自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實(shí)現(xiàn)異步編程,這個(gè)工具類你得掌握!

開發(fā) 開發(fā)工具
在日常的Java8項(xiàng)目開發(fā)中,CompletableFuture是很強(qiáng)大的并行開發(fā)工具,其語法貼近java8的語法風(fēng)格,與stream一起使用也能大大增加代碼的簡潔性。

 [[393773]]

本文轉(zhuǎn)載自微信公眾號(hào)「月伴飛魚」,作者日常加油站。轉(zhuǎn)載本文請(qǐng)聯(lián)系月伴飛魚公眾號(hào)。   

前言

最近看公司代碼,多線程編程用的比較多,其中有對(duì)CompletableFuture的使用,所以想寫篇文章總結(jié)下

在日常的Java8項(xiàng)目開發(fā)中,CompletableFuture是很強(qiáng)大的并行開發(fā)工具,其語法貼近java8的語法風(fēng)格,與stream一起使用也能大大增加代碼的簡潔性

大家可以多應(yīng)用到工作中,提升接口性能,優(yōu)化代碼

基本介紹

CompletableFuture是Java 8新增的一個(gè)類,用于異步編程,繼承了Future和CompletionStage

這個(gè)Future主要具備對(duì)請(qǐng)求結(jié)果獨(dú)立處理的功能,CompletionStage用于實(shí)現(xiàn)流式處理,實(shí)現(xiàn)異步請(qǐng)求的各個(gè)階段組合或鏈?zhǔn)教幚?,因此completableFuture能實(shí)現(xiàn)整個(gè)異步調(diào)用接口的扁平化和流式處理,解決原有Future處理一系列鏈?zhǔn)疆惒秸?qǐng)求時(shí)的復(fù)雜編碼

Future的局限性

1、Future 的結(jié)果在非阻塞的情況下,不能執(zhí)行更進(jìn)一步的操作

我們知道,使用Future時(shí)只能通過isDone()方法判斷任務(wù)是否完成,或者通過get()方法阻塞線程等待結(jié)果返回,它不能非阻塞的情況下,執(zhí)行更進(jìn)一步的操作。

2、不能組合多個(gè)Future的結(jié)果

假設(shè)你有多個(gè)Future異步任務(wù),你希望最快的任務(wù)執(zhí)行完時(shí),或者所有任務(wù)都執(zhí)行完后,進(jìn)行一些其他操作

3、多個(gè)Future不能組成鏈?zhǔn)秸{(diào)用

當(dāng)異步任務(wù)之間有依賴關(guān)系時(shí),F(xiàn)uture不能將一個(gè)任務(wù)的結(jié)果傳給另一個(gè)異步任務(wù),多個(gè)Future無法創(chuàng)建鏈?zhǔn)降墓ぷ髁鳌?/p>

4、沒有異常處理

現(xiàn)在使用CompletableFuture能幫助我們完成上面的事情,讓我們編寫更強(qiáng)大、更優(yōu)雅的異步程序

基本使用

創(chuàng)建異步任務(wù)

通常可以使用下面幾個(gè)CompletableFuture的靜態(tài)方法創(chuàng)建一個(gè)異步任務(wù)

  1. public static CompletableFuture<Void> runAsync(Runnable runnable);              //創(chuàng)建無返回值的異步任務(wù) 
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);     //無返回值,可指定線程池(默認(rèn)使用ForkJoinPool.commonPool) 
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);           //創(chuàng)建有返回值的異步任務(wù) 
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); //有返回值,可指定線程池 

使用示例:

  1. Executor executor = Executors.newFixedThreadPool(10); 
  2. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { 
  3.     //do something 
  4. }, executor); 
  5. int poiId = 111; 
  6. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
  7.  PoiDTO poi = poiService.loadById(poiId); 
  8.   return poi.getName(); 
  9. }); 
  10. // Block and get the result of the Future 
  11. String poiName = future.get(); 

使用回調(diào)方法

通過future.get()方法獲取異步任務(wù)的結(jié)果,還是會(huì)阻塞的等待任務(wù)完成

CompletableFuture提供了幾個(gè)回調(diào)方法,可以不阻塞主線程,在異步任務(wù)完成后自動(dòng)執(zhí)行回調(diào)方法中的代碼

  1. public CompletableFuture<Void> thenRun(Runnable runnable);            //無參數(shù)、無返回值 
  2. public CompletableFuture<Void> thenAccept(Consumer<? super T> action);         //接受參數(shù),無返回值 
  3. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); //接受參數(shù)T,有返回值U 

