自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java 8 異步編程 CompletableFuture 全解析

開發(fā) 后端
Future 是 Java 5 添加的類,用來描述一個異步計算的結(jié)果。你可以使用 isDone() 方法檢查計算是否完成,或者使用 get() 方法阻塞住調(diào)用線程,直到計算完成返回結(jié)果,也可以使用 cancel() 方法停止任務(wù)的執(zhí)行。

[[382705]]

本文轉(zhuǎn)載自微信公眾號「KK架構(gòu)師」,作者wangkai 。轉(zhuǎn)載本文請聯(lián)系KK架構(gòu)師公眾號。   

本文大綱速看

 

一、異步編程

通常來說,程序都是順序執(zhí)行,同一時刻只會發(fā)生一件事情。如果一個函數(shù)依賴于另一個函數(shù)的結(jié)果,它只能等待那個函數(shù)結(jié)束才能繼續(xù)執(zhí)行,從用戶角度來說,整個程序才算執(zhí)行完畢。但現(xiàn)在的計算機(jī)普遍擁有多核 CPU,在那里干等著毫無意義,完全可以在另一個處理器內(nèi)核上干其他工作,耗時長的任務(wù)結(jié)束之后會主動通知你。這就是異步編程的出發(fā)點(diǎn):充分使用多核 CPU 的優(yōu)勢,最大程度提高程序性能。一句話來說:所謂異步編程,就是實(shí)現(xiàn)一個無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。

二、拋出一個問題:如何實(shí)現(xiàn)燒水泡茶的程序

 

最后我們會使用傳統(tǒng)方式和 Java8 異步編程方式分別實(shí)現(xiàn),來對比一下實(shí)現(xiàn)復(fù)雜度。

三、Java5 的 Future 實(shí)現(xiàn)的異步編程

Future 是 Java 5 添加的類,用來描述一個異步計算的結(jié)果。你可以使用 isDone() 方法檢查計算是否完成,或者使用 get() 方法阻塞住調(diào)用線程,直到計算完成返回結(jié)果,也可以使用 cancel() 方法停止任務(wù)的執(zhí)行。

  1. public static void main(String[] args) throws InterruptedException, ExecutionException { 
  2.         ExecutorService es = Executors.newFixedThreadPool(5); 
  3.         Future<Integer> f = es.submit(() -> 100); 
  4.         System.out.println(f.get()); 
  5.         es.shutdown(); 
  6.     } 

雖然 Future 提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們異步編程的初衷相違背,輪詢的方式又會耗費(fèi)無謂的 CPU 資源,而且也不能及時的獲取結(jié)果。

當(dāng)然,很多其他的語言采用回調(diào)的方式來實(shí)現(xiàn)異步編程,比如 Node.js;Java 的一些框架,比如 Netty,Google Guava 也擴(kuò)展了 Future 接口,提供了很多回調(diào)的機(jī)制,封裝了工具類,輔助異步編程開發(fā)。

Java 作為老牌編程語言,自然也不會落伍。在 Java 8 中,新增了一個包含 50 多個方法的類:CompletableFuture,提供了非常強(qiáng)大的 Future 擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供函數(shù)式編程的能力。

四、CompletableFuture 類功能概覽

如下圖是 CompletableFuture 實(shí)現(xiàn)的接口:

 

 

它實(shí)現(xiàn)了 Future 接口,擁有 Future 所有的特性,比如可以使用 get() 方法獲取返回值等;還實(shí)現(xiàn)了 CompletionStage 接口,這個接口有超過 40 個方法,功能太豐富了,它主要是為了編排任務(wù)的工作流。

我們可以把工作流和工作流之間的關(guān)系分類為三種:串行關(guān)系,并行關(guān)系,匯聚關(guān)系。

串行關(guān)系

 

提供了如下的 api 來實(shí)現(xiàn)(先大致瀏覽一遍):

  1. CompletionStage<R> thenApply(fn); 
  2. CompletionStage<R> thenApplyAsync(fn); 
  3. CompletionStage<Void> thenAccept(consumer); 
  4. CompletionStage<Void> thenAcceptAsync(consumer); 
  5. CompletionStage<Void> thenRun(action); 
  6. CompletionStage<Void> thenRunAsync(action); 
  7. CompletionStage<R> thenCompose(fn); 
  8. CompletionStage<R> thenComposeAsync(fn); 

并行關(guān)系

 

 

多線程異步執(zhí)行就是并行關(guān)系

