并發(fā)編程之ForkJoin框架原理分析
前言
前面我們介紹了線程池框架(ExecutorService)的兩個(gè)具體實(shí)現(xiàn):
- ThreadPoolExecutor 默認(rèn)線程池
- ScheduledThreadPoolExecutor定時(shí)線程池
線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟蕉鄠€(gè)任務(wù)上。Java7 又提供了的一個(gè)用于并行執(zhí)行的任務(wù)的框架 Fork/Join ,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。在介紹Fork/Join 框架之前我們先了解幾個(gè)概念:CPU密集型、IO密集型,再逐步深入去認(rèn)識(shí)Fork/Join 框架。
任務(wù)性質(zhì)類型
CPU密集型(CPU bound)
CPU密集型也叫計(jì)算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對(duì)于CPU要好很好多,此時(shí),系統(tǒng)運(yùn)作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內(nèi)存),I/O在很短的時(shí)間就可以完成,而CPU還有許多運(yùn)算要處理,CPU Loading很高。
在多重程序系統(tǒng)中,大部分時(shí)間用來做計(jì)算、邏輯判斷等CPU動(dòng)作的程序稱之 CPU bound。例如一個(gè)計(jì)算圓周率至小數(shù)點(diǎn)一千位以下的程序,在執(zhí)行的過程當(dāng)中絕大部分時(shí)間在用三角函數(shù)和開根號(hào)的計(jì)算,便是屬于CPU bound的程序。
CPU bound的程序一般而言CPU占用率相當(dāng)高。這可能是因?yàn)槿蝿?wù)本身不太需要訪問I/O設(shè)備,也可能是因?yàn)槌绦蚴嵌嗑€程實(shí)現(xiàn)因此屏蔽了等待I/O的時(shí)間。
- 線程數(shù)一般設(shè)置為:線程數(shù) = CPU核數(shù) + 1(現(xiàn)代CPU支持超線程)
IO密集型(I/O bound)
I/O密集型指的是系統(tǒng)的CPU性能相對(duì)硬盤、內(nèi)存要好很多,此時(shí),系統(tǒng)運(yùn)作,大部分的狀況是 CPU 在等 I/O(硬盤/內(nèi)存)的讀/寫操作,此時(shí) CPU Loading 并不高。
I/O bound的程序一般在達(dá)到性能極限時(shí),CPU占用率仍然較低。這可能是因?yàn)槿蝿?wù)本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。
- 線程數(shù)一般設(shè)置為:線程數(shù) = ((線程等待時(shí)間 + 線程CPU時(shí)間) / 線程CPU時(shí)間) * CPU數(shù)目
CPU密集型 VS I/O密集型
我們可以把任務(wù)分為計(jì)算密集型和I/O密集型。
計(jì)算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計(jì)算,消耗CPU資源,比如計(jì)算圓周率、對(duì)視頻進(jìn)行高清解碼等等,全靠CPU的運(yùn)算能力。這種計(jì)算密集型任務(wù)雖然也可以用多任務(wù)完成,但是任務(wù)越多,花在任務(wù)切換的時(shí)間就越多,CPU執(zhí)行任務(wù)的效率就越低,所以,要最高效地利用CPU,計(jì)算密集型任務(wù)同時(shí)進(jìn)行的數(shù)量應(yīng)當(dāng)?shù)扔贑PU的核心數(shù)。
計(jì)算密集型任務(wù)由于主要消耗CPU資源,因此,代碼運(yùn)行效率至關(guān)重要。Python這樣的腳本語言運(yùn)行效率很低,完全不適合計(jì)算密集型任務(wù)。對(duì)于計(jì)算密集型任務(wù),最好用C語言編寫。
第二種任務(wù)的類型是I/O密集型,涉及到網(wǎng)絡(luò)、磁盤I/O的任務(wù)都是I/O密集型任務(wù),這類任務(wù)的特點(diǎn)是CPU消耗很少,任務(wù)的大部分時(shí)間都在等待I/O操作完成(因?yàn)镮/O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)。對(duì)于I/O密集型任務(wù),任務(wù)越多,CPU效率越高,但也有一個(gè)限度。常見的大部分任務(wù)都是I/O密集型任務(wù),比如Web應(yīng)用。
I/O密集型任務(wù)執(zhí)行期間,99%的時(shí)間都花在I/O上,花在CPU上的時(shí)間很少,因此,用運(yùn)行速度極快的C語言替換用Python這樣運(yùn)行速度極低的腳本語言,完全無法提升運(yùn)行效率。對(duì)于I/O密集型任務(wù),最合適的語言就是開發(fā)效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
什么是 Fork/Join 框架?
Fork/Join 框架是 Java7 提供了的一個(gè)用于并行執(zhí)行的任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
Fork 就是把一個(gè)大任務(wù)切分為若干個(gè)子任務(wù)并行的執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個(gè)大任務(wù)的結(jié)果。比如計(jì)算 1+2+......+10000,可以分割成10個(gè)子任務(wù),每個(gè)子任務(wù)對(duì)1000個(gè)數(shù)進(jìn)行求和,最終匯總這10個(gè)子任務(wù)的結(jié)果。如下圖所示:

