Dubbo異步化實踐
1、背景
從Apach Dubbo的官網了解到從 2.7.0 版本開始,Dubbo 的所有異步編程接口開始以CompletableFuture為基礎,Dubbo接口異步化能夠極大地提高接口性能,降低接口依賴調用之間的阻塞,同時了解到我們公司大部分應用使用的是同步rpc,在公司降本增效的大背景下,我們選擇了在客服機器人組對Dubbo異步化進行落地實踐,實踐下來發(fā)現(xiàn)Dubbo異步化對接口性能提升了50%,涉及異步化的應用服務器縮減了1/3,接下來主要為大家分享一下實踐的經驗以及異步化提升的效果。
2、Dubbo異步化實現(xiàn)方式
通過CompletableFuture可以將復雜的業(yè)務邏輯從Dubbo線程池(大小默認200)切換到用戶自定義的業(yè)務線程來執(zhí)行,提升Dubbo線程池請求的處理能力,同時增加自定義業(yè)務線程池,提升服務器的資源利用率。接下來我們來看下CompletableFuture怎么異步化Dubbo接口以及其原理。
2.1 接口改造方式
getRecommendContent為老的方法,asyncGetRecommendContent為新添加的異步方法;老的方法保留,兼容沒有升級的調用方;添加新的異步方法,返回值使用CompletableFuture進行包裝:
public interface RecommendAtConnectApi {
Result<RecommendAtConnectRes> getRecommendContent(RecommendAtConnectReq request);
CompletableFuture<Result<RecommendAtConnectRes>> asyncGetRecommendContent(RecommendAtConnectReq request);
}
2.2 future使用方式
下面先介紹幾種常用的使用方式:
- future的結果獲取到時轉化處理(thenApply)
CompletableFuture<String> cFuture = cAsyncService.asyncSayHello(name);
CompletableFuture<DataDTO> finalFuture = cFuture.thenApply(c -> new DataDTO());
return finalFuture;
- 多個future組合轉化(thenCombine),超過2個可使用allOf,后面實踐有使用到
CompletableFuture<String> cFuture = cAsyncService.asyncSayHello(name);
CompletableFuture<String> dFuture = dAsyncService.asyncSayHello(name);
CompletableFuture<DataDTO> allFuture = cFuture.thenCombine(dFuture, (c, d) -> new DataDTO());
return allFuture;
- 多個future前后依賴(thenCompose)
CompletableFuture<Optional<RecommendAtConnectDto>> taskEngineFuture = pushGsTaskEngineHandler.asyncPushHandler(connectRequest);
CompletableFuture<Optional<RecommendAtConnectDto>> refundFuture = getNextFuture(taskEngineFuture, connectRequest, unused ->pushLogisticsRefundHandler.asyncPushHandler(connectRequest));
return refundFuture;
//回調工具方法
public static CompletableFuture<Optional<RecommendAtConnectDto>> getNextFuture(CompletableFuture<Optional<RecommendAtConnectDto>> beforeFuture,
RecommendAtConnectRequest request,
Function<RecommendAtConnectRequest, CompletableFuture<Optional<RecommendAtConnectDto>>> function) {
return beforeFuture.thenCompose(recommendAtConnectDto -> {
if (!recommendAtConnectDto.isPresent()) {
return function.apply(request);
}
return beforeFuture;
});
}
還有很多其他的使用方式這里就不再一一介紹,大家感興趣了可以去看下官方文檔https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
2.3 CompletableFuture原理
//CompletableFuture源碼
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
CompletableFuture有兩個非常重要的屬性result和stack,result是future中的結果,stack是future獲取到結果時回調的函數(shù)動作存儲的棧,stack是一個Completion,Completion中有指向下一個Completion的指針。
thenApply
//thenApply源碼
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
//生成當前future的依賴
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
//CAS操作壓入棧中
push(c);
//嘗試運行一下
c.tryFire(SYNC);
}
return d;
}
thenApply的原理比較簡單在調用的時候會將回調的邏輯生成UniApply壓入棧中,UniApply中包含了返回的future和當前的feture,等到當前future有結果返回時,會回調執(zhí)行棧中的函數(shù)f。
thenCombine
//thenCombine源碼
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.biApply(this, b, f, null)) {
//生成二元依賴的BiCompletion
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
//將其壓入當前和組合的future棧中
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
thenCombine依賴兩個future,返回一個新的future,當依賴的兩個future都有結果返回之后,回調傳入的函數(shù)動作。
thenCompose
//thenCompose源碼
private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
if (f == null) throw new NullPointerException();
Object r; Throwable x;
//如果線程池為空且當前future已經有結果
if (e == null && (r = result) != null) {
// try to return function result directly
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
return new CompletableFuture<V>(encodeThrowable(x, r));
}
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
//將當前處理結果作為f的輸入,并執(zhí)行f得到新的future g
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
Object s = g.result;
//如果已經有結果直接返回
if (s != null)
return new CompletableFuture<V>(encodeRelay(s));
//new一個返回的future
CompletableFuture<V> d = new CompletableFuture<V>();
//生成一個元依賴的UniCompletion
UniRelay<V> copy = new UniRelay<V>(d, g);
//將其壓入g的棧中
g.push(copy);
copy.tryFire(SYNC);
return d;
} catch (Throwable ex) {
return new CompletableFuture<V>(encodeThrowable(ex));
}
}
//如果當前結果為空,則直接生成當前feture的依賴,壓入棧中
CompletableFuture<V> d = new CompletableFuture<V>();
UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
return d;
}
CompletableFuture底層借助了魔法類Unsafe的相關CAS方法,除了get或join阻塞之外,其他方法都實現(xiàn)了無鎖操作。
3、實踐經驗
3.1 機器人場景選擇
這次實踐主要選擇了機器人的3個場景進行改造:訂單詳情頁和聊天頁猜你想問以及輸入聯(lián)想。選擇這3個場景的原因如下:
- 接口qps高,異步化ROI高
- 大量調用外部接口,屬于IO密集型場景,異步化提升效果明顯
- 出于安全和穩(wěn)定性的考慮,機器人核心的對話接口不受這3個接口異步化的影響
3.2 最佳實踐
3.2.1 梳理接口的先后依賴關系
不管是新的功能的開發(fā)還是老的代碼的改造這一步都至關重要,我們可以像梳理電路圖一樣梳理接口之間的先后依賴關系,將并行關系和串行關系梳理出來,筆者在實踐之后才明白這個道理,希望這份經驗能幫助大家少走一些彎路:
- 圖中每個CF為接口或者service返回的CompletableFuture
- CF1、CF2和CF3同一層的代表它們是并行的關系,CF2和CF4前后代表它們是依賴的關系
- 最后組裝3條并行鏈路的結果一起返回
3.2.2 代碼編寫
這里基于上述的梳理出來的圖例寫一下具體的代碼
public CompletableFuture<CFResponse> getResult(){
//并行3條鏈路
CompletableFuture<CF1Response> cf1 = cf1Service.getResult();
CompletableFuture<CF2CombineResponse> cf2Combine = getCf2Combine();
CompletableFuture<CF3CombineResponse> cf3Combine = getCf3Combine();
//組合3個future,轉化結果
CompletableFuture<Void> finalFuture = CompletableFuture.allOf(cf1, cf2Combine, cf3Combine);
return finalFuture.thenApply((unused, r) -> new CFResponse(cf1.get().getCf1Value() +
cf2Combine.get().getCf2CombineValue() + cf3Combine.get().getCf3CombineValue()));
}
//第二條鏈路的執(zhí)行
private CompletableFuture<CF2CombineResponse> getCf2Combine() {
CompletableFuture<CF2Response> cf2 = cf2Service.getResult();
return cf2.thenCompose(cf2Response -> {
CompletableFuture<CF4Response> cf3 = cf4Service.getResult(cf2Response.getCf2Value());
return cf3.thenApply(cf4Response -> new CF2CombineResponse(cf4Response.getCf4Value()));
});
}
//第三條鏈路的執(zhí)行
private CompletableFuture<CF3CombineResponse> getCf3Combine() {
CompletableFuture<CF3Response> cf3 = cf3Service.getResult();
return cf3.thenCompose(cf3Response -> {
CompletableFuture<CF5Response> cf5 = cf5Service.getResult(cf3Response.getCf3Value());
CompletableFuture<CF6Response> cf6 = cf6Service.getResult(cf3Response.getCf3Value());
return CompletableFuture.allOf(cf5, cf6).thenCompose(unused -> cf7Service.getResult(cf5.get().getCf5Value(), cf6.get().getCf6Value()));
});
}
實際改造代碼片段
接口:
public interface RecommendAtConnectApi {
/**
* 聊天頁
* @param request
* @return
*/
Result<RecommendAtConnectRes> getRecommendContentNew(RecommendAtConnectReq request);
/**
* 聊天頁異步
* @param request
* @return
*/
CompletableFuture<Result<RecommendAtConnectRes>> asyncGetRecommendContentNew(RecommendAtConnectReq request);
}
thenApply結果轉化
public CompletableFuture<RecommendAtConnectRes> asyncGetRecommendContent(RecommendAtConnectReq request) {
RecommendAtConnectRequest recommendAtConnectRequest = getRecommendAtConnectRequest(request);
CompletableFuture<RecommendAtConnectDto> future = recommendAtConnectEventHandlerChain.asyncHandlerOfRecommendAtConnect(recommendAtConnectRequest);
return Objects.isNull(future)? null: future.thenApply(this::dtoToRes);
}
前后future依賴:
//future編排
CompletableFuture<Optional<RecommendAtConnectDto>> taskEngineFuture = pushGsTaskEngineHandler.asyncPushHandler(connectRequest);
CompletableFuture<Optional<RecommendAtConnectDto>> refundFuture = getNextFuture(taskEngineFuture, connectRequest, unused ->pushLogisticsRefundHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> serviceCaseFuture = getNextFuture(refundFuture, connectRequest, unused ->pushServiceCaseHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> orderFuture = getNextFuture(serviceCaseFuture, connectRequest, unused->pushOrderSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> spuFuture = getNextFuture(orderFuture, connectRequest, unused->pushSpuSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> customerCenterFuture = getNextFuture(spuFuture, connectRequest, unused->pushCustomerCenterSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> guessQuestionFuture = getNextFuture(customerCenterFuture,connectRequest, unused -> pushGuessQuestionHandler.asyncPushHandler(connectRequest));
finalFuture = getNextFuture(guessQuestionFuture, connectRequest, unused -> pushWelcomeHandler.asyncPushHandler(connectRequest));
//回調工具方法
public static CompletableFuture<Optional<RecommendAtConnectDto>> getNextFuture(CompletableFuture<Optional<RecommendAtConnectDto>> beforeFuture,
RecommendAtConnectRequest request,
Function<RecommendAtConnectRequest, CompletableFuture<Optional<RecommendAtConnectDto>>> function) {
return beforeFuture.thenCompose(recommendAtConnectDto -> {
if (!recommendAtConnectDto.isPresent()) {
return function.apply(request);
}
return beforeFuture;
});
}
3.2.3 線程池
自定義業(yè)務線程池
處理具體的業(yè)務邏輯時,如果不傳入線程池,默認使用ForkJoinPool的commonPool,其線程數(shù)量默認是CPU的核心數(shù)量-1,推薦傳入自定義的業(yè)務線程池,防止阻塞dubbo線程。
//自定義dubbo業(yè)務線程池
@Bean(name = "dubboAsyncBizExecutor")
public ThreadPoolTaskExecutor dubboAsyncBizExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(200);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("dubboAsyncBizExecutor-");
executor.setRejectedExecutionHandler((r, executor1) -> log.error("dubbo async biz task exceed limit"));
return executor;
}
public CompletableFuture<Result<GuessQuestionResponse>> asyncPredictQuestion(PredictQuestionExtRequest request) {
log.info("asyncPredictQuestion start");
CompletableFuture<Result<GuessQuestionResponse>> resultCompletableFuture =
CompletableFuture.supplyAsync(() -> predictQuestionNew(request), dubboAsyncBizExecutor);
log.info("asyncPredictQuestion end");
return resultCompletableFuture;
}
同步和異步線程隔離(目前最新正式版本3.2.0支持)
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- NOTE: we need config executor-management-mode="isolation" -->
<dubbo:application name="demo-provider" executor-management-mode="isolation">
</dubbo:application>
<bean id="syncService" class="org.apache.dubbo.config.spring.impl.SyncServiceImpl"/>
<bean id="asyncService" class="org.apache.dubbo.config.spring.impl.AsyncServiceImpl"/>
<!-- customized thread pool -->
<bean id="executor-sync-service"
class="org.apache.dubbo.config.spring.isolation.spring.support.SyncServiceExecutor"/>
<bean id="executor-async-service"
class="org.apache.dubbo.config.spring.isolation.spring.support.AsyncServiceExecutor"/>
<dubbo:service executor="executor-sync-service"
interface="org.apache.dubbo.config.spring.api.SyncService" version="1.0.0"
timeout="3000" ref="syncService" />
<dubbo:service executor="executor-async-service"
interface="org.apache.dubbo.config.spring.api.AsyncService" version="1.0.0"
timeout="5000" ref="asyncService" />
</beans>
3.2.4 異常處理
CompletableFuture異常處理使用回調exceptionally,當CompletableFuture執(zhí)行的過程拋出了異常,會使用CompletionException進行封裝然后拋出。
CompletableFuture<RecommendAtConnectDto> asyncPushContent(RecommendAtConnectRequest connectRequest) {
//業(yè)務方法,內部會發(fā)起異步rpc調用
CompletableFuture<String> future = orderSourcePredictHandlerChain.asyncHandleOfPredict(connectRequest);
//這里回調方法thenApply,如果發(fā)生異常thenApply內部會通過new CompletionException(throwable) 對異常進行包裝
return Objects.isNull(future)? null : future.thenApply(messageBody->{
if (StrUtil.isBlank(messageBody)){
log.info(" async orderSourcePredictHandlerChain.handleOfPredict fail, connectRequest:{}", JSON.toJSONString(connectRequest));
return null;
}
RecommendAtConnectDto connectDto = RecommendAtConnectDtoUtil.getDto(
messageBody, connectRequest.getSessionId(),
connectRequest.getCreateChatReq().getUserId(), MessageBodyTypeEnum.MULTI_STAGE.getCode(), EventEnum.PUSH_MULTI_STAGE_MESSAGE.getCode());
return connectDto;
}).exceptionally(err -> {
//通過exceptionally 捕獲異常,這里的err已經被thenApply包裝過,因此需要通過Throwable.getCause()提取異常
log.error("orderSourcePredictHandlerChain.handleOfPredict Exception cnotallow={}", JSON.toJSONString(connectRequest), ExceptionUtils.extractRealException(err));
return 0;
});
}
異常使用自定義工具類ExceptionUtils進行提取。
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//這里判斷異常類型是否為CompletionException、ExecutionException,如果是則進行提取,否則直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
3.2.5 穩(wěn)定性保障
- 改造的過程從上到下改動的同步方法保持不變,新增異步的方法進行支持
- 改造的接口是上游服務端依賴的,和上游服務端溝通,通過AB控制調用同步和異步接口
- 改造的接口是App端依賴的,在接口實現(xiàn)處通過AB控制調用異步和同步service
- 通過以上三種方法可以實現(xiàn)一鍵回滾到最初的邏輯
3.3 遇到的問題
- CompletableFuture回調方法中打印的日志會丟失traceId,已找監(jiān)控團隊幫忙支持解決,但是會增加應用gc的次數(shù),現(xiàn)在生產上是白名單應用開放中
- 異步接口線程池和同步接口線程池隔離在dubbo最新發(fā)布的正式版本3.2.0支持
- CompletableFuture.thenCompose不支持返回null,需要將返回值用Optional包裝返回
- 打印日志的位置變更,由于返回值是future,拿不到真實的結果,只能在回調之中打印日志才能看到真實的結果
- 監(jiān)控平臺監(jiān)控的平均耗時不包含回調的耗時,對于排查接口性能問題會增加一些難度,例如5月10日遇到了一個異步接口耗時同比增加了50%,但是從監(jiān)控平臺上看到平均耗時并沒有明顯增加
4、異步化收益
- 壓測接口性能提升了50%
- 線上接口RT降低1/4左右,其中輸入聯(lián)想接口RT由173.04ms降為119.43ms
- 服務器資源縮減了1/3
- 服務器資源利用率提升
異步化之前CPU的使用率:
異步化縮減機器之后CPU的使用率:
可以看到dubbo異步化之后,服務器cpu的使用率由18左右提升到了50%左右,大家在進行機器縮減時需要關注一下CPU的使用率,當CPU的使用率超過60%時就會引發(fā)報警,這個就是我們縮減的極限了,如果在繼續(xù)縮減在一些流量高峰或者流量飆升的場景會出現(xiàn)風險。
5、其他
- Dubbo異步化對編程者的代碼水平和架構能力都有一定的要求,同時在對老的代碼異步化的過程中,通過對上述接口先后調用關系的梳理也能發(fā)現(xiàn)很多代碼不合理或者有性能問題的地方,對代碼質量的提高也有一定的好處,其實就算不是想異步化,而是想提高代碼的并發(fā)度,這種前后依賴關系的梳理也是必不可少的,只不過異步化是將程序的并發(fā)度提升到極致的一種表現(xiàn)。
- Dubbo異步化編程和以往的同步編程習慣可能有所不同,但是轉念一想,是不是異步化才是現(xiàn)實世界中更加真實的寫照,更加的符合現(xiàn)實世界運轉的規(guī)律,我們在規(guī)劃做一件事情時,往往會將事情進行拆解,然后同時(是指同一段時間不是同一刻)去做沒有先后依賴關系的多件事情,而不是做一件事,然后一直等到有結果了再去做其他事情。
- 通過壓測我們發(fā)現(xiàn)當壓測qps不斷提高依賴的接口或者組件的耗時增加比較明顯,且慢慢成為性能提升的瓶頸時,異步帶來的提升效果會受到此瓶頸的制約,帶來提升會有一定比例的折扣,所以大家在做異步化實踐時,需要稍微降低一些提升的預期。
6、總結
通過這次實踐,我們使用CompletableFuture將Dubbo接口進行了異步化,同時利用CompletableFuture的異步回調能力,減少了服務依賴之間的阻塞,增加了dubbo線程的處理請求的能力,同時利用CompletableFuture傳入的業(yè)務線程提高了服務器CPU資源的利用率,用更少的硬件資源可以處理更多的請求,為公司的降本增效貢獻了一小份力量。