匯聚關(guān)系

 

匯聚關(guān)系,又分為 AND 匯聚關(guān)系和 OR 匯聚關(guān)系:

AND 匯聚關(guān)系,就是所有依賴的任務(wù)都完成之后再執(zhí)行;OR 匯聚關(guān)系,就是依賴的任務(wù)中有一個執(zhí)行完成,就開始執(zhí)行。

AND 匯聚關(guān)系由這些接口表達(dá):

  1. CompletionStage<R> thenCombine(other, fn); 
  2. CompletionStage<R> thenCombineAsync(other, fn); 
  3. CompletionStage<Void> thenAcceptBoth(other, consumer); 
  4. CompletionStage<Void> thenAcceptBothAsync(other, consumer); 
  5. CompletionStage<Void> runAfterBoth(other, action); 
  6. CompletionStage<Void> runAfterBothAsync(other, action); 

OR 匯聚關(guān)系由這些接口來表達(dá):

  1. CompletionStage applyToEither(other, fn); 
  2. CompletionStage applyToEitherAsync(other, fn); 
  3. CompletionStage acceptEither(other, consumer); 
  4. CompletionStage acceptEitherAsync(other, consumer); 
  5. CompletionStage runAfterEither(other, action); 
  6. CompletionStage runAfterEitherAsync(other, action); 

五、CompletableFuture 接口精講

1、提交執(zhí)行的靜態(tài)方法

方法名描述

方法名 描述
runAsync(Runnable runnable) 執(zhí)行異步代碼,使用 ForkJoinPool.commonPool() 作為它的線程池
runAsync(Runnable runnable, Executor executor) 執(zhí)行異步代碼,使用指定的線程池
supplyAsync(Supplier<U> supplier) 異步執(zhí)行代碼,有返回值,使用 ForkJoinPool.commonPool() 作為它的線程池
supplyAsync(Supplier<U> supplier, Executor executor) 異步執(zhí)行代碼,有返回值,使用指定的線程池執(zhí)行

上述四個方法,都是提交任務(wù)的,runAsync 方法需要傳入一個實(shí)現(xiàn)了 Runnable 接口的方法,supplyAsync 需要傳入一個實(shí)現(xiàn)了 Supplier 接口的方法,實(shí)現(xiàn) get 方法,返回一個值。

(1)run 和 supply 的區(qū)別

run 就是執(zhí)行一個方法,沒有返回值,supply 執(zhí)行一個方法,有返回值。

(2)一個參數(shù)和兩個參數(shù)的區(qū)別

第二個參數(shù)是線程池,如果沒有傳,則使用自帶的 ForkJoinPool.commonPool() 作為線程池,這個線程池默認(rèn)創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設(shè)置 ForkJoinPool 線程池的線程數(shù))

2、串行關(guān)系 api

這些 api 之間主要是能否獲得前一個任務(wù)的返回值與自己是否有返回值的區(qū)別。

api 是否可獲得前一個任務(wù)的返回值 是否有返回值
thenApply
thenAccept
thenRun 不能
thenCompose

(1) thenApply 和 thenApplyAsync 使用

