萬字圖解線程池ThreadPoolExecutor、ForkJoinPool、定時調(diào)度 STPE 使用場景和原理
J.U.C 提供的線程池:ThreadPoolExecutor 類,幫助開發(fā)人員管理線程并方便地執(zhí)行并行任務(wù)。
了解并合理使用線程池,是一個開發(fā)人員必修的基本功。
本文會圍繞以下幾點展開:
- 什么是線程池?
- 如何使用線程池?
- 線程池的使用場景和實現(xiàn)原理:
ThreadPoolExecutor
ScheduledThreadPoolExecutor
ForkJoinPool
線程池是什么?
線程池(Thread Pool)是一種基于池化思想管理線程的工具,經(jīng)常出現(xiàn)在多線程服務(wù)器中,如 MySQL。
線程過多會帶來額外的開銷,其中包括創(chuàng)建銷毀線程的開銷、調(diào)度線程的開銷等等,同時也降低了計算機的整體性能。
線程池維護多個線程,等待監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。
這種做法,一方面避免了處理任務(wù)時創(chuàng)建銷毀線程開銷的代價,另一方面避免了線程數(shù)量膨脹導(dǎo)致的過分調(diào)度問題,保證了對內(nèi)核的充分利用。
Chaya:“線程池有什么好處呢?”
通過使用線程池,可以帶來了許多好處:
- 資源管理: 線程池能夠有效地管理系統(tǒng)資源,通過限制并發(fā)任務(wù)的數(shù)量和重用線程,減少了線程創(chuàng)建和銷毀的開銷,提高了系統(tǒng)資源利用率。
- 性能提升: 通過合理地配置線程池大小和任務(wù)隊列,可以優(yōu)化任務(wù)執(zhí)行流程,降低了線程的上下文切換成本,提高了任務(wù)的執(zhí)行效率和系統(tǒng)的吞吐量。
- 避免資源耗盡: 線程池可以控制并發(fā)任務(wù)的數(shù)量,防止系統(tǒng)因創(chuàng)建過多線程而導(dǎo)致資源耗盡,從而提高了系統(tǒng)的穩(wěn)定性和可靠性。
- 任務(wù)排隊: 線程池通過任務(wù)隊列來暫存尚未執(zhí)行的任務(wù),保證了任務(wù)的順序執(zhí)行,并且能夠靈活地處理突發(fā)任務(wù)量,避免了系統(tǒng)的過載。
- 簡化并發(fā)編程: 使用線程池可以簡化并發(fā)編程的復(fù)雜性,開發(fā)人員無需手動管理線程的生命周期和任務(wù)的調(diào)度,只需將任務(wù)提交給線程池即可,從而降低了編程的復(fù)雜度和出錯的可能性。
使用場景
不聊原理,先說下如何使用。
Web 應(yīng)用的并發(fā)請求處理
Web 應(yīng)用通常需要同時處理多個用戶的請求。為了不每個請求都創(chuàng)建一個新線程,可以使用線程池來復(fù)用一定數(shù)量的線程:
public class WebServer {
// 創(chuàng)建固定大小的線程池??以處理用戶請求
privatestaticfinal ExecutorService executor = Executors.newFixedThreadPool(100);
public static void handleRequest(HttpRequest request) {
CompletableFuture.runAsync(() -> {
// 處理請求
processRequest(request);
}, executor);
}
private static void processRequest(HttpRequest request) {
// 處理請求的實現(xiàn)
}
}
后臺任務(wù)和定時任務(wù)
應(yīng)用程序可能需要定期執(zhí)行一些后臺任務(wù),如數(shù)據(jù)庫的清理工作。
可以使用ScheduledThreadPoolExecutor來安排這些任務(wù):
public class BackgroundJobScheduler {
privatestaticfinal ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
public static void startCleanupJob() {
// 這里執(zhí)行清理任務(wù)
scheduler.scheduleAtFixedRate(BackgroundJobScheduler::performCleanup
, 0, 1, TimeUnit.HOURS);
}
private static void performCleanup() {
// 清理工作的實現(xiàn)
}
}
異步操作
用戶下單后可能需要進行一系列后臺操作,比如發(fā)送確認(rèn)郵件、通知倉庫出貨等。
public class ECommerceApplication {
privatestaticfinal ExecutorService pool = Executors.newCachedThreadPool();
public static void completeOrder(Order order) {
// 異步發(fā)送確認(rèn)郵件
pool.execute(() -> sendConfirmationEmail(order));
// 異步通知倉庫
pool.execute(() -> notifyWarehouse(order));
}
private static void sendConfirmationEmail(Order order) {
// 郵件發(fā)送邏輯
}
private static void notifyWarehouse(Order order) {
// 倉庫通知邏輯
}
}
Executor 線程池核心設(shè)計
線程池的頂層接口是 Executor,它提供了一種思想,將任務(wù)提交和任務(wù)執(zhí)行進行解耦。
用戶無需關(guān)注如何創(chuàng)建線程,如何調(diào)度線程來執(zhí)行任務(wù),用戶只需提供 Runnable 對象,將任務(wù)的運行邏輯提交到執(zhí)行器(Executor)中,由 Executor 框架完成線程的調(diào)配和任務(wù)的執(zhí)行部分。
public interface Executor {
void execute(Runnable command);
}
我們首先來看一下 ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask、ForkJoinPool的 UML 類圖,全局上了解下線程池的繼承關(guān)系。
圖片
ExecutorService接口增加了一些能力:
圖片
- 擴充執(zhí)行任務(wù)的能力:可以為一個或一批異步任務(wù)生成 Future 的方法;
- 提供了管控線程池的方法,比如停止線程池的運行。
AbstractExecutorService則是上層的抽象類,實現(xiàn)了 ExecutorService,模板方法模式的運用,將執(zhí)行任務(wù)的流程串聯(lián)了起來,由子類繼承,將變化點交給子類實現(xiàn),保證下層的實現(xiàn)只需關(guān)注一個執(zhí)行任務(wù)的方法即可。
圖片
ScheduledThreadPoolExecutor 線程池的特性是定時調(diào)度,專門設(shè)計了一個接口 ScheduledExecutorService 并繼承接口 ExecutorService,這就是單一職責(zé)的體現(xiàn)了,家人們!
圖片
該接口主要定義了定時調(diào)度的方法。
最下層就分別有 ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。
ThreadPoolExecutor 原理
Chaya:ThreadPoolExecutor 運行機制是什么?如何同時維護線程狀態(tài)和執(zhí)行任務(wù)的呢?
核心組件關(guān)系如下圖所示:
圖片
- Worker:對線程的抽象。
- RejectedExecutionHandler:當(dāng)線程池的任務(wù)緩存隊列已滿,并且線程池中的線程數(shù)目達到 maximumPoolSize 時,就需要拒絕掉該任務(wù),采取任務(wù)拒絕策略,保護線程池。
- HashSet workers:持有線程的引用,這樣可以通過添加引用、移除引用這樣的操作來控制線程的生命周期。
- WorkQueue:阻塞隊列。
任務(wù)運行機制
Chaya:“從全局視角說下線程池的運行機制把?!?/p>
線程池的工作機制可以看作是一種生產(chǎn)者-消費者模型的應(yīng)用,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復(fù)用線程。
在這個模型中,任務(wù)(生產(chǎn)者)被提交到線程池,然后線程池中的線程(消費者)從任務(wù)隊列中取出任務(wù)并執(zhí)行,當(dāng)線程執(zhí)行完任務(wù)后則會繼續(xù)獲取新的任務(wù)去執(zhí)行,最終當(dāng)線程獲取不到任務(wù)的時候,線程就會被回收。
其運行機制如下圖所示:
圖片
- 開發(fā)人員使用 ThreadPoolExecutor 的 submit() 方法提交任務(wù)。
- 檢測線程池運行狀態(tài),如果不是 RUNNING,則直接拒絕,線程池要保證在 RUNNING 的狀態(tài)下執(zhí)行任務(wù)
- 提交的任務(wù)(通常實現(xiàn)了 Callable 或 Runnable 接口)會被封裝成一個 FutureTask 對象,該對象實現(xiàn)了 Future 接口,允許獲取任務(wù)執(zhí)行的結(jié)果。
- 如果線程池中的核心線程數(shù)小于核心線程池大小(corePoolSize),則嘗試創(chuàng)建新的核心線程來執(zhí)行任務(wù)。
- 如果當(dāng)前核心線程數(shù)已經(jīng)達到 corePoolSize,則將任務(wù)放入任務(wù)隊列中,等待工作線程獲取任務(wù)執(zhí)行。
- 如果隊列已滿,而且當(dāng)前線程池中的線程數(shù)量小于最大線程池大?。╩aximumPoolSize),則嘗試創(chuàng)建新的非核心線程來執(zhí)行任務(wù)。
- 如果當(dāng)前線程池中的線程數(shù)量已經(jīng)達到最大線程池大小,則根據(jù)拒絕策略進行處理。
- 任務(wù)執(zhí)行完成后,線程池將返回一個 Future 對象,通過這個對象可以獲取任務(wù)執(zhí)行的結(jié)果。
流程圖如下:
圖片
為了讓你更容易理解,再畫一個時序圖。
圖片
- Executor:線程池任務(wù)調(diào)度入口;
- Queue:阻塞隊列
- Worker:實現(xiàn) Runnable 并繼承 AbstractQueuedSynchronizer,線程池中的任務(wù)線程抽象。
- RejectedHandler:拒絕策略。
接下來我們分別分析線程池核心組件的作用和實現(xiàn)原理。
狀態(tài)控制(ctl 變量)
如何維護線程池運行狀態(tài)和工作線程呢?
使用 32 位整型 AtomicInteger 同時維護線程池的運行狀態(tài)和工作線程:
- 高 3 位:線程池狀態(tài)(RUNNING, SHUTDOWN, STOP 等)
- 低 29 位:工作線程數(shù)量
Java 中的線程池具有不同的狀態(tài),這些狀態(tài)反映了線程池在其生命周期中的不同階段和行為。主要的線程池狀態(tài)有以下幾種:
狀態(tài) | 描述 |
(運行中) | 表示線程池正在正常運行,并且可以接受新的任務(wù)提交。在這種狀態(tài)下,線程池可以執(zhí)行任務(wù),并且可以創(chuàng)建新的線程來處理任務(wù)。 |
(關(guān)閉中) | 表示線程池正在關(guān)閉中。在這種狀態(tài)下,線程池不再接受新的任務(wù)提交,但會繼續(xù)執(zhí)行已提交的任務(wù),直到所有任務(wù)執(zhí)行完成。 |
(停止) | 表示線程池已經(jīng)停止,不再接受新的任務(wù)提交,并且嘗試中斷正在執(zhí)行的任務(wù)。 |
(終止) | 表示線程池已經(jīng)終止,不再接受新的任務(wù)提交,并且所有任務(wù)已經(jīng)執(zhí)行完成。在這種狀態(tài)下,線程池中的所有線程都已經(jīng)被銷毀。 |
通過 ctl 字段,ThreadPoolExecutor 類能夠高效地維護線程池的狀態(tài)和線程數(shù)量信息,從而實現(xiàn)了對線程池的有效管理和控制。
Chaya:“愛一個人會變,線程池狀態(tài)又如何變化呢?”
線程池的狀態(tài)不是直接設(shè)置的,而是通過調(diào)用 shutdown()、shutdownNow() 等方法觸發(fā)狀態(tài)的轉(zhuǎn)換。
例如,調(diào)用 shutdown() 方法會將線程池的狀態(tài)從 RUNNING 轉(zhuǎn)換為 SHUTDOWN。
其生命周期轉(zhuǎn)換如下入所示:
圖片
阻塞隊列——任務(wù)緩沖
線程池中是通過阻塞隊列實現(xiàn)生產(chǎn)者-消費者模式。阻塞隊列緩存任務(wù),工作線程從阻塞隊列中獲取任務(wù)。
圖片
使用不同的隊列可以實現(xiàn)不一樣的任務(wù)存取策略。Java 并發(fā)編程中不同阻塞隊列的特點和實現(xiàn)原理詳見之前的文章《1.8w 字圖解 Java 并發(fā)容器: CHM、ConcurrentLinkedQueue、7 種阻塞隊列的使用場景和原理》。
圖片
拒絕策略
Chaya:“李老師,如果無止盡的海量任務(wù)丟給線程池,線程池處理不過來了?”
這時候就要設(shè)計一個拒絕策略了,線程池有一個最大的容量,當(dāng)線程池的任務(wù)緩存隊列已滿,并且線程池中的線程數(shù)目達到 maximumPoolSize 時,就需要拒絕掉該任務(wù),采取任務(wù)拒絕策略,保護線程池。
ThreadPoolExecutor 內(nèi)部有實現(xiàn) 4 個拒絕策略:
- CallerRunsPolicy,由調(diào)用 execute 方法提交任務(wù)的線程來執(zhí)行這個任務(wù)。
- AbortPolicy,拋出異常 RejectedExecutionException 拒絕提交任務(wù)。
- DiscardPolicy,直接拋棄任務(wù),不做任何處理。
- DiscardOldestPolicy,去除任務(wù)隊列中的第一個任務(wù)(最舊的),重新提。
除了上述標(biāo)準(zhǔn)拒絕策略之外,您還可以實現(xiàn) RejectedExecutionHandler 接口來定義自定義的拒絕策略。
這樣你就可以根據(jù)應(yīng)用程序的需求實現(xiàn)更復(fù)雜的拒絕邏輯。關(guān)注公眾號「碼哥跳動」,更多硬核文章等你來探索。
RejectedExecutionHandler 接口:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Worker 線程管理
Chaya:“線程池如何管理線程的狀態(tài)和生命周期呢?”
設(shè)計了一個工作線程 Worker 來管理。
private finalclass Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // 禁止中斷直到runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 省略AQS方法實現(xiàn)...
}
Worker 這個工作線程,實現(xiàn)了 Runnable 接口,并持有一個線程 thread、一個初始化的任務(wù) firstTask。
- thread 是在調(diào)用構(gòu)造方法時通過 ThreadFactory 來創(chuàng)建的線程,可以用來執(zhí)行任務(wù);
- firstTask 用它來保存?zhèn)魅氲牡谝粋€任務(wù),這個任務(wù)可以有也可以為 null。
如果這個值是非空的,那么線程就會在啟動初期立即執(zhí)行這個任務(wù),也就對應(yīng)核心線程創(chuàng)建時的情況;
如果這個值是 null,那么就需要創(chuàng)建一個線程去執(zhí)行任務(wù)列表(workQueue)中的任務(wù),也就是非核心線程的創(chuàng)建。
Worker 執(zhí)行任務(wù)的模型如下圖所示:
圖片
線程池如何管理線程生命周期?
線程池使用一張 HashSet 表去持有線程的引用,這樣可以通過添加引用、移除引用這樣的操作來控制線程的生命周期。
Worker 線程復(fù)用
通過繼承 AQS,使用 AQS 來實現(xiàn)獨占鎖 Worker 線程的復(fù)用功能。
- lock 方法一旦獲取了獨占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中。
- 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程。
- 如果該線程現(xiàn)在不是獨占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù),這時可以對該線程進行中斷回收。
- 線程池在執(zhí)行 shutdown 方法或 tryTerminate 方法時會調(diào)用 interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是否是空閑狀態(tài);如果線程是空閑狀態(tài)則可以安全回收。
線程池線程回收復(fù)用過程如圖所示:
Worker 線程增加
增加線程是通過線程池中的 addWorker 方法,addWorker 方法有兩個參數(shù):firstTask、core。
- firstTask 參數(shù)用于指定新增的線程執(zhí)行的第一個任務(wù),該參數(shù)可以為空;
- core 參數(shù)為 true 表示在新增線程時會判斷當(dāng)前活動線程數(shù)是否少于 corePoolSize,false 表示新增線程前需要判斷當(dāng)前活動線程數(shù)是否少于 maximumPoolSize。
執(zhí)行流程如下圖所示:
圖片
Worker 線程垃圾回收
線程池中線程的銷毀依賴 JVM 自動的回收,線程池做的工作是根據(jù)當(dāng)前線程池的狀態(tài)維護一定數(shù)量的線程引用,防止這部分線程被 JVM 回收,當(dāng)線程池決定哪些線程需要回收時,只需要將其引用消除即可。
Worker 被創(chuàng)建出來后,就會不斷地進行輪詢,然后獲取任務(wù)去執(zhí)行,核心線程可以無限等待獲取任務(wù),非核心線程要限時獲取任務(wù)。
當(dāng) Worker 無法獲取到任務(wù),也就是獲取的任務(wù)為空時,循環(huán)會結(jié)束,Worker 會主動消除自身在線程池內(nèi)的引用。
圖10 線程銷毀流程
線程回收的工作是在 processWorkerExit 方法完成的。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 異常終止才需要補償
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 從集合中移除
} finally {
mainLock.unlock();
}
tryTerminate(); // 嘗試終止線程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 計算最小保留線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false); // 補充新Worker
}
}
回收場景對比表:
場景類型 | 觸發(fā)條件 | 處理方式 |
正常退出 | getTask()返回 null | 檢查是否需要補充新 Worker |
異常退出 | 任務(wù)執(zhí)行拋出未捕獲異常 | 立即補充新 Worker |
配置變更 | 核心線程數(shù)調(diào)整 | 動態(tài)調(diào)整存活 Worker 數(shù)量 |
線程池關(guān)閉 | shutdown/shutdownNow 調(diào)用 | 中斷所有 Worker 并清空隊列 |
Worker 線程執(zhí)行任務(wù)
Worker 類中的 run 方法調(diào)用了 runWorker 方法來執(zhí)行任務(wù),runWorker 方法的執(zhí)行過程如下:
- while 循環(huán)不斷地通過 getTask()方法獲取任務(wù)。
- getTask()方法從阻塞隊列中取任務(wù)。
- 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài)。
- 執(zhí)行任務(wù)。
- 如果 getTask 結(jié)果為 null 則跳出循環(huán),執(zhí)行 processWorkerExit()方法,銷毀線程。
任務(wù)執(zhí)行主流程如下:
圖片
源碼分析:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 獲取Worker鎖
// 檢查線程池狀態(tài)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 實際執(zhí)行任務(wù)
} catch (Throwable x) {
thrown = x;
throw x;
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
設(shè)計亮點分析
- 無鎖化設(shè)計:通過 CAS 操作修改 ctl 狀態(tài),避免全局鎖競爭
- 線程復(fù)用:Worker 循環(huán)從隊列獲取任務(wù),減少線程創(chuàng)建開銷
- 彈性擴容:corePoolSize 維持常駐線程,maximumPoolSize 應(yīng)對突發(fā)流量
- 優(yōu)雅降級:隊列緩沖+拒絕策略防止資源耗盡
ScheduledThreadPoolExecutor
有了前文的線程池實現(xiàn)原理做鋪墊,掌握 ScheduledThreadPoolExecutor 就輕松多了。
Chaya:“ScheduledThreadPoolExecutor 是什么?”
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor,為任務(wù)提供延遲或周期執(zhí)行,屬于線程池的一種。和 ThreadPoolExecutor 相比,它還具有以下幾種特性:
- 使用專門的任務(wù)類型—ScheduledFutureTask 來執(zhí)行周期任務(wù),也可以接收不需要時間調(diào)度的任務(wù)(這些任務(wù)通過 ExecutorService 來執(zhí)行)。
- 使用專門的存儲隊列—DelayedWorkQueue 來存儲任務(wù),DelayedWorkQueue 是無界延遲隊列 DelayQueue 的一種。相比 ThreadPoolExecutor 也簡化了執(zhí)行機制(delayedExecute 方法,后面單獨分析)。
- 支持可選的 run-after-shutdown參數(shù),在池被關(guān)閉(shutdown)之后支持可選的邏輯來決定是否繼續(xù)運行周期或延遲任務(wù)。并且當(dāng)任務(wù)(重新)提交操作與 shutdown 操作重疊時,復(fù)查邏輯也不相同。
使用場景
光說不練假把式,想要學(xué)習(xí)一個框架的原理,第一步先要理解使用場景,并把它跑起來。
1. 定時任務(wù)調(diào)度
image-20250426171745139
代碼案例如下:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// 系統(tǒng)啟動后5秒執(zhí)行初始化
scheduler.schedule(() -> initConfig(), 5, TimeUnit.SECONDS);
// 每天9:30執(zhí)行數(shù)據(jù)歸檔(需計算初始延遲)
long initialDelay = calculateDelay(9, 30);
scheduler.scheduleAtFixedRate(() -> archiveData(),
initialDelay, 24 * 60 * 60, TimeUnit.SECONDS);
2. 心跳監(jiān)測機制
圖片
scheduler.scheduleWithFixedDelay(() -> {
try {
HeartbeatResponse res = httpClient.checkHealth();
if (res.isHealthy()) {
resetFailureCount();
}
} catch (TimeoutException e) {
if (incrementAndGetFailureCount() > 3) {
alertSystem.sendCriticalAlert();
}
}
}, 0, 30, TimeUnit.SECONDS); // 立即開始,間隔30秒
最佳防御式編程示例
scheduler.scheduleAtFixedRate(() -> {
try {
// 業(yè)務(wù)代碼
processBusinessLogic();
// 添加健康檢查點
if (!systemStatus.isHealthy()) {
thrownew ServiceUnavailableException();
}
} catch (BusinessException e) {
// 業(yè)務(wù)可恢復(fù)異常
logger.warn("業(yè)務(wù)處理警告", e);
} catch (Throwable t) {
// 不可恢復(fù)異常處理
logger.error("致命錯誤觸發(fā)任務(wù)終止", t);
emergencyRepair();
}
}, 0, 1, TimeUnit.MINUTES);
優(yōu)雅關(guān)閉
public void gracefulShutdown(ScheduledThreadPoolExecutor executor) {
executor.shutdown(); // 禁止新任務(wù)提交
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
List<Runnable> dropped = executor.shutdownNow();
logger.warn("強制關(guān)閉,丟棄{}個任務(wù)", dropped.size());
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
logger.error("線程池未完全關(guān)閉");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
實現(xiàn)原理
看下類圖,可以發(fā)現(xiàn) ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor,并實現(xiàn)了 ScheduledExecutorService接口。
圖片
ScheduledThreadPoolExecutor 內(nèi)部構(gòu)造了兩個內(nèi)部類 ScheduledFutureTask 和 DelayedWorkQueue:
- ScheduledFutureTask: 繼承了 FutureTask,說明是一個異步運算任務(wù);最上層分別實現(xiàn)了 Runnable、Future、Delayed 接口,說明它是一個可以延遲執(zhí)行的異步運算任務(wù)。
- DelayedWorkQueue: 這是 ScheduledThreadPoolExecutor 為存儲周期或延遲任務(wù)專門定義的一個延遲隊列,繼承了 AbstractQueue,為了契合 ThreadPoolExecutor 也實現(xiàn)了 BlockingQueue 接口。它內(nèi)部只允許存儲 RunnableScheduledFuture 類型的任務(wù)。與 DelayQueue 的不同之處就是它只允許存放 RunnableScheduledFuture 對象,并且自己實現(xiàn)了二叉堆(DelayQueue 是利用了 PriorityQueue 的二叉堆結(jié)構(gòu))。
線程池 ThreadPoolExecutor 在之前介紹過了,相信大家都還有印象,接下來我們來看看 ScheduledExecutorService 接口。
public interface ScheduledExecutorService extends ExecutorService {
/**
* 安排一個Runnable任務(wù)在給定的延遲后執(zhí)行。
*
* @param command 需要執(zhí)行的任務(wù)
* @param delay 延遲時間
* @param unit 時間單位
* @return 可用于提取結(jié)果或取消的ScheduledFuture
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 安排一個Callable任務(wù)在給定的延遲后執(zhí)行。
*
* @param callable 需要執(zhí)行的任務(wù)
* @param delay 延遲時間
* @param unit 時間單位
* @return 可用于提取結(jié)果或取消的ScheduledFuture
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 安排一個Runnable任務(wù)在給定的初始延遲后首次執(zhí)行,隨后每個period時間間隔執(zhí)行一次。
*
* @param command 需要執(zhí)行的任務(wù)
* @param initialDelay 首次執(zhí)行的初始延遲
* @param period 連續(xù)執(zhí)行之間的時間間隔
* @param unit 時間單位
* @return 可用于提取結(jié)果或取消的ScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 安排一個Runnable任務(wù)在給定的初始延遲后首次執(zhí)行,隨后每次完成任務(wù)后等待指定的延遲再次執(zhí)行。
*
* @param command 需要執(zhí)行的任務(wù)
* @param initialDelay 首次執(zhí)行的初始延遲
* @param delay 每次執(zhí)行結(jié)束后的延遲時間
* @param unit 時間單位
* @return 可用于提取結(jié)果或取消的ScheduledFuture
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ScheduledExecutorService 接口繼承了 ExecutorService 接口,并增加了幾個定時相關(guān)的接口方法。前兩個方法用于單次調(diào)度執(zhí)行任務(wù),區(qū)別是有沒有返回值。關(guān)注公眾號「碼哥跳動」,更多硬核文章等你來探索。
核心結(jié)構(gòu)
圖片
關(guān)鍵字段說明:
- sequenceNumber:原子遞增序列,解決相同觸發(fā)時間的任務(wù)排序問題。
- time:基于System.nanoTime()的絕對時間戳。
- period:
<0:固定延遲(scheduleWithFixedDelay)。
0:固定速率(scheduleAtFixedRate)。
Chaya:如何保存時間最小的任務(wù)調(diào)度執(zhí)行呢?
ScheduledThreadPoolExecutor使用了DelayedWorkQueue 來保存等待的任務(wù)。
該等待隊列的隊首應(yīng)該保存的是最近將要執(zhí)行的任務(wù),所以worker只關(guān)心隊首任務(wù),如果隊首任務(wù)的開始執(zhí)行時間還未到,worker 也應(yīng)該繼續(xù)等待。
DelayedWorkQueue 是一個無界優(yōu)先隊列,使用數(shù)組存儲,底層使用堆結(jié)構(gòu)來實現(xiàn)優(yōu)先隊列的功能。
圖片
可以轉(zhuǎn)換成如下的數(shù)組:
圖片
在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性。假設(shè),索引值從 0 開始,子節(jié)點的索引值為 k,父節(jié)點的索引值為 p,則:
- 一個節(jié)點的左子節(jié)點的索引為:k = p * 2 + 1;
- 一個節(jié)點的右子節(jié)點的索引為:k = (p + 1) * 2;
- 一個節(jié)點的父節(jié)點的索引為:p = (k - 1) / 2。
圖片
任務(wù)調(diào)度執(zhí)行全流程。
圖片
我們先來看下 ScheduledThreadPoolExecutor 的構(gòu)造方法,其實在 executors 框架概述中講 Executors 時已經(jīng)接觸過了。關(guān)注公眾號「碼哥跳動」,更多硬核文章等你來探索。
Executors 使用 newScheduledThreadPool 工廠方法創(chuàng)建 ScheduledThreadPoolExecutor:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
我們來看下 ScheduledThreadPoolExecutor 的構(gòu)造器,內(nèi)部其實都是調(diào)用了父類 ThreadPoolExecutor 的構(gòu)造器,這里最需要注意的就是任務(wù)隊列的選擇——DelayedWorkQueue.
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
執(zhí)行過程
ScheduledThreadPoolExecutor 的核心調(diào)度方法是 schedule、scheduleAtFixedRate、scheduleWithFixedDelay,我們通過 schedule 方法來看下整個調(diào)度流程:
// delay時長后執(zhí)行任務(wù)command,該任務(wù)只執(zhí)行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 這里的decorateTask方法僅僅返回第二個參數(shù)
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
// 延時或者周期執(zhí)行任務(wù)的主要方法,稍后統(tǒng)一說明
delayedExecute(t);
return t;
}
上述的 decorateTask 方法把 Runnable 任務(wù)包裝成 ScheduledFutureTask,用戶可以根據(jù)自己的需要覆寫該方法。
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* 任務(wù)序號, 自增唯一
*/
privatefinallong sequenceNumber;
/**
* 首次執(zhí)行的時間點
*/
privatelong time;
/**
* 0: 非周期任務(wù)
* >0: fixed-rate任務(wù)
* <0: fixed-delay任務(wù)
*/
privatefinallong period;
/**
* 在堆中的索引
*/
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// ...
}
ScheduledThreadPoolExecutor 中的任務(wù)隊列——DelayedWorkQueue,保存的元素就是 ScheduledFutureTask。DelayedWorkQueue 是一種堆結(jié)構(gòu),time 最小的任務(wù)會排在堆頂(表示最早過期),每次出隊都是取堆頂元素,這樣最快到期的任務(wù)就會被先執(zhí)行。
如果兩個 ScheduledFutureTask 的 time 相同,就比較它們的序號——sequenceNumber,序號小的代表先被提交,所以就會先執(zhí)行。
schedule 的核心是其中的 delayedExecute 方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 線程池已關(guān)閉
reject(task); // 任務(wù)拒絕策略
else {
super.getQueue().add(task); // 將任務(wù)入隊
// 如果線程池已關(guān)閉且該任務(wù)是非周期任務(wù), 則將其從隊列移除
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false); // 取消任務(wù)
else
ensurePrestart(); // 添加一個工作線程
}
}
ScheduledThreadPoolExecutor 的整個任務(wù)調(diào)度流程大致如下圖:
圖片
我們來分析這個過程:
- 首先,任務(wù)被提交到線程池后,會判斷線程池的狀態(tài),如果不是 RUNNING 狀態(tài)會執(zhí)行拒絕策略。
- 然后,將任務(wù)添加到阻塞隊列中。(注意,由于 DelayedWorkQueue 是無界隊列,所以一定會 add 成功)
- 然后,會創(chuàng)建一個工作線程,加入到核心線程池或者非核心線程池:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
通過 ensurePrestart 可以看到,如果核心線程池未滿,則新建的工作線程會被放到核心線程池中。如果核心線程池已經(jīng)滿了,ScheduledThreadPoolExecutor 不會像 ThreadPoolExecutor 那樣再去創(chuàng)建歸屬于非核心線程池的工作線程,而是直接返回。也就是說,在 ScheduledThreadPoolExecutor 中,一旦核心線程池滿了,就不會再去創(chuàng)建工作線程。
關(guān)注公眾號「碼哥跳動」,更多硬核文章等你來探索。
最后,線程池中的工作線程會去任務(wù)隊列獲取任務(wù)并執(zhí)行,當(dāng)任務(wù)被執(zhí)行完成后,如果該任務(wù)是周期任務(wù),則會重置 time 字段,并重新插入隊列中,等待下次執(zhí)行。這里注意從隊列中獲取元素的方法:
- 對于核心線程池中的工作線程來說,如果沒有超時設(shè)置(allowCoreThreadTimeOut == false),則會使用阻塞方法 take 獲取任務(wù)(因為沒有超時限制,所以會一直等待直到隊列中有任務(wù));如果設(shè)置了超時,則會使用 poll 方法(方法入?yún)⑿枰瑫r時間),超時還沒拿到任務(wù)的話,該工作線程就會被回收。
- 對于非工作線程來說,都是調(diào)用 poll 獲取隊列元素,超時取不到任務(wù)就會被回收。
上述就是 ScheduledThreadPoolExecutor 的核心調(diào)度流程,通過我們的分析可以看出,相比 ThreadPoolExecutor,ScheduledThreadPoolExecutor 主要有以下幾點不同:
- 總體的調(diào)度控制流程略有區(qū)別;
- 任務(wù)的執(zhí)行方式有所區(qū)別;
- 任務(wù)隊列的選擇不同。
ForkJoinPool
ForkJoinPool是自Java7開始,提供的一個用于并行執(zhí)行的任務(wù)框架。廣泛用在java8的parallelStream和CompletableFuture中。
其主旨是將大任務(wù)分成若干小任務(wù),之后再并行對這些小任務(wù)進行計算,最終匯總這些任務(wù)的結(jié)果,得到最終的結(jié)果。
這個描述實際上比較接近于單機版的map-reduce,都是采用了分治算法。區(qū)別就在于ForkJoin機制可能只能在單個jvm上運行,而map-reduce則是在集群上執(zhí)行。
此外,F(xiàn)orkJoinPool采取工作竊取算法,以避免工作線程由于拆分了任務(wù)之后的join等待過程。
這樣處于空閑的工作線程將從其他工作線程的隊列中主動去竊取任務(wù)來執(zhí)行。
這里涉及到的兩個基本知識點是分治法和工作竊取。
分治任務(wù)模型
分治任務(wù)模型可分為兩個階段:
- 一個階段是 任務(wù)分解,就是迭代地將任務(wù)分解為子任務(wù),直到子任務(wù)可以直接計算出結(jié)果;
- 另一個階段是 結(jié)果合并,即逐層合并子任務(wù)的執(zhí)行結(jié)果,直到獲得最終結(jié)果。
下圖是一個簡化的分治任務(wù)模型圖,你可以對照著理解。
圖片
在這個分治任務(wù)模型里,任務(wù)和分解后的子任務(wù)具有相似性,這種相似性往往體現(xiàn)在任務(wù)和子任務(wù)的算法是相同的,但是計算的數(shù)據(jù)規(guī)模是不同的。
分治是一種解決復(fù)雜問題的思維方法和模式;
具體而言,它將一個復(fù)雜的問題分解成多個相似的子問題,然后再將這些子問題進一步分解成更小的子問題,直到每個子問題變得足夠簡單從而可以直接求解。
例如,在算法領(lǐng)域,我們經(jīng)常使用分治算法來解決問題(如歸并排序和快速排序都屬于分治算法,二分查找也是一種分治算法)。
在大數(shù)據(jù)領(lǐng)域,MapReduce 計算框架背后的思想也是基于分治。
這只是一個簡化版本的Fork-Join,實際上我們在日常工作中的應(yīng)用可能比這要復(fù)雜很多。但是基本原理類似。
這樣就將一個大的任務(wù),通過fork方法不斷拆解,直到能夠計算為止,之后,再將這些結(jié)果用join合并。
這樣逐次遞歸,就得到了我們想要的結(jié)果。這就是再ForkJoinPool中的分治法。關(guān)注公眾號「碼哥跳動」,更多硬核文章等你來探索。
工作竊取
從上述Fork/Join 框架的描述可以看出,我們需要一些線程來執(zhí)行 Fork 出的任務(wù),在實際中,如果每次都創(chuàng)建新的線程執(zhí)行任務(wù),對系統(tǒng)資源的開銷會很大,所以 Fork/Join框架利用了線程池來調(diào)度任務(wù)。
既然由線程池調(diào)度,根據(jù)我們之前學(xué)習(xí)線程池的經(jīng)驗,必然存在兩個要素:
- 工作線程
- 任務(wù)隊列
一般的線程池只有一個任務(wù)隊列,但是對于 Fork/Join 框架來說,由于 Fork 出的各個子任務(wù)其實是平行關(guān)系,為了提高效率,減少線程競爭,應(yīng)該將這些平行的任務(wù)放到不同的隊列中去。
圖片
Chaya:有的線程執(zhí)行比較快,如何提高效率讓閑著去搶任務(wù)呢?
由于線程處理不同任務(wù)的速度不同,這樣就可能存在某個線程先執(zhí)行完了自己隊列中的任務(wù)的情況,這時為了提升效率,我們可以讓該線程去“竊取”其它任務(wù)隊列中的任務(wù),這就是所謂的工作竊取算法。
當(dāng)工作線程空閑時,它可以從其他工作線程的任務(wù)隊列中"竊取"任務(wù)。
以充分利用工作線程的計算能力,減少線程由于獲取不到任務(wù)而造成的空閑浪費。
在ForkJoinPool中,工作任務(wù)的隊列都采用雙端隊列Deque容器。
在通常使用隊列的過程中,我們都在隊尾插入,而在隊頭消費以實現(xiàn)FIFO。
而為了實現(xiàn)工作竊取。一般我們會改成工作線程在工作隊列上LIFO,而竊取其他線程的任務(wù)的時候,從隊列頭部取獲取。示意圖如下:
圖片
工作線程worker1、worker2以及worker3都從taskQueue的尾部popping獲取task,而任務(wù)也從尾部Pushing,當(dāng)worker3隊列中沒有任務(wù)的時候,就會從其他線程的隊列中取stealing,這樣就使得worker3不會由于沒有任務(wù)而空閑。
時序圖如下:
圖片
案例
圖片
- 大數(shù)據(jù)處理:數(shù)組排序、矩陣運算(參考案例:百萬級數(shù)據(jù)求和效率提升 3 倍)
- 分治算法:快速排序、歸并排序、斐波那契數(shù)列計算。
- 并行流基礎(chǔ):Java 8 的parallelStream()底層實現(xiàn)。
大數(shù)據(jù)集并行處理
處理百萬級數(shù)據(jù)聚合、矩陣運算等可拆分任務(wù)。
假設(shè):我們要計算 1 到 1 億的和,為了加快計算的速度,我們自然想到算法中的分治原理,將 1 億個數(shù)字分成 1 萬個任務(wù),每個任務(wù)計算 1 萬個數(shù)值的綜合,利用 CPU 的并發(fā)計算性能縮短計算時間。
定義一個 Calculator 接口,表示計算數(shù)字總和這個動作,如下所示。
public interface Calculator {
/**
* 把傳進來的所有numbers 做求和處理
*
* @param numbers
* @return 總和
*/
long sumUp(long[] numbers);
}
ForkJoinCalculator 實現(xiàn) Calculator 接口,內(nèi)部類 SumTask 繼承 RecursiveTask 抽象類,并在 compute 方法中定義拆分邏輯及計算。
最后在 sumUp 方法中調(diào)用 pool 方法進行計算。
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
// 1. 定義計算邏輯
privatestaticclass SumTask extends RecursiveTask<Long> {
privatelong[] numbers;
privateint from;
privateint to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法為ForkJoin的核心方法:對任務(wù)進行拆分 拆分的好壞決定了效率的高低
@Override
protected Long compute() {
// 當(dāng)需要計算的數(shù)字個數(shù)小于6時,直接采用for loop方式計算結(jié)果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else {
// 否則,把任務(wù)一分為二,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
// 也可以使用公用的線程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
}
實現(xiàn)原理
ForkJoinPool 作為 Executors 框架的一員,從外部看與其它線程池并沒有什么區(qū)別,僅僅是 ExecutorService 的一個實現(xiàn)類。
圖片
在JUC中,實現(xiàn)Fork-join框架有兩個類,分別是ForkJoinPool以及提交的任務(wù)抽象類ForkJoinTask。
通常情況下我們都是直接繼承ForkJoinTask的子類,F(xiàn)ork/Join框架提供了兩個子類:
- RecursiveAction:一個遞歸無結(jié)果的ForkJoinTask(沒有返回值)任務(wù)
- RecursiveTask:一個遞歸有結(jié)果的ForkJoinTask(有返回值)任務(wù)
ForkJoinPool 的主要工作如下:
- 接受外部任務(wù)的提交(外部調(diào)用 ForkJoinPool 的invoke/execute/submit方法提交任務(wù));
- 接受 ForkJoinTask 自身fork出的子任務(wù)的提交;
- 任務(wù)隊列數(shù)組(WorkQueue[])的初始化和管理;
- 工作線程(Worker)的創(chuàng)建/管理。
核心類關(guān)系圖。關(guān)注公眾號「碼哥跳動」,更多硬核文章等你來探索。
圖片
- ForkJoinPool:ExecutorService 的實現(xiàn)類,負(fù)責(zé)工作線程的管理、任務(wù)隊列的維護,以及控制整個任務(wù)調(diào)度流程;
- ForkJoinTask:Future 接口的實現(xiàn)類,fork 是其核心方法,用于分解任務(wù)并異步執(zhí)行;而 join 方法在任務(wù)結(jié)果計算完畢之后才會運行,用來合并或返回計算結(jié)果;
- ForkJoinWorkerThread:Thread 的子類,作為線程池中的工作線程(Worker)執(zhí)行任務(wù);
- WorkQueue:任務(wù)隊列,用于保存任務(wù);
ForkJoinPool 提供了 3 類外部提交任務(wù)的方法:invoke、execute、submit,它們的主要區(qū)別在于任務(wù)的執(zhí)行方式上。
- 通過invoke方法提交的任務(wù),調(diào)用線程直到任務(wù)執(zhí)行完成才會返回,也就是說這是一個同步方法,且有返回結(jié)果;
- 通過execute方法提交的任務(wù),調(diào)用線程會立即返回,也就是說這是一個異步方法,且沒有返回結(jié)果;
- 通過submit方法提交的任務(wù),調(diào)用線程會立即返回,也就是說這是一個異步方法,且有返回結(jié)果(返回 Future 實現(xiàn)類,可以通過 get 獲取結(jié)果)。
ForkJoinPool 對象的構(gòu)建有兩種方式:
- 通過 3 種構(gòu)造器的任意一種進行構(gòu)造;
- 通過ForkJoinPool.commonPool()靜態(tài)方法構(gòu)造。
JDK8 以后,F(xiàn)orkJoinPool 又提供了一個靜態(tài)方法 commonPool(),這個方法返回一個 ForkJoinPool 內(nèi)部聲明的靜態(tài) ForkJoinPool 實例,主要是為了簡化線程池的構(gòu)建,這個 ForkJoinPool 實例可以滿足大多數(shù)的使用場景。
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
ForkJoinTask
從 Fork/Join 框架的描述上來看,“任務(wù)”必須要滿足一定的條件:
- 支持 Fork,即任務(wù)自身的分解
- 支持 Join,即任務(wù)結(jié)果的合并
因此,J.U.C 提供了一個抽象類——ForkJoinTask,來作為該類 Fork/Join 任務(wù)的抽象定義:
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
ForkJoinTask 實現(xiàn)了 Future 接口,是一個異步任務(wù),我們在使用 Fork/Join 框架時,一般需要使用線程池來調(diào)度任務(wù),線程池內(nèi)部調(diào)度的其實都是 ForkJoinTask 任務(wù)(即使提交的是一個 Runnable 或 Callable 任務(wù),也會被適配成 ForkJoinTask)。
除了 ForkJoinTask,F(xiàn)ork/Join 框架還提供了兩個它的抽象實現(xiàn),我們在自定義 ForkJoin 任務(wù)時,一般繼承這兩個類:
- RecursiveAction:表示具有返回結(jié)果的 ForkJoin 任務(wù)
- RecursiveTask:表示沒有返回結(jié)果的 ForkJoin 任務(wù)
public abstractclass RecursiveAction extends ForkJoinTask<Void> {
/**
* 該任務(wù)的執(zhí)行,子類覆寫該方法
*/
protected abstract void compute();
public final Void getRawResult() { returnnull; }
protected final void setRawResult(Void mustBeNull) { }
protected final boolean exec() {
compute();
returntrue;
}
}
public abstractclass RecursiveTask<V> extends ForkJoinTask<V> {
/**
* 該任務(wù)的執(zhí)行結(jié)果.
*/
V result;
/**
* 該任務(wù)的執(zhí)行,子類覆寫該方法
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
returntrue;
}
}
ForkJoinWorkerThread
Fork/Join 框架中,每個工作線程(Worker)都有一個自己的任務(wù)隊列(WorkerQueue), 所以需要對一般的 Thread 做些特性化處理,J.U.C 提供了ForkJoinWorkerThread類作為 ForkJoinPool 中的工作線程:
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; // 該工作線程歸屬的線程池
final ForkJoinPool.WorkQueue workQueue; // 對應(yīng)的任務(wù)隊列
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super("aForkJoinWorkerThread"); // 指定工作線程名稱
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
// ...
}
ForkJoinWorkerThread 在構(gòu)造過程中,會保存所屬線程池信息和與自己綁定的任務(wù)隊列信息。同時,它會通過 ForkJoinPool 的registerWorker方法將自己注冊到線程池中。
線程池中的每個工作線程(ForkJoinWorkerThread)都有一個自己的任務(wù)隊列(WorkQueue),工作線程優(yōu)先處理自身隊列中的任務(wù)(LIFO 或 FIFO 順序,由線程池構(gòu)造時的參數(shù) mode 決定),自身隊列為空時,以 FIFO 的順序隨機竊取其它隊列中的任務(wù)。
WorkQueue
任務(wù)隊列(WorkQueue)是 ForkJoinPool 與其它線程池區(qū)別最大的地方,在 ForkJoinPool 內(nèi)部,維護著一個WorkQueue[]數(shù)組.
WorkQueue作為 ForkJoinPool 的內(nèi)部類,表示一個雙端隊列。雙端隊列既可以作為棧使用(LIFO),也可以作為隊列使用(FIFO)。
ForkJoinPool 的“工作竊取”正是利用了這個特點,當(dāng)工作線程從自己的隊列中獲取任務(wù)時,默認(rèn)總是以棧操作(LIFO)的方式從棧頂取任務(wù);當(dāng)工作線程嘗試竊取其它任務(wù)隊列中的任務(wù)時,則是 FIFO 的方式。
任務(wù)調(diào)度流程
Fork/Join 框架的任務(wù)調(diào)度流程是什么樣的?
圖片
任務(wù)提交
任務(wù)提交是整個調(diào)度流程的第一步,有兩種:
- 外部提交:通過ForkJoinPool的execute/submit/invoke方法提交的任務(wù),或者非工作線程(ForkJoinWorkerThread)直接調(diào)用ForkJoinTask的fork/invoke方法提交的任務(wù)。
- 工作線程 fork 任務(wù):由 ForkJoinPool 所維護的工作線程(ForkJoinWorkerThread)從自身任務(wù)隊列中獲取任務(wù)(或從其它任務(wù)隊列竊?。?,然后執(zhí)行任務(wù)。
創(chuàng)建工作線程
任務(wù)提交完成后,F(xiàn)orkJoinPool 會根據(jù)情況創(chuàng)建或喚醒工作線程,以便執(zhí)行任務(wù)。
ForkJoinPool 并不會為每個任務(wù)都創(chuàng)建工作線程,而是根據(jù)實際情況(構(gòu)造線程池時的參數(shù))確定是喚醒已有空閑工作線程,還是新建工作線程。
個過程還是涉及任務(wù)隊列的綁定、工作線程的注銷等過程:
- ForkJoinPool.signalWork
- ForkJoinPool.tryAddWorker
- ForkJoinPool.createWorker
- ForkJoinWorkerThread.registerWorker
- ForkJoinPool.deregisterWorker
任務(wù)執(zhí)行
任務(wù)入隊后,由工作線程開始執(zhí)行,這個過程涉及任務(wù)竊取、工作線程等待等過程:
- ForkJoinWorkerThread.run
- ForkJoinPool.runWorker
- ForkJoinPool.scan
- ForkJoinPool.runTask
- ForkJoinTask.doExec
- ForkJoinPool.execLocalTasks
- ForkJoinPool.awaitWork
任務(wù)結(jié)果獲取
任務(wù)結(jié)果一般通過ForkJoinTask的join方法獲得,其主要流程如下圖:
圖片
任務(wù)結(jié)果獲取的核心涉及兩點:
- 互助竊?。篎orkJoinPool.helpStealer
- 算力補償:ForkJoinPool.tryCompensate
可以總結(jié)為以下幾點:
- 每個 Worker 線程利用它自己的任務(wù)隊列維護可執(zhí)行任務(wù);
- 任務(wù)隊列是一種雙端隊列,支持 LIFO 的push和pop操作,也支持 FIFO 的take操作;
- 任務(wù) fork 的子任務(wù),只會 push 到它所在線程(調(diào)用 fork 方法的線程)的隊列;
- 工作線程既可以使用 LIFO 通過 pop 處理自己隊列中的任務(wù),也可以 FIFO 通過 poll 處理自己隊列中的任務(wù),具體取決于構(gòu)造線程池時的 asyncMode 參數(shù);
- 當(dāng)工作線程自己隊列中沒有待處理任務(wù)時,它嘗試去隨機讀?。ǜ`?。┢渌蝿?wù)隊列的 base 端的任務(wù);
- 當(dāng)線程進入 join 操作,它也會去處理其它工作線程的隊列中的任務(wù)(自己的已經(jīng)處理完了),直到目標(biāo)任務(wù)完成(通過 isDone 方法);
- 當(dāng)一個工作線程沒有任務(wù)了,并且嘗試從其它隊列竊取也失敗了,它讓出資源(通過使用 yields, sleeps 或者其它優(yōu)先級調(diào)整)并且隨后會再次激活,直到所有工作線程都空閑了——此時,它們都阻塞在等待另一個頂層線程的調(diào)用。