使用示例:

  1. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello"
  2.                            .thenRun(() -> System.out.println("do other things. 比如異步打印日志或發(fā)送消息")); 
  3. //如果只想在一個(gè)CompletableFuture任務(wù)執(zhí)行完后,進(jìn)行一些后續(xù)的處理,不需要返回值,那么可以用thenRun回調(diào)方法來完成。 
  4. //如果主線程不依賴thenRun中的代碼執(zhí)行完成,也不需要使用get()方法阻塞主線程。 
  1. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello"
  2.                            .thenAccept((s) -> System.out.println(s + " world")); 
  3. //輸出:Hello world 
  4. //回調(diào)方法希望使用異步任務(wù)的結(jié)果,并不需要返回值,那么可以使用thenAccept方法 
  1. CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { 
  2.   PoiDTO poi = poiService.loadById(poiId); 
  3.   return poi.getMainCategory(); 
  4. }).thenApply((s) -> isMainPoi(s));   // boolean isMainPoi(int poiId); 
  5.  
  6. future.get(); 
  7. //希望將異步任務(wù)的結(jié)果做進(jìn)一步處理,并需要返回值,則使用thenApply方法。 
  8. //如果主線程要獲取回調(diào)方法的返回,還是要用get()方法阻塞得到 

組合兩個(gè)異步任務(wù)

  1. //thenCompose方法中的異步任務(wù)依賴調(diào)用該方法的異步任務(wù) 
  2. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);  
  3. //用于兩個(gè)獨(dú)立的異步任務(wù)都完成的時(shí)候 
  4. public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,  
  5.                                               BiFunction<? super T,? super U,? extends V> fn);  

使用示例:

  1. CompletableFuture<List<Integer>> poiFuture = CompletableFuture.supplyAsync( 
  2.   () -> poiService.queryPoiIds(cityId, poiId) 
  3. ); 
  4. //第二個(gè)任務(wù)是返回CompletableFuture的異步方法 
  5. CompletableFuture<List<DealGroupDTO>> getDeal(List<Integer> poiIds){ 
  6.   return CompletableFuture.supplyAsync(() ->  poiService.queryPoiIds(poiIds)); 
  7. //thenCompose 
  8. CompletableFuture<List<DealGroupDTO>> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds)); 
  9. resultFuture.get(); 

thenCompose和thenApply的功能類似,兩者區(qū)別在于thenCompose接受一個(gè)返回CompletableFuture的Function,當(dāng)想從回調(diào)方法返回的CompletableFuture中直接獲取結(jié)果U時(shí),就用thenCompose

如果使用thenApply,返回結(jié)果resultFuture的類型是CompletableFuture>>,而不是CompletableFuture>

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"
  2.   .thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2); 
  3. //future.get() 

組合多個(gè)CompletableFuture

當(dāng)需要多個(gè)異步任務(wù)都完成時(shí),再進(jìn)行后續(xù)處理,可以使用allOf方法

  1. CompletableFuture<Void> poiIDTOFuture = CompletableFuture 
  2.  .supplyAsync(() -> poiService.loadPoi(poiId)) 
  3.   .thenAccept(poi -> { 
  4.     model.setModelTitle(poi.getShopName()); 
  5.     //do more thing 
  6.   }); 
  7.  
  8. CompletableFuture<Void> productFuture = CompletableFuture 
  9.  .supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId)) 
  10.   .thenAccept(list -> { 
  11.     model.setDefaultCount(list.size()); 
  12.     model.setMoreDesc("more"); 
  13.   }); 
  14. //future3等更多異步任務(wù),這里就不一一寫出來了 
  15.  
  16. CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join();  //allOf組合所有異步任務(wù),并使用join獲取結(jié)果 

該方法挺適合C端的業(yè)務(wù),比如通過poiId異步的從多個(gè)服務(wù)拿門店信息,然后組裝成自己需要的模型,最后所有門店信息都填充完后返回

這里使用了join方法獲取結(jié)果,它和get方法一樣阻塞的等待任務(wù)完成