thenApply 和 thenApplyAsync 把兩個并行的任務(wù)串行化,另一個任務(wù)在獲得上一個任務(wù)的返回值之后,做一些加工和轉(zhuǎn)換。它也是有返回值的。

  1. public class BasicFuture4 { 
  2.  
  3.     @Data 
  4.     @AllArgsConstructor 
  5.     @ToString 
  6.     static class Student { 
  7.         private String name
  8.     } 
  9.      
  10.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  11.         CompletableFuture<Student> future = CompletableFuture.supplyAsync(() -> "Jack"
  12.                 .thenApply(s -> s + " Smith"
  13.                 .thenApply(String::toUpperCase) 
  14.                 .thenApplyAsync(Student::new); 
  15.         System.out.println(future.get()); 
  16.     } 
  17.  

結(jié)果可以看到,輸入是一個字符串,拼接了一個字符串,轉(zhuǎn)換成大寫,new 了一個 Student 對象返回。

  1. BasicFuture4.Student(name=JACK SMITH) 

和 thenApply 一起的還有 thenAccept 和 thenRun,thenAccept 能獲得到前一個任務(wù)的返回值,但是自身沒有返回值;thenRun 不能獲得前一個任務(wù)的返回值,自身也沒有返回值。

(2)thenApply 和 thenApplyAsync 的區(qū)別

這兩個方法的區(qū)別,在于誰去執(zhí)行任務(wù)。如果使用 thenApplyAsync,那么執(zhí)行的線程是從 ForkJoinPool.commonPool() 或者自己定義的線程池中取線程去執(zhí)行。如果使用 thenApply,又分兩種情況,如果 supplyAsync 方法執(zhí)行速度特別快,那么 thenApply 任務(wù)就使用主線程執(zhí)行,如果 supplyAsync 執(zhí)行速度特別慢,就是和 supplyAsync 執(zhí)行線程一樣。

可以使用下面的例子演示一下:

  1. package com.dsj361.future; 
  2.  
  3. import java.util.concurrent.CompletableFuture; 
  4. import java.util.concurrent.ExecutionException; 
  5.  
  6. /** 
  7.  * @Author wangkai 
  8.  */ 
  9. public class BasicFuture8 { 
  10.  
  11.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  12.         System.out.println("----------supplyAsync 執(zhí)行很快"); 
  13.         CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { 
  14.             System.out.println(Thread.currentThread().getName()); 
  15.             return "1"
  16.         }).thenApply(s -> { 
  17.             System.out.println(Thread.currentThread().getName()); 
  18.             return "2"
  19.         }); 
  20.         System.out.println(future1.get()); 
  21.  
  22.         System.out.println("----------supplyAsync 執(zhí)行很慢"); 
  23.         CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { 
  24.             try { 
  25.                 Thread.sleep(1000); 
  26.             } catch (InterruptedException e) { 
  27.             } 
  28.             System.out.println(Thread.currentThread().getName()); 
  29.             return "1"
  30.         }).thenApply(s -> { 
  31.             System.out.println(Thread.currentThread().getName()); 
  32.             return "2"
  33.         }); 
  34.         System.out.println(future2.get()); 
  35.     } 

執(zhí)行結(jié)果:

  1. ----------supplyAsync 執(zhí)行很快 
  2. ForkJoinPool.commonPool-worker-1 
  3. main 
  4. ----------supplyAsync 執(zhí)行很慢 
  5. ForkJoinPool.commonPool-worker-1 
  6. ForkJoinPool.commonPool-worker-1 

(3)thenCompose 的使用

假設(shè)有兩個異步任務(wù),第二個任務(wù)想要獲取第一個任務(wù)的返回值,并且做運(yùn)算,我們可以用 thenCompose。此時使用 thenApply 也可以實(shí)現(xiàn),看一段代碼發(fā)現(xiàn)他們的區(qū)別:

  1. public class BasicFuture9 { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture<String> future = getLastOne().thenCompose(BasicFuture9::getLastTwo); 
  5.         System.out.println(future.get()); 
  6.  
  7.         CompletableFuture<CompletableFuture<String>> future2 = getLastOne().thenApply(s -> getLastTwo(s)); 
  8.         System.out.println(future2.get().get()); 
  9.     } 
  10.  
  11.     public static CompletableFuture<String> getLastOne(){ 
  12.         return CompletableFuture.supplyAsync(()-> "topOne"); 
  13.     } 
  14.  
  15.     public static CompletableFuture<String> getLastTwo(String s){ 
  16.         return CompletableFuture.supplyAsync(()-> s + "  topTwo"); 
  17.     } 

可以看到使用 thenApply 的時候,需要使用兩個 get() 方法才能獲取到最終的返回值,使用 thenCompose 只要一個即可。

3、And 匯聚關(guān)系 Api

(1)thenCombine 的使用

加入我們要計算兩個異步方法返回值的和,就必須要等到兩個異步任務(wù)都計算完才能求和,此時可以用 thenCombine 來完成。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException { 
  2.     CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192); 
  3.     CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); 
  4.     CompletableFuture<Integer> thenComposeCount = thenComposeOne 
  5.         .thenCombine(thenComposeTwo, (s, y) -> s + y); 
  6.  
  7.     thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth")); 
  8.     thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth")); 
  9.  
  10.     System.out.println(thenComposeCount.get()); 

可以看到 thenCombine 第二個參數(shù)是一個 Function 函數(shù),前面兩個異步任務(wù)都完成之后,使用這個函數(shù)來完成一些運(yùn)算。

(2)thenAcceptBoth

接收前面兩個異步任務(wù)的結(jié)果,執(zhí)行一個回調(diào)函數(shù),但是這個回調(diào)函數(shù)沒有返回值。

