從Future到CompletableFuture,Java異步編程進(jìn)化史
一、引言:異步編程的重要性
在當(dāng)今數(shù)字化時(shí)代,電商購(gòu)物已成為人們生活中不可或缺的一部分。想象一下,你正在開(kāi)發(fā)一個(gè)電商系統(tǒng),當(dāng)用戶訪問(wèn)商品詳情頁(yè)時(shí),需要同時(shí)展示多個(gè)商品的信息,包括商品的基本描述、價(jià)格、庫(kù)存、用戶評(píng)價(jià)等。這些信息可能來(lái)自不同的數(shù)據(jù)源,如數(shù)據(jù)庫(kù)、緩存或者外部接口,獲取每個(gè)商品信息的操作都可能是耗時(shí)的 I/O 操作。如果采用傳統(tǒng)的同步編程方式,一個(gè)一個(gè)地獲取商品信息,那么整個(gè)頁(yè)面的加載速度將會(huì)變得非常緩慢,嚴(yán)重影響用戶體驗(yàn)。
在現(xiàn)代 Java 開(kāi)發(fā)中,類(lèi)似這樣需要處理大量并發(fā)任務(wù)和耗時(shí)操作的場(chǎng)景越來(lái)越多。而異步編程就像是一把神奇的鑰匙,能夠幫助我們高效地解決這些問(wèn)題,顯著提升程序的性能和響應(yīng)性。它允許程序在執(zhí)行一個(gè)耗時(shí)操作的同時(shí),不阻塞主線程,繼續(xù)執(zhí)行其他任務(wù),從而充分利用 CPU 資源,提高系統(tǒng)的并發(fā)處理能力。
今天,我們要深入探討的主角 ——CompletableFuture,正是 Java 異步編程領(lǐng)域中的一位強(qiáng)大利器。它在 Java 8 中被引入,極大地簡(jiǎn)化了異步編程的復(fù)雜性,為開(kāi)發(fā)者提供了更加便捷、靈活的異步操作方式,讓我們能夠更加優(yōu)雅地處理異步任務(wù)的創(chuàng)建、組合、回調(diào)和異常處理等。接下來(lái),就讓我們一起揭開(kāi) CompletableFuture 的神秘面紗,領(lǐng)略它的強(qiáng)大魅力吧!
二、Java 異步編程的發(fā)展歷程
在 Java 的發(fā)展長(zhǎng)河中,異步編程的演進(jìn)是一個(gè)不斷突破和創(chuàng)新的過(guò)程,它見(jiàn)證了 Java 語(yǔ)言在應(yīng)對(duì)復(fù)雜計(jì)算場(chǎng)景時(shí)的不斷進(jìn)化。
早期,Java 主要通過(guò) Thread 類(lèi)和 Runnable 接口來(lái)實(shí)現(xiàn)異步編程。這種方式為開(kāi)發(fā)者提供了基本的多線程能力,讓程序能夠在多個(gè)線程中并行執(zhí)行任務(wù)。例如,在開(kāi)發(fā)一個(gè)簡(jiǎn)單的文件處理程序時(shí),我們可以創(chuàng)建一個(gè) Thread 類(lèi)的子類(lèi),并重寫(xiě)其 run 方法來(lái)實(shí)現(xiàn)文件讀取和處理的邏輯。然后,通過(guò)創(chuàng)建該子類(lèi)的實(shí)例并調(diào)用 start 方法,就可以啟動(dòng)一個(gè)新的線程來(lái)執(zhí)行文件處理任務(wù),而主線程則可以繼續(xù)執(zhí)行其他操作。
public class FileProcessor extends Thread {private String filePath;public FileProcessor(String filePath) {this.filePath = filePath;}@Overridepublic void run() {// 模擬文件讀取和處理邏輯try {Thread.sleep(2000);System.out.println("文件 " + filePath + " 處理完成");} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {FileProcessor fileProcessor = new FileProcessor("example.txt");fileProcessor.start();System.out.println("主線程繼續(xù)執(zhí)行其他任務(wù)");}}
不過(guò),這種方式存在一些明顯的缺點(diǎn)。創(chuàng)建和管理線程的開(kāi)銷(xiāo)較大,頻繁地創(chuàng)建和銷(xiāo)毀線程會(huì)消耗大量的系統(tǒng)資源。當(dāng)需要處理多個(gè)線程之間的協(xié)作和同步時(shí),代碼會(huì)變得復(fù)雜,容易出現(xiàn)死鎖等問(wèn)題。而且,Thread 類(lèi)和 Runnable 接口缺乏對(duì)異步任務(wù)結(jié)果的有效管理機(jī)制,獲取任務(wù)執(zhí)行結(jié)果變得很不方便。
為了解決這些問(wèn)題,Java 5 引入了 Future 接口,這是 Java 異步編程發(fā)展中的一個(gè)重要里程碑。Future 接口為異步任務(wù)的管理和結(jié)果獲取提供了一種標(biāo)準(zhǔn)的方式。通過(guò) Future 接口,我們可以提交一個(gè)異步任務(wù),并在需要的時(shí)候獲取其執(zhí)行結(jié)果,還能檢查任務(wù)的狀態(tài),如是否完成、是否被取消等。比如,在一個(gè)電商系統(tǒng)中,計(jì)算商品的推薦列表可能是一個(gè)耗時(shí)的任務(wù),我們可以使用 Future 接口來(lái)異步執(zhí)行這個(gè)任務(wù)。
import java.util.concurrent.*;public class ProductRecommender {public static void main(String[] args) {ExecutorService executor = Executors.newSingleThreadExecutor();Future<List<String>> future = executor.submit(() -> {// 模擬計(jì)算商品推薦列表的耗時(shí)操作Thread.sleep(3000);List<String> recommendations = Arrays.asList("商品A", "商品B", "商品C");return recommendations;});// 主線程可以繼續(xù)執(zhí)行其他操作System.out.println("主線程繼續(xù)執(zhí)行其他任務(wù)");try {List<String> recommendations = future.get();System.out.println("商品推薦列表: " + recommendations);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {executor.shutdown();}}}
上述代碼中,首先創(chuàng)建了一個(gè)線程池 ExecutorService,并提交了一個(gè) Callable 任務(wù),該任務(wù)返回一個(gè) Future 對(duì)象。在主線程中,我們可以繼續(xù)執(zhí)行其他操作,然后通過(guò)調(diào)用 Future 對(duì)象的 get 方法來(lái)獲取異步任務(wù)的執(zhí)行結(jié)果。如果任務(wù)尚未完成,get 方法會(huì)阻塞當(dāng)前線程,直到任務(wù)完成并返回結(jié)果。
Future 接口的出現(xiàn),使得 Java 的異步編程更加規(guī)范和易于管理,極大地提高了程序的并發(fā)性能和響應(yīng)能力。但它也并非完美無(wú)缺,在處理復(fù)雜的異步任務(wù)組合和回調(diào)時(shí),F(xiàn)uture 接口的局限性逐漸顯現(xiàn),代碼可能會(huì)變得冗長(zhǎng)和難以維護(hù)。
三、CompletableFuture 全面解析
(一)CompletableFuture 的概述
CompletableFuture 是 Java 8 引入的一個(gè)強(qiáng)大的異步編程工具,它極大地簡(jiǎn)化了異步編程的復(fù)雜性。它實(shí)現(xiàn)了 Future 接口,同時(shí)還實(shí)現(xiàn)了 CompletionStage 接口,這使得它不僅具備了 Future 的基本功能,還提供了更豐富的異步操作和任務(wù)編排能力。
與傳統(tǒng)的 Future 相比,CompletableFuture 的最大優(yōu)勢(shì)在于它的非阻塞性和對(duì)異步任務(wù)結(jié)果的靈活處理。在傳統(tǒng)的 Future 中,我們獲取任務(wù)結(jié)果時(shí)通常需要調(diào)用get()方法,這個(gè)方法會(huì)阻塞當(dāng)前線程,直到任務(wù)完成并返回結(jié)果。而 CompletableFuture 則通過(guò)回調(diào)機(jī)制,允許我們?cè)谌蝿?wù)完成時(shí)自動(dòng)觸發(fā)后續(xù)操作,無(wú)需阻塞線程,從而提高了程序的并發(fā)性能和響應(yīng)速度。此外,CompletableFuture 還支持鏈?zhǔn)秸{(diào)用和函數(shù)式編程風(fēng)格,使得代碼更加簡(jiǎn)潔、易讀。
(二)核心特性
- 異步執(zhí)行任務(wù):CompletableFuture 提供了兩個(gè)靜態(tài)方法supplyAsync和runAsync來(lái)開(kāi)啟異步任務(wù)。supplyAsync方法接受一個(gè)Supplier接口的實(shí)現(xiàn),用于返回一個(gè)異步計(jì)算的結(jié)果;runAsync方法接受一個(gè)Runnable接口的實(shí)現(xiàn),用于執(zhí)行一個(gè)沒(méi)有返回值的異步任務(wù)。這兩個(gè)方法都有兩個(gè)重載版本,一個(gè)是使用默認(rèn)的線程池(ForkJoinPool.commonPool ()),另一個(gè)是可以指定自定義的線程池。
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 使用supplyAsync方法開(kāi)啟一個(gè)異步任務(wù),返回一個(gè)CompletableFuture對(duì)象CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模擬耗時(shí)操作try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "任務(wù)1的結(jié)果";});// 使用runAsync方法開(kāi)啟一個(gè)異步任務(wù),返回一個(gè)CompletableFuture<Void>對(duì)象CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {// 模擬耗時(shí)操作try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任務(wù)2執(zhí)行完畢");});// 獲取future1的結(jié)果,如果任務(wù)未完成,get方法會(huì)阻塞當(dāng)前線程String result1 = future1.get();System.out.println("任務(wù)1的結(jié)果是: " + result1);// 等待future2完成,因?yàn)閒uture2沒(méi)有返回值,所以不需要調(diào)用get方法獲取結(jié)果future2.join();}}
在上述代碼中,supplyAsync方法創(chuàng)建了一個(gè)異步任務(wù),該任務(wù)在后臺(tái)線程中執(zhí)行,模擬了一個(gè)耗時(shí) 2 秒的操作,并返回一個(gè)結(jié)果。runAsync方法創(chuàng)建了另一個(gè)異步任務(wù),同樣模擬了一個(gè)耗時(shí) 1 秒的操作,但沒(méi)有返回值,只是在任務(wù)完成時(shí)打印一條消息。主線程在創(chuàng)建這兩個(gè)異步任務(wù)后,不會(huì)被阻塞,可以繼續(xù)執(zhí)行其他操作。當(dāng)需要獲取supplyAsync任務(wù)的結(jié)果時(shí),調(diào)用get方法,此時(shí)如果任務(wù)尚未完成,主線程會(huì)被阻塞,直到任務(wù)完成并返回結(jié)果。而對(duì)于runAsync任務(wù),由于不需要獲取其返回值,所以可以使用join方法等待任務(wù)完成,join方法和get方法類(lèi)似,但它不會(huì)拋出受檢異常。
- 鏈?zhǔn)秸{(diào)用:CompletableFuture 支持鏈?zhǔn)秸{(diào)用,通過(guò)thenApply、thenAccept、thenRun等方法,可以對(duì)異步任務(wù)的結(jié)果進(jìn)行處理,并將處理結(jié)果傳遞給下一個(gè)階段的任務(wù)。這種鏈?zhǔn)秸{(diào)用的方式使得代碼更加簡(jiǎn)潔、易讀,并且能夠清晰地表達(dá)任務(wù)之間的依賴關(guān)系。
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureChainDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)1執(zhí)行中...");return "任務(wù)1的結(jié)果";}).thenApply(result -> {System.out.println("任務(wù)2執(zhí)行中,處理任務(wù)1的結(jié)果: " + result);return result + " 經(jīng)過(guò)任務(wù)2的處理";}).thenAccept(finalResult -> {System.out.println("任務(wù)3執(zhí)行中,最終結(jié)果是: " + finalResult);});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成Thread.sleep(2000);}}
在這段代碼中,首先使用supplyAsync方法創(chuàng)建了一個(gè)異步任務(wù),該任務(wù)返回一個(gè)結(jié)果。然后通過(guò)thenApply方法,接收上一個(gè)任務(wù)的結(jié)果,并對(duì)其進(jìn)行處理,返回一個(gè)新的結(jié)果。最后使用thenAccept方法,接收thenApply處理后的結(jié)果,并進(jìn)行消費(fèi),但不返回新的結(jié)果。整個(gè)過(guò)程通過(guò)鏈?zhǔn)秸{(diào)用的方式,將三個(gè)任務(wù)串聯(lián)起來(lái),形成了一個(gè)異步任務(wù)鏈。需要注意的是,由于這些異步任務(wù)是在后臺(tái)線程中執(zhí)行的,主線程在創(chuàng)建完任務(wù)鏈后,會(huì)繼續(xù)執(zhí)行后續(xù)代碼。為了讓異步任務(wù)有足夠的時(shí)間完成,這里使用Thread.sleep(2000)方法讓主線程睡眠 2 秒。
- 組合異步操作:在實(shí)際應(yīng)用中,我們常常需要將多個(gè)異步任務(wù)組合起來(lái),以實(shí)現(xiàn)更復(fù)雜的業(yè)務(wù)邏輯。CompletableFuture 提供了thenCombine、thenAcceptBoth等方法,用于組合多個(gè)異步任務(wù)。
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureCombineDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)A執(zhí)行中...");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "任務(wù)A的結(jié)果";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)B執(zhí)行中...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "任務(wù)B的結(jié)果";});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {System.out.println("組合任務(wù)執(zhí)行中,合并任務(wù)A和任務(wù)B的結(jié)果");return result1 + " 和 " + result2;});String combinedResult = combinedFuture.get();System.out.println("最終組合結(jié)果是: " + combinedResult);}}
上述代碼中,首先創(chuàng)建了兩個(gè)異步任務(wù)future1和future2,分別模擬了耗時(shí) 2 秒和 1 秒的操作,并返回各自的結(jié)果。然后使用thenCombine方法將這兩個(gè)任務(wù)組合起來(lái),thenCombine方法接受另一個(gè)CompletableFuture對(duì)象和一個(gè)BiFunction函數(shù),當(dāng)future1和future2都完成時(shí),會(huì)將它們的結(jié)果作為參數(shù)傳遞給BiFunction函數(shù),該函數(shù)對(duì)兩個(gè)結(jié)果進(jìn)行合并,并返回一個(gè)新的結(jié)果,這個(gè)新的結(jié)果會(huì)被包裝成一個(gè)新的CompletableFuture對(duì)象combinedFuture。最后通過(guò)get方法獲取combinedFuture的結(jié)果,即兩個(gè)任務(wù)結(jié)果的組合。
- 異常處理:在異步編程中,異常處理是非常重要的一環(huán)。CompletableFuture 提供了exceptionally和handle方法,用于優(yōu)雅地處理異步任務(wù)中可能出現(xiàn)的異常。
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionDemo {public static void main(String[] args) {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)執(zhí)行中...");if (Math.random() > 0.5) {throw new RuntimeException("任務(wù)執(zhí)行出現(xiàn)異常");}return "任務(wù)正常結(jié)果";}).exceptionally(ex -> {System.out.println("捕獲到異常: " + ex.getMessage());return "默認(rèn)結(jié)果";}).thenAccept(result -> {System.out.println("最終結(jié)果是: " + result);});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}
在這段代碼中,supplyAsync方法創(chuàng)建的異步任務(wù)中,通過(guò)Math.random()方法隨機(jī)生成一個(gè)數(shù),如果這個(gè)數(shù)大于 0.5,則拋出一個(gè)運(yùn)行時(shí)異常,否則返回正常結(jié)果。exceptionally方法用于捕獲異步任務(wù)中拋出的異常,當(dāng)捕獲到異常時(shí),會(huì)執(zhí)行exceptionally方法中的處理邏輯,返回一個(gè)默認(rèn)結(jié)果。如果異步任務(wù)正常完成,exceptionally方法不會(huì)被執(zhí)行,而是直接執(zhí)行thenAccept方法,處理正常的結(jié)果。同樣,為了讓異步任務(wù)有足夠的時(shí)間完成,主線程通過(guò)Thread.sleep(2000)方法睡眠 2 秒 。
四、代碼示例與實(shí)踐
(一)創(chuàng)建異步任務(wù)
在使用 CompletableFuture 進(jìn)行異步編程時(shí),創(chuàng)建異步任務(wù)是基礎(chǔ)操作。我們可以使用supplyAsync和runAsync方法來(lái)創(chuàng)建異步任務(wù),這兩個(gè)方法都有使用默認(rèn)線程池和自定義線程池的重載版本。
使用默認(rèn)線程池創(chuàng)建帶有返回值的異步任務(wù),示例如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureCreateTask {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模擬異步任務(wù)中的耗時(shí)操作,比如查詢數(shù)據(jù)庫(kù)、調(diào)用遠(yuǎn)程接口等try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 42;});// 獲取異步任務(wù)的執(zhí)行結(jié)果,如果任務(wù)尚未完成,get方法會(huì)阻塞當(dāng)前線程int result = future.get();System.out.println("任務(wù)的結(jié)果是: " + result);}}
在上述代碼中,CompletableFuture.supplyAsync方法接受一個(gè)Supplier接口的實(shí)現(xiàn),以 Lambda 表達(dá)式的形式定義了異步任務(wù)的邏輯。在這個(gè)任務(wù)中,通過(guò)Thread.sleep(2000)模擬了一個(gè)耗時(shí) 2 秒的操作,然后返回結(jié)果 42。主線程通過(guò)future.get()方法獲取異步任務(wù)的執(zhí)行結(jié)果,此時(shí)如果任務(wù)尚未完成,get方法會(huì)阻塞主線程,直到任務(wù)完成并返回結(jié)果。
接下來(lái),我們看一下使用自定義線程池創(chuàng)建異步任務(wù)的示例:
import java.util.concurrent.*;public class CompletableFutureCustomThreadPool {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創(chuàng)建一個(gè)自定義線程池,設(shè)置核心線程數(shù)為5,最大線程數(shù)為10,線程空閑時(shí)間為60秒,任務(wù)隊(duì)列容量為100ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模擬異步任務(wù)中的耗時(shí)操作,比如復(fù)雜的計(jì)算邏輯try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "自定義線程池執(zhí)行的任務(wù)結(jié)果";}, executor);// 獲取異步任務(wù)的執(zhí)行結(jié)果String result = future.get();System.out.println("任務(wù)的結(jié)果是: " + result);// 關(guān)閉線程池,釋放資源executor.shutdown();}}
在這個(gè)示例中,我們首先創(chuàng)建了一個(gè)ThreadPoolExecutor類(lèi)型的自定義線程池,設(shè)置了核心線程數(shù)、最大線程數(shù)、線程空閑時(shí)間和任務(wù)隊(duì)列容量等參數(shù)。然后,使用CompletableFuture.supplyAsync方法并傳入自定義線程池,定義了一個(gè)異步任務(wù)。在任務(wù)中同樣模擬了一個(gè)耗時(shí) 3 秒的操作,并返回結(jié)果。最后,在主線程中獲取任務(wù)結(jié)果,并在任務(wù)完成后關(guān)閉線程池,以釋放資源。
使用runAsync方法創(chuàng)建沒(méi)有返回值的異步任務(wù),使用默認(rèn)線程池的示例如下:
import java.util.concurrent.CompletableFuture;public class CompletableFutureRunAsyncDefault {public static void main(String[] args) {CompletableFuture.runAsync(() -> {// 模擬異步任務(wù)中的操作,比如打印日志、更新緩存等try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("沒(méi)有返回值的異步任務(wù)執(zhí)行完成");});System.out.println("主線程繼續(xù)執(zhí)行其他任務(wù)");}}
在這段代碼中,CompletableFuture.runAsync方法接受一個(gè)Runnable接口的實(shí)現(xiàn),定義了一個(gè)沒(méi)有返回值的異步任務(wù)。任務(wù)中通過(guò)Thread.sleep(1000)模擬了一個(gè)耗時(shí) 1 秒的操作,然后打印任務(wù)完成的消息。由于runAsync方法創(chuàng)建的任務(wù)沒(méi)有返回值,所以主線程不需要等待任務(wù)完成,可以直接繼續(xù)執(zhí)行后續(xù)操作。
使用自定義線程池創(chuàng)建沒(méi)有返回值的異步任務(wù)示例如下:
import java.util.concurrent.*;public class CompletableFutureRunAsyncCustom {public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(3,6,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>(50));CompletableFuture.runAsync(() -> {// 模擬異步任務(wù)中的操作,比如發(fā)送郵件、調(diào)用第三方接口等try {Thread.sleep(1500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("使用自定義線程池的無(wú)返回值異步任務(wù)執(zhí)行完成");}, executor);System.out.println("主線程繼續(xù)執(zhí)行其他任務(wù)");// 關(guān)閉線程池,釋放資源executor.shutdown();}}
此示例中,我們創(chuàng)建了一個(gè)自定義線程池,并使用CompletableFuture.runAsync方法結(jié)合自定義線程池創(chuàng)建了一個(gè)沒(méi)有返回值的異步任務(wù)。任務(wù)執(zhí)行完畢后,打印相應(yīng)的消息,主線程繼續(xù)執(zhí)行其他操作,最后關(guān)閉線程池。
(二)鏈?zhǔn)秸{(diào)用與結(jié)果處理
CompletableFuture 的鏈?zhǔn)秸{(diào)用功能使得我們可以對(duì)異步任務(wù)的結(jié)果進(jìn)行一系列的處理,讓代碼更加簡(jiǎn)潔和易讀。通過(guò)thenApply、thenAccept、thenRun等方法,我們可以實(shí)現(xiàn)對(duì)異步任務(wù)結(jié)果的轉(zhuǎn)換、消費(fèi)和后續(xù)操作。
thenApply方法用于對(duì)異步任務(wù)的結(jié)果進(jìn)行轉(zhuǎn)換,它接收一個(gè)Function函數(shù),將前一個(gè)任務(wù)的結(jié)果作為參數(shù)傳入該函數(shù),并返回一個(gè)新的結(jié)果。示例代碼如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureThenApply {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)1執(zhí)行中...");return 10;}).thenApply(result -> {System.out.println("任務(wù)2執(zhí)行中,處理任務(wù)1的結(jié)果: " + result);return result * 2;}).thenApply(finalResult -> {System.out.println("任務(wù)3執(zhí)行中,處理任務(wù)2的結(jié)果: " + finalResult);return finalResult + 5;}).thenAccept(result -> {System.out.println("最終結(jié)果是: " + result);});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成Thread.sleep(2000);}}
在上述代碼中,首先使用supplyAsync方法創(chuàng)建了一個(gè)異步任務(wù),返回值為 10。然后通過(guò)thenApply方法,將任務(wù) 1 的結(jié)果乘以 2,得到新的結(jié)果 20。接著再次使用thenApply方法,將 20 加上 5,得到最終結(jié)果 25。最后通過(guò)thenAccept方法,消費(fèi)最終結(jié)果并打印輸出。整個(gè)過(guò)程通過(guò)鏈?zhǔn)秸{(diào)用,將多個(gè)異步任務(wù)串聯(lián)起來(lái),實(shí)現(xiàn)了對(duì)結(jié)果的逐步處理。
thenAccept方法用于消費(fèi)異步任務(wù)的結(jié)果,它接收一個(gè)Consumer函數(shù),將前一個(gè)任務(wù)的結(jié)果作為參數(shù)傳入該函數(shù),但不返回新的結(jié)果。示例如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureThenAccept {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)1執(zhí)行中...");return "Hello";}).thenAccept(result -> {System.out.println("任務(wù)2執(zhí)行中,消費(fèi)任務(wù)1的結(jié)果: " + result);// 可以在這個(gè)方法中進(jìn)行一些基于結(jié)果的操作,比如打印日志、更新數(shù)據(jù)庫(kù)等});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成Thread.sleep(1000);}}
在這個(gè)例子中,supplyAsync方法創(chuàng)建的異步任務(wù)返回字符串 "Hello"。thenAccept方法接收這個(gè)結(jié)果,并在其內(nèi)部消費(fèi)該結(jié)果,打印出消費(fèi)信息。在thenAccept方法中,我們可以根據(jù)業(yè)務(wù)需求進(jìn)行一些基于結(jié)果的操作,比如將結(jié)果寫(xiě)入日志文件、更新數(shù)據(jù)庫(kù)記錄等。
thenRun方法則是在異步任務(wù)完成后執(zhí)行一個(gè)無(wú)參數(shù)的操作,它不關(guān)心前一個(gè)任務(wù)的結(jié)果。示例代碼如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureThenRun {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)1執(zhí)行中...");return 100;}).thenRun(() -> {System.out.println("任務(wù)2執(zhí)行中,不依賴任務(wù)1的結(jié)果");// 這里可以執(zhí)行一些與前一個(gè)任務(wù)結(jié)果無(wú)關(guān)的操作,比如發(fā)送通知等});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成Thread.sleep(1000);}}
在這段代碼中,supplyAsync方法創(chuàng)建的異步任務(wù)返回值為 100,但thenRun方法并不關(guān)心這個(gè)結(jié)果,它只是在任務(wù) 1 完成后執(zhí)行一個(gè)獨(dú)立的操作,打印出相應(yīng)的消息。在實(shí)際應(yīng)用中,thenRun方法可以用于執(zhí)行一些與前一個(gè)任務(wù)結(jié)果無(wú)關(guān)的操作,比如發(fā)送郵件通知、更新系統(tǒng)狀態(tài)等。
(三)組合異步操作
在實(shí)際的開(kāi)發(fā)場(chǎng)景中,我們常常需要將多個(gè)異步任務(wù)組合起來(lái),以實(shí)現(xiàn)更復(fù)雜的業(yè)務(wù)邏輯。CompletableFuture 提供了thenCombine、thenAcceptBoth等方法,方便我們對(duì)多個(gè)異步任務(wù)進(jìn)行組合操作。
thenCombine方法用于將兩個(gè)異步任務(wù)的結(jié)果進(jìn)行合并,它接收另一個(gè)CompletableFuture對(duì)象和一個(gè)BiFunction函數(shù)。當(dāng)兩個(gè)異步任務(wù)都完成時(shí),BiFunction函數(shù)會(huì)將兩個(gè)任務(wù)的結(jié)果作為參數(shù)進(jìn)行處理,并返回一個(gè)新的結(jié)果。示例代碼如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureThenCombine {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)A執(zhí)行中...");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)B執(zhí)行中...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return 20;});CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {System.out.println("組合任務(wù)執(zhí)行中,合并任務(wù)A和任務(wù)B的結(jié)果");return result1 + result2;});int combinedResult = combinedFuture.get();System.out.println("最終組合結(jié)果是: " + combinedResult);}}
在上述代碼中,首先創(chuàng)建了兩個(gè)異步任務(wù)future1和future2,分別模擬了耗時(shí) 2 秒和 1 秒的操作,并返回各自的結(jié)果 10 和 20。然后使用thenCombine方法將這兩個(gè)任務(wù)組合起來(lái),當(dāng)future1和future2都完成時(shí),BiFunction函數(shù)會(huì)將它們的結(jié)果相加,得到新的結(jié)果 30。最后通過(guò)get方法獲取組合任務(wù)的結(jié)果并打印輸出。
thenAcceptBoth方法則是在兩個(gè)異步任務(wù)都完成后,將它們的結(jié)果作為參數(shù)傳遞給一個(gè)BiConsumer函數(shù)進(jìn)行消費(fèi),但不返回新的結(jié)果。示例如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureThenAcceptBoth {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)X執(zhí)行中...");try {Thread.sleep(1500);} catch (InterruptedException e) {e.printStackTrace();}return "Hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)Y執(zhí)行中...");try {Thread.sleep(2500);} catch (InterruptedException e) {e.printStackTrace();}return "World";});future1.thenAcceptBoth(future2, (result1, result2) -> {System.out.println("組合任務(wù)執(zhí)行中,消費(fèi)任務(wù)X和任務(wù)Y的結(jié)果");System.out.println("拼接后的結(jié)果是: " + result1 + " " + result2);});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成Thread.sleep(3000);}}
在這個(gè)例子中,future1和future2分別是兩個(gè)異步任務(wù),完成后分別返回 "Hello" 和 "World"。thenAcceptBoth方法接收這兩個(gè)任務(wù)的結(jié)果,并通過(guò)BiConsumer函數(shù)將它們拼接起來(lái)并打印輸出。在thenAcceptBoth方法中,我們可以根據(jù)業(yè)務(wù)需求對(duì)兩個(gè)任務(wù)的結(jié)果進(jìn)行各種消費(fèi)操作,比如將結(jié)果寫(xiě)入文件、發(fā)送到消息隊(duì)列等。
(四)異常處理
在異步編程中,異常處理是至關(guān)重要的環(huán)節(jié)。CompletableFuture 提供了exceptionally和handle方法,幫助我們優(yōu)雅地處理異步任務(wù)中可能出現(xiàn)的異常。
exceptionally方法用于捕獲異步任務(wù)執(zhí)行過(guò)程中拋出的異常,并返回一個(gè)默認(rèn)值或處理后的結(jié)果。示例代碼如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureExceptionally {public static void main(String[] args) {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)執(zhí)行中...");if (Math.random() > 0.5) {throw new RuntimeException("任務(wù)執(zhí)行出現(xiàn)異常");}return "任務(wù)正常結(jié)果";}).exceptionally(ex -> {System.out.println("捕獲到異常: " + ex.getMessage());return "默認(rèn)結(jié)果";}).thenAccept(result -> {System.out.println("最終結(jié)果是: " + result);});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}
在上述代碼中,supplyAsync方法創(chuàng)建的異步任務(wù)中,通過(guò)Math.random()方法隨機(jī)生成一個(gè)數(shù),如果這個(gè)數(shù)大于 0.5,則拋出一個(gè)運(yùn)行時(shí)異常,否則返回正常結(jié)果。exceptionally方法用于捕獲異步任務(wù)中拋出的異常,當(dāng)捕獲到異常時(shí),會(huì)執(zhí)行exceptionally方法中的處理邏輯,打印異常信息并返回一個(gè)默認(rèn)結(jié)果。如果異步任務(wù)正常完成,exceptionally方法不會(huì)被執(zhí)行,而是直接執(zhí)行thenAccept方法,處理正常的結(jié)果。
handle方法則更加靈活,它不僅可以處理異常,還可以處理正常的計(jì)算結(jié)果。handle方法接收一個(gè)BiFunction函數(shù),該函數(shù)接收兩個(gè)參數(shù):異步任務(wù)的結(jié)果和可能出現(xiàn)的異常。根據(jù)這兩個(gè)參數(shù),我們可以返回不同的處理結(jié)果。示例如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureHandle {public static void main(String[] args) {CompletableFuture.supplyAsync(() -> {System.out.println("任務(wù)執(zhí)行中...");if (Math.random() > 0.3) {throw new RuntimeException("任務(wù)執(zhí)行出現(xiàn)異常");}return "任務(wù)正常結(jié)果";}).handle((result, ex) -> {if (ex!= null) {System.out.println("捕獲到異常: " + ex.getMessage());return "異常處理后的結(jié)果";}System.out.println("任務(wù)正常完成,結(jié)果是: " + result);return result;}).thenAccept(finalResult -> {System.out.println("最終結(jié)果是: " + finalResult);});// 主線程需要等待一段時(shí)間,以便異步任務(wù)有足夠的時(shí)間完成try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}
在這個(gè)例子中,supplyAsync方法創(chuàng)建的異步任務(wù)同樣可能會(huì)拋出異常。handle方法中的BiFunction函數(shù)會(huì)根據(jù)任務(wù)的執(zhí)行情況進(jìn)行處理。如果出現(xiàn)異常,會(huì)打印異常信息并返回異常處理后的結(jié)果;如果任務(wù)正常完成,會(huì)打印正常結(jié)果并返回該結(jié)果。最后通過(guò)thenAccept方法處理最終的
五、與其他異步編程方式的對(duì)比
(一)與 Future 的對(duì)比
在 Java 的異步編程領(lǐng)域,F(xiàn)uture 接口是早期的重要工具,而 CompletableFuture 則是在其基礎(chǔ)上的進(jìn)一步發(fā)展和增強(qiáng)。下面我們從功能、性能、靈活性等方面來(lái)對(duì)比這兩者,看看 CompletableFuture 究竟有哪些獨(dú)特的優(yōu)勢(shì)。
從功能層面來(lái)看,F(xiàn)uture 主要用于表示異步計(jì)算的結(jié)果,它提供了isDone方法用于檢查計(jì)算是否完成,get方法用于獲取計(jì)算結(jié)果,以及cancel方法用于取消計(jì)算任務(wù)。然而,F(xiàn)uture 的功能相對(duì)較為單一,獲取結(jié)果時(shí),如果計(jì)算尚未完成,get方法會(huì)阻塞當(dāng)前線程,這在一定程度上限制了其在復(fù)雜異步場(chǎng)景中的應(yīng)用。例如,在一個(gè)需要同時(shí)處理多個(gè)異步任務(wù)并對(duì)結(jié)果進(jìn)行復(fù)雜處理的電商系統(tǒng)中,使用 Future 可能會(huì)導(dǎo)致主線程長(zhǎng)時(shí)間阻塞,影響系統(tǒng)的響應(yīng)性能。
相比之下,CompletableFuture 的功能要豐富得多。它不僅實(shí)現(xiàn)了 Future 接口,具備 Future 的基本功能,還提供了大量用于異步任務(wù)編排和結(jié)果處理的方法。通過(guò)thenApply、thenAccept、thenRun等方法,我們可以方便地對(duì)異步任務(wù)的結(jié)果進(jìn)行轉(zhuǎn)換、消費(fèi)和后續(xù)操作,實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用,使代碼更加簡(jiǎn)潔和易讀。在處理多個(gè)異步任務(wù)時(shí),CompletableFuture 提供了thenCombine、thenAcceptBoth、allOf、anyOf等方法,能夠輕松地實(shí)現(xiàn)任務(wù)的組合和并行執(zhí)行,滿足各種復(fù)雜的業(yè)務(wù)需求。
在性能方面,由于 Future 的get方法會(huì)阻塞線程,當(dāng)有大量異步任務(wù)需要處理時(shí),可能會(huì)導(dǎo)致線程資源的浪費(fèi)和系統(tǒng)性能的下降。而 CompletableFuture 采用了非阻塞的方式來(lái)處理異步結(jié)果,通過(guò)回調(diào)機(jī)制,在任務(wù)完成時(shí)自動(dòng)觸發(fā)后續(xù)操作,無(wú)需阻塞線程,從而提高了系統(tǒng)的并發(fā)性能和響應(yīng)速度。例如,在一個(gè)高并發(fā)的網(wǎng)絡(luò)爬蟲(chóng)系統(tǒng)中,使用 CompletableFuture 可以充分利用線程資源,同時(shí)處理多個(gè)網(wǎng)頁(yè)的抓取和解析任務(wù),大大提高了爬蟲(chóng)的效率。
靈活性上,F(xiàn)uture 的操作相對(duì)較為固定,缺乏對(duì)多個(gè)異步操作的組合能力。而 CompletableFuture 則提供了強(qiáng)大的組合和處理能力,可以方便地處理復(fù)雜的異步任務(wù)。它支持在任務(wù)執(zhí)行過(guò)程中動(dòng)態(tài)地添加回調(diào)函數(shù),根據(jù)任務(wù)的執(zhí)行結(jié)果或異常情況進(jìn)行不同的處理,使得異步編程更加靈活和可控。在一個(gè)分布式系統(tǒng)中,可能需要調(diào)用多個(gè)不同的服務(wù)來(lái)完成一個(gè)業(yè)務(wù)流程,使用 CompletableFuture 可以輕松地將這些異步調(diào)用組合起來(lái),實(shí)現(xiàn)高效的分布式事務(wù)處理。
(二)與其他異步框架的對(duì)比
除了與 Future 進(jìn)行對(duì)比,我們也來(lái)簡(jiǎn)要提及一下 CompletableFuture 與其他異步框架的差異,比如 Guava 的 ListenableFuture。
Guava 的 ListenableFuture 是對(duì) Java 原生 Future 的擴(kuò)展,它提供了一種可以監(jiān)聽(tīng)異步任務(wù)完成的機(jī)制,通過(guò)添加回調(diào)函數(shù),在任務(wù)完成時(shí)自動(dòng)觸發(fā)回調(diào)操作,這在一定程度上彌補(bǔ)了 Future 的不足。例如,在一個(gè)需要實(shí)時(shí)處理消息的系統(tǒng)中,使用 ListenableFuture 可以在消息處理完成后立即觸發(fā)通知操作,提高系統(tǒng)的實(shí)時(shí)性。
然而,CompletableFuture 作為 Java 標(biāo)準(zhǔn)庫(kù)的一部分,在使用和集成上具有獨(dú)特的便利性。它無(wú)需額外引入第三方庫(kù),減少了項(xiàng)目的依賴,降低了維護(hù)成本。而且,CompletableFuture 與 Java 8 引入的 Lambda 表達(dá)式、Stream API 等新特性緊密結(jié)合,能夠更好地利用 Java 語(yǔ)言的新特性進(jìn)行異步編程,使得代碼更加簡(jiǎn)潔、高效。在使用 CompletableFuture 時(shí),我們可以直接使用 Lambda 表達(dá)式來(lái)定義異步任務(wù)和回調(diào)函數(shù),利用 Stream API 對(duì)多個(gè)異步任務(wù)進(jìn)行并行處理和結(jié)果匯總,大大提高了開(kāi)發(fā)效率。
在功能方面,CompletableFuture 同樣具有一定的優(yōu)勢(shì)。它提供了更豐富的方法來(lái)處理異步任務(wù)的組合、異常處理和超時(shí)控制等。在處理多個(gè)異步任務(wù)的組合時(shí),CompletableFuture 的thenCombine、allOf、anyOf等方法比 ListenableFuture 更加簡(jiǎn)潔和靈活,能夠更方便地實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。在異常處理方面,CompletableFuture 的exceptionally和handle方法提供了更強(qiáng)大的異常處理能力,能夠更優(yōu)雅地處理異步任務(wù)中可能出現(xiàn)的異常情況。
六、實(shí)際應(yīng)用場(chǎng)景
(一)電商系統(tǒng)中的商品比價(jià)
在電商領(lǐng)域,用戶在購(gòu)買(mǎi)商品時(shí)往往希望能夠快速了解不同平臺(tái)上同款商品的價(jià)格,以便做出最優(yōu)的購(gòu)買(mǎi)決策。為了實(shí)現(xiàn)這一功能,我們可以利用 CompletableFuture 來(lái)異步地從多個(gè)電商平臺(tái)獲取商品價(jià)格信息,并進(jìn)行比價(jià)展示。
假設(shè)我們有一個(gè)電商比價(jià)系統(tǒng),需要從淘寶、京東、拼多多等多個(gè)電商平臺(tái)獲取某款手機(jī)的價(jià)格。使用 CompletableFuture,我們可以將每個(gè)平臺(tái)的價(jià)格查詢操作封裝成一個(gè)異步任務(wù),然后并行地執(zhí)行這些任務(wù),最后將所有平臺(tái)的價(jià)格信息匯總展示給用戶。示例代碼如下:
import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.stream.Collectors;// 定義電商平臺(tái)類(lèi)class EcommercePlatform {private String platformName;public EcommercePlatform(String platformName) {this.platformName = platformName;}// 模擬從電商平臺(tái)獲取商品價(jià)格的方法public double getProductPrice(String product) {// 這里可以替換為實(shí)際的價(jià)格查詢邏輯,比如調(diào)用電商平臺(tái)的APItry {Thread.sleep(1000); // 模擬網(wǎng)絡(luò)延遲或數(shù)據(jù)查詢耗時(shí)} catch (InterruptedException e) {e.printStackTrace();}// 隨機(jī)生成一個(gè)價(jià)格作為示例return Math.random() * 1000 + 1000;}}public class PriceComparisonSystem {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創(chuàng)建線程池ExecutorService executor = Executors.newFixedThreadPool(3);// 定義要查詢價(jià)格的商品String product = "iPhone 15";// 定義各個(gè)電商平臺(tái)EcommercePlatform taobao = new EcommercePlatform("淘寶");EcommercePlatform jd = new EcommercePlatform("京東");EcommercePlatform pinduoduo = new EcommercePlatform("拼多多");// 使用CompletableFuture異步獲取各個(gè)平臺(tái)的商品價(jià)格CompletableFuture<Double> taobaoPriceFuture = CompletableFuture.supplyAsync(() -> taobao.getProductPrice(product), executor);CompletableFuture<Double> jdPriceFuture = CompletableFuture.supplyAsync(() -> jd.getProductPrice(product), executor);CompletableFuture<Double> pinduoduoPriceFuture = CompletableFuture.supplyAsync(() -> pinduoduo.getProductPrice(product), executor);// 將所有的CompletableFuture放入一個(gè)列表中List<CompletableFuture<Double>> futureList = new ArrayList<>();futureList.add(taobaoPriceFuture);futureList.add(jdPriceFuture);futureList.add(pinduoduoPriceFuture);// 使用CompletableFuture.allOf等待所有異步任務(wù)完成CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();// 獲取各個(gè)平臺(tái)的商品價(jià)格List<Double> priceList = futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());// 打印各個(gè)平臺(tái)的商品價(jià)格System.out.println("商品 " + product + " 在不同平臺(tái)的價(jià)格如下:");System.out.println("淘寶:" + priceList.get(0));System.out.println("京東:" + priceList.get(1));System.out.println("拼多多:" + priceList.get(2));// 關(guān)閉線程池executor.shutdown();}}
在上述代碼中,首先創(chuàng)建了一個(gè)固定大小的線程池,用于執(zhí)行異步任務(wù)。然后,定義了三個(gè)電商平臺(tái)對(duì)象,并使用 CompletableFuture 的supplyAsync方法將每個(gè)平臺(tái)的價(jià)格查詢操作封裝成異步任務(wù),這些任務(wù)將在后臺(tái)線程中并行執(zhí)行。通過(guò)CompletableFuture.allOf方法等待所有異步任務(wù)完成,最后獲取各個(gè)平臺(tái)的商品價(jià)格并打印輸出。使用 CompletableFuture 實(shí)現(xiàn)商品比價(jià)功能,大大提高了獲取價(jià)格信息的效率,避免了因順序查詢而導(dǎo)致的長(zhǎng)時(shí)間等待,為用戶提供了更快速、便捷的購(gòu)物體驗(yàn)。
(二)多數(shù)據(jù)源的數(shù)據(jù)獲取與整合
在企業(yè)級(jí)開(kāi)發(fā)中,經(jīng)常會(huì)遇到需要從多個(gè)數(shù)據(jù)源獲取數(shù)據(jù)并進(jìn)行整合處理的場(chǎng)景。例如,在一個(gè)大型企業(yè)的數(shù)據(jù)分析系統(tǒng)中,用戶信息可能存儲(chǔ)在關(guān)系型數(shù)據(jù)庫(kù)中,而用戶的行為數(shù)據(jù)則存儲(chǔ)在分布式緩存和日志文件中。為了生成全面的用戶分析報(bào)告,我們需要從這些不同的數(shù)據(jù)源獲取數(shù)據(jù),并進(jìn)行整合和分析。
假設(shè)我們要開(kāi)發(fā)一個(gè)用戶數(shù)據(jù)分析模塊,需要從數(shù)據(jù)庫(kù)中獲取用戶的基本信息,從緩存中獲取用戶的近期登錄記錄,從日志文件中獲取用戶的操作行為數(shù)據(jù)。使用 CompletableFuture,我們可以將這些數(shù)據(jù)獲取操作并行化,提高數(shù)據(jù)獲取的效率。示例代碼如下:
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;// 模擬數(shù)據(jù)庫(kù)操作類(lèi)class DatabaseService {public String getUserInfo(String userId) {try {Thread.sleep(1500); // 模擬數(shù)據(jù)庫(kù)查詢耗時(shí)} catch (InterruptedException e) {e.printStackTrace();}return "用戶基本信息:姓名:張三,年齡:25,性別:男";}}// 模擬緩存操作類(lèi)class CacheService {public String getLoginRecords(String userId) {try {Thread.sleep(1000); // 模擬緩存查詢耗時(shí)} catch (InterruptedException e) {e.printStackTrace();}return "近期登錄記錄:[2024-01-01 10:00:00,2024-01-02 15:30:00]";}}// 模擬日志文件操作類(lèi)class LogService {public String getOperationLogs(String userId) {try {Thread.sleep(2000); // 模擬日志文件讀取耗時(shí)} catch (InterruptedException e) {e.printStackTrace();}return "操作行為數(shù)據(jù):[查看商品詳情,添加購(gòu)物車(chē),提交訂單]";}}public class UserDataAnalysis {public static void main(String[] args) throws ExecutionException, InterruptedException {String userId = "123456";DatabaseService databaseService = new DatabaseService();CacheService cacheService = new CacheService();LogService logService = new LogService();// 使用CompletableFuture異步獲取用戶基本信息CompletableFuture<String> userInfoFuture = CompletableFuture.supplyAsync(() -> databaseService.getUserInfo(userId));// 使用CompletableFuture異步獲取用戶近期登錄記錄CompletableFuture<String> loginRecordsFuture = CompletableFuture.supplyAsync(() -> cacheService.getLoginRecords(userId));// 使用CompletableFuture異步獲取用戶操作行為數(shù)據(jù)CompletableFuture<String> operationLogsFuture = CompletableFuture.supplyAsync(() -> logService.getOperationLogs(userId));// 使用CompletableFuture.allOf等待所有異步任務(wù)完成CompletableFuture.allOf(userInfoFuture, loginRecordsFuture, operationLogsFuture).join();// 獲取并整合數(shù)據(jù)String userInfo = userInfoFuture.get();String loginRecords = loginRecordsFuture.get();String operationLogs = operationLogsFuture.get();String combinedData = "用戶ID:" + userId + "\n" + userInfo + "\n" + loginRecords + "\n" + operationLogs;System.out.println(combinedData);}}
在這個(gè)示例中,分別創(chuàng)建了模擬數(shù)據(jù)庫(kù)、緩存和日志文件操作的類(lèi),并定義了相應(yīng)的數(shù)據(jù)獲取方法。使用 CompletableFuture 的supplyAsync方法將從不同數(shù)據(jù)源獲取數(shù)據(jù)的操作封裝成異步任務(wù),這些任務(wù)會(huì)在后臺(tái)線程中并行執(zhí)行。通過(guò)CompletableFuture.allOf方法等待所有異步任務(wù)完成,然后獲取各個(gè)數(shù)據(jù)源的數(shù)據(jù)并進(jìn)行整合,最終輸出完整的用戶分析數(shù)據(jù)。這種方式充分利用了 CompletableFuture 的異步和并行處理能力,大大提高了數(shù)據(jù)獲取和整合的效率,為企業(yè)級(jí)應(yīng)用的高效運(yùn)行提供了有力支持 。
七、總結(jié)與展望
在 Java 異步編程的廣闊領(lǐng)域中,CompletableFuture 無(wú)疑占據(jù)著舉足輕重的地位。它的出現(xiàn),為開(kāi)發(fā)者們提供了一種高效、便捷且靈活的異步編程解決方案,極大地簡(jiǎn)化了異步任務(wù)的處理流程。
從基本的異步任務(wù)創(chuàng)建,到復(fù)雜的任務(wù)組合與結(jié)果處理,再到優(yōu)雅的異常處理機(jī)制,CompletableFuture 憑借其豐富的方法和強(qiáng)大的功能,滿足了各種場(chǎng)景下的異步編程需求。與傳統(tǒng)的異步編程方式相比,它的優(yōu)勢(shì)顯而易見(jiàn),無(wú)論是在提升代碼的可讀性、可維護(hù)性,還是在提高程序的性能和響應(yīng)性方面,都表現(xiàn)出色。
在實(shí)際項(xiàng)目中,我們已經(jīng)看到 CompletableFuture 在電商系統(tǒng)、企業(yè)級(jí)數(shù)據(jù)處理等諸多領(lǐng)域發(fā)揮了重要作用。它能夠幫助我們更高效地利用系統(tǒng)資源,提升用戶體驗(yàn),增強(qiáng)系統(tǒng)的競(jìng)爭(zhēng)力。
展望未來(lái),隨著 Java 技術(shù)的不斷發(fā)展和應(yīng)用場(chǎng)景的日益豐富,相信 CompletableFuture 還將不斷進(jìn)化和完善,為我們帶來(lái)更多的驚喜和便利。同時(shí),也希望廣大讀者能夠在日常的開(kāi)發(fā)工作中,大膽地使用 CompletableFuture,不斷探索它的更多可能性,讓我們的程序在異步編程的加持下,運(yùn)行得更加高效、流暢 。