強(qiáng)大的異步任務(wù)處理類CompletableFuture使用詳解
環(huán)境:Java8
Future基本應(yīng)用
Future是從JDK1.5開(kāi)始有的,目的是獲取異步任務(wù)執(zhí)行的結(jié)果,通常情況會(huì)結(jié)合ExecutorService及Callable一起使用。
1. Future結(jié)合Callable使用
單任務(wù)執(zhí)行
private static class Task implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(3) ;
return "success";
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
Future<String> future = executor.submit(new Task()) ;
String result = future.get() ;
System.out.println("執(zhí)行結(jié)果:" + result) ;
}
當(dāng)執(zhí)行到future.get()方法的時(shí)候會(huì)阻塞,等待3s后繼續(xù)執(zhí)行。
多個(gè)任務(wù)同時(shí)執(zhí)行
private static class Task implements Callable<String> {
private int sleep ;
public Task(int sleep) {
this.sleep = sleep ;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(this.sleep) ;
return "success";
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
Future<String> future1 = executor.submit(new Task(3)) ;
Future<String> future2 = executor.submit(new Task(2)) ;
Future<String> future3 = executor.submit(new Task(1)) ;
String result1 = future1.get() ;
String result2 = future2.get() ;
String result3 = future3.get() ;
System.out.println("result1:" + result1 + "\t" + "result2:" + result2 + "\t" + "result3:" + result3) ;
}
以上代碼執(zhí)行的3個(gè)任務(wù)分別用時(shí)3,2,1s。future1用時(shí)最長(zhǎng)。
從運(yùn)行的結(jié)果看到即便future2, future3執(zhí)行時(shí)間短也必須等待future1執(zhí)行完后才會(huì)繼續(xù),雖然你可以倒過(guò)來(lái)獲取結(jié)果,但是在實(shí)際項(xiàng)目中的應(yīng)用你應(yīng)該是不能確認(rèn)每個(gè)任務(wù)執(zhí)行需要多長(zhǎng)時(shí)間,誰(shuí)先執(zhí)行完就先獲取誰(shuí)。
雖然這種同步阻塞的方式在有些場(chǎng)景下還是很有必要的。但由于它的同步阻塞導(dǎo)致了當(dāng)前線程不能干其它的事必須一致等待。
CompletionService解決Future的缺點(diǎn)
CompletionService是一邊生產(chǎn)新的任務(wù),一邊處理已經(jīng)完成的任務(wù)。簡(jiǎn)單地說(shuō)就是CompletionService不管任務(wù)執(zhí)行先后順序,誰(shuí)先執(zhí)行完就處理誰(shuí)。
private static class Task implements Callable<String> {
private int time;
private String name ;
public Task(int time, String name) {
this.time = time ;
this.name = name ;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(this.time) ;
return name ;
}
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
cs.submit(new Task(3, "name" + 3)) ;
cs.submit(new Task(1, "name" + 1)) ;
cs.submit(new Task(2, "name" + 2)) ;
for (int i = 0; i < 3; i++) {
System.out.println(cs.take().get()) ;
}
}
通過(guò)執(zhí)行結(jié)果發(fā)現(xiàn),任務(wù)的結(jié)果獲取是以誰(shuí)先執(zhí)行完處理誰(shuí)與任務(wù)的執(zhí)行先后沒(méi)有關(guān)系。
2. CompletableFuture異步編程
CompletableFuture通過(guò)如下4個(gè)靜態(tài)方法來(lái)執(zhí)行異步任務(wù)
圖片
2.1 簡(jiǎn)單異步任務(wù)鏈?zhǔn)秸{(diào)用執(zhí)行
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3) ;
System.out.println(Thread.currentThread().getName() + ", 1 任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor).thenRun(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 2 任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
}) ;
System.out.println("主線程:" + Thread.currentThread().getName()) ;
executor.shutdown() ;
執(zhí)行結(jié)果:
圖片
2.2 獲取上一步任務(wù)執(zhí)行結(jié)果及任務(wù)完成處理
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3) ;
System.out.println(Thread.currentThread().getName() + ", 1 任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return "1" ;
}, executor).thenApply(res -> {
System.out.println("獲取到上一步任務(wù)執(zhí)行結(jié)果:" + res) ;
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 2 任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return "2" ;
}).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res) ;
if (tx != null) {
System.err.println("發(fā)生錯(cuò)誤了:" + tx.getMessage()) ;
}
executor.shutdown();
}) ;
System.out.println("主線程:" + Thread.currentThread().getName()) ;
執(zhí)行結(jié)果:
圖片
這里如果任務(wù)執(zhí)行的時(shí)候發(fā)生了異常那么在whenComplete方法中的res 會(huì)為空,tx為發(fā)生異常的對(duì)象。沒(méi)有異常時(shí)res有執(zhí)行的機(jī)構(gòu),tx異常對(duì)象為空。
2.3 異步任務(wù)異常處理
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3) ;
System.out.println(Thread.currentThread().getName() + ", 1 任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return "1" ;
}, executor).thenApply(res -> {
System.out.println("獲取到上一步任務(wù)執(zhí)行結(jié)果:" + res) ;
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 2 任務(wù)執(zhí)行完成") ;
System.out.println(1 / 0) ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return "2" ;
}).exceptionally(tx -> {
System.out.println(Thread.currentThread().getName() + ", 任務(wù)執(zhí)行發(fā)生了異常") ;
return "error" ;
}).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res) ;
if (tx != null) {
System.err.println("發(fā)生錯(cuò)誤了:" + tx.getMessage()) ;
}
executor.shutdown();
}) ;
System.out.println("主線程:" + Thread.currentThread().getName()) ;
這里我們?nèi)藶榈闹圃飚惓?1 / 0 。
執(zhí)行結(jié)果:
圖片
根據(jù)執(zhí)行結(jié)果當(dāng)發(fā)生異常時(shí)進(jìn)入exceptionally方法,最終進(jìn)入whenComplete方法此時(shí) tx異常對(duì)象是發(fā)生異常的異常對(duì)象。
2.4 所有任務(wù)完成才算完成任務(wù)
CompletableFuture.allOf
CompletableFuture<Double> calc1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", calc1任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10D ;
}, executor) ;
CompletableFuture<Double> calc2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5) ;
System.out.println(Thread.currentThread().getName() + ", calc2任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20D ;
}, executor) ;
// 當(dāng)任何一個(gè)任務(wù)發(fā)生異常,這里的tx都不會(huì)為null
CompletableFuture.allOf(calc1, calc2).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res + ", " + tx) ;
try {
System.out.println(calc1.get()) ;
System.out.println(calc2.get()) ;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}) ;
執(zhí)行結(jié)果:
圖片
在這里whenComplete中的res是沒(méi)有結(jié)果的,要獲取數(shù)據(jù)我們的分別調(diào)用get方法獲取。
2.5 handle方法對(duì)結(jié)果處理
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 1 任務(wù)執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return "0" ;
}, executor).handle((res, tx) -> {
// 處理結(jié)果數(shù)據(jù)
return res + "1" ;
}).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res) ;
if (tx != null) {
System.err.println("發(fā)生錯(cuò)誤了:" + tx.getMessage()) ;
}
executor.shutdown();
}) ;
執(zhí)行結(jié)果:
正確
圖片
發(fā)生異常時(shí):
當(dāng)發(fā)生異常時(shí)handle方法中的res是沒(méi)有值的,tx異常對(duì)象為發(fā)生異常的異常對(duì)象。
2.6 合并異步任務(wù)
將兩個(gè)異步任務(wù)完成后合并處理
CompletableFuture.thenCombine
CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 任務(wù)1執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10d ;
}, executor) ;
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 任務(wù)2執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20d ;
}, executor) ;
task1.thenCombine(task2, (t1, t2) -> {
System.out.println(Thread.currentThread().getName() + ", 合并任務(wù)完成") ;
return t1 + "," + t2 ;
}).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res) ;
if (tx != null) {
System.err.println("發(fā)生錯(cuò)誤了:" + tx.getMessage()) ;
}
executor.shutdown();
}) ;
執(zhí)行結(jié)果:
圖片
2.7 異步任務(wù)誰(shuí)快誰(shuí)就進(jìn)入下一步的執(zhí)行
CompletableFuture.applyToEither
兩個(gè)異步任務(wù)誰(shuí)先執(zhí)行完誰(shuí)就繼續(xù)執(zhí)行后續(xù)的操作。
CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 任務(wù)1執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10d ;
}, executor) ;
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 任務(wù)2執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20d ;
}, executor) ;
task1.applyToEither(task2, res -> {
return res ;
}).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res) ;
if (tx != null) {
System.err.println("發(fā)生錯(cuò)誤了:" + tx.getMessage()) ;
}
executor.shutdown();
}) ;
執(zhí)行結(jié)果:
圖片
2.8 兩個(gè)異步任務(wù)都執(zhí)行完了才繼續(xù)執(zhí)行
只有兩個(gè)任務(wù)都執(zhí)行完成了后才會(huì)繼續(xù)。
CompletableFuture.runAfterBoth
CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 任務(wù)1執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10d ;
}, executor) ;
CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2) ;
System.out.println(Thread.currentThread().getName() + ", 任務(wù)2執(zhí)行完成") ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20d ;
}, executor) ;
task1.runAfterBoth(task2, () -> {
System.out.println("任務(wù)都執(zhí)行完成了...") ;
}).whenComplete((res, tx) -> {
System.out.println("獲取到結(jié)果:" + res) ;
if (tx != null) {
System.err.println("發(fā)生錯(cuò)誤了:" + tx.getMessage()) ;
}
executor.shutdown();
}) ;
執(zhí)行結(jié)果:
圖片
2.9 任意一個(gè)任務(wù)執(zhí)行完成就算完成
CompletableFuture.anyOf
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(1000) ;
System.out.println("我是任務(wù)1") ;
return "Task1" ;
}, executor) ;
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(3000) ;
System.out.println("我是任務(wù)2") ;
System.out.println(1 / 0) ;
return "Task2" ;
}, executor) ;
// 任意一個(gè)任務(wù)執(zhí)行完成就算完成
// 當(dāng)任務(wù)執(zhí)行發(fā)生異常后,th才不會(huì)為null
CompletableFuture.anyOf(task1, task2).whenCompleteAsync((v, th) -> {
System.out.println("v = " + v) ;
System.out.println("th = " + th) ;
}, executor) ;
執(zhí)行結(jié)果:
圖片
2.10 接收上一個(gè)任務(wù)的執(zhí)行結(jié)果
CompletableFuture.supplyAsync(() -> {
sleep(2000) ;
System.out.println("第一個(gè)任務(wù)執(zhí)行完成...") ;
// System.out.println(1 / 0) ;
return new Random().nextInt(10000) ;
}, executor).thenAcceptAsync(res -> { // 接收上一個(gè)任務(wù)的執(zhí)行結(jié)果
System.out.println("任務(wù)執(zhí)行結(jié)果:" + res) ;
}, executor) ;
執(zhí)行結(jié)果:
圖片
以上是本篇文章的全部?jī)?nèi)容,希望對(duì)你有幫助。