(3)runAfterBoth

接收前面兩個異步任務(wù)的結(jié)果,但是回調(diào)函數(shù),不接收參數(shù),也不返回值。

4、Or 匯聚關(guān)系 Api

  1. public class BasicFuture11 { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192); 
  5.         CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); 
  6.         CompletableFuture<Integer> thenComposeCount = thenComposeOne 
  7.                 .applyToEither(thenComposeTwo, s -> s + 1); 
  8.  
  9.         thenComposeOne.acceptEither(thenComposeTwo,s -> {}); 
  10.          
  11.         thenComposeOne.runAfterEither(thenComposeTwo,()->{}); 
  12.  
  13.         System.out.println(thenComposeCount.get()); 
  14.     } 

(1)applyToEither

任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法接收一個參數(shù),有返回值

(2)acceptEither

任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法接收一個參數(shù),無返回值

(3)runAfterEither

任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法不接收參數(shù),也無返回值

5、處理異常

上面我們講了如何把幾個異步任務(wù)編排起來,執(zhí)行一些串行或者匯聚操作。還有一個重要的地方,就是異常的處理。

先看下面的例子:

  1. public static void main(String[] args) throws ExecutionException, InterruptedException { 
  2.     CompletableFuture.supplyAsync(() -> { 
  3.         System.out.println("execute one "); 
  4.         return 100; 
  5.     }) 
  6.         .thenApply(s -> 10 / 0) 
  7.         .thenRun(() -> System.out.println("thenRun")) 
  8.         .thenAccept(s -> System.out.println("thenAccept")); 
  9.  
  10.     CompletableFuture.runAsync(() -> System.out.println("other")); 

結(jié)果:

  1. execute one  
  2. other 

可以發(fā)現(xiàn),只要鏈條上有一個任務(wù)發(fā)生了異常,這個鏈條下面的任務(wù)都不再執(zhí)行了。

但是 main 方法上的接下來的代碼還是會執(zhí)行的。

所以這個時候,需要合理的去處理異常來完成一些收尾的工作。

  1. public class BasicFuture12 { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture.supplyAsync(() -> { 
  5.             System.out.println("execute one "); 
  6.             return 100; 
  7.         }) 
  8.                 .thenApply(s -> 10 / 0) 
  9.                 .thenRun(() -> System.out.println("thenRun")) 
  10.                 .thenAccept(s -> System.out.println("thenAccept")) 
  11.                 .exceptionally(s -> { 
  12.                     System.out.println("異常處理"); 
  13.                     return null
  14.                 }); 
  15.  
  16.         CompletableFuture.runAsync(() -> System.out.println("other")); 
  17.     } 

可以使用 exceptionally 來處理異常。

使用 handle() 方法也可以處理異常。但是 handle() 方法的不同之處在于,即使沒有發(fā)生異常,也會執(zhí)行。

六、燒水泡茶程序的實(shí)現(xiàn)

1、使用 Thread 多線程和 CountDownLatch 來實(shí)現(xiàn)

  1. public class MakeTee { 
  2.  
  3.     private static CountDownLatch countDownLatch = new CountDownLatch(2); 
  4.  
  5.     static class HeatUpWater implements Runnable { 
  6.  
  7.         private CountDownLatch countDownLatch; 
  8.  
  9.         public HeatUpWater(CountDownLatch countDownLatch) { 
  10.             this.countDownLatch = countDownLatch; 
  11.         } 
  12.         @Override 
  13.         public void run() { 
  14.             try { 
  15.                 System.out.println("洗水壺"); 
  16.                 Thread.sleep(1000); 
  17.                 System.out.println("燒開水"); 
  18.                 Thread.sleep(5000); 
  19.                 countDownLatch.countDown(); 
  20.             } catch (InterruptedException e) { 
  21.             } 
  22.  
  23.         } 
  24.     } 
  25.  
  26.     static class PrepareTee implements Runnable { 
  27.         private CountDownLatch countDownLatch; 
  28.  
  29.         public PrepareTee(CountDownLatch countDownLatch) { 
  30.             this.countDownLatch = countDownLatch; 
  31.         } 
  32.  
  33.         @Override 
  34.         public void run() { 
  35.             try { 
  36.                 System.out.println("洗茶壺"); 
  37.                 Thread.sleep(1000); 
  38.                 System.out.println("洗茶杯"); 
  39.                 Thread.sleep(1000); 
  40.                 System.out.println("拿茶葉"); 
  41.                 Thread.sleep(1000); 
  42.                 countDownLatch.countDown(); 
  43.             } catch (InterruptedException e) { 
  44.             } 
  45.         } 
  46.     } 
  47.     public static void main(String[] args) throws InterruptedException { 
  48.         new Thread(new HeatUpWater(countDownLatch) ).start(); 
  49.         new Thread(new PrepareTee(countDownLatch)).start(); 
  50.         countDownLatch.await(); 
  51.         System.out.println("準(zhǔn)備就緒,開始泡茶"); 
  52.     } 

這里我們使用兩個線程,分別執(zhí)行燒水和泡茶的程序,使用 CountDownLatch 來協(xié)調(diào)兩個線程的進(jìn)度,等到他們都執(zhí)行完成之后,再執(zhí)行泡茶的動作。

可以看到這種方法,多了很多不必要的代碼,new Thread,人工維護(hù) CountDownLatch 的進(jìn)度。

2、使用 CompletableFuture 來實(shí)現(xiàn)

  1. public class MakeTeeFuture { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { 
  5.             try { 
  6.                 System.out.println("洗水壺"); 
  7.                 Thread.sleep(1000); 
  8.                 System.out.println("燒開水"); 
  9.                 Thread.sleep(5000); 
  10.             } catch (InterruptedException e) { 
  11.                 e.printStackTrace(); 
  12.             } 
  13.         }); 
  14.         CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { 
  15.             try { 
  16.                 System.out.println("洗茶壺"); 
  17.                 Thread.sleep(1000); 
  18.                 System.out.println("洗茶杯"); 
  19.                 Thread.sleep(1000); 
  20.                 System.out.println("拿茶葉"); 
  21.                 Thread.sleep(1000); 
  22.             } catch (InterruptedException e) { 
  23.                 e.printStackTrace(); 
  24.             } 
  25.         }); 
  26.         CompletableFuture<Void> finish = future1.runAfterBoth(future2, () -> { 
  27.             System.out.println("準(zhǔn)備完畢,開始泡茶"); 
  28.         }); 
  29.         System.out.println(finish.get()); 
  30.     } 

