面試官問我知不知道異步編程的Future
荒腔走板
大家好,我是 why,歡迎來到我連續(xù)周更優(yōu)質(zhì)原創(chuàng)文章的第 60 篇。
前兩周寫的我眼中的成都那篇文章還在持續(xù)發(fā)酵,全網(wǎng)閱讀初步估計(jì)應(yīng)該是過 100w 了。誠(chéng)惶誠(chéng)恐,但是需要說明一下的是其實(shí)我是一個(gè)技術(shù)博主來的,偶爾荒腔走板的寫點(diǎn)生活相關(guān)的,所以這篇還是回到技術(shù)上。
老規(guī)矩,先來一個(gè)簡(jiǎn)短的荒腔走板,給冰冷的技術(shù)文注入一絲色彩。
上面這圖是我五年前,在學(xué)校宿舍拍的。
前幾天由于有點(diǎn)事情,打開了多年沒有打開的 QQ。然后突然推送了一個(gè)“那年今日”發(fā)送的動(dòng)態(tài)。
這張圖片就是那個(gè)動(dòng)態(tài)里面的。
2015 年 8 月的時(shí)候正是大三放暑假的時(shí)間,但是那個(gè)暑假我找了一個(gè)實(shí)習(xí),所以暑假期間住在學(xué)校里面。宿舍就我一個(gè)人。那個(gè)時(shí)候我完全沒有意識(shí)到,這是我程序猿生涯的一個(gè)真正的開端,也是我學(xué)生時(shí)代提前結(jié)束的宣告。
8 月 5 日凌晨,一只小貓突然躥到了宿舍里面,在宿舍里面旁若無人的,像宿管阿姨一樣審查著一切東西。甚至直接跳到桌子上,看著我敲代碼。完全不怕我的樣子。
于是我把它放到了我的自行車上,當(dāng)模特拍了幾張照片。
初見這只小貓時(shí)的那種驚喜我還記憶猶新,但是這波回憶殺給我的更大的沖擊是:原來,這件事已經(jīng)過去五年了。
如果沒有 QQ 的這個(gè)提醒,你讓我想這件事是發(fā)生在什么時(shí)候的,我的第一反應(yīng)肯定是好多年前的事情了吧,慢慢咂摸之后有可能才想起,原來是大三暑假的時(shí)候的事情,然后再仔細(xì)一算,原來是僅僅五年前的事情呀。
短短的五年怎么就發(fā)生了怎么多事情啊?把我這五年塞的滿滿當(dāng)當(dāng)?shù)摹?/p>
不知道為什么如果把人生求學(xué)、步入社會(huì)的各個(gè)階段分開來看,我每跨過一個(gè)階段,再次回頭望的時(shí)候都感覺這好像是別人的故事啊。
幸好我自己一年年的記錄了下來,幸好這真的是我自己的故事。
好了,說回文章。
你就是寫了個(gè)假異步
這是 rpc 的四種調(diào)用方式:
文本主要分享這個(gè) future 的調(diào)用方式,不講 Dubbo 框架,這里只是一個(gè)引子。
談到 future 的時(shí)候大家都會(huì)想到異步編程。但是你仔細(xì)看框起來這里:
客戶端線程調(diào)用 future.get() 方法的時(shí)候還是會(huì)阻塞當(dāng)前線程的。
我倒是覺得這充其量只能算一個(gè)閹割版的異步編程。
本文將帶你從閹割版的 future 聊到升級(jí)版的 Google Guava 的 future,最后談?wù)劶訌?qiáng)版的 future 。
先聊聊線程池的提交方式
談到 Future 的時(shí)候,我們基本上就會(huì)想到線程池,想到它的幾種提交方式。
先是最簡(jiǎn)單的,execute 方式提交,不關(guān)心返回值的,直接往線程池里面扔任務(wù)就完事:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
- //execute(Runnable command)方法。沒有返回值
- executor.execute(() -> {
- System.out.println("關(guān)注why技術(shù)");
- });
- Thread.currentThread().join();
- }
- }
可以看一下 execute 方法,接受一個(gè) Runnable 方法,返回類型是 void:
然后是 submit 方法。你知道線程池有幾種 submit 方法嗎?
雖然你經(jīng)常用,但是可能你從來沒有關(guān)心過人家。呸,渣男:

