Java8異步編程之CompletableFuture源碼解讀
原創(chuàng)【51CTO.com原創(chuàng)稿件】
一、引言
一說到異步任務(wù),很多人上來咔咔新建個線程池。為了防止線程數(shù)量肆虐,一般還會考慮使用單例模式創(chuàng)建線程池,具體使用方法大都如下面的代碼所示:
- @Test
- publicvoiddemo1() throwsExecutionException, InterruptedException {
- ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);
- Future<Object>future1=executorService.submit(newCallable<Object>() {
- @Override
- publicObjectcall() throwsException {
- returnThread.currentThread().getName();
}- });
- System.out.println(future1.get());
- executorService.execute(newRunnable() {
- @Overridepublicvoidrun() {
- System.out.println(Thread.currentThread().getName());
- }
- });
- }
經(jīng)常使用 JavaScript 的同學相信對于異步回調(diào)的用法相當熟悉了,畢竟 JavaScript 擁有“回調(diào)地獄”的美譽。
我們大 Java 又開啟了新一輪模仿之旅。
java.util.concurrent 包新增了 CompletableFuture 類可以實現(xiàn)類似 JavaScript 的連續(xù)回調(diào)。
二、兩種基本用法
先來看下 CompletableFuture 的兩種基本⽤法,代碼如下:
- @Test
- public void index1() throws ExecutionException, InterruptedException {
- CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> Thread.currentThread().getName());
- CompletableFuture completableFuture2 = CompletableFuture.runAsync(() -> Thread.currentThread().getName());
- System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get());
- }
打印輸出:
- ForkJoinPool.commonPool-worker-1
- null
初看代碼,第一反應(yīng)是代碼簡潔。直接調(diào)用 CompletableFuture 類的靜態(tài)方法,提交任務(wù)方法就完事了。但是,隨之而來的疑問就是,異步任務(wù)執(zhí)行的背后是一套什么邏輯呢?是一對一使用newThread()還是依賴線程池去執(zhí)行的呢。
三、探索線程池原理
翻閱 CompletableFuture 類的源碼,我們找到答案。關(guān)鍵代碼如下:
- private static final boolean useCommonPool =
- (ForkJoinPool.getCommonPoolParallelism() > 1);
- /**
- * Default executor -- ForkJoinPool.commonPool() unless it cannot
- * support parallelism.
- */
- private static final Executor asyncPool = useCommonPool ?
- ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
可以看到 CompletableFuture 類默認使⽤的是 ForkJoinPool.commonPool() ⽅法返回的線程池。當 然啦,前提是 ForkJoinPool 線程池的數(shù)量⼤于 1 。否則,則使⽤ CompletableFuture 類⾃定義的 ThreadPerTaskExecutor 線程池。 ThreadPerTaskExecutor 線程池的實現(xiàn)邏輯⾮常簡單,⼀⾏代碼簡單實現(xiàn)了 Executor 接⼝,內(nèi)部執(zhí)⾏ 邏輯是⼀條任務(wù)對應(yīng)⼀條線程。代碼如下:
- /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
- static final class ThreadPerTaskExecutor implements Executor {
- public void execute(Runnable r) { new Thread(r).start(); }
- }
四、兩種異步接⼝
之前我們使⽤線程池執(zhí)⾏異步任務(wù)時,當不需要任務(wù)執(zhí)⾏完畢后返回結(jié)果的,我們都是實現(xiàn) Runnable 接⼝。⽽當需要實現(xiàn)返回值時,我們使⽤的則是 Callable 接⼝。 同理,使⽤ CompletableFuture 類的靜態(tài)⽅法執(zhí)⾏異步任務(wù)時,不需要返回結(jié)果的也是實現(xiàn) Runnable 接⼝。⽽當需要實現(xiàn)返回值時,我們使⽤的則是 Supplier 接⼝。其實,Callable 接⼝和 Supplier 接⼝ 并沒有什么區(qū)別。 接下來,我們來分析⼀下 CompletableFuture 是如何實現(xiàn)異步任務(wù)執(zhí)⾏的。
runAsync
CompletableFuture 執(zhí)⾏⽆返回值任務(wù)的是 runAsync() ⽅法。該⽅法的關(guān)鍵執(zhí)⾏代碼如下:
- static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
- if (f == null) throw new NullPointerException();
- CompletableFuture<Void> d = new CompletableFuture<Void>();
- e.execute(new AsyncRun(d, f));
- return d;
- }
可以看到,該⽅法將 Runnable 實例作為參數(shù)封裝⾄ AsyncRun 類。實際上, AsyncRun 類是對 Runnable 接⼝的進⼀步封裝。實際上,AsyncRun 類也是實現(xiàn)了 Runnable 接⼝。觀察下⽅ AsyncRun 類的源碼,可以看到 AsyncRun 類的 run() ⽅法中調(diào)⽤了 Runnable 參數(shù)的 run() ⽅法。
- public void run() {
- CompletableFuture<Void> d; Runnable f;
- if ((d = dep) != null && (f = fn) != null) {
- dep = null; fn = null;
- if (d.result == null) {
- try {
- f.run();
- d.completeNull();
- } catch (Throwable ex) {
- d.completeThrowable(ex);
- }
- }
- d.postComplete();
- }
- }
當提交的任務(wù)執(zhí)⾏完畢后,即 f.run() ⽅法執(zhí)⾏完畢。調(diào)⽤ d.completeNull() ⽅法設(shè)置任務(wù)執(zhí)⾏結(jié) 果為空。代碼如下:
- /** The encoding of the null value. */
- static final AltResult NIL = new AltResult(null);
- /** Completes with the null value, unless already completed. */
- final boolean completeNull() {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- NIL);
- }
可以看到,對于任務(wù)返回值為 null 的執(zhí)⾏結(jié)果,被封裝為 new AltResult(null) 對象。⽽且,還是 調(diào)⽤的 CAS 本地⽅法實現(xiàn)了原⼦操作。 為什么需要對 null 值進⾏單獨封裝呢?觀察 get() ⽅法的源碼:
- public T get() throws InterruptedException, ExecutionException {
- Object r;
- return reportGet((r = result) == null ? waitingGet(true) : r);
- }
原來原因是便于使⽤ null 值區(qū)分異步任務(wù)是否執(zhí)⾏完畢。 如果你對 CAS 不太了解的話,可以查閱 compareAndSwapObject ⽅法的四個參數(shù)的含義。該⽅法的參 數(shù) RESULT 是什么呢?查看代碼如下:
- RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
原來,RESULT 是獲取 CompletableFuture 對象中 result 字段的偏移地址。這個 result 字段⼜是啥 呢?就是任務(wù)執(zhí)⾏完畢后的結(jié)果值。代碼如下:
- // Either the result or boxed AltResult
- volatile Object result;
supplyAsync
CompletableFuture 執(zhí)⾏有返回值任務(wù)的是 supplyAsync() ⽅法。該⽅法的關(guān)鍵執(zhí)⾏代碼如下:
- static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
- Supplier<U> f) {
- if (f == null) throw new NullPointerException();
- CompletableFuture<U> d = new CompletableFuture<U>();
- e.execute(new AsyncSupply<U>(d, f));
- return d;
- }
與 AsyncRun 類對 Runnable 接⼝的封裝相同的是,AsyncSupply 類也是對 Runnable 接⼝的 run() ⽅ 法進⾏了⼀層封裝。代碼如下:
- public void run() {
- CompletableFuture<T> d; Supplier<T> f;
- if ((d = dep) != null && (f = fn) != null) {
- dep = null; fn = null;
- if (d.result == null) {
- try {
- d.completeValue(f.get());
- } catch (Throwable ex) {
- d.completeThrowable(ex);
- }
- }
- d.postComplete();
- }
- }
當異步任務(wù)執(zhí)⾏完畢后,返回結(jié)果會經(jīng) d.completeValue() ⽅法進⾏封裝。與 d.completeNull() ⽅ 法不同的是,該⽅法具有⼀個參數(shù)。代碼如下:
- /** Completes with a non-exceptional result, unless already completed. */
- final boolean completeValue(T t) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- (t == null) ? NIL : t);
- }
⽆論是類 AsyncRun 還是類 AsyncSupply ,run() ⽅法都會在執(zhí)⾏結(jié)束之際調(diào)⽤ CompletableFuture 對象的 postComplete() ⽅法。顧名思義,該⽅法將通知后續(xù)回調(diào)函數(shù)的執(zhí)⾏。
五、探究回調(diào)函數(shù)原理
前⾯我們提到了 CompletableFuture 具有連續(xù)回調(diào)的特性。舉個例⼦:
- @Test
- public void demo2() throws ExecutionException, InterruptedException {
- CompletableFuture<ArrayList> completableFuture =
- CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName());
- return new ArrayList();
- })
- .whenCompleteAsync((list, throwable) -> {
- System.out.println(Thread.currentThread().getName());
- list.add(1);
- })
- .whenCompleteAsync((list, throwable) -> {
- System.out.println(Thread.currentThread().getName());
- list.add(2);
- })
- .whenCompleteAsync((list, throwable) -> {
- System.out.println(Thread.currentThread().getName());
- list.add(3);
- });
- System.out.println(completableFuture.get());
- }
打印輸出:
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-1
- ForkJoinPool.commonPool-worker-1
- [1, 2, 3]
上⾯的測試⽅法中,通過 supplyAsync ⽅法提交異步任務(wù),當異步任務(wù)運⾏結(jié)束,對結(jié)果值添加三個回 調(diào)函數(shù)進⼀步處理。 觀察打印輸出,可以初步得出如下結(jié)論:
- 異步任務(wù)與回調(diào)函數(shù)均運⾏在同⼀個線程中。
- 回調(diào)函數(shù)的調(diào)⽤順序與添加回調(diào)函數(shù)的順序⼀致。
那么問題來了,CompletableFuture 內(nèi)部是如何處理連續(xù)回調(diào)函數(shù)的呢?
AsyncSupply
當我們提交異步任務(wù)時,等價于向線程池提交 AsyncSupply 對象或者 AsyncRun 對象。觀察這兩個類 的唯⼀構(gòu)造⽅法都是相同的,代碼如下:
- AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
- this.dep = dep; this.fn = fn;
- }
這就將 AsyncSupply 異步任務(wù)與返回給⽤戶的 CompletableFuture 對象進⾏綁定,⽤于在執(zhí)⾏結(jié)束后 回填結(jié)果到 CompletableFuture 對象,以及通知后續(xù)回調(diào)函數(shù)的運⾏。
Completion
回調(diào)函數(shù)均是 Completion 類的⼦類,抽取 Completion 類與⼦類的關(guān)鍵代碼:
- Completion next;
- CompletableFuture<V> dep;
- CompletableFuture<T> src;
- Function fn;
Completion 類含有 next 字段,很明顯是⼀個鏈表。 Completion 的⼦類含有兩個 CompletableFuture 類型的參數(shù),dep 是新建的、⽤于下⼀步的 CompletableFuture 對象,src 則是引⽤它的 CompletableFuture 對象。
當 Completion 執(zhí)⾏完回調(diào)⽅法后,⼀般會返回 dep 對象,⽤于迭代遍歷。
CompletableFuture
觀察源碼,CompletableFuture 主要包含下⾯兩個參數(shù):
- volatile Object result; //結(jié)果
- volatile Completion stack; //回調(diào)⽅法棧
Completion 類型封裝了回調(diào)⽅法,但為什么要起名為 stack (棧)呢? 因為 CompletableFuture 借助 Completion 的鏈表結(jié)構(gòu)實現(xiàn)了棧。每當調(diào)⽤ CompletableFuture 對 象的 whenCompleteAsync() 或其它回調(diào)⽅法時,都會新建⼀個 Completion 對象,并壓到棧頂。代碼 如下:
- final boolean tryPushStack(Completion c) {
- Completion h = stack;
- lazySetNext(c, h);
- return UNSAFE.compareAndSwapObject(this, STACK, h, c);
- }
postComplete
回顧上⾯兩種異步任務(wù)類的實現(xiàn),當異步任務(wù)執(zhí)⾏完畢之后,都會調(diào)⽤ postComplete() ⽅法通知回調(diào) ⽅法的執(zhí)⾏。代碼如下:
- final void postComplete() {
- CompletableFuture<?> f = this; Completion h;
- while ((h = f.stack) != null ||
- (f != this && (h = (f = this).stack) != null)) {
- CompletableFuture<?> d; Completion t;
- if (f.casStack(h, t = h.next)) {
- if (t != null) {
- if (f != this) {
- pushStack(h);
- continue;
- }
- h.next = null; // detach
- }
- f = (d = h.tryFire(NESTED)) == null ? this : d;
- }
- }
- }
這段代碼是本⽂的核⼼部分,⼤致邏輯如下:
當異步任務(wù)執(zhí)⾏結(jié)束后,CompletableFuture 會查看⾃身是否含有回調(diào)⽅法棧,如果含有,會通過 casStack() ⽅法拿出棧頂元素 h ,此時的棧頂是原來棧的第⼆位元素 t。如果 t 等于 null,那么直接 執(zhí)⾏回調(diào)⽅法 h,并返回下⼀個 CompletableFuture 對象。然后⼀直迭代這個過程。 簡化上述思路,我更想稱其為通過 Completion 對象實現(xiàn)橋接的 CompletableFuture 鏈表,流程圖如 下:
上⾯的過程是屬于正常情況下的,也就是⼀個 CompletableFuture 對象只提交⼀個回調(diào)⽅法的情況。 如果我們使⽤同⼀個 CompletableFuture 對象連續(xù)調(diào)⽤多次回調(diào)⽅法,那么就會形成 Completion 棧。
你以為 Completion 棧內(nèi)元素會依次調(diào)⽤,不會的。從代碼中來看,當回調(diào)⽅法 t 不等于 null,有兩種 情況:
情況 1:如果當前迭代到的 CompletableFuture 對象是 this (也就是 CompletableFuture 鏈表頭), 會令 h.next = null ,因為 h.next 也就是 t 通過 CAS 的⽅式壓到了 this 對象的 stack 棧頂。
情況 2:如果當前迭代到的 CompletableFuture 對象 f 不是 this (不是鏈表頭)的話,會將回調(diào)函數(shù) h 壓⼊ this (鏈表頭)的 stack 中。然后從鏈表頭再次迭代遍歷。這樣下去,對象 f 中的回調(diào)⽅法棧假設(shè) 為 3-2-1,從 f 的棧頂推出再壓⼊ this 的棧頂,順序就變?yōu)榱?1-2-3。這時候,情況就變成了第 1 種。
這樣,當回調(diào)⽅法 t = h.next 等于 null 或者 f 等于 this 時,都會對棧頂?shù)幕卣{(diào)⽅法進⾏調(diào)⽤。
簡單來說,就是將擁有多個回調(diào)⽅法的 CompletableFuture 對象的多余的回調(diào)⽅法移到到 this 對象的 棧內(nèi)。
回調(diào)⽅法執(zhí)⾏結(jié)束要么返回下⼀個 CompletableFuture 對象,要么返回 null 然后⼿動設(shè)置為 f = this, 再次從頭遍歷。
Async
回調(diào)函數(shù)的執(zhí)⾏其實分為兩種,區(qū)別在于帶不帶 Async 后綴。例如:
- @Test
- public void demo3() throws ExecutionException, InterruptedException {
- CompletableFuture<ArrayList> completableFuture =
- CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName());
- return new ArrayList();
- })
- .whenComplete((arrayList, throwable) -> {
- System.out.println(Thread.currentThread().getName());
- arrayList.add(1);
- }).whenCompleteAsync((arrayList, throwable) -> {
- System.out.println(Thread.currentThread().getName());
- arrayList.add(2);
- });
- System.out.println(completableFuture.get());
- }
打印輸出:
- ForkJoinPool.commonPool-worker-1
- main
- ForkJoinPool.commonPool-worker-1
- [1, 2]
whenComplete() 和 whenCompleteAsync() ⽅法的區(qū)別在于是否在⽴即執(zhí)⾏。源碼如下:
- private CompletableFuture<T> uniWhenCompleteStage(
- Executor e, BiConsumer<? super T, ? super Throwable> f) {
- if (f == null) throw new NullPointerException();
- CompletableFuture<T> d = new CompletableFuture<T>();
- if (e != null || !d.uniWhenComplete(this, f, null)) {
- UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
- push(c);
- c.tryFire(SYNC);
- }
- return d;
- }
兩個⽅法都是調(diào)⽤的 uniWhenCompleteStage() ,區(qū)別在于參數(shù) Executor e 是否為 null。從⽽控制是 否調(diào)⽤ d.uniWhenComplete() ⽅法,該⽅法會判斷 result 是否為 null,從⽽嘗試是否⽴即執(zhí)⾏該回調(diào) ⽅法。若是 supplyAsync() ⽅法提交的異步任務(wù)耗時相對⻓⼀些,那么就不建議使⽤ whenComplete() ⽅法了。此時由 whenComplete() 和 whenCompleteAsync() ⽅法提交的異步任務(wù)都會由線程池執(zhí)⾏。
本章小結(jié)
通過本章節(jié)的源碼分析,我們明白了 Completion 之所以將自身設(shè)置為鏈表結(jié)構(gòu),是因為 CompletableFuture 需要借助 Completion 的鏈表結(jié)構(gòu)實現(xiàn)棧。也明白了同一個 CompletableFuture 對象如果多次調(diào)用回調(diào)方法時執(zhí)行順序會與調(diào)用的順序不符合。換言之,一個 CompletableFuture 對象只調(diào)用一個回調(diào)方法才是 CompletableFuture 設(shè)計的初衷,我們在編程中也可以利用這一特性來保證回調(diào)方法的調(diào)用順序。
因篇幅有限,本文并沒有分析更多的 CompletableFuture 源碼,感興趣的小伙伴可以自行查看。
六、用法集錦
異常處理
方法:
- public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
示例:
- @Test
- public void index2() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2 / 0)
- .exceptionally((e) -> {
- System.out.println(e.getMessage());
- return 0;
- });
- System.out.println(completableFuture.get());
- }
輸出:
- java.lang.ArithmeticException: / by zero
- 0
任務(wù)完成后對結(jié)果的處理
方法:
- public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
示例:
- @Test
- public void index3() throws ExecutionException, InterruptedException {
- CompletableFuture<HashMap> completableFuture = CompletableFuture.supplyAsync(() -> new HashMap())
- .whenComplete((map, throwable) -> {
- map.put("key1", "value1");
- });
- System.out.println(completableFuture.get());
- }
輸出:
- {key=value}
方法:
- public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
- public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
- public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
示例:
- @Test
- public void index4() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2)
- .thenApply((r) -> r + 1);
- System.out.println(completableFuture.get());
- }
輸出:
- 3
方法:
- public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
示例:
- @Test
- public void index5() throws ExecutionException, InterruptedException {
- CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 2)
- .thenAccept(System.out::println);
- System.out.println(completableFuture.get());
- }
輸出:
- 2
- null
任務(wù)的組合(需等待上一個任務(wù)完成)
方法:
- public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
- public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
- public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
示例:
- @Test
- public void index6() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2)
- .thenCompose(integer -> CompletableFuture.supplyAsync(() -> integer + 1));
- System.out.println(completableFuture.get());
- }
輸出:
- 3
任務(wù)的組合(不需等待上一步完成)
- public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
- public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
- public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
示例:
- @Test
- public void index7() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2)
- .thenCombine(CompletableFuture.supplyAsync(() -> 1), (x, y) -> x + y);
- System.out.println(completableFuture.get());
- }
輸出:
- 3
消費最先執(zhí)行完畢的其中一個任務(wù),不返回結(jié)果
方法:
- public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
- public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
- public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
示例:
- @Test
- public void index8() throws ExecutionException, InterruptedException {
- CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 2;
- })
- .acceptEither(CompletableFuture.supplyAsync(() -> 1), System.out::println);
- System.out.println(completableFuture.get());
- }
輸出:
- 1
- null
消費最先執(zhí)行完畢的其中一個任務(wù),并返回結(jié)果
方法:
- public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
- public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
- public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
示例:
- @Test
- public void index9() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 2;
- })
- .applyToEither(CompletableFuture.supplyAsync(() -> 1), x -> x + 10);
- System.out.println(completableFuture.get());
- }
輸出:
- 11
等待所有任務(wù)完成
方法:
- public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
示例:
- @Test
- public void index10() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 1;
- });
- CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2);
- CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2);
- System.out.println("waiting all task finish..");
- System.out.println(completableFuture.get());
- System.out.println("all task finish");
- }
輸出:
- waiting all task finish..
- null
- all task finish
返回最先完成的任務(wù)結(jié)果
方法:
- public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例:
- @Test
- public void index11() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 1;
- });
- CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2);
- CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2);
- System.out.println(completableFuture.get());
- }
輸出:
- 2
作者簡介:
薛勤,公眾號“代碼藝術(shù)”的作者,就職于阿里巴巴,熱衷于探索計算機世界的底層原理,個人在 Github@Ystcode 上擁有多個開源項目。
【51CTO原創(chuàng)稿件,合作站點轉(zhuǎn)載請注明原文作者和出處為51CTO.com】