Java 8 異步編程 CompletableFuture 全解析
本文轉(zhuǎn)載自微信公眾號「KK架構(gòu)師」,作者wangkai 。轉(zhuǎn)載本文請聯(lián)系KK架構(gòu)師公眾號。
本文大綱速看
一、異步編程
通常來說,程序都是順序執(zhí)行,同一時刻只會發(fā)生一件事情。如果一個函數(shù)依賴于另一個函數(shù)的結(jié)果,它只能等待那個函數(shù)結(jié)束才能繼續(xù)執(zhí)行,從用戶角度來說,整個程序才算執(zhí)行完畢。但現(xiàn)在的計算機(jī)普遍擁有多核 CPU,在那里干等著毫無意義,完全可以在另一個處理器內(nèi)核上干其他工作,耗時長的任務(wù)結(jié)束之后會主動通知你。這就是異步編程的出發(fā)點(diǎn):充分使用多核 CPU 的優(yōu)勢,最大程度提高程序性能。一句話來說:所謂異步編程,就是實(shí)現(xiàn)一個無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。
二、拋出一個問題:如何實(shí)現(xiàn)燒水泡茶的程序
最后我們會使用傳統(tǒng)方式和 Java8 異步編程方式分別實(shí)現(xiàn),來對比一下實(shí)現(xiàn)復(fù)雜度。
三、Java5 的 Future 實(shí)現(xiàn)的異步編程
Future 是 Java 5 添加的類,用來描述一個異步計算的結(jié)果。你可以使用 isDone() 方法檢查計算是否完成,或者使用 get() 方法阻塞住調(diào)用線程,直到計算完成返回結(jié)果,也可以使用 cancel() 方法停止任務(wù)的執(zhí)行。
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- ExecutorService es = Executors.newFixedThreadPool(5);
- Future<Integer> f = es.submit(() -> 100);
- System.out.println(f.get());
- es.shutdown();
- }
雖然 Future 提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們異步編程的初衷相違背,輪詢的方式又會耗費(fèi)無謂的 CPU 資源,而且也不能及時的獲取結(jié)果。
當(dāng)然,很多其他的語言采用回調(diào)的方式來實(shí)現(xiàn)異步編程,比如 Node.js;Java 的一些框架,比如 Netty,Google Guava 也擴(kuò)展了 Future 接口,提供了很多回調(diào)的機(jī)制,封裝了工具類,輔助異步編程開發(fā)。
Java 作為老牌編程語言,自然也不會落伍。在 Java 8 中,新增了一個包含 50 多個方法的類:CompletableFuture,提供了非常強(qiáng)大的 Future 擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供函數(shù)式編程的能力。
四、CompletableFuture 類功能概覽
如下圖是 CompletableFuture 實(shí)現(xiàn)的接口:
它實(shí)現(xiàn)了 Future 接口,擁有 Future 所有的特性,比如可以使用 get() 方法獲取返回值等;還實(shí)現(xiàn)了 CompletionStage 接口,這個接口有超過 40 個方法,功能太豐富了,它主要是為了編排任務(wù)的工作流。
我們可以把工作流和工作流之間的關(guān)系分類為三種:串行關(guān)系,并行關(guān)系,匯聚關(guān)系。
串行關(guān)系
提供了如下的 api 來實(shí)現(xiàn)(先大致瀏覽一遍):
- CompletionStage<R> thenApply(fn);
- CompletionStage<R> thenApplyAsync(fn);
- CompletionStage<Void> thenAccept(consumer);
- CompletionStage<Void> thenAcceptAsync(consumer);
- CompletionStage<Void> thenRun(action);
- CompletionStage<Void> thenRunAsync(action);
- CompletionStage<R> thenCompose(fn);
- CompletionStage<R> thenComposeAsync(fn);
并行關(guān)系

