自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Dubbo異步化實踐

開發(fā) 項目管理
我們使用CompletableFuture將Dubbo接口進行了異步化,同時利用CompletableFuture的異步回調能力,減少了服務依賴之間的阻塞,增加了dubbo線程的處理請求的能力,同時利用CompletableFuture傳入的業(yè)務線程提高了服務器CPU資源的利用率,用更少的硬件資源可以處理更多的請求,為公司的降本增效貢獻了一小份力量。

1、背景

從Apach Dubbo的官網了解到從 2.7.0 版本開始,Dubbo 的所有異步編程接口開始以CompletableFuture為基礎,Dubbo接口異步化能夠極大地提高接口性能,降低接口依賴調用之間的阻塞,同時了解到我們公司大部分應用使用的是同步rpc,在公司降本增效的大背景下,我們選擇了在客服機器人組對Dubbo異步化進行落地實踐,實踐下來發(fā)現(xiàn)Dubbo異步化對接口性能提升了50%,涉及異步化的應用服務器縮減了1/3,接下來主要為大家分享一下實踐的經驗以及異步化提升的效果。

2、Dubbo異步化實現(xiàn)方式

通過CompletableFuture可以將復雜的業(yè)務邏輯從Dubbo線程池(大小默認200)切換到用戶自定義的業(yè)務線程來執(zhí)行,提升Dubbo線程池請求的處理能力,同時增加自定義業(yè)務線程池,提升服務器的資源利用率。接下來我們來看下CompletableFuture怎么異步化Dubbo接口以及其原理。

2.1 接口改造方式

getRecommendContent為老的方法,asyncGetRecommendContent為新添加的異步方法;老的方法保留,兼容沒有升級的調用方;添加新的異步方法,返回值使用CompletableFuture進行包裝:

public interface RecommendAtConnectApi {


    Result<RecommendAtConnectRes> getRecommendContent(RecommendAtConnectReq request);


    CompletableFuture<Result<RecommendAtConnectRes>> asyncGetRecommendContent(RecommendAtConnectReq request);
}

2.2 future使用方式

下面先介紹幾種常用的使用方式:

  • future的結果獲取到時轉化處理(thenApply)
CompletableFuture<String> cFuture = cAsyncService.asyncSayHello(name);
CompletableFuture<DataDTO> finalFuture = cFuture.thenApply(c -> new DataDTO());
return finalFuture;
  • 多個future組合轉化(thenCombine),超過2個可使用allOf,后面實踐有使用到
CompletableFuture<String> cFuture = cAsyncService.asyncSayHello(name);
CompletableFuture<String> dFuture = dAsyncService.asyncSayHello(name);
CompletableFuture<DataDTO> allFuture = cFuture.thenCombine(dFuture, (c, d) -> new DataDTO());
return allFuture;
  • 多個future前后依賴(thenCompose)
CompletableFuture<Optional<RecommendAtConnectDto>> taskEngineFuture = pushGsTaskEngineHandler.asyncPushHandler(connectRequest);
CompletableFuture<Optional<RecommendAtConnectDto>> refundFuture = getNextFuture(taskEngineFuture, connectRequest, unused ->pushLogisticsRefundHandler.asyncPushHandler(connectRequest));
return refundFuture;


//回調工具方法
public static CompletableFuture<Optional<RecommendAtConnectDto>> getNextFuture(CompletableFuture<Optional<RecommendAtConnectDto>> beforeFuture,
                                                                         RecommendAtConnectRequest request,
                                                                         Function<RecommendAtConnectRequest, CompletableFuture<Optional<RecommendAtConnectDto>>> function) {
    return beforeFuture.thenCompose(recommendAtConnectDto -> {
        if (!recommendAtConnectDto.isPresent()) {
            return function.apply(request);
        }
        return beforeFuture;
    });
}

還有很多其他的使用方式這里就不再一一介紹,大家感興趣了可以去看下官方文檔https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

2.3 CompletableFuture原理

//CompletableFuture源碼
volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack;    // Top of Treiber stack of dependent actions

CompletableFuture有兩個非常重要的屬性result和stack,result是future中的結果,stack是future獲取到結果時回調的函數(shù)動作存儲的棧,stack是一個Completion,Completion中有指向下一個Completion的指針。

thenApply

//thenApply源碼
private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        //生成當前future的依賴
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        //CAS操作壓入棧中
        push(c);
        //嘗試運行一下
        c.tryFire(SYNC);
    }
    return d;
}

