轉轉門店商詳頁異步編程的實踐
- 1 背景
- 2 CompletableFuture的發(fā)展
- 2.1 Future簡介
- 2.2 Future的局限性
- 3 關于CompletableFuture
- 3.1 CompletableFuture提供的能力
- 3.2 示例場景的解決方案
- 3.3 實際應用效果
- 4 總結
- 5 參考資料
1 背景
轉轉門店商詳頁為用戶在小程序、APP等終端提供下單、預約看機的能力。下圖為轉轉門店小程序的商詳頁。
圖片
起初業(yè)務體量小,QPS低,內容簡單,接口采用簡單的串行實現,但隨著門店業(yè)務的發(fā)展,頁面信息的豐富,QPS的提升,串行實現的缺點也顯露出來:訪問耗時高,代碼耦合嚴重。
假設門店商詳頁有以下幾個模塊,每個模塊需要如下時間才能完成:
- 獲取商品基礎信息 0.5s
- 獲取優(yōu)惠信息 1s
- 獲取門店信息 1s
- 獲取驗機評估報告信息 1s
- 獲取標品參數信息 1s
- 組裝返回商品信息 0.5s
串行執(zhí)行每個模塊,那么一共需要5s才能返回給調用方,如果RPC接口產生超時等,會比5s還要長,顯然是不能接受的。
圖片
如果有多個線程并行完成各個模塊,可能2s內就能返回信息。
圖片
在JDK1.8中引入了CompletableFuture實現類,擴展了Future和CompletionStage,實現異步多線程任務執(zhí)行。
本文主要以獲取門店商詳頁信息為例介紹CompletableFuture的常用方法和使用。
2 CompletableFuture的發(fā)展
常見的線程創(chuàng)建方式有兩種,一是直接繼承Thread,另一種是實現Runnable接口。但這兩種方式有個缺點,不支持獲取線程執(zhí)行結果,所以在JDK1.5之后,提供了Callable和Future,可以在任務執(zhí)行后獲取執(zhí)行結果。
2.1 Future簡介
Future類位于java.util.concurrent
包下,從下面的源碼可以看出,Future主要提供了三種能力:
- 關閉執(zhí)行中的任務
- 判斷任務是否執(zhí)行完成
- 獲取任務執(zhí)行的結果
package java.util.concurrent;
public interface Future<V> {
// 取消執(zhí)行中的任務
boolean cancel(boolean mayInterruptIfRunning);
// 判斷任務是否被取消成功
boolean isCancelled();
// 判斷任務是否執(zhí)行完成
boolean isDone();
// 獲取任務執(zhí)行結果
V get() throws InterruptedException, ExecutionException;
// 在規(guī)定時間內獲取任務執(zhí)行結果,若規(guī)定時間任務還沒執(zhí)行完,則返回null,而非拋異常
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
2.2 Future的局限性
Future通過isDone()
判斷任務是否執(zhí)行完成,get()
獲取任務執(zhí)行的結果,解決了創(chuàng)建線程異步執(zhí)行任務不能獲取執(zhí)行結果的問題,在任務異步執(zhí)行中,主線程在等待過程中可以做其他事,但其本身也存在一定的局限性。
- 并行執(zhí)行多任務獲取結果主線程長時間阻塞:當需要將多個模塊的任務異步執(zhí)行時,使用for循環(huán)遍歷任務列表,通過
isDone()
輪詢判斷任務是否執(zhí)行完成,通過get()
方法獲取任務執(zhí)行結果,且結果獲取依賴任務執(zhí)行順序。但因Future的get()
方法是主線程阻塞等待獲取執(zhí)行結果,所以在結果返回前,主線程不能處理其他任務,長時間阻塞,可能會產生block,在使用時考慮用超時時間的get()
方法。 - 不能鏈式執(zhí)行任務:如上述場景,希望在獲取商品基礎信息后執(zhí)行獲取優(yōu)惠信息任務,Future沒有提供這種能力,不能實現鏈式調用。
- 不能將多個任務執(zhí)行的結果組合:在上述場景中,希望獲取商詳所需的各個模塊信息后,組合成調用方需要的結果,但Future不支持。
- 不能處理異常:Future沒有異常處理的能力。
綜上所述,阻塞主線程獲取結果的方式與異步編程的初衷相違背,輪詢判斷任務是否執(zhí)行完成會耗費不必要的CPU資源,為優(yōu)化上述問題,在JDK1.8時引入了CompletableFuture實現類,提供異步鏈式編程的能力。
3 關于CompletableFuture
CompletableFuture擴展了Future接口,實現了CompletionStage,提供了函數式編程的能力,通過回調的方式處理計算結果,并提供了轉換和組合CompletableFuture的方法,從而簡化異步編程的復雜性。下圖為CompletableFuture類圖。
圖片
3.1 CompletableFuture提供的能力
CompletableFuture依然有Future的能力,可以通過get()
獲取結果,但不推薦使用,那么CompletableFuture提供了哪些能力呢?
3.1.1 創(chuàng)建異步任務
CompletableFuture提供了四種創(chuàng)建異步對象的方法,如下:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
其中runAsync
表示無返回值的異步對象,supplyAsync
為有返回值的異步對象,仍然可以通過如果不指定線程池會默認使用ForkJoinPool.commonPool()
,建議使用自定義的線程池,和其他場景區(qū)分開,避免線程資源競爭,也易于監(jiān)控線程使用情況。
在上述獲取商品詳情頁信息的示例中,想獲取驗機評估報告信息,可以使用無返回值的runAsync
創(chuàng)建對象,使用自定義的線程池testThreadPool,結果集使用自定義的對象ResultDTO接收,里面定義各個模塊結果的成員變量,如果使用Map接收結果,需要注意線程安全問題。
ExecutorService testThreadPool = Executors.newFixedThreadPool(10);
ResultDTO resultDTO = new ResultDTO();
CompletableFuture<Void> baseInfoFuture = CompletableFuture.runAsync(() -> {
System.out.println("獲取商品驗機評估報告任務當前線程" + Thread.currentThread().getId());
//獲取報告rpc接口邏輯
resultMap.put("baseInfo", "");
}, testThreadPool).exceptionally(e -> {
System.out.println(e.getMessage());
return null;
});
3.1.2 異步回調
CompletableFuture可以在任務處理結束異步回調,主要提供了以下回調方法:
//當上一個任務執(zhí)行結束時,回調方法接收上一段任務執(zhí)行結果或異常,返回上一段任務結果
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
//當上一個任務執(zhí)行結束時,回調方法接收上一段任務執(zhí)行結果或異常,返回回調方法的結果
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
//異步回調
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
//當上一個任務執(zhí)行出現異常時,返回回調中的結果
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
3.1.3 異步編排
CompletionStage接口定義了異步編排的各種場景,CompletableFuture實現了該接口,每個CompletionStage表示異步編排的某個階段,任務執(zhí)行可能在上一個階段完成時觸發(fā),也可能后面幾個階段全部完成或某個階段完成時觸發(fā)。
原理大致是每個新建的CompletableFuture
對象,會將當前方法需要執(zhí)行的任務封裝成Completion
對象,將其壓入到上個方法中創(chuàng)建的異步任務棧中,Completion為一個鏈表結構,保存依賴的任務,CAS出鏈表后,tryFire()
執(zhí)行依賴任務,從而實現任務串聯。
比較常用的場景有在上一階段完成時觸發(fā)的串行場景,多個階段都執(zhí)行完成觸發(fā)的AND,或者多個階段中某一階段執(zhí)行完觸發(fā)的OR。下面為方法示例。
//異步接收上一階段任務結果,且有返回值
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
//異步接收上一階段任務結果,但無返回值
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
//等待全部異步任務執(zhí)行結束
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//某個任務執(zhí)行結束即返回
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
在上述獲取商詳單示例中,可以使用thenAcceptAsync
在獲取商品基礎信息后異步獲取優(yōu)惠信息,如下代碼。
CompletableFuture<Void> productBaseInfoFuture = CompletableFuture.runAsync(() -> {
ProductInfoDTO base = new ProductInfoDTO();
BaseInfoDTO baseInfoDTO = rpcxx;
resultDTO.setBaseInfoDTO(baseInfoDTO);
}, testThreadPool).thenAcceptAsync(() -> {
CouponInfoDTO couponInfoDTO = rpcxx;
resultDTO.setCouponInfoDTO(couponInfoDTO);
}, testThreadPool);
返回結果:
圖片
3.2 示例場景的解決方案
通過上述分析,可以將獲取商品詳情頁的步驟分為三步,分別為:
- 獲取商品基礎信息、商品驗機評估報告信息、商品標品參數信息、門店信息;
- 獲取商品優(yōu)惠信息,需要等1結束;
- 將上述信息RPC結果組裝返回,需要等1,2結束。
因為多任務異步并行執(zhí)行,最終耗時將取決于耗時最長的鏈路。如下圖所示:
圖片
代碼示例:
ExecutorService testThreadPool = Executors.newFixedThreadPool(10);
ResultDTO resultDTO = new ResultDTO();
//基礎信息
CompletableFuture<Void> productBaseInfoFuture = CompletableFuture.runAsync(() -> {
BaseInfoDTO baseInfoDTO = rpcxx;
resultDTO.setBaseInfoDTO(baseInfoDTO);
}, testThreadPool);
//優(yōu)惠信息
CompletableFuture<Void> couponInfoFuture = productBaseInfoFuture.thenAcceptAsync(() -> {
CouponInfoDTO couponInfoDTO = rpcxx;
resultDTO.setCouponInfoDTO(couponInfoDTO);
}, testThreadPool);
//驗機評估報告信息
CompletableFuture<Void> qcInfoFuture = CompletableFuture.runAsync(() -> {
QcInfoDTO qcInfoDTO = rpcxx;
resultDTO.setQcInfoDTO(qcInfoDTO);
}, testThreadPool);
//門店信息
CompletableFuture<Void> storeInfoFuture = CompletableFuture.runAsync(() -> {
StoreInfoDTO storeInfoDTO = rpcxx;
resultDTO.setStoreInfoDTO(storeInfoDTO);
}, testThreadPool);
//標品參數信息
CompletableFuture<Void> spuInfoFuture = CompletableFuture.runAsync(() -> {
SpuInfoDTO spuInfoDTO = rpcxx;
resultDTO.setSpuInfoDTO(spuInfoDTO);
}, testThreadPool);
//組裝結果
CompletableFuture<Void> allQuery = CompletableFuture.allOf(couponInfoFuture, qcInfoFuture, storeInfoFuture, spuInfoFuture);
CompletableFuture<Void> buildFuture = allQuery.thenAcceptAsync((result) -> {
//組裝邏輯
return null;
}).join();
以上即為獲取門店商品詳情頁異步編排的實現邏輯,但也發(fā)現,該方案創(chuàng)建多個異步任務,執(zhí)行邏輯不一樣但流程大致相同,類似的代碼重復寫多遍,不便于擴展和閱讀。
在此基礎上可以優(yōu)化為使用CompletableFuture+簡單工廠+策略模式,將上述步驟中的每個模塊都作為策略handler,且策略之間有權重依賴關系,模塊類型作為工廠類型,將模塊類型放進列表中,使用CompletableFuture.allOf()
異步執(zhí)行列表中的任務。
偽代碼如下:
List<String> eventList = Arrays.asList("xx", "xxx");
CompletableFuture.allOf(eventList.stream().map(event ->
CompletableFuture.runAsync(() -> {
//通過工廠類型獲取策略實現handler
if (Objects.nonNull(handler)) {
//如果存在則執(zhí)行
}
}, testThreadPool)).toArray(CompletableFuture[]::new)).join();
3.3 實際應用效果
最終轉轉門店商詳頁接口重構采用CompletableFuture+簡單工廠+策略模式的方案,解耦堆疊的代碼塊,使功能更易擴展,可閱讀性更高,在重構代碼上線后,效果明顯。
4 總結
以上就是關于使用CompletableFuture進行異步編程的全部內容,在此基礎上我們還要注意以下幾點:
- 當有多個任務可以異步并行執(zhí)行時,使用CompletableFuture,任務越多效果越明顯;
- 使用CompletableFuture可以將多個任務串聯執(zhí)行,也可以利用組合方式將任務排列由列表變成樹結構;
- 在使用集合接收多線程處理任務的結果時,需要考慮線程安全問題;
- 當任務執(zhí)行有相互依賴關系時,需考慮任務超時主動結束,避免系統(tǒng)block。