自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

項目自從用了接口請求合并,效率直接加倍!

開發(fā) 項目管理
請求合并,批量的辦法能大幅節(jié)省被調用系統(tǒng)的連接資源,本例是以數(shù)據(jù)庫為例,其他RPC調用也是類似的道理。缺點就是請求的時間在執(zhí)行實際的邏輯之前增加了等待時間,不適合低并發(fā)的場景。

大家好,我是不才陳某~

請求合并到底有什么意義呢?我們來看下圖。

圖片

假設我們3個用戶(用戶id分別是1、2、3),現(xiàn)在他們都要查詢自己的基本信息,請求到服務器,服務器端請求數(shù)據(jù)庫,發(fā)出3次請求。我們都知道數(shù)據(jù)庫連接資源是相當寶貴的,那么我們怎么盡可能節(jié)省連接資源呢?

這里把數(shù)據(jù)庫換成被調用的遠程服務,也是同樣的道理。

我們改變下思路,如下圖所示。

圖片

我們在服務器端把請求合并,只發(fā)出一條SQL查詢數(shù)據(jù)庫,數(shù)據(jù)庫返回后,服務器端處理返回數(shù)據(jù),根據(jù)一個唯一請求ID,把數(shù)據(jù)分組,返回給對應用戶。

技術手段

  • LinkedBlockQueue 阻塞隊列
  • ScheduledThreadPoolExecutor 定時任務線程池
  • CompleteableFuture future 阻塞機制(Java 8 的 CompletableFuture 并沒有 timeout 機制,后面優(yōu)化,使用了隊列替代)

代碼實現(xiàn)

查詢用戶的代碼

public interface UserService {

Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
}
@Service
public class UserServiceImpl implements UserService {

@Resource
private UsersMapper usersMapper;

@Override
public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
// 全部參數(shù)
List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
// 用in語句合并成一條SQL,避免多次請求數(shù)據(jù)庫的IO
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap<String, Users> result = new HashMap<>();
userReqs.forEach(val -> {
List<Users> usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
// 表示沒數(shù)據(jù)
result.put(val.getRequestId(), null);
}
});
return result;
}
}

合并請求的實現(xiàn)

package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/***
* zzq
* 包裝成批量執(zhí)行的地方
* */
@Service
public class UserWrapBatchService {
@Resource
private UserService userService;

/**
* 最大任務數(shù)
**/
public static int MAX_TASK_NUM = 100;


/**
* 請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
* CompletableFuture將處理結果返回
*/
public class Request {
// 請求id 唯一
String requestId;
// 參數(shù)
Long userId;
//TODO Java 8 的 CompletableFuture 并沒有 timeout 機制
CompletableFuture<Users> completableFuture;

public String getRequestId() {
return requestId;
}

public void setRequestId(String requestId) {
this.requestId = requestId;
}

public Long getUserId() {
return userId;
}

public void setUserId(Long userId) {
this.userId = userId;
}

public CompletableFuture getCompletableFuture() {
return completableFuture;
}

public void setCompletableFuture(CompletableFuture completableFuture) {
this.completableFuture = completableFuture;
}
}

/*
LinkedBlockingQueue是一個阻塞的隊列,內部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小于添加的速度時,容易造成OOM。
ArrayBlockingQueue的存儲容器是數(shù)組,而LinkedBlockingQueue是存儲容器是鏈表
兩者的實現(xiàn)隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
而LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,
也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
*/
private final Queue<Request> queue = new LinkedBlockingQueue();

@PostConstruct
public void init() {
//定時任務線程池,創(chuàng)建一個支持定時、周期性或延時任務的限定線程數(shù)目(這里傳入的是1)的線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果隊列沒數(shù)據(jù),表示這段時間沒有請求,直接返回
if (size == 0) {
return;
}
List<Request> list = new ArrayList<>();
System.out.println("合并了 [" + size + "] 個請求");
//將隊列的請求消費到一個集合保存
for (int i = 0; i < size; i++) {
// 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務數(shù),等下次執(zhí)行
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
//拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
List<Request> userReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
//將參數(shù)傳入service處理, 這里是本地服務,也可以把userService 看成RPC之類的遠程調用
Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
//將處理結果返回各自的請求
for (Request request : list) {
Users result = response.get(request.requestId);
request.completableFuture.complete(result); //completableFuture.complete方法完成賦值,這一步執(zhí)行完畢,下面future.get()阻塞的請求可以繼續(xù)執(zhí)行了
}
}, 100, 10, TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
//這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
}

public Users queryUser(Long userId) {
Request request = new Request();
// 這里用UUID做請求id
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
CompletableFuture<Users> future = new CompletableFuture<>();
request.completableFuture = future;
//將對象傳入隊列
queue.offer(request);
//如果這時候沒完成賦值,那么就會阻塞,直到能夠拿到值
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}

控制層調用

/***
* 請求合并
* */
@RequestMapping("/merge")
public Callable<Users> merge(Long userId) {
return new Callable<Users>() {
@Override
public Users call() throws Exception {
return userBatchService.queryUser(userId);
}
};
}

Callable是什么可以參考:

??https://blog.csdn.net/baidu_19473529/article/details/123596792??

模擬高并發(fā)查詢的代碼

package com.springboot.sample;

import org.springframework.web.client.RestTemplate;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class TestBatch {
private static int threadCount = 30;

private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //為保證30個線程同時并發(fā)運行

private static final RestTemplate restTemplate = new RestTemplate();

public static void main(String[] args) {


for (int i = 0; i < threadCount; i++) {//循環(huán)開30個線程
new Thread(new Runnable() {
public void run() {
COUNT_DOWN_LATCH.countDown();//每次減一
try {
COUNT_DOWN_LATCH.await(); //此處等待狀態(tài),為了讓30個線程同時進行
} catch (InterruptedException e) {
e.printStackTrace();
}

for (int j = 1; j <= 3; j++) {
int param = new Random().nextInt(4);
if (param <=0){
param++;
}
String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
System.out.println(Thread.currentThread().getName() + "參數(shù) " + param + " 返回值 " + responseBody);
}
}
}).start();

}
}
}

測試效果

圖片

圖片

要注意的問題