thenApply的原理比較簡單在調用的時候會將回調的邏輯生成UniApply壓入棧中,UniApply中包含了返回的future和當前的feture,等到當前future有結果返回時,會回調執(zhí)行棧中的函數(shù)f。

thenCombine

//thenCombine源碼
private <U,V> CompletableFuture<V> biApplyStage(
    Executor e, CompletionStage<U> o,
    BiFunction<? super T,? super U,? extends V> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture<V> d = new CompletableFuture<V>();
    if (e != null || !d.biApply(this, b, f, null)) {
        //生成二元依賴的BiCompletion
        BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
        //將其壓入當前和組合的future棧中
        bipush(b, c);
        c.tryFire(SYNC);
    }
    return d;
}

thenCombine依賴兩個future,返回一個新的future,當依賴的兩個future都有結果返回之后,回調傳入的函數(shù)動作。

thenCompose

//thenCompose源碼
private <V> CompletableFuture<V> uniComposeStage(
    Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
    if (f == null) throw new NullPointerException();
    Object r; Throwable x;
    //如果線程池為空且當前future已經有結果
    if (e == null && (r = result) != null) {
        // try to return function result directly
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                return new CompletableFuture<V>(encodeThrowable(x, r));
            }
            r = null;
        }
        try {
            @SuppressWarnings("unchecked") T t = (T) r;
            //將當前處理結果作為f的輸入,并執(zhí)行f得到新的future g
            CompletableFuture<V> g = f.apply(t).toCompletableFuture();
            Object s = g.result;
            //如果已經有結果直接返回
            if (s != null)
                return new CompletableFuture<V>(encodeRelay(s));
            //new一個返回的future
            CompletableFuture<V> d = new CompletableFuture<V>();
            //生成一個元依賴的UniCompletion
            UniRelay<V> copy = new UniRelay<V>(d, g);
            //將其壓入g的棧中
            g.push(copy);
            copy.tryFire(SYNC);
            return d;
        } catch (Throwable ex) {
            return new CompletableFuture<V>(encodeThrowable(ex));
        }
    }
    //如果當前結果為空,則直接生成當前feture的依賴,壓入棧中
    CompletableFuture<V> d = new CompletableFuture<V>();
    UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
    push(c);
    c.tryFire(SYNC);
    return d;
}

CompletableFuture底層借助了魔法類Unsafe的相關CAS方法,除了get或join阻塞之外,其他方法都實現(xiàn)了無鎖操作。

3、實踐經驗

3.1 機器人場景選擇

這次實踐主要選擇了機器人的3個場景進行改造:訂單詳情頁和聊天頁猜你想問以及輸入聯(lián)想。選擇這3個場景的原因如下:

  • 接口qps高,異步化ROI高
  • 大量調用外部接口,屬于IO密集型場景,異步化提升效果明顯
  • 出于安全和穩(wěn)定性的考慮,機器人核心的對話接口不受這3個接口異步化的影響

3.2 最佳實踐

3.2.1 梳理接口的先后依賴關系

不管是新的功能的開發(fā)還是老的代碼的改造這一步都至關重要,我們可以像梳理電路圖一樣梳理接口之間的先后依賴關系,將并行關系和串行關系梳理出來,筆者在實踐之后才明白這個道理,希望這份經驗能幫助大家少走一些彎路:

圖片

  • 圖中每個CF為接口或者service返回的CompletableFuture
  • CF1、CF2和CF3同一層的代表它們是并行的關系,CF2和CF4前后代表它們是依賴的關系
  • 最后組裝3條并行鏈路的結果一起返回

3.2.2 代碼編寫

這里基于上述的梳理出來的圖例寫一下具體的代碼

public CompletableFuture<CFResponse> getResult(){


    //并行3條鏈路
    CompletableFuture<CF1Response> cf1 = cf1Service.getResult();
    CompletableFuture<CF2CombineResponse> cf2Combine = getCf2Combine();
    CompletableFuture<CF3CombineResponse> cf3Combine = getCf3Combine();


    //組合3個future,轉化結果
    CompletableFuture<Void> finalFuture = CompletableFuture.allOf(cf1, cf2Combine, cf3Combine);
    return finalFuture.thenApply((unused, r) -> new CFResponse(cf1.get().getCf1Value() + 
            cf2Combine.get().getCf2CombineValue() + cf3Combine.get().getCf3CombineValue()));
}


