自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java CompletableFuture詳解

開發(fā) 后端
在Java 8中, 新增加了一個(gè)包含50個(gè)方法左右的類: CompletableFuture,默認(rèn)依靠fork/join框架啟動(dòng)新的線程實(shí)現(xiàn)異步與并發(fā)的,提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法。

 

 

Java CompletableFuture詳解

在Java 8中, 新增加了一個(gè)包含50個(gè)方法左右的類: CompletableFuture,默認(rèn)依靠fork/join框架啟動(dòng)新的線程實(shí)現(xiàn)異步與并發(fā)的,提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法。

CompletableFuture類實(shí)現(xiàn)了CompletionStage和Future接口,所以可以像以前一樣通過阻塞或者輪詢的方式獲得結(jié)果,盡管這種方式不推薦使用。

創(chuàng)建CompletableFuture對(duì)象。

以下四個(gè)靜態(tài)方法用來為一段異步執(zhí)行的代碼創(chuàng)建CompletableFuture對(duì)象:

  1. public static CompletableFuture<Void>              runAsync(Runnable runnable) 
  2.  
  3. public static CompletableFuture<Void>              runAsync(Runnable runnable, Executor executor) 
  4.  
  5. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
  6.  
  7. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 

runAsync方法也好理解,它以Runnable函數(shù)式接口類型為參數(shù),所以CompletableFuture的計(jì)算結(jié)果為空。以Async結(jié)尾會(huì)使用其它的線程去執(zhí)行,沒有指定Executor的方法會(huì)使用ForkJoinPool.commonPool()作為它的線程池執(zhí)行異步代碼。

supplyAsync方法以Supplier<U>函數(shù)式接口類型為參數(shù),CompletableFuture的計(jì)算結(jié)果類型為U。

因?yàn)榉椒ǖ膮?shù)類型都是函數(shù)式接口,所以可以使用lambda表達(dá)式實(shí)現(xiàn)異步任務(wù),比如:

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     //長(zhǎng)時(shí)間的計(jì)算任務(wù) 
  4.  
  5.     return "hello world"
  6.  
  7. }); 

計(jì)算結(jié)果完成時(shí)的處理

當(dāng)CompletableFuture的計(jì)算結(jié)果完成,或者拋出異常的時(shí)候,我們可以執(zhí)行特定的Action。主要是下面的方法:

  1. public CompletableFuture<T>  whenComplete(BiConsumer<? super T,? super Throwable> action
  2.  
  3. public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action
  4.  
  5. public CompletableFuture<T>  whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 
  6.  
  7. public CompletableFuture<T>  exceptionally(Function<Throwable,? extends T> fn) 

可以看到Action的類型是BiConsumer<? super T,? super Throwable>,它可以處理正常的計(jì)算結(jié)果,或者異常情況。

注意這幾個(gè)方法都會(huì)返回CompletableFuture,當(dāng)Action執(zhí)行完畢后它的結(jié)果返回原始的CompletableFuture的計(jì)算結(jié)果或者返回異常。

  1. public class Main { 
  2.  
  3.     private static Random rand = new Random(); 
  4.  
  5.     private static long t = System.currentTimeMillis(); 
  6.  
  7.     static int getMoreData() { 
  8.  
  9.         System.out.println("begin to start compute"); 
  10.  
  11.         try { 
  12.  
  13.             Thread.sleep(10000); 
  14.  
  15.         } catch (InterruptedException e) { 
  16.  
  17.             throw new RuntimeException(e); 
  18.  
  19.         } 
  20.  
  21.         System.out.println("end to start compute. passed " + (System.currentTimeMillis() - t)/1000 + " seconds"); 
  22.  
  23.         return rand.nextInt(1000); 
  24.  
  25.     } 
  26.  
  27.     public static void main(String[] args) throws Exception { 
  28.  
  29.         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Main::getMoreData); 
  30.  
  31.         Future<Integer> f = future.whenComplete((v, e) -> { 
  32.  
  33.             System.out.println(v); 
  34.  
  35.             System.out.println(e); 
  36.  
  37.         }); 
  38.  
  39.         System.out.println(f.get()); 
  40.  
  41.         System.in.read(); 
  42.  
  43.     } 
  44.  

exceptionally方法返回一個(gè)新的CompletableFuture,當(dāng)原始的CompletableFuture拋出異常的時(shí)候,就會(huì)觸發(fā)這個(gè)CompletableFuture的計(jì)算,調(diào)用function計(jì)算值,否則如果原始的CompletableFuture正常計(jì)算完后,這個(gè)新的CompletableFuture也計(jì)算完成,它的值和原始的CompletableFuture的計(jì)算的值相同。也就是這個(gè)exceptionally方法用來處理異常的情況。

結(jié)果轉(zhuǎn)換

由于回調(diào)風(fēng)格的實(shí)現(xiàn),我們不必因?yàn)榈却粋€(gè)計(jì)算完成而阻塞著調(diào)用線程,而是告訴CompletableFuture當(dāng)計(jì)算完成的時(shí)候請(qǐng)執(zhí)行某個(gè)function。而且我們還可以將這些操作串聯(lián)起來,或者將CompletableFuture組合起來。

  1. public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn) 
  2.  
  3. public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn) 
  4.  
  5. public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 