Fork/Join的特性:
- ForkJoinPool 不是為了替代 ExecutorService,而是它的補(bǔ)充,在某些應(yīng)用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
- ForkJoinPool 主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù),例如 quick sort 等;
- ForkJoinPool 最適合的是計(jì)算密集型的任務(wù),如果存在 I/O、線程間同步、sleep() 等會(huì)造成線程長時(shí)間阻塞的情況時(shí),最好配合 MangedBlocker。
關(guān)于“分而治之”的算法,可以查看《分治、回溯的實(shí)現(xiàn)和特性》
工作竊取算法
工作竊取(work-stealing)算法 是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。
我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng),比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)。
但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。而在這時(shí)它們會(huì)訪問同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競爭,其缺點(diǎn)是在某些情況下還是存在競爭,比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。

- ForkJoinPool 的每個(gè)工作線程都維護(hù)著一個(gè)工作隊(duì)列(WorkQueue),這是一個(gè)雙端隊(duì)列(Deque),里面存放的對(duì)象是任務(wù)(ForkJoinTask)。
- 每個(gè)工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),會(huì)放入工作隊(duì)列的隊(duì)尾,并且工作線程在處理自己的工作隊(duì)列時(shí),使用的是 LIFO 方式,也就是說每次從隊(duì)尾取出任務(wù)來執(zhí)行。
- 每個(gè)工作線程在處理自己的工作隊(duì)列同時(shí),會(huì)嘗試竊取一個(gè)任務(wù)(或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊(duì)列),竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首,也就是說工作線程在竊取其他工作線程的任務(wù)時(shí),使用的是 FIFO 方式。
- 在遇到 join() 時(shí),如果需要 join 的任務(wù)尚未完成,則會(huì)先處理其他任務(wù),并等待其完成。
- 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時(shí),進(jìn)入休眠。
Fork/Join的使用
使用場景示例
定義fork/join任務(wù),如下示例,隨機(jī)生成2000w條數(shù)據(jù)在數(shù)組當(dāng)中,然后求和_
- package com.niuh.forkjoin.recursivetask;
- import java.util.concurrent.RecursiveTask;
- /**
- * RecursiveTask 并行計(jì)算,同步有返回值
- * ForkJoin框架處理的任務(wù)基本都能使用遞歸處理,比如求斐波那契數(shù)列等,但遞歸算法的缺陷是:
- * 一只會(huì)只用單線程處理,
- * 二是遞歸次數(shù)過多時(shí)會(huì)導(dǎo)致堆棧溢出;
- * ForkJoin解決了這兩個(gè)問題,使用多線程并發(fā)處理,充分利用計(jì)算資源來提高效率,同時(shí)避免堆棧溢出發(fā)生。
- * 當(dāng)然像求斐波那契數(shù)列這種小問題直接使用線性算法搞定可能更簡單,實(shí)際應(yīng)用中完全沒必要使用ForkJoin框架,
- * 所以ForkJoin是核彈,是用來對(duì)付大家伙的,比如超大數(shù)組排序。
- * 最佳應(yīng)用場景:多核、多內(nèi)存、可以分割計(jì)算再合并的計(jì)算密集型任務(wù)
- */
- class LongSum extends RecursiveTask<Long> {
- //任務(wù)拆分的最小閥值
- static final int SEQUENTIAL_THRESHOLD = 1000;
- static final long NPS = (1000L * 1000 * 1000);
- static final boolean extraWork = true; // change to add more than just a sum
- int low;
- int high;
- int[] array;
- LongSum(int[] arr, int lo, int hi) {
- array = arr;
- low = lo;
- high = hi;
- }
- /**
- * fork()方法:將任務(wù)放入隊(duì)列并安排異步執(zhí)行,一個(gè)任務(wù)應(yīng)該只調(diào)用一次fork()函數(shù),除非已經(jīng)執(zhí)行完畢并重新初始化。
- * tryUnfork()方法:嘗試把任務(wù)從隊(duì)列中拿出單獨(dú)處理,但不一定成功。
- * join()方法:等待計(jì)算完成并返回計(jì)算結(jié)果。
- * isCompletedAbnormally()方法:用于判斷任務(wù)計(jì)算是否發(fā)生異常。
- */
- protected Long compute() {
- if (high - low <= SEQUENTIAL_THRESHOLD) {
- long sum = 0;
- for (int i = low; i < high; ++i) {
- sum += array[i];
- }
- return sum;
- } else {
- int mid = low + (high - low) / 2;
- LongSum left = new LongSum(array, low, mid);
- LongSum right = new LongSum(array, mid, high);
- left.fork();
- right.fork();
- long rightAns = right.join();
- long leftAns = left.join();
- return leftAns + rightAns;
- }
- }
- }
執(zhí)行fork/join任務(wù)
- package com.niuh.forkjoin.recursivetask;
- import com.niuh.forkjoin.utils.Utils;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinTask;
- public class LongSumMain {
- //獲取邏輯處理器數(shù)量
- static final int NCPU = Runtime.getRuntime().availableProcessors();
- /**
- * for time conversion
- */
- static final long NPS = (1000L * 1000 * 1000);
- static long calcSum;
- static final boolean reportSteals = true;
- public static void main(String[] args) throws Exception {
- int[] array = Utils.buildRandomIntArray(2000000);
- System.out.println("cpu-num:" + NCPU);
- //單線程下計(jì)算數(shù)組數(shù)據(jù)總和
- long start = System.currentTimeMillis();
- calcSum = seqSum(array);
- System.out.println("seq sum=" + calcSum);
- System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
- start = System.currentTimeMillis();
- //采用fork/join方式將數(shù)組求和任務(wù)進(jìn)行拆分執(zhí)行,最后合并結(jié)果
- LongSum ls = new LongSum(array, 0, array.length);
- ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數(shù)
- ForkJoinTask<Long> task = fjp.submit(ls);
- System.out.println("forkjoin sum=" + task.get());
- System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
- if (task.isCompletedAbnormally()) {
- System.out.println(task.getException());
- }
- fjp.shutdown();
- }
- static long seqSum(int[] array) {
- long sum = 0;
- for (int i = 0; i < array.length; ++i) {
- sum += array[i];
- }
- return sum;
- }
- }
Fork/Join框架原理
Fork/Join 其實(shí)就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實(shí)現(xiàn)其三個(gè)抽象子類)為任務(wù)、ForkJoinWorkerThread作為執(zhí)行任務(wù)的具體線程實(shí)體這三者構(gòu)成的任務(wù)調(diào)度機(jī)制。

