Java 異步編程的幾種方式
異步編程是讓程序并發(fā)運行的一種手段。它允許多個事情 同時發(fā)生
,當程序調(diào)用需要長時間運行的方法時,它不會阻塞當前的執(zhí)行流程,程序可以繼續(xù)運行,當方法執(zhí)行完成時通知給主線程根據(jù)需要獲取其執(zhí)行結(jié)果或者失敗異常的原因。使用異步編程可以大大提高我們程序的吞吐量,可以更好的面對更高的并發(fā)場景并更好的利用現(xiàn)有的系統(tǒng)資源,同時也會一定程度上減少用戶的等待時間等。本文我們一起來看看在 Java
語言中使用異步編程有哪些方式。
Thread 方式
在 Java
語言中最簡單使用異步編程的方式就是創(chuàng)建一個 Thread
來實現(xiàn),如果你使用的 JDK
版本是 8 以上的話,可以使用 Lambda 表達式 會更加簡潔。為了能更好的體現(xiàn)出異步的高效性,下面提供同步版本和異步版本的示例作為對照:
- /**
- * @author mghio
- * @since 2021-08-01
- */
- public class SyncWithAsyncDemo {
- public static void doOneThing() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("doOneThing ---->>> success");
- }
- public static void doOtherThing() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("doOtherThing ---->>> success");
- }
- public synchronized static void main(String[] args) throws InterruptedException {
- StopWatch stopWatch = new StopWatch("SyncWithAsyncDemo");
- stopWatch.start();
- // 同步調(diào)用版本
- // testSynchronize();
- // 異步調(diào)用版本
- testAsynchronize();
- stopWatch.stop();
- System.out.println(stopWatch);
- }
- private static void testAsynchronize() throws InterruptedException {
- System.out.println("-------------------- testAsynchronize --------------------");
- // 創(chuàng)建一個線程執(zhí)行 doOneThing
- Thread doOneThingThread = new Thread(SyncWithAsyncDemo::doOneThing, "doOneThing-Thread");
- doOneThingThread.start();
- doOtherThing();
- // 等待 doOneThing 線程執(zhí)行完成
- doOneThingThread.join();
- }
- private static void testSynchronize() {
- System.out.println("-------------------- testSynchronize --------------------");
- doOneThing();
- doOtherThing();
- }
- }
同步執(zhí)行的運行如下:
注釋掉同步調(diào)用版本的代碼,得到異步執(zhí)行的結(jié)果如下:
從兩次的運行結(jié)果可以看出,同步版本耗時 4002 ms
,異步版本執(zhí)行耗時 2064 ms
,異步執(zhí)行耗時減少將近一半,可以看出使用異步編程后可以大大縮短程序運行時間。
上面的示例的異步線程代碼在 main
方法內(nèi)開啟了一個線程 doOneThing-Thread
用來異步執(zhí)行 doOneThing
任務(wù),在這時該線程與 main
主線程并發(fā)運行,也就是任務(wù) doOneThing
與任務(wù) doOtherThing
并發(fā)運行,則等主線程運行完 doOtherThing
任務(wù)后同步等待線程 doOneThing
運行完畢,整體還是比較簡單的。
但是這個示例只能作為示例使用,如果用到了生產(chǎn)環(huán)境發(fā)生事故后果自負,使用上面這種 Thread
方式異步編程存在兩個明顯的問題。
- FutureTask
FutureTask 方式
自 JDK 1.5
開始,引入了 Future
接口和實現(xiàn) Future
接口的 FutureTask
類來表示異步計算結(jié)果。這個 FutureTask
類不僅實現(xiàn)了 Future
接口還實現(xiàn)了 Runnable
接口,表示一種可生成結(jié)果的 Runnable
。其可以處于這三種狀態(tài):
- 未啟動 當創(chuàng)建一個
FutureTask
沒有執(zhí)行FutureTask.run()
方法之前 - 已啟動 在
FutureTask.run()
方法執(zhí)行的過程中 - 已完成 在
FutureTask.run()
方法正常執(zhí)行結(jié)果或者調(diào)用了FutureTask.cancel(boolean mayInterruptIfRunning)
方法以及在調(diào)用FutureTask.run()
方法的過程中發(fā)生異常結(jié)束后
FutureTask
類實現(xiàn)了 Future
接口的開啟和取消任務(wù)、查詢?nèi)蝿?wù)是否完成、獲取計算結(jié)果方法。要獲取 FutureTask
任務(wù)的結(jié)果,我們只能通過調(diào)用 getXXX()
系列方法才能獲取,當結(jié)果還沒出來時候這些方法會被阻塞,同時這了任務(wù)可以是 Callable
類型(有返回結(jié)果),也可以是 Runnable
類型(無返回結(jié)果)。我們修改上面的示例把兩個任務(wù)方法修改為返回 String
類型,使用 FutureTask
的方法如下:
- private static void testFutureTask() throws ExecutionException, InterruptedException {
- System.out.println("-------------------- testFutureTask --------------------");
- // 創(chuàng)建一個 FutureTask(doOneThing 任務(wù))
- FutureTask<String> futureTask = new FutureTask<>(FutureTaskDemo::doOneThing);
- // 使用線程池執(zhí)行 doOneThing 任務(wù)
- ForkJoinPool.commonPool().execute(futureTask);
- // 執(zhí)行 doOtherThing 任務(wù)
- String doOtherThingResult = doOtherThing();
- // 同步等待線程執(zhí)行 doOneThing 任務(wù)結(jié)束
- String doOneThingResult = futureTask.get();
- // 任務(wù)執(zhí)行結(jié)果輸出
- System.out.println("doOneThingResult ---->>> " + doOneThingResult);
- System.out.println("doOtherThingResult ---->>> " + doOtherThingResult);
- }
使用 FutureTask
異步編程方式的耗時和上面的 Thread
方式是差不多的,其本質(zhì)都是另起一個線程去做 doOneThing
任務(wù)然后等待返回,運行結(jié)果如下:
這個示例中, doOneThing
和 doOtherThing
都是有返回值的任務(wù)(都返回 String
類型結(jié)果),我們在主線程 main
中創(chuàng)建一個異步任務(wù) FutureTask
來執(zhí)行 doOneThing
,然后使用 ForkJoinPool.commonPool()
創(chuàng)建線程池(有關(guān) ForkJoinPool
的介紹見這里),然后調(diào)用了線程池的 execute
方法把 futureTask
提交到線程池來執(zhí)行。
通過示例可以看到,雖然 FutureTask
提供了一些方法讓我們獲取任務(wù)的執(zhí)行結(jié)果、任務(wù)是否完成等,但是使用還是比較復(fù)雜,在一些較為復(fù)雜的場景(比如多個 FutureTask
之間的關(guān)系表示)的編碼還是比較繁瑣,還是當我們調(diào)用 getXXX()
系列方法時還是會在任務(wù)執(zhí)行完畢前阻塞調(diào)用線程,達不到異步編程的效果,基于這些問題,在 JDK 8
中引入了 CompletableFuture
類,下面來看看如何使用 CompletableFuture
來實現(xiàn)異步編程。
CompletableFuture 方式
JDK 8
中引入了 CompletableFuture
類,實現(xiàn)了 Future
和 CompletionStage
接口,為異步編程提供了一些列方法,如 supplyAsync
、 runAsync
和 thenApplyAsync
等,除此之外 CompletableFuture
還有一個重要的功能就是可以讓兩個或者多個 CompletableFuture
進行運算來產(chǎn)生結(jié)果。代碼如下:
- /**
- * @author mghio
- * @since 2021-08-01
- */
- public class CompletableFutureDemo {
- public static CompletableFuture<String> doOneThing() {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "doOneThing";
- });
- }
- public static CompletableFuture<String> doOtherThing(String parameter) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return parameter + " " + "doOtherThing";
- });
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- StopWatch stopWatch = new StopWatch("CompletableFutureDemo");
- stopWatch.start();
- // 異步執(zhí)行版本
- testCompletableFuture();
- stopWatch.stop();
- System.out.println(stopWatch);
- }
- private static void testCompletableFuture() throws InterruptedException, ExecutionException {
- // 先執(zhí)行 doOneThing 任務(wù),后執(zhí)行 doOtherThing 任務(wù)
- CompletableFuture<String> resultFuture = doOneThing().thenCompose(CompletableFutureDemo::doOtherThing);
- // 獲取任務(wù)結(jié)果
- String doOneThingResult = resultFuture.get();
- // 獲取執(zhí)行結(jié)果
- System.out.println("DoOneThing and DoOtherThing execute finished. result = " + doOneThingResult);
- }
- }
執(zhí)行結(jié)果如下:
在主線程 main
中首先調(diào)用了方法 doOneThing()
方法開啟了一個異步任務(wù),并返回了對應(yīng)的 CompletableFuture
對象,我們?nèi)∶麨?nbsp;doOneThingFuture
,然后在 doOneThingFuture
的基礎(chǔ)上使用 CompletableFuture
的 thenCompose()
方法,讓 doOneThingFuture
方法執(zhí)行完成后,使用其執(zhí)行結(jié)果作為 doOtherThing(String parameter)
方法的參數(shù)創(chuàng)建的異步任務(wù)返回。
我們不需要顯式使用 ExecutorService
,在 CompletableFuture
內(nèi)部使用的是 Fork/Join
框架異步處理任務(wù),因此,它使我們編寫的異步代碼更加簡潔。此外, CompletableFuture
類功能很強大其提供了和很多方便的方法,更多關(guān)于 CompletableFuture
的使用請見這篇。