這一組函數(shù)的功能是當(dāng)原來的CompletableFuture計(jì)算完后,將結(jié)果傳遞給函數(shù)fn,將fn的結(jié)果作為新的CompletableFuture計(jì)算結(jié)果。因此它的功能相當(dāng)于將CompletableFuture<T>轉(zhuǎn)換成CompletableFuture<U>。

使用例子如下:

  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     return 100; 
  4.  
  5. }); 
  6.  
  7. CompletableFuture<String> f =  future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()); 
  8.  
  9. System.out.println(f.get()); //"1000" 

需要注意的是,這些轉(zhuǎn)換并不是馬上執(zhí)行的,也不會(huì)阻塞,而是在前一個(gè)stage完成后繼續(xù)執(zhí)行。

下面一組方法雖然也返回CompletableFuture對(duì)象,但是對(duì)象的值和原來的CompletableFuture計(jì)算的值不同。當(dāng)原先的CompletableFuture的值計(jì)算完成或者拋出異常的時(shí)候,會(huì)觸發(fā)這個(gè)CompletableFuture對(duì)象的計(jì)算,結(jié)果由BiFunction參數(shù)計(jì)算而得。因此這組方法兼有whenComplete和轉(zhuǎn)換的兩個(gè)功能。

  1. public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn) 
  2.  
  3. public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn) 
  4.  
  5. public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) 

它們與thenApply* 方法的區(qū)別在于handle*方法會(huì)處理正常計(jì)算值和異常,因此它可以屏蔽異常,避免異常繼續(xù)拋出。而thenApply*方法只是用來處理正常值,因此一旦有異常就會(huì)拋出。

純消費(fèi)結(jié)果

上面的方法是當(dāng)計(jì)算完成的時(shí)候,會(huì)生成新的計(jì)算結(jié)果(thenApply, handle),或者返回同樣的計(jì)算結(jié)果(whenComplete,CompletableFuture)。CompletableFuture提供了一種處理結(jié)果的方法,只對(duì)結(jié)果執(zhí)行Action,而不返回新的計(jì)算值,因此計(jì)算值為Void:

 

  1. public CompletableFuture<Void>  thenAccept(Consumer<? super T> action
  2.  
  3. public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action
  4.  
  5. public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor) 

看它的參數(shù)類型也就明白了,它們是消費(fèi)型函數(shù)式接口Consumer,這個(gè)接口只有輸入,沒有返回值。

 

  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     return 100; 
  4.  
  5. }); 
  6.  
  7. CompletableFuture<Void> f =  future.thenAccept(System.out::println); 
  8.  
  9. System.out.println(f.get()); 
  10.  
  11. public <U> CompletableFuture<Void>                thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action
  12.  
  13. public <U> CompletableFuture<Void>                thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action
  14.  
  15. public <U> CompletableFuture<Void>                thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) 
  16.  
  17. public CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action