ForkJoinWorkerThread
ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對(duì)線程的調(diào)度執(zhí)行做任何更改。

ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創(chuàng)建出來之后都被設(shè)置成為了守護(hù)線程,由它來執(zhí)行ForkJoinTasks。該類主要為了維護(hù)創(chuàng)建線程實(shí)例時(shí)通過ForkJoinPool為其創(chuàng)建的任務(wù)隊(duì)列,與其他兩個(gè)線程池整個(gè)線程池只有一個(gè)任務(wù)隊(duì)列不同,F(xiàn)orkJoinPool管理的所有工作線程都擁有自己的工作隊(duì)列,為了實(shí)現(xiàn)任務(wù)竊取機(jī)制,該隊(duì)列被設(shè)計(jì)成一個(gè)雙端隊(duì)列,而ForkJoinWorkerThread的首要任務(wù)就是執(zhí)行自己的這個(gè)雙端任務(wù)隊(duì)列中的任務(wù),其次是竊取其他線程的工作隊(duì)列,以下是其代碼片段:
- public class ForkJoinWorkerThread extends Thread {
- // 這個(gè)線程工作的ForkJoinPool池
- final ForkJoinPool pool;
- // 這個(gè)線程擁有的工作竊取機(jī)制的工作隊(duì)列
- final ForkJoinPool.WorkQueue workQueue;
- //創(chuàng)建在給定ForkJoinPool池中執(zhí)行的ForkJoinWorkerThread。
- protected ForkJoinWorkerThread(ForkJoinPool pool) {
- // Use a placeholder until a useful name can be set in registerWorker
- super("aForkJoinWorkerThread");
- this.pool = pool;
- //向ForkJoinPool執(zhí)行池注冊(cè)當(dāng)前工作線程,F(xiàn)orkJoinPool為其分配一個(gè)工作隊(duì)列
- this.workQueue = pool.registerWorker(this);
- }
- //該工作線程的執(zhí)行內(nèi)容就是執(zhí)行工作隊(duì)列中的任務(wù)
- public void run() {
- if (workQueue.array == null) { // only run once
- Throwable exception = null;
- try {
- onStart();
- pool.runWorker(workQueue); //執(zhí)行工作隊(duì)列中的任務(wù)
- } catch (Throwable ex) {
- exception = ex; //記錄異常
- } finally {
- try {
- onTermination(exception);
- } catch (Throwable ex) {
- if (exception == null)
- exception = ex;
- } finally {
- pool.deregisterWorker(this, exception); //撤銷工作
- }
- }
- }
- }
- .....
- }
ForkJoinTask
ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個(gè)抽象類。

ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個(gè) ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork() 和 join() 操作的機(jī)制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join框架提供類以下幾個(gè)子類:
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)。(比如寫數(shù)據(jù)到磁盤,然后就退出。一個(gè) RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨(dú)立的線程或者 CPU 執(zhí)行。我們可以通過繼承來實(shí)現(xiàn)一個(gè) RecusiveAction)
- RescursiveTask:用于有返回結(jié)果的任務(wù)。(可以將自己的工作分割為若干更小任務(wù),并將這些子任務(wù)的執(zhí)行合并到一個(gè)集體結(jié)果??梢杂袔讉€(gè)水平的分割和合并)
- CountedCompleter :在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行一個(gè)自定義的鉤子函數(shù)。
常量介紹
ForkJoinTask 有一個(gè)int類型的status字段:
- 其高16位存儲(chǔ)任務(wù)執(zhí)行狀態(tài)例如NORMAL、CANCELLED或EXCEPTIONAL
- 低16位預(yù)留用于用戶自定義的標(biāo)記。
任務(wù)未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個(gè)小于0的值,這幾個(gè)值也是按大小順序的:0(初始狀態(tài)) > NORMAL > CANCELLED > EXCEPTIONAL.
- public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
- /** 該任務(wù)的執(zhí)行狀態(tài) */
- volatile int status; // accessed directly by pool and workers
- static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
- static final int NORMAL = 0xf0000000; // must be negative
- static final int CANCELLED = 0xc0000000; // must be < NORMAL
- static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
- static final int SIGNAL = 0x00010000; // must be >= 1 << 16
- static final int SMASK = 0x0000ffff; // short bits for tags
- // 異常哈希表
- //被任務(wù)拋出的異常數(shù)組,為了報(bào)告給調(diào)用者。因?yàn)楫惓:苌僖?,所以我們不直接將它們保存在task對(duì)象中,而是使用弱引用數(shù)組。注意,取消異常不會(huì)出現(xiàn)在數(shù)組,而是記錄在statue字段中
- //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。
- private static final ExceptionNode[] exceptionTable; //異常哈希鏈表數(shù)組
- private static final ReentrantLock exceptionTableLock;
- private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應(yīng)的異常節(jié)點(diǎn)對(duì)象的引用隊(duì)列
- /**
- * 固定容量的exceptionTable.
- */
- private static final int EXCEPTION_MAP_CAPACITY = 32;
- //異常數(shù)組的鍵值對(duì)節(jié)點(diǎn)。
- //該哈希鏈表數(shù)組使用線程id進(jìn)行比較,該數(shù)組具有固定的容量,因?yàn)樗痪S護(hù)任務(wù)異常足夠長,以便參與者訪問它們,所以在持續(xù)的時(shí)間內(nèi)不應(yīng)該變得非常大。但是,由于我們不知道最后一個(gè)joiner何時(shí)完成,我們必須使用弱引用并刪除它們。我們對(duì)每個(gè)操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變?yōu)閕sQuiescent時(shí)都會(huì)調(diào)用helpExpungeStaleExceptions
- static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
- final Throwable ex;
- ExceptionNode next;
- final long thrower; // 拋出異常的線程id
- final int hashCode; // 在弱引用消失之前存儲(chǔ)hashCode
- ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
- super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會(huì)將該節(jié)點(diǎn)加入隊(duì)列exceptionTableRefQueue
- this.ex = ex;
- this.next = next;
- this.thrower = Thread.currentThread().getId();
- this.hashCode = System.identityHashCode(task);
- }
- }
- .................
- }
除了status記錄任務(wù)的執(zhí)行狀態(tài)之外,其他字段主要是為了對(duì)任務(wù)執(zhí)行的異常的處理,F(xiàn)orkJoinTask采用了哈希數(shù)組 + 鏈表的數(shù)據(jù)結(jié)構(gòu)(JDK8以前的HashMap實(shí)現(xiàn)方法)存放所有(因?yàn)檫@些字段是static)的ForkJoinTask任務(wù)的執(zhí)行異常。
fork 方法(安排任務(wù)異步執(zhí)行)
fork() 做的工作只有一件事,既是把任務(wù)推入當(dāng)前工作線程的工作隊(duì)列里(安排任務(wù)異步執(zhí)行)??梢詤⒖匆韵碌脑创a:
- public final ForkJoinTask<V> fork() {
- Thread t;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- ((ForkJoinWorkerThread)t).workQueue.push(this);
- else
- ForkJoinPool.common.externalPush(this);
- return this;
- }
該方法其實(shí)就是將任務(wù)通過push方法加入到當(dāng)前工作線程的工作隊(duì)列或者提交隊(duì)列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務(wù)),等待被線程池調(diào)度執(zhí)行,這是一個(gè)非阻塞的立即返回方法。
- 這里需要知道,F(xiàn)orkJoinPool線程池通過哈希數(shù)組+雙端隊(duì)列的方式將所有的工作線程擁有的任務(wù)隊(duì)列和從外部提交的任務(wù)分別映射到哈希數(shù)組的不同槽位上。
join 方法(等待執(zhí)行結(jié)果)
join() 的工作則復(fù)雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。
- 檢查調(diào)用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當(dāng)前線程,等待任務(wù)完成。如果是,則不阻塞。
- 查看任務(wù)的完成狀態(tài),如果已經(jīng)完成,直接返回結(jié)果。
- 如果任務(wù)尚未完成,但處于自己的工作隊(duì)列內(nèi),則完成它。
- 如果任務(wù)已經(jīng)被其他的工作線程偷走,則竊取這個(gè)小偷的工作隊(duì)列內(nèi)的任務(wù)(以 FIFO 方式),執(zhí)行,以期幫助它早日完成 join 的任務(wù)。
- 如果偷走任務(wù)的小偷也已經(jīng)把自己的任務(wù)全部做完,正在等待需要 join 的任務(wù)時(shí),則找到小偷的小偷,幫助它完成它的任務(wù)。
- 遞歸地執(zhí)行第5步。
將上述流程畫成序列圖的話就是這個(gè)樣子:
由于文章篇幅有限,源碼分析請(qǐng)查看文章末尾的“了解更多”
小結(jié)
通常ForkJoinTask只適用于非循環(huán)依賴的純函數(shù)的計(jì)算或孤立對(duì)象的操作,否則,執(zhí)行可能會(huì)遇到某種形式的死鎖,因?yàn)槿蝿?wù)循環(huán)地等待彼此。但是,這個(gè)框架支持其他方法和技術(shù)(例如使用Phaser、helpQuiesce和complete),這些方法和技術(shù)可用于構(gòu)造解決這種依賴任務(wù)的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標(biāo)記一個(gè)short類型的值,并使用getForkJoinTaskTag進(jìn)行檢查。ForkJoinTask實(shí)現(xiàn)沒有將這些受保護(hù)的方法或標(biāo)記用于任何目的,但是它們可以用于構(gòu)造專門的子類,由此可以使用提供的方法來避免重新訪問已經(jīng)處理過的節(jié)點(diǎn)/任務(wù)。
ForkJoinTask應(yīng)該執(zhí)行相對(duì)較少的計(jì)算,并且應(yīng)該避免不確定的循環(huán)。大任務(wù)應(yīng)該被分解成更小的子任務(wù),通常通過遞歸分解。如果任務(wù)太大,那么并行性就不能提高吞吐量。如果太小,那么內(nèi)存和內(nèi)部任務(wù)維護(hù)開銷可能會(huì)超過處理開銷。
ForkJoinTask是可序列化的,這使它們能夠在諸如遠(yuǎn)程執(zhí)行框架之類的擴(kuò)展中使用。只在執(zhí)行之前或之后序列化任務(wù)才是明智的,而不是在執(zhí)行期間。
ForkJoinPool
ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部。當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒有任務(wù)時(shí),它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)。