  • Java 8 的 CompletableFuture 并沒有 timeout 機制
  • 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務數(shù),等下次執(zhí)行(本例中加了MAX_TASK_NUM判斷)

使用隊列的超時解決Java 8 的 CompletableFuture 并沒有 timeout 機制

核心代碼

package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/***
* zzq
* 包裝成批量執(zhí)行的地方,使用queue解決超時問題
* */
@Service
public class UserWrapBatchQueueService {
@Resource
private UserService userService;

/**
* 最大任務數(shù)
**/
public static int MAX_TASK_NUM = 100;


/**
* 請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區(qū)分
* CompletableFuture將處理結果返回
*/
public class Request {
// 請求id
String requestId;

// 參數(shù)
Long userId;
// 隊列,這個有超時機制
LinkedBlockingQueue<Users> usersQueue;


public String getRequestId() {
return requestId;
}

public void setRequestId(String requestId) {
this.requestId = requestId;
}

public Long getUserId() {
return userId;
}

public void setUserId(Long userId) {
this.userId = userId;
}

public LinkedBlockingQueue<Users> getUsersQueue() {
return usersQueue;
}

public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
this.usersQueue = usersQueue;
}
}

/*
LinkedBlockingQueue是一個阻塞的隊列,內部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
LinkedBlockingQueue與ArrayBlockingQueue的區(qū)別
ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小于添加的速度時,容易造成OOM。
ArrayBlockingQueue的存儲容器是數(shù)組,而LinkedBlockingQueue是存儲容器是鏈表
兩者的實現(xiàn)隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
而LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,
也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
*/
private final Queue<Request> queue = new LinkedBlockingQueue();

@PostConstruct
public void init() {
//定時任務線程池,創(chuàng)建一個支持定時、周期性或延時任務的限定線程數(shù)目(這里傳入的是1)的線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果隊列沒數(shù)據(jù),表示這段時間沒有請求,直接返回
if (size == 0) {
return;
}
List<Request> list = new ArrayList<>();
System.out.println("合并了 [" + size + "] 個請求");
//將隊列的請求消費到一個集合保存
for (int i = 0; i < size; i++) {
// 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數(shù)量,超過最大任務數(shù),等下次執(zhí)行
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
//拿到我們需要去數(shù)據(jù)庫查詢的特征,保存為集合
List<Request> userReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
//將參數(shù)傳入service處理, 這里是本地服務,也可以把userService 看成RPC之類的遠程調用
Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
for (Request userReq : userReqs) {
// 這里再把結果放到隊列里
Users users = response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

}, 100, 10, TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執(zhí)行 schedule是延遲執(zhí)行 initialDelay是初始延遲 period是周期間隔 后面是單位
//這里我寫的是 初始化后100毫秒后執(zhí)行,周期性執(zhí)行10毫秒執(zhí)行一次
}

public Users queryUser(Long userId) {
Request request = new Request();
// 這里用UUID做請求id
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
request.usersQueue = usersQueue;
//將對象傳入隊列
queue.offer(request);
//取出元素時,如果隊列為空,給定阻塞多少毫秒再隊列取值,這里是3秒
try {
return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
...省略..

@Override
public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
// 全部參數(shù)
List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
// 用in語句合并成一條SQL,避免多次請求數(shù)據(jù)庫的IO
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap<String, Users> result = new HashMap<>();
// 數(shù)據(jù)分組
userReqs.forEach(val -> {
List<Users> usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
// 表示沒數(shù)據(jù) , 這里要new,不然加入隊列會空指針
result.put(val.getRequestId(), new Users());
}
});
return result;
}

...省略...

小結

請求合并,批量的辦法能大幅節(jié)省被調用系統(tǒng)的連接資源,本例是以數(shù)據(jù)庫為例,其他RPC調用也是類似的道理。缺點就是請求的時間在執(zhí)行實際的邏輯之前增加了等待時間,不適合低并發(fā)的場景。

源碼:https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

責任編輯:武曉燕 來源: 碼猿技術專欄
相關推薦

2022-09-22 08:42:14

接口請求合并技巧

2023-03-27 08:25:28

技巧技術吞吐率

2021-02-02 15:38:19

Disruptor緩存Java

2021-03-08 08:02:40

IDEA插件JSON

2022-01-05 08:29:22

監(jiān)控Prometheus Post

2022-02-23 11:47:57

CharlesFiddler抓包

2025-01-08 09:35:55

Spring性能監(jiān)控

2023-10-30 09:46:08

接口重試技巧

2025-04-09 12:48:13

模型AI數(shù)據(jù)

2021-03-26 15:18:11

代碼工具Mockoon

2021-05-31 09:02:55

KPI考核工具公司

2022-01-27 08:12:50

Potplayer播放器

2022-11-24 08:01:24

HTTPClienIDEA

2024-08-27 09:16:15

接口代碼狀態(tài)

2012-04-09 14:28:53

Java

2024-07-10 08:39:49

2025-03-04 00:00:00

2021-12-21 09:50:02

Java請求合并代碼

2024-06-21 09:19:45

代碼接口重復請求開發(fā)

2017-12-01 08:54:18

SpringCloudHystrix
點贊
收藏

51CTO技術棧公眾號