thenAcceptBoth以及相關(guān)方法提供了類似的功能,當(dāng)兩個(gè)CompletionStage都正常完成計(jì)算的時(shí)候,就會(huì)執(zhí)行提供的action,它用來組合另外一個(gè)異步的結(jié)果。

runAfterBoth是當(dāng)兩個(gè)CompletionStage都正常完成計(jì)算的時(shí)候,執(zhí)行一個(gè)Runnable,這個(gè)Runnable并不使用計(jì)算的結(jié)果。

例子如下:

  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     return 100; 
  4.  
  5. }); 
  6.  
  7. CompletableFuture<Void> f =  future.thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y)); 
  8.  
  9. System.out.println(f.get()); 

更徹底地,下面一組方法當(dāng)計(jì)算完成的時(shí)候會(huì)執(zhí)行一個(gè)Runnable,與thenAccept不同,Runnable并不使用CompletableFuture計(jì)算的結(jié)果。

  1. public CompletableFuture<Void>  thenRun(Runnable action
  2.  
  3. public CompletableFuture<Void>  thenRunAsync(Runnable action
  4.  
  5. public CompletableFuture<Void>  thenRunAsync(Runnable action,  Executor executor) 

因此先前的CompletableFuture計(jì)算的結(jié)果被忽略了,這個(gè)方法返回CompletableFuture<Void>類型的對(duì)象。

  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     return 100; 
  4.  
  5. }); 
  6.  
  7. CompletableFuture<Void> f =  future.thenRun(() -> System.out.println("finished")); 
  8.  
  9. System.out.println(f.get()); 

因此,你可以根據(jù)方法的參數(shù)的類型來加速你的記憶。Runnable類型的參數(shù)會(huì)忽略計(jì)算的結(jié)果,Consumer是純消費(fèi)計(jì)算結(jié)果,BiConsumer會(huì)組合另外一個(gè)CompletionStage純消費(fèi),F(xiàn)unction會(huì)對(duì)計(jì)算結(jié)果做轉(zhuǎn)換,BiFunction會(huì)組合另外一個(gè)CompletionStage的計(jì)算結(jié)果做轉(zhuǎn)換。

組合

有時(shí),你需要在一個(gè)future結(jié)構(gòu)運(yùn)行某個(gè)函數(shù),但是這個(gè)函數(shù)也是返回某種future,也就是說是兩個(gè)future彼此依賴串聯(lián)在一起,它類似于flatMap。

  1. public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn) 
  2.  
  3. public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) 
  4.  
  5. public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn,  Executor executor) 

這一組方法接受一個(gè)Function作為參數(shù),這個(gè)Function的輸入是當(dāng)前的CompletableFuture的計(jì)算值,返回結(jié)果將是一個(gè)新的CompletableFuture,這個(gè)新的CompletableFuture會(huì)組合原來的CompletableFuture和函數(shù)返回的CompletableFuture。因此它的功能類似:

A +--> B +---> C

記住,thenCompose返回的對(duì)象并不一是函數(shù)fn返回的對(duì)象,如果原來的CompletableFuture還沒有計(jì)算出來,它就會(huì)生成一個(gè)新的組合后的CompletableFuture。

例子:

  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     return 100; 
  4.  
  5. }); 
  6.  
  7. CompletableFuture<String> f =  future.thenCompose( i -> { 
  8.  
  9.     return CompletableFuture.supplyAsync(() -> { 
  10.  
  11.         return (i * 10) + ""
  12.  
  13.     }); 
  14.  
  15. }); 
  16.  
  17. System.out.println(f.get()); //1000 

 

而下面的一組方法thenCombine用來復(fù)合另外一個(gè)CompletionStage的結(jié)果。它的功能類似:

A +

  |

  +------> C

  +------^

B +