有三種 submit。這三種按照提交任務(wù)的類型來算分為兩個(gè)類型。
- 提交執(zhí)行 Runnable 類型的任務(wù)。
- 提交執(zhí)行 Callable 類型的任務(wù)。
但是返回值都是 Future,這才是我們關(guān)心的東西。
也許你知道線程池有三種 submit 方法,但是也許你根本不知道里面的任務(wù)分為兩種類型,你就只知道往線程池里面扔,也不管扔的是什么類型的任務(wù)。
我們先看一下 Callable 類型的任務(wù)是怎么執(zhí)行的:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
- Future<String> future = executor.submit(() -> {
- System.out.println("關(guān)注why技術(shù)");
- return "這次一定!";
- });
- System.out.println("future的內(nèi)容:" + future.get());
- Thread.currentThread().join();
- }
- }
這里利用 lambda 表達(dá)式,直接在任務(wù)體里面帶上一個(gè)返回值,這時(shí)你看調(diào)用的方法就變成了這個(gè):
運(yùn)行結(jié)果也能拿到任務(wù)體里面的返回了。輸出結(jié)果如下:
好,接下來再說說 submit 的任務(wù)為 Runable 類型的情況。
這個(gè)時(shí)候有兩個(gè)重載的形式:
標(biāo)號(hào)為 ① 的方法扔進(jìn)去一個(gè) Runable 的任務(wù),返回一個(gè) Future,而這個(gè)返回的 Future ,相當(dāng)于是返回了一個(gè)寂寞。下面我會(huì)說到原因。
標(biāo)號(hào)為 ② 的方法扔進(jìn)去一個(gè) Runable 的任務(wù)的同時(shí),再扔進(jìn)去一個(gè)泛型 T ,而巧好返回的 Future 里面的泛型也是 T,那么我們大膽的猜測(cè)一下這就是同一個(gè)對(duì)象。如果是同一個(gè)對(duì)象,說明我們可以一個(gè)對(duì)象傳到任務(wù)體里面去一頓操作,然后通過 Future 再次拿到這個(gè)對(duì)象的。一會(huì)就去驗(yàn)證。
來,先驗(yàn)證標(biāo)號(hào)為 ① 的方法,我為啥說它返回了一個(gè)寂寞。
首先,還是先把測(cè)試案例放在這里:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
- Future<?> future = executor.submit(() -> {
- System.out.println("關(guān)注why技術(shù)");
- });
- System.out.println("future的內(nèi)容:" + future.get());
- Thread.currentThread().join();
- }
- }
可以看到,確實(shí)是調(diào)用的標(biāo)號(hào)為 ① 的方法:
同時(shí),我們也可以看到 future.get() 方法的返回值為 null。
你說,這不是返回了一個(gè)寂寞是干啥?
當(dāng)你想用標(biāo)號(hào)為 ① 的方法時(shí),我勸你直接用 execute 方式提交任務(wù)。還不需要構(gòu)建一個(gè)寂寞的返回值,徒增無用對(duì)象。
接下來,我們看看標(biāo)號(hào)為 ② 的方法是怎么用的:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
- AtomicInteger atomicInteger = new AtomicInteger();
- Future<AtomicInteger> future = executor.submit(() -> {
- System.out.println("關(guān)注why技術(shù)");
- //在這里進(jìn)行計(jì)算邏輯
- atomicInteger.set(5201314);
- }, atomicInteger);
- System.out.println("future的內(nèi)容:" + future.get());
- Thread.currentThread().join();
- }
- }
可以看到改造之后,確實(shí)是調(diào)用了標(biāo)號(hào)為 ② 的方法:
future.get() 方法的輸出值也是異步任務(wù)中我們經(jīng)過計(jì)算后得出的 5201314。
你看,渣男就是這樣,明明不懂你,還非得用甜言蜜語來轟炸你。呸。
好了。綜上,線程池的提交方式一共有四種:一種 execute,無返回值。三種 submit,有返回值。
submit 中按照提交任務(wù)的類型又分為兩種:一個(gè)是 Callable,一個(gè)是 Runable。
submit 中 Runable 的任務(wù)類型又有兩個(gè)重載方法:一個(gè)返回了個(gè)寂寞,一個(gè)返回了個(gè)渣男。哦,不。一個(gè)返回了個(gè)寂寞,一個(gè)返回了個(gè)對(duì)象。
這個(gè)時(shí)候就有人要站出來說:你說的不對(duì),你就是瞎說,明明就只有 execute 這一種提交方式。
是的,“只有 execute 這一種提交方式”這一種說法也是沒錯(cuò)的。
請(qǐng)看源碼:
三種 submit 方法里面調(diào)用的都是 execute 方法。
能把前面這些方法娓娓道來,從表面談到內(nèi)在的這種人,才是好人。
只有愛你,才會(huì)把你研究透。
當(dāng)然,還有這幾種提交方式,用的不多,就不展開說了:
寫到這里我不禁想起了我的第三篇文章,真是奇怪的時(shí)間線開始收縮了的感覺,這篇文章里面聊到了不同提交方式,對(duì)于異常的不同處理方式。
我就問你:一個(gè)線程池中的線程異常了,那么線程池會(huì)怎么處理這個(gè)線程?
你要是不知道,可以去看看這篇文章,畢竟,有可能在面試的時(shí)候遇到的:
好,上面這些東西捋清楚了之后。我們?cè)倬劢沟椒祷刂?Future 上:
從上面的代碼我們可以看出,當(dāng)我們想要返回值的時(shí)候,都需要調(diào)用下面的這個(gè) get() 方法:
而從這個(gè)方法的描述可以看出,這是一個(gè)阻塞方法。拿不到值就在那里等著。當(dāng)然,還有一個(gè)帶超時(shí)時(shí)間的 get 方法,等指定時(shí)間后就不等了。
呸,渣男。沒耐心,這點(diǎn)時(shí)間都舍不得等。
總之就是有可能要等的。只要等,那么就是阻塞。只要是阻塞,就是一個(gè)假異步。
所以總結(jié)一下這種場(chǎng)景下返回的 Future 的不足之處:
- 只有主動(dòng)調(diào)用 get 方法去獲取值,但是有可能值還沒準(zhǔn)備好,就阻塞等待。
- 任務(wù)處理過程中出現(xiàn)異常會(huì)把異常隱藏,封裝到 Future 里面去,只有調(diào)用 get 方法的時(shí)候才知道異常了。
寫到這里的時(shí)候我不禁想起一個(gè)形象的例子,我給你舉一個(gè)。
假設(shè)你想約你的女神一起去吃飯。女神嘛,肯定是要先畫個(gè)美美的妝才會(huì)出去逛街的。而女神化妝就可以類比為我們提交的一個(gè)異步任務(wù)。
假設(shè)你是一個(gè)小屌絲,那么女神就會(huì)對(duì)你說:我已經(jīng)開始化妝了,你到樓下了就給我打電話。
然后你就收拾行頭準(zhǔn)備出發(fā),這就是你提交異步任務(wù)后還可以做一些自己的事情。
你花了一小時(shí)到了女神樓下,打電話給她:女神你好,我到你樓下了。
女神說:你先等著吧,我的妝還沒畫好呢。
于是你開始等待,無盡的等待。這就是不帶超時(shí)時(shí)間的 future.get() 方法。
也有可能你硬氣一點(diǎn),對(duì)女神說:我最多再等 24 小時(shí)哈,超過 24 小時(shí)不下樓,我就走了。
這就是帶超時(shí)時(shí)間的 future.get(timeout,unit) 方法:
結(jié)果 24 小時(shí)之后,女神還沒下來,你就走了。
當(dāng)然,還有一種情況就是你到樓下給女神打電話,女神說:哎,今天我男神約我出去看電影,就不和你去吃飯了哈。本來我想提前給你說的,但是我又記不起你電話,只有你打過來我才能告訴你。就這樣,你自己玩去吧。
這就相當(dāng)于異步任務(wù)執(zhí)行過程中拋出了異常,而你只有在調(diào)用了 get 方法(打電話操作)之后才知道原來異常了。
而真正的異步是你不用等我,我好了我就叫你。
就像女神接到男神的電話時(shí)說的:我需要一點(diǎn)時(shí)間準(zhǔn)備一下,你先玩自己的吧,我一會(huì)好了給你打電話。
這讓我想起了好萊塢原則:Don't Call Us,We'll Call you!
接下來,讓我們見識(shí)一下真正的異步。
什么叫真正的:“你先玩自己的,我一會(huì)好了叫你。”
Guava 的 Future
女神說的:“好了叫你”。
就是一種回調(diào)機(jī)制。說到回調(diào),那么我們就需要在異步任務(wù)提交之后,注冊(cè)一個(gè)回調(diào)函數(shù)就行。
Google 提供的 Guava 包里面對(duì) JDK 的 Future 進(jìn)行了擴(kuò)展:
新增了一個(gè) addListenter 方法,入?yún)⑹且粋€(gè) Runnable 的任務(wù)類型和一個(gè)線程池。
使用方法,先看代碼:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
- ListenableFuture<String> listenableFuture = executor.submit(() -> {
- System.out.println(Thread.currentThread().getName()+"-女神:我開始化妝了,好了我叫你。");
- TimeUnit.SECONDS.sleep(5);
- return "化妝完畢了。";
- });
- listenableFuture.addListener(() -> {
- try {
- System.out.println(Thread.currentThread().getName()+"-future的內(nèi)容:" + listenableFuture.get());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }, executor);
- System.out.println(Thread.currentThread().getName()+"-等女神化妝的時(shí)候可以干點(diǎn)自己的事情。");
- Thread.currentThread().join();
- }
- }
首先創(chuàng)建線程池的方式變了,需要用 Guava 里面的 MoreExecutors 方法裝飾一下:
- ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
然后用裝飾后的 executor 調(diào)用 submit 方法(任意一種),就會(huì)返回 ListenableFuture ,拿到這個(gè) ListenableFuture 之后,我們就可以在上面注冊(cè)監(jiān)聽:
所以,上面的程序我們調(diào)用的是入?yún)?callable 類型的接口:
從運(yùn)行結(jié)果可以看出來:獲取運(yùn)行結(jié)果是在另外的線程里面執(zhí)行的,完全沒有阻塞主線程。
和之前的“假異步”還是有很大區(qū)別的。
除了上面的 addListener 方法外,其實(shí)我更喜歡用 FutureCallback 的方式。
可以看一下代碼,非常的直觀:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
- ListenableFuture<String> listenableFuture = executor.submit(() -> {
- System.out.println(Thread.currentThread().getName()+"-女神:我開始化妝了,好了我叫你。");
- TimeUnit.SECONDS.sleep(5);
- return "化妝完畢了。";
- });
- Futures.addCallback(listenableFuture, new FutureCallback<String>() {
- @Override
- public void onSuccess(@Nullable String result) {
- System.out.println(Thread.currentThread().getName()+"-future的內(nèi)容:" + result);
- }
- @Override
- public void onFailure(Throwable t) {
- System.out.println(Thread.currentThread().getName()+"-女神放你鴿子了。");
- t.printStackTrace();
- }
- });
- System.out.println(Thread.currentThread().getName()+"-等女神化妝的時(shí)候可以干點(diǎn)自己的事情。");
- Thread.currentThread().join();
- }
- }
有 onSuccess 方法和 onFailure 方法。
上面的程序輸出結(jié)果為:
如果異步任務(wù)執(zhí)行的時(shí)候拋出了異常,比如女神被她的男神約走了,異步任務(wù)改成這樣:
- ListenableFuture<String> listenableFuture = executor.submit(() -> {
- System.out.println(Thread.currentThread().getName() + "-女神:我開始化妝了,好了我叫你。");
- TimeUnit.SECONDS.sleep(5);
- throw new Exception("男神約我看電影,就不和你吃飯了。");
- });
最終的運(yùn)行結(jié)果就是這樣:
是的,女神去看電影了。她一定只是不想吃飯而已。
加強(qiáng)版的Future - CompletableFuture
第一小節(jié)講的 Future 是 JDK 1.5 時(shí)代的產(chǎn)物:
經(jīng)過了這么多年的發(fā)展,Doug Lea 在 JDK 1.8 里面引入了新的 CompletableFuture :
到了 JDK 1.8 時(shí)代,這才是真正的異步編程。
CompletableFuture 實(shí)現(xiàn)了兩個(gè)接口,一個(gè)是我們熟悉的 Future ,一個(gè)是 CompletionStage。
CompletionStage接口,你看這個(gè)接口的名稱中有一個(gè) Stage :
可以把這個(gè)接口理解為一個(gè)任務(wù)的某個(gè)階段。所以多個(gè) CompletionStage 鏈接在一起就是一個(gè)任務(wù)鏈。前一個(gè)任務(wù)完成后,下一個(gè)任務(wù)就會(huì)自動(dòng)觸發(fā)。
CompletableFuture 里面的方法非常的多。
由于篇幅原因,我就只演示一個(gè)方法:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "-女神:我開始化妝了,好了我叫你。");
- try {
- TimeUnit.SECONDS.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "化妝完畢了。";
- });
- completableFuture.whenComplete((returnStr, exception) -> {
- if (exception == null) {
- System.out.println(Thread.currentThread().getName() + returnStr);
- } else {
- System.out.println(Thread.currentThread().getName() + "女神放你鴿子了。");
- exception.printStackTrace();
- }
- });
- System.out.println(Thread.currentThread().getName() + "-等女神化妝的時(shí)候可以干點(diǎn)自己的事情。");
- Thread.currentThread().join();
- }
- }
該方法的執(zhí)行結(jié)果如下:
我們執(zhí)行的時(shí)候并沒有指定用什么線程池,但是從結(jié)果可以看到也是異步的執(zhí)行。
從輸出日志中是可以看出端倪的,F(xiàn)orkJoinPool.commonPool() 是其默認(rèn)使用的線程池。
當(dāng)然,我們也可以自己指定。
這個(gè)方法在很多開源框架里面使用的還是非常的多的。
接下來主要看看 CompletableFuture 對(duì)于異常的處理。我覺得非常的優(yōu)雅。
不需要 try-catch 代碼塊包裹,也不需要調(diào)用 Future.get() 才知道異常了,它提供了一個(gè) handle 方法,可以處理上游異步任務(wù)中出現(xiàn)的異常:
- public class JDKThreadPoolExecutorTest {
- public static void main(String[] args) throws Exception {
- CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "-女神:我開始化妝了,好了我叫你。");
- throw new RuntimeException("男神約我看電影了,我們下次再約吧,你是個(gè)好人。");
- }).handleAsync((result, exception) -> {
- if (exception != null) {
- System.out.println(Thread.currentThread().getName() + "-女神放你鴿子了!");
- return exception.getCause();
- } else {
- return result;
- }
- }).thenApplyAsync((returnStr) -> {
- System.out.println(Thread.currentThread().getName() + "-" + returnStr);
- return returnStr;
- });
- System.out.println(Thread.currentThread().getName() + "-等女神化妝的時(shí)候可以干點(diǎn)自己的事情。");
- Thread.currentThread().join();
- }
- }
由于女神在化妝的時(shí)候,接到男神的電話約她看電影,就只能放你鴿子了。
所以,上面程序的輸出結(jié)果如下:
如果,你順利把女神約出來了,是這樣的:
好了,女神都約出來了,文章就到這里了。去干正事吧。
本文轉(zhuǎn)載自微信公眾號(hào)「why技術(shù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系why技術(shù)公眾號(hào)。