常量介紹
ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量
- // Constants shared across ForkJoinPool and WorkQueue
- // 限定參數(shù)
- static final int SMASK = 0xffff; // 低位掩碼,也是最大索引位
- static final int MAX_CAP = 0x7fff; // 工作線程最大容量
- static final int EVENMASK = 0xfffe; // 偶數(shù)低位掩碼
- static final int SQMASK = 0x007e; // workQueues 數(shù)組最多64個(gè)槽位
- // ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
- static final int SCANNING = 1; // 標(biāo)記是否正在運(yùn)行任務(wù)
- static final int INACTIVE = 1 << 31; // 失活狀態(tài) 負(fù)數(shù)
- static final int SS_SEQ = 1 << 16; // 版本戳,防止ABA問題
- // ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
- static final int MODE_MASK = 0xffff << 16; // 模式掩碼
- static final int LIFO_QUEUE = 0; // LIFO隊(duì)列
- static final int FIFO_QUEUE = 1 << 16; // FIFO隊(duì)列
- static final int SHARED_QUEUE = 1 << 31; // 共享模式隊(duì)列,負(fù)數(shù) ForkJoinPool 中的相關(guān)常量和實(shí)例字段:
ForkJoinPool 中的相關(guān)常量和實(shí)例字段
- // 低位和高位掩碼
- private static final long SP_MASK = 0xffffffffL;
- private static final long UC_MASK = ~SP_MASK;
- // 活躍線程數(shù)
- private static final int AC_SHIFT = 48;
- private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
- private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼
- // 工作線程數(shù)
- private static final int TC_SHIFT = 32;
- private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
- private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
- private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 創(chuàng)建工作線程標(biāo)志
- // 池狀態(tài)
- private static final int RSLOCK = 1;
- private static final int RSIGNAL = 1 << 1;
- private static final int STARTED = 1 << 2;
- private static final int STOP = 1 << 29;
- private static final int TERMINATED = 1 << 30;
- private static final int SHUTDOWN = 1 << 31;
- // 實(shí)例字段
- volatile long ctl; // 主控制參數(shù)
- volatile int runState; // 運(yùn)行狀態(tài)鎖
- final int config; // 并行度|模式
- int indexSeed; // 用于生成工作線程索引
- volatile WorkQueue[] workQueues; // 主對(duì)象注冊(cè)信息,workQueue
- final ForkJoinWorkerThreadFactory factory;// 線程工廠
- final UncaughtExceptionHandler ueh; // 每個(gè)工作線程的異常信息
- final String workerNamePrefix; // 用于創(chuàng)建工作線程的名稱
- volatile AtomicLong stealCounter; // 偷取任務(wù)總數(shù),也可作為同步監(jiān)視器
- /** 靜態(tài)初始化字段 */
- //線程工廠
- public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
- //啟動(dòng)或殺死線程的方法調(diào)用者的權(quán)限
- private static final RuntimePermission modifyThreadPermission;
- // 公共靜態(tài)pool
- static final ForkJoinPool common;
- //并行度,對(duì)應(yīng)內(nèi)部common池
- static final int commonParallelism;
- //備用線程數(shù),在tryCompensate中使用
- private static int commonMaxSpares;
- //創(chuàng)建workerNamePrefix(工作線程名稱前綴)時(shí)的序號(hào)
- private static int poolNumberSequence;
- //線程阻塞等待新的任務(wù)的超時(shí)值(以納秒為單位),默認(rèn)2秒
- private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
- //空閑超時(shí)時(shí)間,防止timer未命中
- private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
- //默認(rèn)備用線程數(shù)
- private static final int DEFAULT_COMMON_MAX_SPARES = 256;
- //阻塞前自旋的次數(shù),用在在awaitRunStateLock和awaitWork中
- private static final int SPINS = 0;
- //indexSeed的增量
- private static final int SEED_INCREMENT = 0x9e3779b9;
ForkJoinPool 的內(nèi)部狀態(tài)都是通過一個(gè)64位的 long 型 變量ctl來存儲(chǔ),它由四個(gè)16位的子域組成:
- AC: 正在運(yùn)行工作線程數(shù)減去目標(biāo)并行度,高16位
- TC: 總工作線程數(shù)減去目標(biāo)并行度,中高16位
- SS: 棧頂?shù)却€程的版本計(jì)數(shù)和狀態(tài),中低16位
- ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位
ForkJoinPool.WorkQueue 中的相關(guān)屬性:
- //初始隊(duì)列容量,2的冪
- static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
- //最大隊(duì)列容量
- static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
- // 實(shí)例字段
- volatile int scanState; // Woker狀態(tài), <0: inactive; odd:scanning
- int stackPred; // 記錄前一個(gè)棧頂?shù)腸tl
- int nsteals; // 偷取任務(wù)數(shù)
- int hint; // 記錄偷取者索引,初始為隨機(jī)索引
- int config; // 池索引和模式
- volatile int qlock; // 1: locked, < 0: terminate; else 0
- volatile int base; // 下一個(gè)poll操作的索引(棧底/隊(duì)列頭)
- int top; // 一個(gè)push操作的索引(棧頂/隊(duì)列尾)
- ForkJoinTask<?>[] array; // 任務(wù)數(shù)組
- final ForkJoinPool pool; // the containing pool (may be null)
- final ForkJoinWorkerThread owner; // 當(dāng)前工作隊(duì)列的工作線程,共享模式下為null
- volatile Thread parker; // 調(diào)用park阻塞期間為owner,其他情況為null
- volatile ForkJoinTask<?> currentJoin; // 記錄被join過來的任務(wù)
- volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊(duì)列偷取過來的任務(wù)
內(nèi)部數(shù)據(jù)結(jié)構(gòu)
ForkJoinPool采用了哈希數(shù)組 + 雙端隊(duì)列的方式存放任務(wù),但這里的任務(wù)分為兩類:
- 一類是通過execute、submit 提交的外部任務(wù)
- 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務(wù)
ForkJoinPool并沒有把這兩種任務(wù)混在一個(gè)任務(wù)隊(duì)列中,對(duì)于外部任務(wù),會(huì)利用Thread內(nèi)部的隨機(jī)probe值映射到哈希數(shù)組的偶數(shù)槽位中的提交隊(duì)列中,這種提交隊(duì)列是一種數(shù)組實(shí)現(xiàn)的雙端隊(duì)列稱之為Submission Queue,專門存放外部提交的任務(wù)。
對(duì)于ForkJoinWorkerThread工作線程,每一個(gè)工作線程都分配了一個(gè)工作隊(duì)列,這也是一個(gè)雙端隊(duì)列,稱之為Work Queue,這種隊(duì)列都會(huì)被映射到哈希數(shù)組的奇數(shù)槽位,每一個(gè)工作線程fork/join分解的任務(wù)都會(huì)被添加到自己擁有的那個(gè)工作隊(duì)列中。
在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數(shù)組,其元素就是內(nèi)部類WorkQueue實(shí)現(xiàn)的基于數(shù)組的雙端隊(duì)列。該哈希數(shù)組的長度為2的冪,并且支持?jǐn)U容。如下就是該哈希數(shù)組的示意結(jié)構(gòu)圖:
如圖,提交隊(duì)列位于哈希數(shù)組workQueue的奇數(shù)索引槽位,工作線程的工作隊(duì)列位于偶數(shù)槽位。
- 默認(rèn)情況下,asyncMode為false時(shí):因此工作線程把工作隊(duì)列當(dāng)著棧一樣使用(后進(jìn)先出),將分解的子任務(wù)推入工作隊(duì)列的top端,取任務(wù)的時(shí)候也從top端取(凡是雙端隊(duì)列都會(huì)有兩個(gè)分別指向隊(duì)列兩端的指針,這里就是圖上畫出的base和top);而當(dāng)某些工作線程的任務(wù)為空的時(shí)候,就會(huì)從其他隊(duì)列(不限于workQueue,也會(huì)是提交隊(duì)列)竊取(steal)任務(wù),如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個(gè)任務(wù),竊取任務(wù)的時(shí)候采用的是先進(jìn)先出FIFO的策略(即從base端竊取任務(wù)),這樣不但可以避免在取任務(wù)的時(shí)候與擁有其隊(duì)列的工作線程發(fā)生沖突,從而減小競爭,還可以輔助其完成比較大的任務(wù)。
- asyncMode為true的話,擁有該工作隊(duì)列的工作線程將按照先進(jìn)先出的策略從base端取任務(wù),這一般只用于不需要返回結(jié)果的任務(wù),或者事件消息傳遞框架。
ForkJoinPool構(gòu)造函數(shù)
其完整構(gòu)造方法如下
- private ForkJoinPool(int parallelism,
- ForkJoinWorkerThreadFactory factory,
- UncaughtExceptionHandler handler,
- int mode,
- String workerNamePrefix) {
- this.workerNamePrefix = workerNamePrefix;
- this.factory = factory;
- this.ueh = handler;
- this.config = (parallelism & SMASK) | mode;
- long np = (long)(-parallelism); // offset ctl counts
- this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
- }
重要參數(shù)解釋
- parallelism:并行度( the parallelism level),默認(rèn)情況下跟我們機(jī)器的cpu個(gè)數(shù)保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機(jī)器運(yùn)行時(shí)可用的CPU個(gè)數(shù)。
- factory:創(chuàng)建新線程的工廠( the factory for creating new threads)。默認(rèn)情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
- handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執(zhí)行任務(wù)時(shí)由于某些無法預(yù)料到的錯(cuò)誤而導(dǎo)致任務(wù)線程中斷時(shí)進(jìn)行一些處理,默認(rèn)情況為null。
- asyncMode:這個(gè)參數(shù)要注意,在ForkJoinPool中,每一個(gè)工作線程都有一個(gè)獨(dú)立的任務(wù)隊(duì)列
- asyncMode表示工作線程內(nèi)的任務(wù)隊(duì)列是采用何種方式進(jìn)行調(diào)度,可以是先進(jìn)先出FIFO,也可以是后進(jìn)先出LIFO。如果為true,則線程池中的工作線程則使用先進(jìn)先出方式進(jìn)行任務(wù)調(diào)度,默認(rèn)情況下是false。
ForkJoinPool.submit 方法
- public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
- if (task == null)
- throw new NullPointerException();
- //提交到工作隊(duì)列
- externalPush(task);
- return task;
- }
ForkJoinPool 自身擁有工作隊(duì)列,這些工作隊(duì)列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務(wù),而這些工作隊(duì)列被稱為 submitting queue 。 submit() 和 fork() 其實(shí)沒有本質(zhì)區(qū)別,只是提交對(duì)象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對(duì)象,因此當(dāng)其中的任務(wù)被一個(gè)工作線程成功竊取時(shí),就意味著提交的任務(wù)真正開始進(jìn)入執(zhí)行階段。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git