多個(gè)異步任務(wù)有任意一個(gè)完成時(shí)就返回結(jié)果,可以使用anyOf方法

  1. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { 
  2.     try { 
  3.         TimeUnit.SECONDS.sleep(2); 
  4.     } catch (InterruptedException e) { 
  5.        throw new IllegalStateException(e); 
  6.     } 
  7.     return "Result of Future 1"
  8. }); 
  9.  
  10. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { 
  11.     try { 
  12.         TimeUnit.SECONDS.sleep(1); 
  13.     } catch (InterruptedException e) { 
  14.        throw new IllegalStateException(e); 
  15.     } 
  16.     return "Result of Future 2"
  17. }); 
  18.  
  19. CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { 
  20.     try { 
  21.         TimeUnit.SECONDS.sleep(3); 
  22.     } catch (InterruptedException e) { 
  23.        throw new IllegalStateException(e); 
  24.       return "Result of Future 3"
  25. }); 
  26.  
  27. CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); 
  28.  
  29. System.out.println(anyOfFuture.get()); // Result of Future 2 

異常處理

  1. Integer age = -1; 
  2.  
  3. CompletableFuture<Void> maturityFuture = CompletableFuture.supplyAsync(() -> { 
  4.   if(age < 0) { 
  5.     throw new IllegalArgumentException("Age can not be negative"); 
  6.   } 
  7.   if(age > 18) { 
  8.     return "Adult"
  9.   } else { 
  10.     return "Child"
  11.   } 
  12. }).exceptionally(ex -> { 
  13.   System.out.println("Oops! We have an exception - " + ex.getMessage()); 
  14.   return "Unknown!"
  15. }).thenAccept(s -> System.out.print(s)); 
  16. //Unkown! 

exceptionally方法可以處理異步任務(wù)的異常,在出現(xiàn)異常時(shí),給異步任務(wù)鏈一個(gè)從錯(cuò)誤中恢復(fù)的機(jī)會(huì),可以在這里記錄異?;蚍祷匾粋€(gè)默認(rèn)值

使用handler方法也可以處理異常,并且無論是否發(fā)生異常它都會(huì)被調(diào)用

  1. Integer age = -1; 
  2.  
  3. CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { 
  4.     if(age < 0) { 
  5.         throw new IllegalArgumentException("Age can not be negative"); 
  6.     } 
  7.     if(age > 18) { 
  8.         return "Adult"
  9.     } else { 
  10.         return "Child"
  11.     } 
  12. }).handle((res, ex) -> { 
  13.     if(ex != null) { 
  14.         System.out.println("Oops! We have an exception - " + ex.getMessage()); 
  15.         return "Unknown!"
  16.     } 
  17.     return res; 
  18. }); 

分片處理

分片和并行處理:分片借助stream實(shí)現(xiàn),然后通過CompletableFuture實(shí)現(xiàn)并行執(zhí)行,最后做數(shù)據(jù)聚合(其實(shí)也是stream的方法)

CompletableFuture并不提供單獨(dú)的分片api,但可以借助stream的分片聚合功能實(shí)現(xiàn)