這個程序極度簡單,無需手工維護(hù)線程,給任務(wù)分配線程的工作也不需要關(guān)注。

同時語義也更加清晰,future1.runAfterBoth(future2,......) 能夠清晰的表述“任務(wù) 3 要等到任務(wù) 1 和任務(wù) 2 都完成之后才能繼續(xù)開始”

然后代碼更加簡練并且專注于業(yè)務(wù)邏輯,幾乎所有的代碼都是業(yè)務(wù)邏輯相關(guān)的。

七、總結(jié)

本文介紹了異步編程的概念,以及 Java8 的 CompletableFuture 是如何優(yōu)雅的處理多個異步任務(wù)之間的協(xié)調(diào)工作的。CompletableFuture 能夠極大簡化我們對于異步任務(wù)編排的工作,F(xiàn)link 在提交任務(wù)時,也是使用這種異步任務(wù)的方式,去編排提交時和提交后對于任務(wù)狀態(tài)處理的一些工作的。

責(zé)任編輯:武曉燕 來源: KK架構(gòu)師
相關(guān)推薦

2024-04-18 08:20:27

Java 8編程工具

2020-05-29 07:20:00

Java8異步編程源碼解讀

2021-06-06 16:56:49

異步編程Completable

2023-07-19 08:03:05

Future異步JDK

2022-07-08 14:14:04

并發(fā)編程異步編程

2025-02-06 16:51:30

2024-12-26 12:59:39

2015-06-16 11:06:42

JavaCompletable

2024-01-11 12:14:31

Async線程池任務(wù)

2021-03-18 10:12:54

JavaCompletable字符串

2024-10-14 08:29:14

異步編程任務(wù)

2024-08-06 09:43:54

Java 8工具編程

2023-04-13 07:33:31

Java 8編程工具

2010-03-05 13:46:12

Android編程學(xué)習(xí)

2025-04-07 08:20:00

ORMPython代碼

2022-03-30 07:32:10

JDK8異步編程

2013-04-01 15:38:54

異步編程異步編程模型

2016-10-21 11:04:07

JavaScript異步編程原理解析

2017-12-21 15:48:11

JavaCompletable

2024-03-06 08:13:33

FutureJDKCallable
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號