兩個(gè)CompletionStage是并行執(zhí)行的,它們之間并沒有先后依賴順序,other并不會(huì)等待先前的CompletableFuture執(zhí)行完畢后再執(zhí)行。

  1. public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
  2.  
  3. public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) 
  4.  
  5. public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) 

其實(shí)從功能上來講,它們的功能更類似thenAcceptBoth,只不過thenAcceptBoth是純消費(fèi),它的函數(shù)參數(shù)沒有返回值,而thenCombine的函數(shù)參數(shù)fn有返回值。

 

  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  2.  
  3.     return 100; 
  4.  
  5. }); 
  6.  
  7. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { 
  8.  
  9.     return "abc"
  10.  
  11. }); 
  12.  
  13. CompletableFuture<String> f =  future.thenCombine(future2, (x,y) -> y + "-" + x); 
  14.  
  15. System.out.println(f.get()); //abc-100 

Either

thenAcceptBoth和runAfterBoth是當(dāng)兩個(gè)CompletableFuture都計(jì)算完成,而我們下面要了解的方法是當(dāng)任意一個(gè)CompletableFuture計(jì)算完成的時(shí)候就會(huì)執(zhí)行。

  1. public CompletableFuture<Void>        acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action
  2.  
  3. public CompletableFuture<Void>        acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action
  4.  
  5. public CompletableFuture<Void>        acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) 
  6.  
  7. public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) 
  8.  
  9. public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) 
  10.  
  11. public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) 

acceptEither方法是當(dāng)任意一個(gè)CompletionStage完成的時(shí)候,action這個(gè)消費(fèi)者就會(huì)被執(zhí)行。這個(gè)方法返回CompletableFuture<Void>

applyToEither方法是當(dāng)任意一個(gè)CompletionStage完成的時(shí)候,fn會(huì)被執(zhí)行,它的返回值會(huì)當(dāng)作新的CompletableFuture<U>的計(jì)算結(jié)果。

下面這個(gè)例子有時(shí)會(huì)輸出100,有時(shí)候會(huì)輸出200,哪個(gè)Future先完成就會(huì)根據(jù)它的結(jié)果計(jì)算。

  1. Random rand = new Random(); 
  2.  
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { 
  4.  
  5.     try { 
  6.  
  7.         Thread.sleep(10000 + rand.nextInt(1000)); 
  8.  
  9.     } catch (InterruptedException e) { 
  10.  
  11.         e.printStackTrace(); 
  12.  
  13.     } 
  14.  
  15.     return 100; 
  16.  
  17. }); 
  18.  
  19. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { 
  20.  
  21.     try { 
  22.  
  23.         Thread.sleep(10000 + rand.nextInt(1000)); 
  24.  
  25.     } catch (InterruptedException e) { 
  26.  
  27.         e.printStackTrace(); 
  28.  
  29.     } 
  30.  
  31.     return 200; 
  32.  
  33. }); 
  34.  
  35. CompletableFuture<String> f =  future.applyToEither(future2,i -> i.toString());  

輔助方法 allOf 和 anyOf

前面我們已經(jīng)介紹了幾個(gè)靜態(tài)方法:completedFuture、runAsync、supplyAsync,下面介紹的這兩個(gè)方法用來組合多個(gè)CompletableFuture。

  1. public static CompletableFuture<Void>  allOf(CompletableFuture<?>... cfs) 
  2.  
  3. public static CompletableFuture<Object>  anyOf(CompletableFuture<?>... cfs) 

allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算。

anyOf方法是當(dāng)任意一個(gè)CompletableFuture執(zhí)行完后就會(huì)執(zhí)行計(jì)算,計(jì)算的結(jié)果相同。