多線程異步執(zhí)行就是并行關(guān)系
匯聚關(guān)系
匯聚關(guān)系,又分為 AND 匯聚關(guān)系和 OR 匯聚關(guān)系:
AND 匯聚關(guān)系,就是所有依賴的任務(wù)都完成之后再執(zhí)行;OR 匯聚關(guān)系,就是依賴的任務(wù)中有一個執(zhí)行完成,就開始執(zhí)行。
AND 匯聚關(guān)系由這些接口表達(dá):
- CompletionStage<R> thenCombine(other, fn);
- CompletionStage<R> thenCombineAsync(other, fn);
- CompletionStage<Void> thenAcceptBoth(other, consumer);
- CompletionStage<Void> thenAcceptBothAsync(other, consumer);
- CompletionStage<Void> runAfterBoth(other, action);
- CompletionStage<Void> runAfterBothAsync(other, action);
OR 匯聚關(guān)系由這些接口來表達(dá):
- CompletionStage applyToEither(other, fn);
- CompletionStage applyToEitherAsync(other, fn);
- CompletionStage acceptEither(other, consumer);
- CompletionStage acceptEitherAsync(other, consumer);
- CompletionStage runAfterEither(other, action);
- CompletionStage runAfterEitherAsync(other, action);
五、CompletableFuture 接口精講
1、提交執(zhí)行的靜態(tài)方法
方法名描述
方法名 | 描述 |
---|---|
runAsync(Runnable runnable) | 執(zhí)行異步代碼,使用 ForkJoinPool.commonPool() 作為它的線程池 |
runAsync(Runnable runnable, Executor executor) | 執(zhí)行異步代碼,使用指定的線程池 |
supplyAsync(Supplier<U> supplier) | 異步執(zhí)行代碼,有返回值,使用 ForkJoinPool.commonPool() 作為它的線程池 |
supplyAsync(Supplier<U> supplier, Executor executor) | 異步執(zhí)行代碼,有返回值,使用指定的線程池執(zhí)行 |
上述四個方法,都是提交任務(wù)的,runAsync 方法需要傳入一個實(shí)現(xiàn)了 Runnable 接口的方法,supplyAsync 需要傳入一個實(shí)現(xiàn)了 Supplier 接口的方法,實(shí)現(xiàn) get 方法,返回一個值。
(1)run 和 supply 的區(qū)別
run 就是執(zhí)行一個方法,沒有返回值,supply 執(zhí)行一個方法,有返回值。
(2)一個參數(shù)和兩個參數(shù)的區(qū)別
第二個參數(shù)是線程池,如果沒有傳,則使用自帶的 ForkJoinPool.commonPool() 作為線程池,這個線程池默認(rèn)創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設(shè)置 ForkJoinPool 線程池的線程數(shù))
2、串行關(guān)系 api
這些 api 之間主要是能否獲得前一個任務(wù)的返回值與自己是否有返回值的區(qū)別。
api | 是否可獲得前一個任務(wù)的返回值 | 是否有返回值 |
---|---|---|
thenApply | 能 | 有 |
thenAccept | 能 | 無 |
thenRun | 不能 | 無 |
thenCompose | 能 | 有 |
(1) thenApply 和 thenApplyAsync 使用
thenApply 和 thenApplyAsync 把兩個并行的任務(wù)串行化,另一個任務(wù)在獲得上一個任務(wù)的返回值之后,做一些加工和轉(zhuǎn)換。它也是有返回值的。
- public class BasicFuture4 {
- @Data
- @AllArgsConstructor
- @ToString
- static class Student {
- private String name;
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Student> future = CompletableFuture.supplyAsync(() -> "Jack")
- .thenApply(s -> s + " Smith")
- .thenApply(String::toUpperCase)
- .thenApplyAsync(Student::new);
- System.out.println(future.get());
- }
- }
結(jié)果可以看到,輸入是一個字符串,拼接了一個字符串,轉(zhuǎn)換成大寫,new 了一個 Student 對象返回。
- BasicFuture4.Student(name=JACK SMITH)
和 thenApply 一起的還有 thenAccept 和 thenRun,thenAccept 能獲得到前一個任務(wù)的返回值,但是自身沒有返回值;thenRun 不能獲得前一個任務(wù)的返回值,自身也沒有返回值。
(2)thenApply 和 thenApplyAsync 的區(qū)別
這兩個方法的區(qū)別,在于誰去執(zhí)行任務(wù)。如果使用 thenApplyAsync,那么執(zhí)行的線程是從 ForkJoinPool.commonPool() 或者自己定義的線程池中取線程去執(zhí)行。如果使用 thenApply,又分兩種情況,如果 supplyAsync 方法執(zhí)行速度特別快,那么 thenApply 任務(wù)就使用主線程執(zhí)行,如果 supplyAsync 執(zhí)行速度特別慢,就是和 supplyAsync 執(zhí)行線程一樣。
可以使用下面的例子演示一下:
- package com.dsj361.future;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- /**
- * @Author wangkai
- */
- public class BasicFuture8 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("----------supplyAsync 執(zhí)行很快");
- CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName());
- return "1";
- }).thenApply(s -> {
- System.out.println(Thread.currentThread().getName());
- return "2";
- });
- System.out.println(future1.get());
- System.out.println("----------supplyAsync 執(zhí)行很慢");
- CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- System.out.println(Thread.currentThread().getName());
- return "1";
- }).thenApply(s -> {
- System.out.println(Thread.currentThread().getName());
- return "2";
- });
- System.out.println(future2.get());
- }
- }
執(zhí)行結(jié)果:
- ----------supplyAsync 執(zhí)行很快
- ForkJoinPool.commonPool-worker-1
- main
- 2
- ----------supplyAsync 執(zhí)行很慢
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-1
- 2
(3)thenCompose 的使用
假設(shè)有兩個異步任務(wù),第二個任務(wù)想要獲取第一個任務(wù)的返回值,并且做運(yùn)算,我們可以用 thenCompose。此時使用 thenApply 也可以實(shí)現(xiàn),看一段代碼發(fā)現(xiàn)他們的區(qū)別:
- public class BasicFuture9 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<String> future = getLastOne().thenCompose(BasicFuture9::getLastTwo);
- System.out.println(future.get());
- CompletableFuture<CompletableFuture<String>> future2 = getLastOne().thenApply(s -> getLastTwo(s));
- System.out.println(future2.get().get());
- }
- public static CompletableFuture<String> getLastOne(){
- return CompletableFuture.supplyAsync(()-> "topOne");
- }
- public static CompletableFuture<String> getLastTwo(String s){
- return CompletableFuture.supplyAsync(()-> s + " topTwo");
- }
- }
可以看到使用 thenApply 的時候,需要使用兩個 get() 方法才能獲取到最終的返回值,使用 thenCompose 只要一個即可。
3、And 匯聚關(guān)系 Api
(1)thenCombine 的使用
加入我們要計算兩個異步方法返回值的和,就必須要等到兩個異步任務(wù)都計算完才能求和,此時可以用 thenCombine 來完成。
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
- CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
- CompletableFuture<Integer> thenComposeCount = thenComposeOne
- .thenCombine(thenComposeTwo, (s, y) -> s + y);
- thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth"));
- thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth"));
- System.out.println(thenComposeCount.get());
- }
可以看到 thenCombine 第二個參數(shù)是一個 Function 函數(shù),前面兩個異步任務(wù)都完成之后,使用這個函數(shù)來完成一些運(yùn)算。
(2)thenAcceptBoth
接收前面兩個異步任務(wù)的結(jié)果,執(zhí)行一個回調(diào)函數(shù),但是這個回調(diào)函數(shù)沒有返回值。
(3)runAfterBoth
接收前面兩個異步任務(wù)的結(jié)果,但是回調(diào)函數(shù),不接收參數(shù),也不返回值。
4、Or 匯聚關(guān)系 Api
- public class BasicFuture11 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
- CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
- CompletableFuture<Integer> thenComposeCount = thenComposeOne
- .applyToEither(thenComposeTwo, s -> s + 1);
- thenComposeOne.acceptEither(thenComposeTwo,s -> {});
- thenComposeOne.runAfterEither(thenComposeTwo,()->{});
- System.out.println(thenComposeCount.get());
- }
- }
(1)applyToEither
任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法接收一個參數(shù),有返回值
(2)acceptEither
任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法接收一個參數(shù),無返回值
(3)runAfterEither
任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法不接收參數(shù),也無返回值
5、處理異常
上面我們講了如何把幾個異步任務(wù)編排起來,執(zhí)行一些串行或者匯聚操作。還有一個重要的地方,就是異常的處理。
先看下面的例子:
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture.supplyAsync(() -> {
- System.out.println("execute one ");
- return 100;
- })
- .thenApply(s -> 10 / 0)
- .thenRun(() -> System.out.println("thenRun"))
- .thenAccept(s -> System.out.println("thenAccept"));
- CompletableFuture.runAsync(() -> System.out.println("other"));
- }
結(jié)果:
- execute one
- other
可以發(fā)現(xiàn),只要鏈條上有一個任務(wù)發(fā)生了異常,這個鏈條下面的任務(wù)都不再執(zhí)行了。
但是 main 方法上的接下來的代碼還是會執(zhí)行的。
所以這個時候,需要合理的去處理異常來完成一些收尾的工作。
- public class BasicFuture12 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture.supplyAsync(() -> {
- System.out.println("execute one ");
- return 100;
- })
- .thenApply(s -> 10 / 0)
- .thenRun(() -> System.out.println("thenRun"))
- .thenAccept(s -> System.out.println("thenAccept"))
- .exceptionally(s -> {
- System.out.println("異常處理");
- return null;
- });
- CompletableFuture.runAsync(() -> System.out.println("other"));
- }
- }
可以使用 exceptionally 來處理異常。
使用 handle() 方法也可以處理異常。但是 handle() 方法的不同之處在于,即使沒有發(fā)生異常,也會執(zhí)行。
六、燒水泡茶程序的實(shí)現(xiàn)
1、使用 Thread 多線程和 CountDownLatch 來實(shí)現(xiàn)
- public class MakeTee {
- private static CountDownLatch countDownLatch = new CountDownLatch(2);
- static class HeatUpWater implements Runnable {
- private CountDownLatch countDownLatch;
- public HeatUpWater(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- System.out.println("洗水壺");
- Thread.sleep(1000);
- System.out.println("燒開水");
- Thread.sleep(5000);
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- }
- }
- }
- static class PrepareTee implements Runnable {
- private CountDownLatch countDownLatch;
- public PrepareTee(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
- @Override
- public void run() {
- try {
- System.out.println("洗茶壺");
- Thread.sleep(1000);
- System.out.println("洗茶杯");
- Thread.sleep(1000);
- System.out.println("拿茶葉");
- Thread.sleep(1000);
- countDownLatch.countDown();
- } catch (InterruptedException e) {
- }
- }
- }
- public static void main(String[] args) throws InterruptedException {
- new Thread(new HeatUpWater(countDownLatch) ).start();
- new Thread(new PrepareTee(countDownLatch)).start();
- countDownLatch.await();
- System.out.println("準(zhǔn)備就緒,開始泡茶");
- }
- }
這里我們使用兩個線程,分別執(zhí)行燒水和泡茶的程序,使用 CountDownLatch 來協(xié)調(diào)兩個線程的進(jìn)度,等到他們都執(zhí)行完成之后,再執(zhí)行泡茶的動作。
可以看到這種方法,多了很多不必要的代碼,new Thread,人工維護(hù) CountDownLatch 的進(jìn)度。
2、使用 CompletableFuture 來實(shí)現(xiàn)
- public class MakeTeeFuture {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
- try {
- System.out.println("洗水壺");
- Thread.sleep(1000);
- System.out.println("燒開水");
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
- try {
- System.out.println("洗茶壺");
- Thread.sleep(1000);
- System.out.println("洗茶杯");
- Thread.sleep(1000);
- System.out.println("拿茶葉");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- CompletableFuture<Void> finish = future1.runAfterBoth(future2, () -> {
- System.out.println("準(zhǔn)備完畢,開始泡茶");
- });
- System.out.println(finish.get());
- }
- }
這個程序極度簡單,無需手工維護(hù)線程,給任務(wù)分配線程的工作也不需要關(guān)注。
同時語義也更加清晰,future1.runAfterBoth(future2,......) 能夠清晰的表述“任務(wù) 3 要等到任務(wù) 1 和任務(wù) 2 都完成之后才能繼續(xù)開始”
然后代碼更加簡練并且專注于業(yè)務(wù)邏輯,幾乎所有的代碼都是業(yè)務(wù)邏輯相關(guān)的。
七、總結(jié)
本文介紹了異步編程的概念,以及 Java8 的 CompletableFuture 是如何優(yōu)雅的處理多個異步任務(wù)之間的協(xié)調(diào)工作的。CompletableFuture 能夠極大簡化我們對于異步任務(wù)編排的工作,F(xiàn)link 在提交任務(wù)時,也是使用這種異步任務(wù)的方式,去編排提交時和提交后對于任務(wù)狀態(tài)處理的一些工作的。