舉個(gè)例子:

 

  1. //請(qǐng)求商品數(shù)量過多時(shí),做分批異步處理 
  2. List<List<Long>> skuBaseIdsList = ListUtils.partition(skuIdList, 10);//分片 
  3. //并行 
  4. List<CompletableFuture<List<SkuSales>>> futureList = Lists.newArrayList(); 
  5. for (List<Long> skuId : skuBaseIdsList) { 
  6.   CompletableFuture<List<SkuSales>> tmpFuture = getSkuSales(skuId); 
  7.   futureList.add(tmpFuture); 
  8. //聚合 
  9. futureList.stream().map(CompletalbleFuture::join).collent(Collectors.toList()); 

舉個(gè)例子

帶大家領(lǐng)略下CompletableFuture異步編程的優(yōu)勢(shì)

這里我們用CompletableFuture實(shí)現(xiàn)水泡茶程序

首先還是需要先完成分工方案,在下面的程序中,我們分了3個(gè)任務(wù):

  • 任務(wù)1負(fù)責(zé)洗水壺、燒開水
  • 任務(wù)2負(fù)責(zé)洗茶壺、洗茶杯和拿茶葉
  • 任務(wù)3負(fù)責(zé)泡茶。其中任務(wù)3要等待任務(wù)1和任務(wù)2都完成后才能開始

下面是代碼實(shí)現(xiàn),你先略過runAsync()、supplyAsync()、thenCombine()這些不太熟悉的方法,從大局上看,你會(huì)發(fā)現(xiàn):

  • 無需手工維護(hù)線程,沒有繁瑣的手工維護(hù)線程的工作,給任務(wù)分配線程的工作也不需要我們關(guān)注;
  • 語義更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能夠清晰地表述任務(wù)3要等待任務(wù)1和任務(wù)2都完成后才能開始;
  • 代碼更簡練并且專注于業(yè)務(wù)邏輯,幾乎所有代碼都是業(yè)務(wù)邏輯相關(guān)的
  1. //任務(wù)1:洗水壺->燒開水 
  2. CompletableFuture f1 =  
  3.   CompletableFuture.runAsync(()->{ 
  4.   System.out.println("T1:洗水壺..."); 
  5.   sleep(1, TimeUnit.SECONDS); 
  6.  
  7.   System.out.println("T1:燒開水..."); 
  8.   sleep(15, TimeUnit.SECONDS); 
  9. }); 
  10. //任務(wù)2:洗茶壺->洗茶杯->拿茶葉 
  11. CompletableFuture f2 =  
  12.   CompletableFuture.supplyAsync(()->{ 
  13.   System.out.println("T2:洗茶壺..."); 
  14.   sleep(1, TimeUnit.SECONDS); 
  15.  
  16.   System.out.println("T2:洗茶杯..."); 
  17.   sleep(2, TimeUnit.SECONDS); 
  18.  
  19.   System.out.println("T2:拿茶葉..."); 
  20.   sleep(1, TimeUnit.SECONDS); 
  21.   return "龍井"
  22. }); 
  23. //任務(wù)3:任務(wù)1和任務(wù)2完成后執(zhí)行:泡茶 
  24. CompletableFuture f3 =  
  25.   f1.thenCombine(f2, (__, tf)->{ 
  26.     System.out.println("T1:拿到茶葉:" + tf); 
  27.     System.out.println("T1:泡茶..."); 
  28.     return "上茶:" + tf; 
  29.   }); 
  30. //等待任務(wù)3執(zhí)行結(jié)果 
  31. System.out.println(f3.join()); 
  32.  
  33. void sleep(int t, TimeUnit u) { 
  34.   try { 
  35.     u.sleep(t); 
  36.   }catch(InterruptedException e){} 

注意事項(xiàng)

1.CompletableFuture默認(rèn)線程池是否滿足使用

前面提到創(chuàng)建CompletableFuture異步任務(wù)的靜態(tài)方法runAsync和supplyAsync等,可以指定使用的線程池,不指定則用CompletableFuture的默認(rèn)線程池

  1. private static final Executor asyncPool = useCommonPool ? 
  2.         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 

可以看到,CompletableFuture默認(rèn)線程池是調(diào)用ForkJoinPool的commonPool()方法創(chuàng)建,這個(gè)默認(rèn)線程池的核心線程數(shù)量根據(jù)CPU核數(shù)而定,公式為Runtime.getRuntime().availableProcessors() - 1,以4核雙槽CPU為例,核心線程數(shù)量就是4*2-1=7個(gè)

這樣的設(shè)置滿足CPU密集型的應(yīng)用,但對(duì)于業(yè)務(wù)都是IO密集型的應(yīng)用來說,是有風(fēng)險(xiǎn)的,當(dāng)qps較高時(shí),線程數(shù)量可能就設(shè)的太少了,會(huì)導(dǎo)致線上故障

所以可以根據(jù)業(yè)務(wù)情況自定義線程池使用

 

2.get設(shè)置超時(shí)時(shí)間不能串行g(shù)et,不然會(huì)導(dǎo)致接口延時(shí)線程數(shù)量*超時(shí)時(shí)間

 

責(zé)任編輯:武曉燕 來源: 月伴飛魚
相關(guān)推薦

2022-03-03 08:30:41

GeneratorES6函數(shù)

2019-12-31 14:10:58

Excel文章SQL

2022-10-24 07:31:53

Python編程裝飾器

2024-04-30 11:11:33

aiohttp模塊編程

2013-04-01 15:38:54

異步編程異步編程模型

2024-04-01 09:45:50

TAP模式.NET異步編程

2024-11-08 09:48:38

異步編程I/O密集

2021-05-07 16:19:36

異步編程Java線程

2024-03-15 08:23:26

異步編程函數(shù)

2023-07-06 08:31:50

Python對(duì)象編程

2023-09-18 07:46:28

2011-02-24 12:53:51

.NET異步傳統(tǒng)

2023-11-24 16:13:05

C++編程

2020-03-29 08:27:05

Promise異步編程前端

2020-10-27 10:58:07

Linux內(nèi)核操作系統(tǒng)

2019-04-30 15:10:42

Python調(diào)試工具編程語言

2023-08-02 08:03:08

Python線程池

2013-04-01 15:25:41

異步編程異步EMP

2020-10-15 13:29:57

javascript

2022-01-25 12:41:31

ChromeResponse接口
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)