新聞中心
大家好,我是不才陳某~

請(qǐng)求合并到底有什么意義呢?我們來看下圖。
假設(shè)我們3個(gè)用戶(用戶id分別是1、2、3),現(xiàn)在他們都要查詢自己的基本信息,請(qǐng)求到服務(wù)器,服務(wù)器端請(qǐng)求數(shù)據(jù)庫,發(fā)出3次請(qǐng)求。我們都知道數(shù)據(jù)庫連接資源是相當(dāng)寶貴的,那么我們?cè)趺幢M可能節(jié)省連接資源呢?
這里把數(shù)據(jù)庫換成被調(diào)用的遠(yuǎn)程服務(wù),也是同樣的道理。
我們改變下思路,如下圖所示。
我們?cè)诜?wù)器端把請(qǐng)求合并,只發(fā)出一條SQL查詢數(shù)據(jù)庫,數(shù)據(jù)庫返回后,服務(wù)器端處理返回?cái)?shù)據(jù),根據(jù)一個(gè)唯一請(qǐng)求ID,把數(shù)據(jù)分組,返回給對(duì)應(yīng)用戶。
技術(shù)手段
- LinkedBlockQueue 阻塞隊(duì)列
- ScheduledThreadPoolExecutor 定時(shí)任務(wù)線程池
- CompleteableFuture future 阻塞機(jī)制(Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制,后面優(yōu)化,使用了隊(duì)列替代)
代碼實(shí)現(xiàn)
查詢用戶的代碼
public interface UserService {
Map queryUserByIdBatch(List userReqs);
}
@Service
public class UserServiceImpl implements UserService {
@Resource
private UsersMapper usersMapper;
@Override
public Map queryUserByIdBatch(List userReqs) {
// 全部參數(shù)
List userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapper queryWrapper = new QueryWrapper<>();
// 用in語句合并成一條SQL,避免多次請(qǐng)求數(shù)據(jù)庫的IO
queryWrapper.in("id", userIds);
List users = usersMapper.selectList(queryWrapper);
Map> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap result = new HashMap<>();
userReqs.forEach(val -> {
List usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
// 表示沒數(shù)據(jù)
result.put(val.getRequestId(), null);
}
});
return result;
}
} 合并請(qǐng)求的實(shí)現(xiàn)
package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/***
* zzq
* 包裝成批量執(zhí)行的地方
* */
@Service
public class UserWrapBatchService {
@Resource
private UserService userService;
/**
* 最大任務(wù)數(shù)
**/
public static int MAX_TASK_NUM = 100;
/**
* 請(qǐng)求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
* CompletableFuture將處理結(jié)果返回
*/
public class Request {
// 請(qǐng)求id 唯一
String requestId;
// 參數(shù)
Long userId;
//TODO Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制
CompletableFuturecompletableFuture;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public CompletableFuture getCompletableFuture() {
return completableFuture;
}
public void setCompletableFuture(CompletableFuture completableFuture) {
this.completableFuture = completableFuture;
}
}
/*
LinkedBlockingQueue是一個(gè)阻塞的隊(duì)列,內(nèi)部采用鏈表的結(jié)果,通過兩個(gè)ReenTrantLock來保證線程安全
LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
ArrayBlockingQueue默認(rèn)指定了長度,而LinkedBlockingQueue的默認(rèn)長度是Integer.MAX_VALUE,也就是無界隊(duì)列,在移除的速度小于添加的速度時(shí),容易造成OOM。
ArrayBlockingQueue的存儲(chǔ)容器是數(shù)組,而LinkedBlockingQueue是存儲(chǔ)容器是鏈表
兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖,
而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊(duì)列的吞吐量,
也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高整個(gè)隊(duì)列的并發(fā)性能。
*/
private final Queuequeue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
//定時(shí)任務(wù)線程池,創(chuàng)建一個(gè)支持定時(shí)、周期性或延時(shí)任務(wù)的限定線程數(shù)目(這里傳入的是1)的線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果隊(duì)列沒數(shù)據(jù),表示這段時(shí)間沒有請(qǐng)求,直接返回
if (size == 0) {
return;
}
Listlist = new ArrayList<>();
System.out.println("合并了 [" + size + "] 個(gè)請(qǐng)求");
//將隊(duì)列的請(qǐng)求消費(fèi)到一個(gè)集合保存
for (int i = 0; i < size; i++) {
// 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務(wù)數(shù),等下次執(zhí)行
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
//拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
ListuserReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
//將參數(shù)傳入service處理, 這里是本地服務(wù),也可以把userService 看成RPC之類的遠(yuǎn)程調(diào)用
Mapresponse = userService.queryUserByIdBatch(userReqs);
//將處理結(jié)果返回各自的請(qǐng)求
for (Request request : list) {
Users result = response.get(request.requestId);
request.completableFuture.complete(result); //completableFuture.complete方法完成賦值,這一步執(zhí)行完畢,下面future.get()阻塞的請(qǐng)求可以繼續(xù)執(zhí)行了
}
}, 100, 10, TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
//這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
}
public Users queryUser(Long userId) {
Request request = new Request();
// 這里用UUID做請(qǐng)求id
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
CompletableFuturefuture = new CompletableFuture<>();
request.completableFuture = future;
//將對(duì)象傳入隊(duì)列
queue.offer(request);
//如果這時(shí)候沒完成賦值,那么就會(huì)阻塞,直到能夠拿到值
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}
控制層調(diào)用
/***
* 請(qǐng)求合并
* */
@RequestMapping("/merge")
public Callablemerge(Long userId) {
return new Callable() {
@Override
public Users call() throws Exception {
return userBatchService.queryUser(userId);
}
};
}
Callable是什么可以參考:
??https://blog.csdn.net/baidu_19473529/article/details/123596792??
模擬高并發(fā)查詢的代碼
package com.springboot.sample;
import org.springframework.web.client.RestTemplate;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class TestBatch {
private static int threadCount = 30;
private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //為保證30個(gè)線程同時(shí)并發(fā)運(yùn)行
private static final RestTemplate restTemplate = new RestTemplate();
public static void main(String[] args) {
for (int i = 0; i < threadCount; i++) {//循環(huán)開30個(gè)線程
new Thread(new Runnable() {
public void run() {
COUNT_DOWN_LATCH.countDown();//每次減一
try {
COUNT_DOWN_LATCH.await(); //此處等待狀態(tài),為了讓30個(gè)線程同時(shí)進(jìn)行
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 1; j <= 3; j++) {
int param = new Random().nextInt(4);
if (param <=0){
param++;
}
String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
System.out.println(Thread.currentThread().getName() + "參數(shù) " + param + " 返回值 " + responseBody);
}
}
}).start();
}
}
}
測試效果
要注意的問題
- Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制
- 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務(wù)數(shù),等下次執(zhí)行(本例中加了MAX_TASK_NUM判斷)
使用隊(duì)列的超時(shí)解決Java 8 的 CompletableFuture 并沒有 timeout 機(jī)制
核心代碼
package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/***
* zzq
* 包裝成批量執(zhí)行的地方,使用queue解決超時(shí)問題
* */
@Service
public class UserWrapBatchQueueService {
@Resource
private UserService userService;
/**
* 最大任務(wù)數(shù)
**/
public static int MAX_TASK_NUM = 100;
/**
* 請(qǐng)求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
* CompletableFuture將處理結(jié)果返回
*/
public class Request {
// 請(qǐng)求id
String requestId;
// 參數(shù)
Long userId;
// 隊(duì)列,這個(gè)有超時(shí)機(jī)制
LinkedBlockingQueueusersQueue;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public LinkedBlockingQueuegetUsersQueue() {
return usersQueue;
}
public void setUsersQueue(LinkedBlockingQueueusersQueue) {
this.usersQueue = usersQueue;
}
}
/*
LinkedBlockingQueue是一個(gè)阻塞的隊(duì)列,內(nèi)部采用鏈表的結(jié)果,通過兩個(gè)ReenTrantLock來保證線程安全
LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
ArrayBlockingQueue默認(rèn)指定了長度,而LinkedBlockingQueue的默認(rèn)長度是Integer.MAX_VALUE,也就是無界隊(duì)列,在移除的速度小于添加的速度時(shí),容易造成OOM。
ArrayBlockingQueue的存儲(chǔ)容器是數(shù)組,而LinkedBlockingQueue是存儲(chǔ)容器是鏈表
兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖,
而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊(duì)列的吞吐量,
也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高整個(gè)隊(duì)列的并發(fā)性能。
*/
private final Queuequeue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
//定時(shí)任務(wù)線程池,創(chuàng)建一個(gè)支持定時(shí)、周期性或延時(shí)任務(wù)的限定線程數(shù)目(這里傳入的是1)的線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果隊(duì)列沒數(shù)據(jù),表示這段時(shí)間沒有請(qǐng)求,直接返回
if (size == 0) {
return;
}
Listlist = new ArrayList<>();
System.out.println("合并了 [" + size + "] 個(gè)請(qǐng)求");
//將隊(duì)列的請(qǐng)求消費(fèi)到一個(gè)集合保存
for (int i = 0; i < size; i++) {
// 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務(wù)數(shù),等下次執(zhí)行
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
//拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
ListuserReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
//將參數(shù)傳入service處理, 這里是本地服務(wù),也可以把userService 看成RPC之類的遠(yuǎn)程調(diào)用
Mapresponse = userService.queryUserByIdBatchQueue(userReqs);
for (Request userReq : userReqs) {
// 這里再把結(jié)果放到隊(duì)列里
Users users = response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}
}, 100, 10, TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
//這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
}
public Users queryUser(Long userId) {
Request request = new Request();
// 這里用UUID做請(qǐng)求id
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
LinkedBlockingQueueusersQueue = new LinkedBlockingQueue<>();
request.usersQueue = usersQueue;
//將對(duì)象傳入隊(duì)列
queue.offer(request);
//取出元素時(shí),如果隊(duì)列為空,給定阻塞多少毫秒再隊(duì)列取值,這里是3秒
try {
return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
...省略..
@Override
public MapqueryUserByIdBatchQueue(List userReqs) {
// 全部參數(shù)
ListuserIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper = new QueryWrapper<>();
// 用in語句合并成一條SQL,避免多次請(qǐng)求數(shù)據(jù)庫的IO
queryWrapper.in("id", userIds);
Listusers = usersMapper.selectList(queryWrapper);
Map> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult = new HashMap<>();
// 數(shù)據(jù)分組
userReqs.forEach(val -> {
ListusersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
// 表示沒數(shù)據(jù) , 這里要new,不然加入隊(duì)列會(huì)空指針
result.put(val.getRequestId(), new Users());
}
});
return result;
}
...省略...
小結(jié)
請(qǐng)求合并,批量的辦法能大幅節(jié)省被調(diào)用系統(tǒng)的連接資源,本例是以數(shù)據(jù)庫為例,其他RPC調(diào)用也是類似的道理。缺點(diǎn)就是請(qǐng)求的時(shí)間在執(zhí)行實(shí)際的邏輯之前增加了等待時(shí)間,不適合低并發(fā)的場景。
源碼:https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5
當(dāng)前文章:項(xiàng)目自從用了接口請(qǐng)求合并,效率直接加倍!
網(wǎng)址分享:http://fisionsoft.com.cn/article/coidiog.html


咨詢
建站咨詢
