面試官:如何實(shí)現(xiàn)線程池任務(wù)編排?
任務(wù)編排(Task Orchestration)是指管理和控制多個(gè)任務(wù)的執(zhí)行流程,確保它們按照預(yù)定的順序正確執(zhí)行。
1.為什么需要任務(wù)編排?
在復(fù)雜的業(yè)務(wù)場景中,任務(wù)間通常存在依賴關(guān)系,也就是某個(gè)任務(wù)會(huì)依賴另一個(gè)任務(wù)的執(zhí)行結(jié)果,在這種情況下,我們需要通過任務(wù)編排,來確保任務(wù)按照正確的順序進(jìn)行執(zhí)行。
例如,以下任務(wù)的執(zhí)行順序:
其中,任務(wù)二要等任務(wù)一執(zhí)行完才能執(zhí)行,而任務(wù)四要等任務(wù)二和任務(wù)三全部執(zhí)行完才能執(zhí)行。
2.任務(wù)編排實(shí)現(xiàn)
任務(wù)編排和控制的主要手段有以下:
- Future
- CompletableFuture
- CountDownLatch
- Semaphore
- CyclicBarrier
但如果是全局線程池,想要實(shí)現(xiàn)精準(zhǔn)的任務(wù)編排,只能使用 Future 或 CompletableFuture。
(1)Future 任務(wù)編排
使用 Future 實(shí)現(xiàn)上述 4 個(gè)任務(wù)的編排(任務(wù)二要等任務(wù)一執(zhí)行完才能執(zhí)行,而任務(wù)四要等任務(wù)二和任務(wù)三全部執(zhí)行完才能執(zhí)行):
import java.util.concurrent.*;
import java.util.Arrays;
public class TaskOrchestrator {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)線程池來執(zhí)行任務(wù)
ExecutorService executor = Executors.newFixedThreadPool(5);
// 定義任務(wù)一
Future<String> taskOneResult = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000); // 模擬耗時(shí)操作
return "Task One Result";
}
});
// 定義任務(wù)二,依賴任務(wù)一
Future<String> taskTwoResult = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String result = taskOneResult.get(); // 阻塞等待任務(wù)一完成
Thread.sleep(1000); // 模擬耗時(shí)操作
return "Task Two Result, got: " + result;
}
});
// 定義任務(wù)三
Future<String> taskThreeResult = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1500); // 模擬耗時(shí)操作
return "Task Three Result";
}
});
// 定義任務(wù)四,依賴任務(wù)二和任務(wù)三
Future<String> taskFourResult = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String taskTwoOutput = taskTwoResult.get(); // 阻塞等待任務(wù)二完成
String taskThreeOutput = taskThreeResult.get(); // 阻塞等待任務(wù)三完成
Thread.sleep(500); // 模擬耗時(shí)操作
return "Task Four Result, got: " + taskTwoOutput + " and " + taskThreeOutput;
}
});
// 打印最終結(jié)果
try {
System.out.println("Final Result: " + taskFourResult.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
(2)CompletableFuture 任務(wù)編排
CompletableFutrue 提供的方法有很多,但最常用和最實(shí)用的核心方法只有以下幾個(gè):
接下來,使用 CompletableFuture 實(shí)現(xiàn)上述 4 個(gè)任務(wù)的編排(任務(wù)二要等任務(wù)一執(zhí)行完才能執(zhí)行,而任務(wù)四要等任務(wù)二和任務(wù)三全部執(zhí)行完才能執(zhí)行):
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
// 任務(wù)一:返回 "Task 1 result"
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
// 模擬耗時(shí)操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 1 result";
});
// 任務(wù)二:依賴任務(wù)一,返回 "Task 2 result" + 任務(wù)一的結(jié)果
CompletableFuture<String> task2 = task1.handle((result1, throwable) -> {
try {
// 模擬耗時(shí)操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 2 result " + result1;
});
// 任務(wù)三:和任務(wù)一、任務(wù)二并行執(zhí)行,返回 "Task 3 result"
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
// 模擬耗時(shí)操作
Thread.sleep(800); // 任務(wù)三可能比任務(wù)二先完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Task 3 result";
});
// 任務(wù)四:依賴任務(wù)二和任務(wù)三,等待它們都完成后執(zhí)行,返回 "Task 4 result" + 任務(wù)二和任務(wù)三的結(jié)果
CompletableFuture<String> task4 = CompletableFuture.allOf(task2, task3).handle((res, throwable) -> {
try {
// 這里不需要顯式等待,因?yàn)?allOf 已經(jīng)保證了它們完成
return "Task 4 result with " + task2.get() + " and " + task3.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// 獲取任務(wù)四的結(jié)果并打印
String finalResult = task4.join();
System.out.println(finalResult);
}
}