下面的代碼運(yùn)行結(jié)果有時(shí)是100,有時(shí)是"abc"。但是anyOf和applyToEither不同。anyOf接受任意多的CompletableFuture,但是applyToEither只是判斷兩個(gè)CompletableFuture。anyOf返回值的計(jì)算結(jié)果是參數(shù)中其中一個(gè)CompletableFuture的計(jì)算結(jié)果,applyToEither返回值的計(jì)算結(jié)果卻是要經(jīng)過fn處理的。當(dāng)然還有靜態(tài)方法的區(qū)別,線程池的選擇等。

  1. Random rand = new Random(); 
  2.  
  3. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { 
  4.  
  5.     try { 
  6.  
  7.         Thread.sleep(10000 + rand.nextInt(1000)); 
  8.  
  9.     } catch (InterruptedException e) { 
  10.  
  11.         e.printStackTrace(); 
  12.  
  13.     } 
  14.  
  15.     return 100; 
  16.  
  17. }); 
  18.  
  19. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { 
  20.  
  21.     try { 
  22.  
  23.         Thread.sleep(10000 + rand.nextInt(1000)); 
  24.  
  25.     } catch (InterruptedException e) { 
  26.  
  27.         e.printStackTrace(); 
  28.  
  29.     } 
  30.  
  31.     return "abc"
  32.  
  33. }); 
  34.  
  35. //CompletableFuture<Void> f =  CompletableFuture.allOf(future1,future2); 
  36.  
  37. CompletableFuture<Object> f =  CompletableFuture.anyOf(future1,future2); 
  38.  
  39. System.out.println(f.get()); 

更進(jìn)一步

Guava的Future類,它的Futures輔助類提供了很多便利方法,用來處理多個(gè)Future,而不像Java的CompletableFuture,只提供了allOf、anyOf兩個(gè)方法。 比如有這樣一個(gè)需求,將多個(gè)CompletableFuture組合成一個(gè)CompletableFuture,這個(gè)組合后的CompletableFuture的計(jì)算結(jié)果是個(gè)List,它包含前面所有的CompletableFuture的計(jì)算結(jié)果,guava的Futures.allAsList可以實(shí)現(xiàn)這樣的功能,但是對(duì)于java CompletableFuture,我們需要一些輔助方法:

  1. public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { 
  2.  
  3.        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); 
  4.  
  5.        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList())); 
  6.  
  7.    } 
  8.  
  9. public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) { 
  10.  
  11.        List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList()); 
  12.  
  13.        return sequence(futureList); 
  14.  
  15.    } 

或者Java Future轉(zhuǎn)CompletableFuture:

  1.  public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) { 
  2.     return CompletableFuture.supplyAsync(() -> { 
  3.  
  4.         try { 
  5.  
  6.             return future.get(); 
  7.  
  8.         } catch (InterruptedException | ExecutionException e) { 
  9.  
  10.             throw new RuntimeException(e); 
  11.  
  12.         } 
  13.  
  14.     }, executor); 
  15.  

 

 

責(zé)任編輯:龐桂玉 來源: 田心雙木的博客
相關(guān)推薦

2025-02-28 09:20:00

Future開發(fā)代碼

2021-06-06 16:56:49

異步編程Completable

2024-03-06 08:13:33

FutureJDKCallable

2024-12-26 12:59:39

2024-01-11 12:14:31

Async線程池任務(wù)

2015-06-16 11:06:42

JavaCompletable

2021-02-21 14:35:29

Java 8異步編程

2024-04-18 08:20:27

Java 8編程工具

2020-05-29 07:20:00

Java8異步編程源碼解讀

2021-08-30 19:00:46

靜態(tài)CompletableCountDownLa

2025-02-06 16:51:30

2023-04-13 07:33:31

Java 8編程工具

2024-08-06 09:43:54

Java 8工具編程

2024-10-28 13:31:33

性能@Async應(yīng)用

2021-09-27 13:01:52

線程阻塞排查

2021-03-16 15:12:57

CompletableFuture機(jī)制java

2023-07-19 08:03:05

Future異步JDK

2024-08-30 09:53:17

Java 8編程集成

2022-07-08 14:14:04

并發(fā)編程異步編程

2021-03-18 10:12:54

JavaCompletable字符串
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)