詳解 CompletableFuture 實踐
CompletableFuture繼承了CompletionStage接口和Future接口,在原有Future的基礎(chǔ)上增加了異步回調(diào)、流式處理以及任務(wù)組合,成為JDK8多任務(wù)協(xié)同場景下一個有效利器。所以筆者今天就以此文演示一下CompletableFuture基礎(chǔ)實踐案例。
CompletableFuture基本設(shè)計
因為本文著重講解CompletableFuture的使用,所以這里我們就簡單的從類的繼承關(guān)系了解一下CompletableFuture的基本理念,結(jié)合CompletableFuture源碼注釋的說法,它是一個針對異步流程化的工具類,即它支持某個異步Future任務(wù)完成之后按照指定編排的順序觸發(fā)下一個依賴動作:
A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion. When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.
舉個例子,例如我現(xiàn)在在瀏覽器上某個網(wǎng)站的商家的產(chǎn)品,我希望瀏覽器能夠做到以下幾點:
- 我針對性點選擇3個商家。
- 3個門店在我勾選點擊查看信息后,分別開始查詢各自的產(chǎn)品數(shù)據(jù)。
- 3家數(shù)據(jù)都完成數(shù)據(jù)加載后,網(wǎng)站歸并這些數(shù)據(jù),通過一個網(wǎng)頁渲染給我查看。
通過CompletableFuture,我們就可以完成通過CompletableFuture將上述商家的查詢?nèi)蝿?wù)進(jìn)行異步提交:
對此我們也可以從CompletableFuture的繼承關(guān)系了解其設(shè)計理念:
- 它繼承Future接口使之具備阻塞獲取異步回調(diào)的能力。
- 繼承CompletionStage接口,它永遠(yuǎn)thenApply等方法,通過這個繼承關(guān)系,使CompletableFuture具備異步任務(wù)順序編排的能力,即當(dāng)前異步任務(wù)一處理完成就執(zhí)行thenApply給定的異步邏輯,使得我們可以清晰明了的編排異步任務(wù):
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
//......
}
提交有返回值的異步任務(wù)
通過supplyAsync提交我們的異步任務(wù),然后通過get方法等待異步任務(wù)完成并獲取返回結(jié)果。
public static void main(String[] args) throws Exception {
//提交一個CompletableFuture任務(wù)
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
long start = System.currentTimeMillis();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work complete! cost:" +(System.currentTimeMillis() - start) + " ms");
return 1;
});
System.out.println("main thread working");
//通過get方法阻塞獲取任務(wù)執(zhí)行結(jié)果
System.out.println("supplyAsync result: " + task.get());
System.out.println("main thread finish");
}
輸出結(jié)果如下,可以看出CompletableFuture的get方法會阻塞主線程工作,直到得到返回值為止。
main thread working
work complete! cost:1001 ms
supplyAsync result: 1
main thread finish
對此我們不妨來看看get方法是如何做到阻塞主線程并等待異步線程任務(wù)執(zhí)行完成的。從下面這段源碼我們可以看到get方法的執(zhí)行步驟:
- 調(diào)用reportGet查看異步任務(wù)是否將結(jié)果賦值給result。
- 如果不為null直接返回。
- 若為null則調(diào)用waitingGet等待任務(wù)返回。
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
查看reportGet方法可以看到邏輯也很簡單,如果r為空則直接拋中斷異常,如果r存在異常則直接將異常拋出,如果有結(jié)果則將結(jié)果返回。
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {
//如果結(jié)果為null直接拋出終端異常
if (r == null) // by convention below, null means interrupted
throw new InterruptedException();
//如果結(jié)果有異常則將異常拋出
if (r instanceof AltResult) {
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
}
//如果r正常則直接將結(jié)果返回出去
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
waitingGet源碼相對復(fù)雜一些,整體步驟我們可以拆解為while循環(huán)內(nèi)部和while循環(huán)外部,我們先來看看while循環(huán)內(nèi)部的執(zhí)行流程:
- while循環(huán)從任務(wù)中獲取result,如果result為空,則進(jìn)入循環(huán)。
- 如果spins小于0,說明剛剛進(jìn)入循環(huán)內(nèi)部,可以自旋等待一下任務(wù)的獲取,設(shè)置好spins(spins的值從SPINS來,如果多核的情況下值為256),進(jìn)入下一次循環(huán)。
- 進(jìn)入循環(huán)發(fā)現(xiàn)spins大于0,則隨機(jī)生成一個數(shù),如果這個數(shù)大于等于0則--spins,進(jìn)入下次循環(huán)。
- 不斷執(zhí)行步驟3的操作,知道spins等于0。
- 此時判斷來到q==null,說明任務(wù)自旋等待了一段時間還是沒有結(jié)果,我們需要將其掛起,首先將線程封裝成一個Signaller,進(jìn)入下一次循環(huán)。
- 循環(huán)會判斷if (!queued),將要阻塞的任務(wù)放到棧中,進(jìn)入下一次循環(huán)。
- 循環(huán)下一次會來到if (q.thread != null && result == null),說明q線程不為空且沒有結(jié)果,我們需要將其打斷,調(diào)用ForkJoinPool.managedBlock(q)將其打斷,直至有結(jié)果后才結(jié)束循環(huán)。
while循環(huán)外操作就簡單了,來到循環(huán)尾部時,result已經(jīng)有值了,代碼執(zhí)行postComplete完成任務(wù),并將結(jié)果返回。
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
//如果result為空則進(jìn)入循環(huán)
while ((r = result) == null) {
//如果spins小于0,說明剛剛進(jìn)入循環(huán)內(nèi)部,可以自旋等待一下任務(wù)的獲取,設(shè)置好spins(spins的值從SPINS來,如果多核的情況下值為256),自此,第一次循環(huán)步驟結(jié)束
if (spins < 0)
spins = SPINS;
//這一步的操作是自旋等待任務(wù)結(jié)果,所以代碼進(jìn)入循環(huán)發(fā)現(xiàn)spins大于0,則隨機(jī)生成一個數(shù),如果這個數(shù)大于等于0則--spins,進(jìn)入下次循環(huán),直到循環(huán)spins變?yōu)?
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
//此時判斷來到q==null,說明任務(wù)自旋等待了一段時間還是沒有結(jié)果,我們需要將其掛起,首先將線程封裝成一個Signaller,結(jié)束本次循環(huán)
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
//上一步我們將任務(wù)封裝成Signaller,這里就將其存入棧中,然后結(jié)束循環(huán)
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
//循環(huán)來到這說明q線程不為空且沒有結(jié)果,我們需要將其打斷,調(diào)用`ForkJoinPool.managedBlock(q)`將其打斷,直至有結(jié)果后才結(jié)束循環(huán)
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
//結(jié)束循環(huán),調(diào)用postComplete結(jié)束任務(wù)并返回結(jié)果r
postComplete();
return r;
}
提交無返回值的異步任務(wù)
通過runAsync提交一個無返回值的異步任務(wù),這里我們?yōu)榱藢崿F(xiàn)任務(wù)執(zhí)行完成再關(guān)閉主線程用了個get阻塞等待任務(wù)完成。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
long start = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + "開始工作了,執(zhí)行時間:" + start);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "結(jié)束工作了,總執(zhí)行時間:" + (System.currentTimeMillis() - start));
});
System.out.println("主線程開始運行");
//get阻塞主線程等待任務(wù)結(jié)束
supplyAsync.get();
System.out.println("主線程運行結(jié)束");
}
輸出結(jié)果:
主線程開始運行
ForkJoinPool.commonPool-worker-1開始工作了,執(zhí)行時間:1651251489755
ForkJoinPool.commonPool-worker-1結(jié)束工作了,總執(zhí)行時間:1010
主線程運行結(jié)束
將異步任務(wù)提交給自己的線程池處理
查看supplyAsync方法的源碼我們發(fā)現(xiàn),我們提交的任務(wù)默認(rèn)情況下會交給asyncPool這個線程池處理。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
查看asyncPool 我們可以看到如果服務(wù)器是多核的情況下返回的是一個commonPool,commonPool默認(rèn)線程池數(shù)為CPU核心數(shù)。
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
所以如果某些情況下我們希望將任務(wù)提交到我們自己的線程池中,就建議通過supplyAsync的第二個參數(shù)告知CompletableFuture自己要用自定義線程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
//使用第二個參數(shù)告知CompletableFuture使用的線程池
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
long start = System.currentTimeMillis();
System.out.println(Thread.currentThread() + "開始工作了,執(zhí)行時間:" + start);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印當(dāng)前執(zhí)行任務(wù)的線程
System.out.println(Thread.currentThread() + "結(jié)束工作了,總執(zhí)行時間:" + (System.currentTimeMillis() - start));
return 1;
}, executorService);
System.out.println("主線程開始運行");
System.out.println("輸出結(jié)果 " + supplyAsync.get());
System.out.println("主線程運行結(jié)束");
executorService.shutdown();
while (executorService.isTerminated()) {
}
}
從輸出結(jié)果也可以看出這里使用的線程池是我們自定義的線程池:
主線程開始運行
Thread[pool-1-thread-1,5,main]開始工作了,執(zhí)行時間:1651251851358
Thread[pool-1-thread-1,5,main]結(jié)束工作了,總執(zhí)行時間:2005
輸出結(jié)果 1
主線程運行結(jié)束
thenApply和thenApplyAsync
thenApply 適用那些需要順序執(zhí)行的異步任務(wù),例如我們希望將第一個任務(wù)的返回值交給第二個異步任務(wù),就可以使用thenApply將兩個任務(wù)組合起來。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + "開始工作了");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "結(jié)束工作了");
return 100;
}, executorService);
//將兩個任務(wù)組合起來
CompletableFuture<String> task2 = task1.thenApply((data) -> {
System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "第一個線程的結(jié)果為 " + data;
});
System.out.println("獲取組合任務(wù)結(jié)果");
System.out.println("組合任務(wù)處理結(jié)果為: " + task2.get());
System.out.println("獲取組合任務(wù)結(jié)果結(jié)束");
executorService.shutdown();
while (executorService.isTerminated()) {
}
}
輸出結(jié)果可以看到,任務(wù)1執(zhí)行完成后任務(wù)2接著執(zhí)行了。
Thread[pool-1-thread-1,5,main]開始工作了
獲取組合任務(wù)結(jié)果
Thread[pool-1-thread-1,5,main]結(jié)束工作了
第二個線程:Thread[pool-1-thread-1,5,main]開始工作了
組合任務(wù)處理結(jié)果為: 第一個線程的結(jié)果為 100
獲取組合任務(wù)結(jié)果結(jié)束
thenApplyAsync與thenApply不同的是,在第一個異步任務(wù)有指定線程池的情況下,第二個異步任務(wù)會被提交到其他線程池中,所以這里我們可以說明一個規(guī)律,帶有Async關(guān)鍵字的方法支持組合任務(wù)時,將任務(wù)提交到不同的線程池中。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread()+"開始工作了");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread()+"結(jié)束工作了");
return 100;
},executorService);
CompletableFuture<String> task2 = task1.thenApplyAsync((data) -> {
System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "第一個線程的結(jié)果為 " + data;
});
System.out.println("獲取任務(wù)結(jié)果開始");
System.out.println("任務(wù)的結(jié)果 "+task2.get());
System.out.println("獲取任務(wù)結(jié)果結(jié)束");
executorService.shutdown();
while (executorService.isTerminated()){
}
}
輸出結(jié)果:
Thread[pool-1-thread-1,5,main]開始工作了
獲取任務(wù)結(jié)果開始
Thread[pool-1-thread-1,5,main]結(jié)束工作了
第二個線程:Thread[ForkJoinPool.commonPool-worker-9,5,main]開始工作了
任務(wù)的結(jié)果 第一個線程的結(jié)果為 100
獲取任務(wù)結(jié)果結(jié)束
thenAccept和thenRun
thenAccept和thenRun都會在上一個任務(wù)執(zhí)行結(jié)束后才會繼續(xù)執(zhí)行。兩者唯一區(qū)別時:
- thenAccept在上一個任務(wù)執(zhí)行結(jié)束后,將上一個任務(wù)返回結(jié)果作為入?yún)?,但無返回值。
- thenRun會在上一個任務(wù)執(zhí)行結(jié)束后才開始處理,既沒有入?yún)⒁矝]有返回值。
以下便是筆者的使用示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("task線程:" + Thread.currentThread().getName() + "開始工作了");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task線程:" + Thread.currentThread().getName() + "結(jié)束工作了");
return 200;
}, executorService);
CompletableFuture<Integer> task2 = task.thenApply((data) -> {
System.out.println("task2線程:" + Thread.currentThread().getName() + "開始工作了");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task2線程:" + Thread.currentThread().getName() + "執(zhí)行結(jié)束");
return data;
});
//thenAccept 收上一個任務(wù)的入?yún)?,但無返回值
CompletableFuture<Void> task3 = task2.thenAccept((data) -> {
System.out.println("task3線程:" + Thread.currentThread().getName() + ",該任務(wù)接收上一個任務(wù)的結(jié)果,但無返回值,收到上一個任務(wù)的結(jié)果值為 " + data);
});
//thenRun在上一個任務(wù)結(jié)束后執(zhí)行,既無入?yún)⒁矡o出參
CompletableFuture<Void> task4 = task3.thenRun(() -> {
System.out.println("task4在上一個任務(wù)結(jié)束后繼續(xù)執(zhí)行,無入?yún)?也無返回值");
});
System.out.println("嘗試獲取最終執(zhí)行結(jié)果");
task4.get();
System.out.println("執(zhí)行任務(wù)直至task4 ");
System.out.println("任務(wù)全部執(zhí)行結(jié)束");
executorService.shutdown();
while (executorService.isTerminated()) {
}
}
輸出結(jié)果:
task線程:pool-1-thread-1開始工作了
嘗試獲取最終執(zhí)行結(jié)果
task線程:pool-1-thread-1結(jié)束工作了
task2線程:pool-1-thread-1開始工作了
task2線程:pool-1-thread-1執(zhí)行結(jié)束
task3線程:pool-1-thread-1,該任務(wù)接收上一個任務(wù)的結(jié)果,但無返回值,收到上一個任務(wù)的結(jié)果值為 200
task4在上一個任務(wù)結(jié)束后繼續(xù)執(zhí)行,無入?yún)?也無返回值
執(zhí)行任務(wù)直至task4
任務(wù)全部執(zhí)行結(jié)束
exceptionally
假如我們的任務(wù)1執(zhí)行過程中可能報錯,我們希望能夠從邏輯的角度處理掉,那么我們就可以在任務(wù)1后面接一個exceptionally方法,然后再接上任務(wù)2。這樣一來,任務(wù)1執(zhí)行報錯就會走到exceptionally,反之就會走到任務(wù)2的代碼段:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task1 開始工作了");
//隨機(jī)生成被除數(shù),為0會拋出算術(shù)異常
int num = RandomUtil.randomInt(0, 2);
int result = 10 / num;
System.out.println("task1 結(jié)束工作");
return 200;
});
//假如task1報錯,任務(wù)會走到這個任務(wù)上
CompletableFuture<Integer> exceptionally = task1.exceptionally((e) -> {
System.out.println("上一個任務(wù)報錯了,錯誤信息" + e.getMessage());
return -1;
});
CompletableFuture task2 = task1.thenAccept((param) -> {
System.out.println("走到正常的結(jié)束分支了,task1執(zhí)行結(jié)果:" + param);
});
System.out.println("主線程開始運行");
// 調(diào)用錯誤捕獲的任務(wù)執(zhí)行結(jié)束也會自動走到正常結(jié)束的分支
System.out.println("輸出結(jié)果 " + exceptionally.get());
System.out.println("主線程運行結(jié)束");
}
執(zhí)行正常的輸出結(jié)果:
task1 開始工作了
主線程開始運行
task1 結(jié)束工作
走到正常的結(jié)束分支了:200
輸出結(jié)果 200
主線程運行結(jié)束
執(zhí)行異常的輸出結(jié)果:
task1 開始工作了
主線程開始運行
上一個任務(wù)報錯了,錯誤信息java.lang.ArithmeticException: / by zero
輸出結(jié)果 -1
主線程運行結(jié)束
whenComplete
對于上面的例子,我們完全可以用whenComplete來簡化,whenComplete會接收兩個入?yún)?
- 入?yún)?為上一個任務(wù)的返回值。
- 入?yún)?比較特殊,如果上一個任務(wù)拋出異常,則第2個入?yún)⒉粸榭铡?/li>
所以上一個例子的代碼我們可以簡化成這樣,需要注意的是whenComplete返回結(jié)果是上一個任務(wù)的執(zhí)行結(jié)果,我們無法返回任務(wù)2的執(zhí)行結(jié)果。
public static void main(String[] args) {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)1開始工作");
int num = RandomUtil.randomInt(0, 2);
int result = 10 / num;
System.out.println("任務(wù)1執(zhí)行結(jié)束,執(zhí)行結(jié)果:" + result);
return result;
});
CompletableFuture<Integer> task2 = task.whenComplete((result, err) -> {
System.out.println("任務(wù)2開始工作");
if (err != null) {
System.out.println("任務(wù)1執(zhí)行報錯,報錯原因:" + err.getMessage());
return;
}
System.out.println("任務(wù)1正常結(jié)束,執(zhí)行結(jié)果:" + result);
});
try {
System.out.println("task2拿到最終執(zhí)行結(jié)果 " + task2.get());
} catch (Exception e) {
}
System.out.println("全流程結(jié)束");
}
錯誤的輸出結(jié)果:
任務(wù)1開始工作
任務(wù)2開始工作
任務(wù)1執(zhí)行報錯,報錯原因:java.lang.ArithmeticException: / by zero
全流程結(jié)束
正確執(zhí)行的輸出結(jié)果:
任務(wù)1開始工作
任務(wù)1執(zhí)行結(jié)束,執(zhí)行結(jié)果:10
任務(wù)2開始工作
任務(wù)1正常結(jié)束,執(zhí)行結(jié)果:10
task2拿到最終執(zhí)行結(jié)果 10
全流程結(jié)束
handle
handle使用和whenComplete差不多,唯一的區(qū)別就是whenComplete返回的是上一個任務(wù)的結(jié)果,而handle可以返回自己的結(jié)果。
代碼如下所示:
public static void execute1() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + "開始工作了");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Random random = new java.util.Random();
int num = random.nextInt(10);
if (num < 5) {
throw new RuntimeException("報錯了 num:" + num);
}
System.out.println(Thread.currentThread() + "結(jié)束工作了");
return num;
});
CompletableFuture<String> future2 = future.handle((result, err) -> {
System.out.println("第二個線程:" + Thread.currentThread() + "開始工作了");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (err != null) {
System.out.println(err.getMessage());
;return "fail";
}
return "sucdess";
});
System.out.println("拿第1個任務(wù)的結(jié)果");
System.out.println("第1個任務(wù)的結(jié)果 " + future2.get());
System.out.println("第1個任務(wù)結(jié)果結(jié)束");
/**
* 輸出結(jié)果
* Thread[pool-1-thread-1,5,main]開始工作了
* 拿第一個任務(wù)的結(jié)果
* Thread[pool-1-thread-1,5,main]結(jié)束工作了
* 第二個線程:Thread[pool-1-thread-1,5,main]開始工作了
* 100
* 第一個任務(wù)結(jié)果結(jié)束
* 拿第2個任務(wù)的結(jié)果
* 第二個任務(wù)的結(jié)果 第一個線程的結(jié)果為 100
* 第2個任務(wù)結(jié)果結(jié)束
*/
}
thenCombine / thenAcceptBoth / runAfterBoth
這幾個方法都是將兩個任務(wù)組合起來執(zhí)行的,只有兩個任務(wù)都順利完成了,才會執(zhí)行之后的方法,唯一的區(qū)別是:
(1) thenCombine 接收兩個任務(wù)的返回值,并返回自己的返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task開始工作");
int num = RandomUtil.randomInt(0, 100);
System.out.println("task結(jié)束工作");
return num;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task2開始工作");
int num = RandomUtil.randomInt(0, 100);
System.out.println("task2結(jié)束工作");
return num;
});
//通過thenCombine將兩個任務(wù)組合起來
CompletableFuture<Integer> completableFuture = task1.thenCombine(task2, (result1, result2) -> {
System.out.println("task1返回結(jié)果:" + result1 + " task2返回結(jié)果:" + result2);
return result1 + result2;
});
System.out.println(completableFuture.get());
}
輸出結(jié)果如下:
task開始工作
task2開始工作
task結(jié)束工作
task2結(jié)束工作
task1返回結(jié)果:30 task2返回結(jié)果:1
31
(2) thenAcceptBoth 接收兩個參數(shù)返回值,但沒有返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task開始工作");
int num = RandomUtil.randomInt(0, 100);
System.out.println("task結(jié)束工作");
return num;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task2開始工作");
int num = RandomUtil.randomInt(0, 100);
System.out.println("task2結(jié)束工作");
return num;
});
//通過 thenAcceptBoth 將兩個任務(wù)組合起來,獲取前兩個任務(wù)處理結(jié)果,但自己不返回結(jié)果
CompletableFuture<Void> completableFuture = task1.thenAcceptBoth(task2, (result1, result2) -> {
System.out.println("task1返回結(jié)果:" + result1 + " task2返回結(jié)果:" + result2);
});
completableFuture.get();
}
輸出結(jié)果:
task開始工作
task2開始工作
task結(jié)束工作
task2結(jié)束工作
task1返回結(jié)果:66 task2返回結(jié)果:10
(3) runAfterBoth 既不能接收入?yún)?,也無返回值,待前兩個任務(wù)執(zhí)行完成后才能執(zhí)行。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("task開始工作");
int num = RandomUtil.randomInt(0, 100);
System.out.println("task結(jié)束工作");
return num;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task2開始工作");
int num = RandomUtil.randomInt(0, 100);
System.out.println("task2結(jié)束工作");
return num;
});
//通過 runAfterBoth 將兩個任務(wù)組合起來,待前兩個組合任務(wù)完成后執(zhí)行,無入?yún)ⅰo出參
CompletableFuture<Void> completableFuture = task1.runAfterBoth(task2,()-> {
System.out.println("task1、task2處理完成" );
});
completableFuture.get();
}
輸出結(jié)果:
task開始工作
task2開始工作
task結(jié)束工作
task2結(jié)束工作
task1、task2處理完成
applyToEither / acceptEither / runAfterEither
這種組合模式只要有一個異步任務(wù)成功,就會觸發(fā)后續(xù)的方法,比如我們組合任務(wù)1和任務(wù)2,如果任務(wù)1執(zhí)行完成就直接執(zhí)行任務(wù)3,無視任務(wù)2。反之任務(wù)2先完成直接執(zhí)行任務(wù)3,無視任務(wù)1。
和上一個組合模式一樣,依次規(guī)律也是:
(1) 接收入?yún)?,含返回值?/p>
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<String> completableFuture = task.applyToEither(task2, (result) -> {
if (result == 1) {
System.out.println("task1先完成任務(wù)");
return "task1";
}
System.out.println("task2先完成任務(wù)");
return "task2";
});
System.out.println("最先完成任務(wù)的是:" + completableFuture.get());
}
(2) 接收入?yún)?,無返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Void> completableFuture = task.acceptEither(task2, (result) -> {
System.out.println("result:" + result);
if (result == 1) {
System.out.println("task1先完成任務(wù)");
return;
}
System.out.println("task2先完成任務(wù)");
});
completableFuture.get();
}
(3) 無入?yún)?,無返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println("task1開始工作");
try {
TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task1結(jié)束工作");
return 1;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync( () -> {
System.out.println("task2 開始工作");
try {
TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task2 結(jié)束工作");
return 2;
});
CompletableFuture<Void> completableFuture = task.runAfterEither(task2, () -> {
System.out.println("有一個任務(wù)完成了");
});
completableFuture.get();
}
輸出結(jié)果:
task1開始工作
task2 開始工作
task1結(jié)束工作
有一個任務(wù)完成了
thenCompose
thenCompose方法會在某個任務(wù)執(zhí)行完成后,將該任務(wù)的執(zhí)行結(jié)果作為方法入?yún)⑷缓髨?zhí)行指定的方法,該方法會返回一個新的CompletableFuture實例,例如我們希望任務(wù)1執(zhí)行完成后執(zhí)行任務(wù)2,任務(wù)2執(zhí)行完成后返回執(zhí)行任務(wù)3,最終結(jié)果是從任務(wù)3中獲取。
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 創(chuàng)建異步執(zhí)行任務(wù):
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1開始工作");
int num=RandomUtil.randomInt(0,5);
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task1結(jié)束工作,處理結(jié)果:"+num);
return num;
});
CompletableFuture<String> task2= task1.thenCompose((r)->{
System.out.println("task2 開始工作");
int num=RandomUtil.randomInt(0,5);
try {
TimeUnit.SECONDS.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task2 結(jié)束工作");
return CompletableFuture.supplyAsync(()->{
System.out.println("task3 開始工作,收到任務(wù)1的執(zhí)行結(jié)果:"+r);
return "task3 finished";
});
});
System.out.println("執(zhí)行結(jié)果->"+task2.get());
}
輸出結(jié)果:
task1開始工作
task1結(jié)束工作,處理結(jié)果:1
task2 開始工作
task2 結(jié)束工作
task3 開始工作,收到任務(wù)1的執(zhí)行結(jié)果:1
執(zhí)行結(jié)果->task3 finished
allOf / anyOf
allOf返回的CompletableFuture是所有任務(wù)都執(zhí)行完成后才會執(zhí)行,只要有一個任務(wù)執(zhí)行異常,則返回的CompletableFuture執(zhí)行g(shù)et方法時會拋出異常。
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模擬異步任務(wù)1
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模擬異步任務(wù)2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "World";
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
allFutures.thenRun(() -> {
// 所有異步任務(wù)完成后打印它們的結(jié)果
String result1 = future1.join();
String result2 = future2.join();
System.out.println(result1 + " " + result2);
});
// 等待所有異步任務(wù)完成
allFutures.join();
}
輸出結(jié)果:
Hello World
而anyOf則是只要有一個任務(wù)完成就可以觸發(fā)后續(xù)方法,并且可以返回先完成任務(wù)的返回值,這一點和上述applyToEither 例子差不多。
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模擬異步任務(wù)1
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模擬異步任務(wù)2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "World";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
anyFuture.thenAccept(result -> {
// 任何一個異步任務(wù)完成后打印它的結(jié)果
System.out.println(result);
});
// 等待任何一個異步任務(wù)完成
anyFuture.join();
}
}