實(shí)現(xiàn)異步編程,這個(gè)工具類你得掌握!
本文轉(zhuǎn)載自微信公眾號(hào)「月伴飛魚」,作者日常加油站。轉(zhuǎn)載本文請(qǐng)聯(lián)系月伴飛魚公眾號(hào)。
前言
最近看公司代碼,多線程編程用的比較多,其中有對(duì)CompletableFuture的使用,所以想寫篇文章總結(jié)下
在日常的Java8項(xiàng)目開發(fā)中,CompletableFuture是很強(qiáng)大的并行開發(fā)工具,其語法貼近java8的語法風(fēng)格,與stream一起使用也能大大增加代碼的簡潔性
大家可以多應(yīng)用到工作中,提升接口性能,優(yōu)化代碼
基本介紹
CompletableFuture是Java 8新增的一個(gè)類,用于異步編程,繼承了Future和CompletionStage
這個(gè)Future主要具備對(duì)請(qǐng)求結(jié)果獨(dú)立處理的功能,CompletionStage用于實(shí)現(xiàn)流式處理,實(shí)現(xiàn)異步請(qǐng)求的各個(gè)階段組合或鏈?zhǔn)教幚?,因此completableFuture能實(shí)現(xiàn)整個(gè)異步調(diào)用接口的扁平化和流式處理,解決原有Future處理一系列鏈?zhǔn)疆惒秸?qǐng)求時(shí)的復(fù)雜編碼
Future的局限性
1、Future 的結(jié)果在非阻塞的情況下,不能執(zhí)行更進(jìn)一步的操作
我們知道,使用Future時(shí)只能通過isDone()方法判斷任務(wù)是否完成,或者通過get()方法阻塞線程等待結(jié)果返回,它不能非阻塞的情況下,執(zhí)行更進(jìn)一步的操作。
2、不能組合多個(gè)Future的結(jié)果
假設(shè)你有多個(gè)Future異步任務(wù),你希望最快的任務(wù)執(zhí)行完時(shí),或者所有任務(wù)都執(zhí)行完后,進(jìn)行一些其他操作
3、多個(gè)Future不能組成鏈?zhǔn)秸{(diào)用
當(dāng)異步任務(wù)之間有依賴關(guān)系時(shí),F(xiàn)uture不能將一個(gè)任務(wù)的結(jié)果傳給另一個(gè)異步任務(wù),多個(gè)Future無法創(chuàng)建鏈?zhǔn)降墓ぷ髁鳌?/p>
4、沒有異常處理
現(xiàn)在使用CompletableFuture能幫助我們完成上面的事情,讓我們編寫更強(qiáng)大、更優(yōu)雅的異步程序
基本使用
創(chuàng)建異步任務(wù)
通常可以使用下面幾個(gè)CompletableFuture的靜態(tài)方法創(chuàng)建一個(gè)異步任務(wù)
- public static CompletableFuture<Void> runAsync(Runnable runnable); //創(chuàng)建無返回值的異步任務(wù)
- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor); //無返回值,可指定線程池(默認(rèn)使用ForkJoinPool.commonPool)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); //創(chuàng)建有返回值的異步任務(wù)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); //有返回值,可指定線程池
使用示例:
- Executor executor = Executors.newFixedThreadPool(10);
- CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
- //do something
- }, executor);
- int poiId = 111;
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
- PoiDTO poi = poiService.loadById(poiId);
- return poi.getName();
- });
- // Block and get the result of the Future
- String poiName = future.get();
使用回調(diào)方法
通過future.get()方法獲取異步任務(wù)的結(jié)果,還是會(huì)阻塞的等待任務(wù)完成
CompletableFuture提供了幾個(gè)回調(diào)方法,可以不阻塞主線程,在異步任務(wù)完成后自動(dòng)執(zhí)行回調(diào)方法中的代碼
- public CompletableFuture<Void> thenRun(Runnable runnable); //無參數(shù)、無返回值
- public CompletableFuture<Void> thenAccept(Consumer<? super T> action); //接受參數(shù),無返回值
- public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); //接受參數(shù)T,有返回值U
使用示例:
- CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
- .thenRun(() -> System.out.println("do other things. 比如異步打印日志或發(fā)送消息"));
- //如果只想在一個(gè)CompletableFuture任務(wù)執(zhí)行完后,進(jìn)行一些后續(xù)的處理,不需要返回值,那么可以用thenRun回調(diào)方法來完成。
- //如果主線程不依賴thenRun中的代碼執(zhí)行完成,也不需要使用get()方法阻塞主線程。
- CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
- .thenAccept((s) -> System.out.println(s + " world"));
- //輸出:Hello world
- //回調(diào)方法希望使用異步任務(wù)的結(jié)果,并不需要返回值,那么可以使用thenAccept方法
- CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
- PoiDTO poi = poiService.loadById(poiId);
- return poi.getMainCategory();
- }).thenApply((s) -> isMainPoi(s)); // boolean isMainPoi(int poiId);
- future.get();
- //希望將異步任務(wù)的結(jié)果做進(jìn)一步處理,并需要返回值,則使用thenApply方法。
- //如果主線程要獲取回調(diào)方法的返回,還是要用get()方法阻塞得到
組合兩個(gè)異步任務(wù)
- //thenCompose方法中的異步任務(wù)依賴調(diào)用該方法的異步任務(wù)
- public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
- //用于兩個(gè)獨(dú)立的異步任務(wù)都完成的時(shí)候
- public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
- BiFunction<? super T,? super U,? extends V> fn);
使用示例:
- CompletableFuture<List<Integer>> poiFuture = CompletableFuture.supplyAsync(
- () -> poiService.queryPoiIds(cityId, poiId)
- );
- //第二個(gè)任務(wù)是返回CompletableFuture的異步方法
- CompletableFuture<List<DealGroupDTO>> getDeal(List<Integer> poiIds){
- return CompletableFuture.supplyAsync(() -> poiService.queryPoiIds(poiIds));
- }
- //thenCompose
- CompletableFuture<List<DealGroupDTO>> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds));
- resultFuture.get();
thenCompose和thenApply的功能類似,兩者區(qū)別在于thenCompose接受一個(gè)返回CompletableFuture的Function,當(dāng)想從回調(diào)方法返回的CompletableFuture中直接獲取結(jié)果U時(shí),就用thenCompose
如果使用thenApply,返回結(jié)果resultFuture的類型是CompletableFuture>>,而不是CompletableFuture>
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
- .thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2);
- //future.get()
組合多個(gè)CompletableFuture
當(dāng)需要多個(gè)異步任務(wù)都完成時(shí),再進(jìn)行后續(xù)處理,可以使用allOf方法
- CompletableFuture<Void> poiIDTOFuture = CompletableFuture
- .supplyAsync(() -> poiService.loadPoi(poiId))
- .thenAccept(poi -> {
- model.setModelTitle(poi.getShopName());
- //do more thing
- });
- CompletableFuture<Void> productFuture = CompletableFuture
- .supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId))
- .thenAccept(list -> {
- model.setDefaultCount(list.size());
- model.setMoreDesc("more");
- });
- //future3等更多異步任務(wù),這里就不一一寫出來了
- CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join(); //allOf組合所有異步任務(wù),并使用join獲取結(jié)果
該方法挺適合C端的業(yè)務(wù),比如通過poiId異步的從多個(gè)服務(wù)拿門店信息,然后組裝成自己需要的模型,最后所有門店信息都填充完后返回
這里使用了join方法獲取結(jié)果,它和get方法一樣阻塞的等待任務(wù)完成
多個(gè)異步任務(wù)有任意一個(gè)完成時(shí)就返回結(jié)果,可以使用anyOf方法
- CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
- try {
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- return "Result of Future 1";
- });
- CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- return "Result of Future 2";
- });
- CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- return "Result of Future 3";
- });
- CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
- System.out.println(anyOfFuture.get()); // Result of Future 2
異常處理
- Integer age = -1;
- CompletableFuture<Void> maturityFuture = CompletableFuture.supplyAsync(() -> {
- if(age < 0) {
- throw new IllegalArgumentException("Age can not be negative");
- }
- if(age > 18) {
- return "Adult";
- } else {
- return "Child";
- }
- }).exceptionally(ex -> {
- System.out.println("Oops! We have an exception - " + ex.getMessage());
- return "Unknown!";
- }).thenAccept(s -> System.out.print(s));
- //Unkown!
exceptionally方法可以處理異步任務(wù)的異常,在出現(xiàn)異常時(shí),給異步任務(wù)鏈一個(gè)從錯(cuò)誤中恢復(fù)的機(jī)會(huì),可以在這里記錄異?;蚍祷匾粋€(gè)默認(rèn)值
使用handler方法也可以處理異常,并且無論是否發(fā)生異常它都會(huì)被調(diào)用
- Integer age = -1;
- CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
- if(age < 0) {
- throw new IllegalArgumentException("Age can not be negative");
- }
- if(age > 18) {
- return "Adult";
- } else {
- return "Child";
- }
- }).handle((res, ex) -> {
- if(ex != null) {
- System.out.println("Oops! We have an exception - " + ex.getMessage());
- return "Unknown!";
- }
- return res;
- });
分片處理
分片和并行處理:分片借助stream實(shí)現(xiàn),然后通過CompletableFuture實(shí)現(xiàn)并行執(zhí)行,最后做數(shù)據(jù)聚合(其實(shí)也是stream的方法)
CompletableFuture并不提供單獨(dú)的分片api,但可以借助stream的分片聚合功能實(shí)現(xiàn)
舉個(gè)例子:
- //請(qǐng)求商品數(shù)量過多時(shí),做分批異步處理
- List<List<Long>> skuBaseIdsList = ListUtils.partition(skuIdList, 10);//分片
- //并行
- List<CompletableFuture<List<SkuSales>>> futureList = Lists.newArrayList();
- for (List<Long> skuId : skuBaseIdsList) {
- CompletableFuture<List<SkuSales>> tmpFuture = getSkuSales(skuId);
- futureList.add(tmpFuture);
- }
- //聚合
- futureList.stream().map(CompletalbleFuture::join).collent(Collectors.toList());
舉個(gè)例子
帶大家領(lǐng)略下CompletableFuture異步編程的優(yōu)勢(shì)
這里我們用CompletableFuture實(shí)現(xiàn)水泡茶程序
首先還是需要先完成分工方案,在下面的程序中,我們分了3個(gè)任務(wù):
- 任務(wù)1負(fù)責(zé)洗水壺、燒開水
- 任務(wù)2負(fù)責(zé)洗茶壺、洗茶杯和拿茶葉
- 任務(wù)3負(fù)責(zé)泡茶。其中任務(wù)3要等待任務(wù)1和任務(wù)2都完成后才能開始
下面是代碼實(shí)現(xiàn),你先略過runAsync()、supplyAsync()、thenCombine()這些不太熟悉的方法,從大局上看,你會(huì)發(fā)現(xiàn):
- 無需手工維護(hù)線程,沒有繁瑣的手工維護(hù)線程的工作,給任務(wù)分配線程的工作也不需要我們關(guān)注;
- 語義更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能夠清晰地表述任務(wù)3要等待任務(wù)1和任務(wù)2都完成后才能開始;
- 代碼更簡練并且專注于業(yè)務(wù)邏輯,幾乎所有代碼都是業(yè)務(wù)邏輯相關(guān)的
- //任務(wù)1:洗水壺->燒開水
- CompletableFuture f1 =
- CompletableFuture.runAsync(()->{
- System.out.println("T1:洗水壺...");
- sleep(1, TimeUnit.SECONDS);
- System.out.println("T1:燒開水...");
- sleep(15, TimeUnit.SECONDS);
- });
- //任務(wù)2:洗茶壺->洗茶杯->拿茶葉
- CompletableFuture f2 =
- CompletableFuture.supplyAsync(()->{
- System.out.println("T2:洗茶壺...");
- sleep(1, TimeUnit.SECONDS);
- System.out.println("T2:洗茶杯...");
- sleep(2, TimeUnit.SECONDS);
- System.out.println("T2:拿茶葉...");
- sleep(1, TimeUnit.SECONDS);
- return "龍井";
- });
- //任務(wù)3:任務(wù)1和任務(wù)2完成后執(zhí)行:泡茶
- CompletableFuture f3 =
- f1.thenCombine(f2, (__, tf)->{
- System.out.println("T1:拿到茶葉:" + tf);
- System.out.println("T1:泡茶...");
- return "上茶:" + tf;
- });
- //等待任務(wù)3執(zhí)行結(jié)果
- System.out.println(f3.join());
- void sleep(int t, TimeUnit u) {
- try {
- u.sleep(t);
- }catch(InterruptedException e){}
- }
注意事項(xiàng)
1.CompletableFuture默認(rèn)線程池是否滿足使用
前面提到創(chuàng)建CompletableFuture異步任務(wù)的靜態(tài)方法runAsync和supplyAsync等,可以指定使用的線程池,不指定則用CompletableFuture的默認(rèn)線程池
- private static final Executor asyncPool = useCommonPool ?
- ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
可以看到,CompletableFuture默認(rèn)線程池是調(diào)用ForkJoinPool的commonPool()方法創(chuàng)建,這個(gè)默認(rèn)線程池的核心線程數(shù)量根據(jù)CPU核數(shù)而定,公式為Runtime.getRuntime().availableProcessors() - 1,以4核雙槽CPU為例,核心線程數(shù)量就是4*2-1=7個(gè)
這樣的設(shè)置滿足CPU密集型的應(yīng)用,但對(duì)于業(yè)務(wù)都是IO密集型的應(yīng)用來說,是有風(fēng)險(xiǎn)的,當(dāng)qps較高時(shí),線程數(shù)量可能就設(shè)的太少了,會(huì)導(dǎo)致線上故障
所以可以根據(jù)業(yè)務(wù)情況自定義線程池使用
2.get設(shè)置超時(shí)時(shí)間不能串行g(shù)et,不然會(huì)導(dǎo)致接口延時(shí)線程數(shù)量*超時(shí)時(shí)間