//第二條鏈路的執(zhí)行
private CompletableFuture<CF2CombineResponse> getCf2Combine() {
    CompletableFuture<CF2Response> cf2 = cf2Service.getResult();
    return cf2.thenCompose(cf2Response -> {
        CompletableFuture<CF4Response> cf3 = cf4Service.getResult(cf2Response.getCf2Value());
        return cf3.thenApply(cf4Response -> new CF2CombineResponse(cf4Response.getCf4Value()));
    });
}


//第三條鏈路的執(zhí)行
private CompletableFuture<CF3CombineResponse> getCf3Combine() {
    CompletableFuture<CF3Response> cf3 = cf3Service.getResult();
    return cf3.thenCompose(cf3Response -> {
        CompletableFuture<CF5Response> cf5 = cf5Service.getResult(cf3Response.getCf3Value());
        CompletableFuture<CF6Response> cf6 = cf6Service.getResult(cf3Response.getCf3Value());
        return CompletableFuture.allOf(cf5, cf6).thenCompose(unused -> cf7Service.getResult(cf5.get().getCf5Value(), cf6.get().getCf6Value()));
    });
}

實際改造代碼片段

接口:

public interface RecommendAtConnectApi {


    /**
     * 聊天頁
     * @param request
     * @return
     */
    Result<RecommendAtConnectRes> getRecommendContentNew(RecommendAtConnectReq request);


    /**
     * 聊天頁異步
     * @param request
     * @return
     */
    CompletableFuture<Result<RecommendAtConnectRes>> asyncGetRecommendContentNew(RecommendAtConnectReq request);
}

thenApply結果轉化

public CompletableFuture<RecommendAtConnectRes> asyncGetRecommendContent(RecommendAtConnectReq request) {
    RecommendAtConnectRequest recommendAtConnectRequest = getRecommendAtConnectRequest(request);
    CompletableFuture<RecommendAtConnectDto> future = recommendAtConnectEventHandlerChain.asyncHandlerOfRecommendAtConnect(recommendAtConnectRequest);
    return Objects.isNull(future)? null: future.thenApply(this::dtoToRes);
}

前后future依賴:

//future編排
CompletableFuture<Optional<RecommendAtConnectDto>> taskEngineFuture = pushGsTaskEngineHandler.asyncPushHandler(connectRequest);


