美團(tuán)面試:如何實(shí)現(xiàn)線(xiàn)程任務(wù)編排?
線(xiàn)程任務(wù)編排指的是對(duì)多個(gè)線(xiàn)程任務(wù)按照一定的邏輯順序或條件進(jìn)行組織和安排,以實(shí)現(xiàn)協(xié)同工作、順序執(zhí)行或并行執(zhí)行的一種機(jī)制。
1.線(xiàn)程任務(wù)編排 VS 線(xiàn)程通訊
有同學(xué)可能會(huì)想:那線(xiàn)程的任務(wù)編排是不是問(wèn)的就是線(xiàn)程間通訊啊?
線(xiàn)程間通訊我知道了,它的實(shí)現(xiàn)方式總共有以下幾種方式:
- Object 類(lèi)下的 wait()、notify() 和 notifyAll() 方法;
- Condition 類(lèi)下的 await()、signal() 和 signalAll() 方法;
- LockSupport 類(lèi)下的 park() 和 unpark() 方法。
但是,線(xiàn)程通訊和線(xiàn)程的任務(wù)編排是不同的兩個(gè)概念,它們的區(qū)別如下:
- 線(xiàn)程任務(wù)編排主要關(guān)注的是如何組織和管理線(xiàn)程執(zhí)行的任務(wù)序列,確保任務(wù)按照預(yù)定的邏輯和順序執(zhí)行,包括任務(wù)的啟動(dòng)、停止、依賴(lài)管理、執(zhí)行策略(如并行、串行)以及錯(cuò)誤處理等。它是關(guān)于如何有效地規(guī)劃線(xiàn)程的工作流程,以達(dá)成高效和正確的程序執(zhí)行目標(biāo)。
- 線(xiàn)程通訊則是指在多線(xiàn)程環(huán)境中,線(xiàn)程之間傳遞信息和協(xié)調(diào)工作的機(jī)制。當(dāng)多個(gè)線(xiàn)程需要共享數(shù)據(jù)或協(xié)同完成某項(xiàng)任務(wù)時(shí),它們需要通過(guò)某種方式進(jìn)行溝通,以確保數(shù)據(jù)的正確性和任務(wù)的同步執(zhí)行。它的重點(diǎn)在于解決線(xiàn)程間的同步問(wèn)題和數(shù)據(jù)一致性問(wèn)題。
簡(jiǎn)而言之,線(xiàn)程任務(wù)編排側(cè)重于高層次的執(zhí)行計(jì)劃和流程控制,而線(xiàn)程通訊則專(zhuān)注于底層的數(shù)據(jù)交互和同步細(xì)節(jié)。在實(shí)際應(yīng)用中,有效的線(xiàn)程任務(wù)編排往往離不開(kāi)合理的線(xiàn)程通訊機(jī)制,兩者相輔相成,共同支撐起復(fù)雜多線(xiàn)程程序的正確執(zhí)行。
2.線(xiàn)程任務(wù)編排
線(xiàn)程的任務(wù)編排的實(shí)現(xiàn)方式主要有以下兩種:
- FutureTask:誕生于 JDK 1.5,它實(shí)現(xiàn)了 Future 接口和 Runnable 接口,設(shè)計(jì)初衷是為了支持可取消的異步計(jì)算。它既可以承載 Runnable 任務(wù)(通過(guò)包裝成 RunnableAdapter),也可以承載 Callable 任務(wù),從而能夠返回計(jì)算結(jié)果,使用它可以實(shí)現(xiàn)簡(jiǎn)單的異步任務(wù)執(zhí)行和結(jié)果的等待。
- CompletableFuture:誕生于 JDK 8,它不僅實(shí)現(xiàn)了 Future 接口,還實(shí)現(xiàn)了 CompletionStage 接口。CompletionStage 是對(duì) Future 的擴(kuò)展,提供了豐富的鏈?zhǔn)疆惒骄幊棠P?,支持函?shù)式編程風(fēng)格,可以更加靈活地處理異步操作的組合和依賴(lài)回調(diào)等。
(1)FutureTask 使用
FutureTask 使用示例如下:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTaskDemo {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)Callable任務(wù)
Callable<Integer> task = () -> {
Thread.sleep(2000); // 模擬任務(wù)耗時(shí)操作
return 10; // 返回任務(wù)結(jié)果
};
// 創(chuàng)建FutureTask,并將Callable任務(wù)包裝起來(lái)
FutureTask<Integer> futureTask = new FutureTask<>(task);
// 創(chuàng)建線(xiàn)程池
ExecutorService executor = Executors.newCachedThreadPool();
// 提交FutureTask給線(xiàn)程池執(zhí)行
executor.submit(futureTask);
try {
// 獲取任務(wù)結(jié)果,get()方法會(huì)阻塞直到任務(wù)完成并返回結(jié)果
int result = futureTask.get();
System.out.println("任務(wù)結(jié)果:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在上述示例中,通過(guò)創(chuàng)建一個(gè) Callable 任務(wù)來(lái)模擬耗時(shí)操作,并使用 FutureTask 包裝該任務(wù)。然后將 FutureTask 提交給線(xiàn)程池執(zhí)行,最后通過(guò) get() 方法獲取任務(wù)的執(zhí)行結(jié)果,之后才會(huì)執(zhí)行后續(xù)流程。我們可以通過(guò) get() 方法阻塞等待程序執(zhí)行結(jié)果,從而完成線(xiàn)程任務(wù)的簡(jiǎn)單編排。
(2)CompletableFuture 使用
從上面 FutureTask 實(shí)現(xiàn)代碼可以看出,它不但寫(xiě)法麻煩,而且需要使用 get() 方法阻塞等待線(xiàn)程的執(zhí)行結(jié)果,對(duì)于異步任務(wù)的執(zhí)行來(lái)說(shuō),不夠靈活且效率也會(huì)受影響,然而 CompletableFutrue 的出現(xiàn),則彌補(bǔ)了 FutureTask 的這些缺陷。
CompletableFutrue 提供的方法有很多,但最常用和最實(shí)用的核心方法只有以下幾個(gè):
例如,我們現(xiàn)在實(shí)現(xiàn)一個(gè)這樣的場(chǎng)景:
任務(wù)描述:任務(wù)一執(zhí)行完之后執(zhí)行任務(wù)二,任務(wù)三和任務(wù)一和任務(wù)二一起執(zhí)行,所有任務(wù)都有返回值,等任務(wù)二和任務(wù)三執(zhí)行完成之后,再執(zhí)行任務(wù)四,它的實(shí)現(xiàn)代碼如下:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
// 任務(wù)一:返回 "Task 1 result"
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
// 模擬耗時(shí)操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 1 result";
});
// 任務(wù)二:依賴(lài)任務(wù)一,返回 "Task 2 result" + 任務(wù)一的結(jié)果
CompletableFuture<String> task2 = task1.handle((result1, throwable) -> {
try {
// 模擬耗時(shí)操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 2 result " + result1;
});
// 任務(wù)三:和任務(wù)一、任務(wù)二并行執(zhí)行,返回 "Task 3 result"
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
// 模擬耗時(shí)操作
Thread.sleep(800); // 任務(wù)三可能比任務(wù)二先完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 3 result";
});
// 任務(wù)四:依賴(lài)任務(wù)二和任務(wù)三,等待它們都完成后執(zhí)行,返回 "Task 4 result" + 任務(wù)二和任務(wù)三的結(jié)果
CompletableFuture<String> task4 = CompletableFuture.allOf(task2, task3).handle((res, throwable) -> {
try {
// 這里不需要顯式等待,因?yàn)?allOf 已經(jīng)保證了它們完成
return "Task 4 result with " + task2.get() + " and " + task3.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 獲取任務(wù)四的結(jié)果并打印
String finalResult = task4.join();
System.out.println(finalResult);
}
}
課后思考
使用 CompletableFuture 需要配合線(xiàn)程池一起使用嗎?為什么?CompletableFuture 默認(rèn)的線(xiàn)程池是如何實(shí)現(xiàn)的?