聊聊 JDK8 的 CompletableFuture ,你明白了嗎?
前段時間,阿粉已經(jīng)說過一次CompletableFuture了,但是還是有讀者說,感覺不是很清晰,有點亂的樣子,今天阿粉就再來說一下這個CompletableFuture的一些API的方法。
CompletableFuture
CompletableFuture是java.util.concurrent庫在java 8中新增的主要工具,同傳統(tǒng)的Future相比,其支持流式計算、函數(shù)式編程、完成通知、自定義異常處理等很多新的特性。
supplyAsync方法
通過該函數(shù)創(chuàng)建的CompletableFuture實例會異步執(zhí)行當(dāng)前傳入的計算任務(wù)。在調(diào)用端,則可以通過get或join獲取最終計算結(jié)果。
這個有兩個不同的實現(xiàn)方式,一種是我們傳入我們自己創(chuàng)建的線程池,然后使用我們創(chuàng)建的線程池進行操作,還有一種就是不傳線程池,讓程序是使用默認的線程池進行操作。
//使用默認線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//使用自定義線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
第一種只需傳入一個Supplier實例(一般使用lamda表達式),此時框架會默認使用ForkJoin的線程池來執(zhí)行被提交的任務(wù)。
我們來自定義一個代碼看一下:
ExecutorService executors = Executors.newFixedThreadPool(5);
CompletableFuture<List<Order>> bFuture = CompletableFuture.supplyAsync(() -> {
//執(zhí)行查詢
QueryWrapper<Order> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("userId",params.getUserId());
queryWrapper.eq("SHOP_ID",params.getShopId());
List<Order> list = orderService.list(queryWrapper);
return list;
}, executors);
當(dāng)我們執(zhí)行查詢的時候,這時候?qū)嶋H上就屬于異步的查詢的,我們可以寫多個查詢,比如,上面的代碼我們查詢的是訂單,下面我們可以查詢用戶的信息,還是使用同樣的線程池。
CompletableFuture<List<User>> aFuture = CompletableFuture.supplyAsync(() -> {
//執(zhí)行查詢
QueryWrapper<User> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("SHOP_ID",params.getShopId());
List<User> list = userService.list(queryWrapper);
return list;
}, executors);
這時候,我們就可以獲取一下這個返回的結(jié)果,那么獲取返回結(jié)果我們需要調(diào)用什么方法呢?
可以通過get或join獲取最終計算結(jié)果。這時候我們可以打印一下,獲取結(jié)果的長度。
List<User> list = aFuture.get();
List<Order> list = bFuture.get();
這樣,我們就可以使用異步查詢來完成我們對結(jié)果集的查詢。
這就有小伙伴想問,如果我想要用第一個的結(jié)果,去在第二個數(shù)據(jù)中去查詢,我們應(yīng)該怎么做呢?
thenApply
thenApply提交的任務(wù)類型需遵從Function簽名,也就是有入?yún)⒑头祷刂?,其中入?yún)榍爸萌蝿?wù)的結(jié)果。
什么意思呢?其實很簡單,我們這里直接看代碼:
CompletableFuture<List<Order>> cFuture = bFuture.thenApply((list) -> {
List<String> collect = list.stream().map(User::getUserId).collect(Collectors.toList());
...
return orderList;
});
這實際上,就是我們根據(jù)查詢出的所有用戶的集合,直接獲取到他的userId,然后我們根據(jù)UserId,把這些用戶下的訂單數(shù)據(jù)都提取出來,當(dāng)然,在實際使用中,我們理論上可以無限連接后續(xù)計算任務(wù),從而實現(xiàn)鏈條更長的流式計算。
但是這種鏈式也不是都非常的好用,畢竟要控制住線程池,大家記得在使用完成之后,可以把自己創(chuàng)建的線程池小回調(diào),調(diào)用shutDown方法就可以了。我們再接著往下說。
thenAccept
實際上thenAccept的效果,和thenApply 的效果等同,但是thenAccept提交的任務(wù)類型需遵從Consumer簽名,也就是有入?yún)⒌菦]有返回值,其中入?yún)榍爸萌蝿?wù)的結(jié)果。
實際上調(diào)用的是和之前一樣的,但是就是沒有返回值了。
CompletableFuture<void> cFuture = bFuture.thenAccept((list) -> {
List<String> collect = list.stream().map(User::getUserId).collect(Collectors.toList());
...
});
這就是區(qū)別,不需要進行返回了,這種一般用的地方比較特殊,就比如說我們執(zhí)行了很多操作,但是在其中需要夾雜一些比如說給某個服務(wù)進行通知,但是我們并不需要這個服務(wù)給我們返回值的結(jié)果的時候,實際上完全可以使用這種方式。方便還快捷。當(dāng)然,你想直接調(diào)用,阿粉也是沒有意見的。
whenComplete
whenComplete主要用于注入任務(wù)完成時的回調(diào)通知邏輯。這個解決了傳統(tǒng)future在任務(wù)完成時,無法主動發(fā)起通知的問題。前置任務(wù)會將計算結(jié)果或者拋出的異常作為入?yún)鬟f給回調(diào)通知函數(shù)。
這個方法就是相當(dāng)于是把前一個任務(wù)中的結(jié)果,通過第二個方法獲取結(jié)果,也并不會影響第二個的邏輯。
示例代碼如下:
CompletableFuture<List<Order>> listCompletableFuture = bFuture.whenComplete((r, e) -> {
if (e != null) {
System.out.println("compute failed!");
} else {
System.out.println("received result is " + r);
}
});
實際上最后答應(yīng)的就是接受者的信息。大家學(xué)會了么?