CompletableFuture<Optional<RecommendAtConnectDto>> refundFuture = getNextFuture(taskEngineFuture, connectRequest, unused ->pushLogisticsRefundHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> serviceCaseFuture = getNextFuture(refundFuture, connectRequest, unused ->pushServiceCaseHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> orderFuture = getNextFuture(serviceCaseFuture, connectRequest, unused->pushOrderSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> spuFuture = getNextFuture(orderFuture, connectRequest, unused->pushSpuSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> customerCenterFuture = getNextFuture(spuFuture, connectRequest, unused->pushCustomerCenterSourcePredictHandler.asyncPushHandler(connectRequest));
CompletableFuture<Optional<RecommendAtConnectDto>> guessQuestionFuture = getNextFuture(customerCenterFuture,connectRequest, unused ->  pushGuessQuestionHandler.asyncPushHandler(connectRequest));
finalFuture = getNextFuture(guessQuestionFuture, connectRequest, unused -> pushWelcomeHandler.asyncPushHandler(connectRequest));


//回調工具方法
public static CompletableFuture<Optional<RecommendAtConnectDto>> getNextFuture(CompletableFuture<Optional<RecommendAtConnectDto>> beforeFuture,
                                                                         RecommendAtConnectRequest request,
                                                                         Function<RecommendAtConnectRequest, CompletableFuture<Optional<RecommendAtConnectDto>>> function) {
    return beforeFuture.thenCompose(recommendAtConnectDto -> {
        if (!recommendAtConnectDto.isPresent()) {
            return function.apply(request);
        }
        return beforeFuture;
    });
}

3.2.3 線程池

自定義業(yè)務線程池

處理具體的業(yè)務邏輯時,如果不傳入線程池,默認使用ForkJoinPool的commonPool,其線程數(shù)量默認是CPU的核心數(shù)量-1,推薦傳入自定義的業(yè)務線程池,防止阻塞dubbo線程。

//自定義dubbo業(yè)務線程池
@Bean(name = "dubboAsyncBizExecutor")
public ThreadPoolTaskExecutor dubboAsyncBizExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(200);
    executor.setMaxPoolSize(200);
    executor.setQueueCapacity(50);
    executor.setThreadNamePrefix("dubboAsyncBizExecutor-");
    executor.setRejectedExecutionHandler((r, executor1) -> log.error("dubbo async biz  task exceed limit"));
    return executor;
}


public CompletableFuture<Result<GuessQuestionResponse>> asyncPredictQuestion(PredictQuestionExtRequest request) {
    log.info("asyncPredictQuestion start");
    CompletableFuture<Result<GuessQuestionResponse>> resultCompletableFuture =
            CompletableFuture.supplyAsync(() -> predictQuestionNew(request), dubboAsyncBizExecutor);
    log.info("asyncPredictQuestion end");
    return resultCompletableFuture;
}
同步和異步線程隔離(目前最新正式版本3.2.0支持)
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
        xmlns="http://www.springframework.org/schema/beans"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">


<!-- NOTE: we need config executor-management-mode="isolation" -->
<dubbo:application name="demo-provider" executor-management-mode="isolation">
</dubbo:application>






<bean id="syncService" class="org.apache.dubbo.config.spring.impl.SyncServiceImpl"/>
<bean id="asyncService" class="org.apache.dubbo.config.spring.impl.AsyncServiceImpl"/>




<!-- customized thread pool -->
<bean id="executor-sync-service"
class="org.apache.dubbo.config.spring.isolation.spring.support.SyncServiceExecutor"/>
<bean id="executor-async-service"
class="org.apache.dubbo.config.spring.isolation.spring.support.AsyncServiceExecutor"/>




<dubbo:service executor="executor-sync-service"
interface="org.apache.dubbo.config.spring.api.SyncService" version="1.0.0" 
        timeout="3000" ref="syncService" />




<dubbo:service executor="executor-async-service"
interface="org.apache.dubbo.config.spring.api.AsyncService" version="1.0.0"
        timeout="5000" ref="asyncService" />


</beans>

3.2.4 異常處理

CompletableFuture異常處理使用回調exceptionally,當CompletableFuture執(zhí)行的過程拋出了異常,會使用CompletionException進行封裝然后拋出。

CompletableFuture<RecommendAtConnectDto> asyncPushContent(RecommendAtConnectRequest connectRequest) {
    //業(yè)務方法,內部會發(fā)起異步rpc調用
    CompletableFuture<String> future = orderSourcePredictHandlerChain.asyncHandleOfPredict(connectRequest);
    //這里回調方法thenApply,如果發(fā)生異常thenApply內部會通過new CompletionException(throwable) 對異常進行包裝
    return Objects.isNull(future)? null : future.thenApply(messageBody->{ 
        if (StrUtil.isBlank(messageBody)){
            log.info(" async orderSourcePredictHandlerChain.handleOfPredict fail, connectRequest:{}", JSON.toJSONString(connectRequest));
            return null;
        }
        RecommendAtConnectDto connectDto = RecommendAtConnectDtoUtil.getDto(
                messageBody, connectRequest.getSessionId(),
                connectRequest.getCreateChatReq().getUserId(), MessageBodyTypeEnum.MULTI_STAGE.getCode(), EventEnum.PUSH_MULTI_STAGE_MESSAGE.getCode());
        return connectDto;
    }).exceptionally(err -> {
        //通過exceptionally 捕獲異常,這里的err已經被thenApply包裝過,因此需要通過Throwable.getCause()提取異常
         log.error("orderSourcePredictHandlerChain.handleOfPredict Exception cnotallow={}", JSON.toJSONString(connectRequest), ExceptionUtils.extractRealException(err));
         return 0;
      });
}

異常使用自定義工具類ExceptionUtils進行提取。

public class ExceptionUtils {
    public static Throwable extractRealException(Throwable throwable) {
          //這里判斷異常類型是否為CompletionException、ExecutionException,如果是則進行提取,否則直接返回。
        if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
            if (throwable.getCause() != null) {
                return throwable.getCause();
            }
        }
        return throwable;
    }
}

3.2.5 穩(wěn)定性保障

  • 改造的過程從上到下改動的同步方法保持不變,新增異步的方法進行支持
  • 改造的接口是上游服務端依賴的,和上游服務端溝通,通過AB控制調用同步和異步接口
  • 改造的接口是App端依賴的,在接口實現(xiàn)處通過AB控制調用異步和同步service
  • 通過以上三種方法可以實現(xiàn)一鍵回滾到最初的邏輯

3.3 遇到的問題

  • CompletableFuture回調方法中打印的日志會丟失traceId,已找監(jiān)控團隊幫忙支持解決,但是會增加應用gc的次數(shù),現(xiàn)在生產上是白名單應用開放中
  • 異步接口線程池和同步接口線程池隔離在dubbo最新發(fā)布的正式版本3.2.0支持
  • CompletableFuture.thenCompose不支持返回null,需要將返回值用Optional包裝返回
  • 打印日志的位置變更,由于返回值是future,拿不到真實的結果,只能在回調之中打印日志才能看到真實的結果
  • 監(jiān)控平臺監(jiān)控的平均耗時不包含回調的耗時,對于排查接口性能問題會增加一些難度,例如5月10日遇到了一個異步接口耗時同比增加了50%,但是從監(jiān)控平臺上看到平均耗時并沒有明顯增加

4、異步化收益

  • 壓測接口性能提升了50%

    圖片

  • 線上接口RT降低1/4左右,其中輸入聯(lián)想接口RT由173.04ms降為119.43ms
  • 服務器資源縮減了1/3

圖片

  • 服務器資源利用率提升

異步化之前CPU的使用率:

圖片

異步化縮減機器之后CPU的使用率:

圖片

可以看到dubbo異步化之后,服務器cpu的使用率由18左右提升到了50%左右,大家在進行機器縮減時需要關注一下CPU的使用率,當CPU的使用率超過60%時就會引發(fā)報警,這個就是我們縮減的極限了,如果在繼續(xù)縮減在一些流量高峰或者流量飆升的場景會出現(xiàn)風險。

5、其他

  • Dubbo異步化對編程者的代碼水平和架構能力都有一定的要求,同時在對老的代碼異步化的過程中,通過對上述接口先后調用關系的梳理也能發(fā)現(xiàn)很多代碼不合理或者有性能問題的地方,對代碼質量的提高也有一定的好處,其實就算不是想異步化,而是想提高代碼的并發(fā)度,這種前后依賴關系的梳理也是必不可少的,只不過異步化是將程序的并發(fā)度提升到極致的一種表現(xiàn)。
  • Dubbo異步化編程和以往的同步編程習慣可能有所不同,但是轉念一想,是不是異步化才是現(xiàn)實世界中更加真實的寫照,更加的符合現(xiàn)實世界運轉的規(guī)律,我們在規(guī)劃做一件事情時,往往會將事情進行拆解,然后同時(是指同一段時間不是同一刻)去做沒有先后依賴關系的多件事情,而不是做一件事,然后一直等到有結果了再去做其他事情。
  • 通過壓測我們發(fā)現(xiàn)當壓測qps不斷提高依賴的接口或者組件的耗時增加比較明顯,且慢慢成為性能提升的瓶頸時,異步帶來的提升效果會受到此瓶頸的制約,帶來提升會有一定比例的折扣,所以大家在做異步化實踐時,需要稍微降低一些提升的預期。

6、總結

通過這次實踐,我們使用CompletableFuture將Dubbo接口進行了異步化,同時利用CompletableFuture的異步回調能力,減少了服務依賴之間的阻塞,增加了dubbo線程的處理請求的能力,同時利用CompletableFuture傳入的業(yè)務線程提高了服務器CPU資源的利用率,用更少的硬件資源可以處理更多的請求,為公司的降本增效貢獻了一小份力量。

責任編輯:武曉燕 來源: 得物技術
相關推薦

2017-11-14 10:23:20

HTTP服務異步

2022-05-13 12:34:16

美團開發(fā)實踐

2022-08-15 08:01:35

微服務框架RPC

2022-09-09 10:01:11

服務網格云原生交付請求

2022-07-01 08:14:28

Dubbo異步代碼

2022-01-27 08:27:23

Dubbo上下線設計

2021-08-09 10:21:42

云原生Dubbo3.0 服務治理

2021-01-28 11:40:34

Dubbo異步配置

2023-09-07 20:04:06

前后端趨勢Node.js

2022-03-15 18:33:34

URL重構Dubbo3.0

2014-07-07 09:19:32

異步異步編程

2009-08-21 11:02:55

C#異步調用

2023-02-20 15:29:46

異步編碼多線程

2018-10-18 09:34:16

高并發(fā)異步化并行化

2023-07-27 07:00:01

轉轉門店商編程

2019-01-11 09:41:56

網易考拉服務架構微服務

2012-02-17 09:33:52

虛擬化桌面虛擬化

2022-05-09 08:34:01

FeignhttpJava

2022-06-17 09:30:00

參數(shù)化測試TestNG測試

2017-07-24 13:58:49

Android組件化插件化
點贊
收藏

51CTO技術棧公眾號