自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

并發(fā)編程之ForkJoin框架原理分析

開發(fā) 前端
Java7 又提供了的一個(gè)用于并行執(zhí)行的任務(wù)的框架 Fork/Join ,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。在介紹Fork/Join 框架之前我們先了解幾個(gè)概念:CPU密集型、IO密集型,再逐步深入去認(rèn)識(shí)Fork/Join 框架。

[[358064]]

 前言

前面我們介紹了線程池框架(ExecutorService)的兩個(gè)具體實(shí)現(xiàn):

  • ThreadPoolExecutor 默認(rèn)線程池
  • ScheduledThreadPoolExecutor定時(shí)線程池

線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟蕉鄠€(gè)任務(wù)上。Java7 又提供了的一個(gè)用于并行執(zhí)行的任務(wù)的框架 Fork/Join ,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。在介紹Fork/Join 框架之前我們先了解幾個(gè)概念:CPU密集型、IO密集型,再逐步深入去認(rèn)識(shí)Fork/Join 框架。

任務(wù)性質(zhì)類型

CPU密集型(CPU bound)

CPU密集型也叫計(jì)算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對(duì)于CPU要好很好多,此時(shí),系統(tǒng)運(yùn)作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內(nèi)存),I/O在很短的時(shí)間就可以完成,而CPU還有許多運(yùn)算要處理,CPU Loading很高。

在多重程序系統(tǒng)中,大部分時(shí)間用來做計(jì)算、邏輯判斷等CPU動(dòng)作的程序稱之 CPU bound。例如一個(gè)計(jì)算圓周率至小數(shù)點(diǎn)一千位以下的程序,在執(zhí)行的過程當(dāng)中絕大部分時(shí)間在用三角函數(shù)和開根號(hào)的計(jì)算,便是屬于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相當(dāng)高。這可能是因?yàn)槿蝿?wù)本身不太需要訪問I/O設(shè)備,也可能是因?yàn)槌绦蚴嵌嗑€程實(shí)現(xiàn)因此屏蔽了等待I/O的時(shí)間。

  • 線程數(shù)一般設(shè)置為:線程數(shù) = CPU核數(shù) + 1(現(xiàn)代CPU支持超線程)

IO密集型(I/O bound)

I/O密集型指的是系統(tǒng)的CPU性能相對(duì)硬盤、內(nèi)存要好很多,此時(shí),系統(tǒng)運(yùn)作,大部分的狀況是 CPU 在等 I/O(硬盤/內(nèi)存)的讀/寫操作,此時(shí) CPU Loading 并不高。

I/O bound的程序一般在達(dá)到性能極限時(shí),CPU占用率仍然較低。這可能是因?yàn)槿蝿?wù)本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。

  • 線程數(shù)一般設(shè)置為:線程數(shù) = ((線程等待時(shí)間 + 線程CPU時(shí)間) / 線程CPU時(shí)間) * CPU數(shù)目

CPU密集型 VS I/O密集型

我們可以把任務(wù)分為計(jì)算密集型和I/O密集型。

計(jì)算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計(jì)算,消耗CPU資源,比如計(jì)算圓周率、對(duì)視頻進(jìn)行高清解碼等等,全靠CPU的運(yùn)算能力。這種計(jì)算密集型任務(wù)雖然也可以用多任務(wù)完成,但是任務(wù)越多,花在任務(wù)切換的時(shí)間就越多,CPU執(zhí)行任務(wù)的效率就越低,所以,要最高效地利用CPU,計(jì)算密集型任務(wù)同時(shí)進(jìn)行的數(shù)量應(yīng)當(dāng)?shù)扔贑PU的核心數(shù)。

計(jì)算密集型任務(wù)由于主要消耗CPU資源,因此,代碼運(yùn)行效率至關(guān)重要。Python這樣的腳本語言運(yùn)行效率很低,完全不適合計(jì)算密集型任務(wù)。對(duì)于計(jì)算密集型任務(wù),最好用C語言編寫。

第二種任務(wù)的類型是I/O密集型,涉及到網(wǎng)絡(luò)、磁盤I/O的任務(wù)都是I/O密集型任務(wù),這類任務(wù)的特點(diǎn)是CPU消耗很少,任務(wù)的大部分時(shí)間都在等待I/O操作完成(因?yàn)镮/O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)。對(duì)于I/O密集型任務(wù),任務(wù)越多,CPU效率越高,但也有一個(gè)限度。常見的大部分任務(wù)都是I/O密集型任務(wù),比如Web應(yīng)用。

I/O密集型任務(wù)執(zhí)行期間,99%的時(shí)間都花在I/O上,花在CPU上的時(shí)間很少,因此,用運(yùn)行速度極快的C語言替換用Python這樣運(yùn)行速度極低的腳本語言,完全無法提升運(yùn)行效率。對(duì)于I/O密集型任務(wù),最合適的語言就是開發(fā)效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

什么是 Fork/Join 框架?

Fork/Join 框架是 Java7 提供了的一個(gè)用于并行執(zhí)行的任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。

Fork 就是把一個(gè)大任務(wù)切分為若干個(gè)子任務(wù)并行的執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個(gè)大任務(wù)的結(jié)果。比如計(jì)算 1+2+......+10000,可以分割成10個(gè)子任務(wù),每個(gè)子任務(wù)對(duì)1000個(gè)數(shù)進(jìn)行求和,最終匯總這10個(gè)子任務(wù)的結(jié)果。如下圖所示:


Fork/Join的特性:

  1. ForkJoinPool 不是為了替代 ExecutorService,而是它的補(bǔ)充,在某些應(yīng)用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù),例如 quick sort 等;
  3. ForkJoinPool 最適合的是計(jì)算密集型的任務(wù),如果存在 I/O、線程間同步、sleep() 等會(huì)造成線程長時(shí)間阻塞的情況時(shí),最好配合 MangedBlocker。

關(guān)于“分而治之”的算法,可以查看《分治、回溯的實(shí)現(xiàn)和特性》

工作竊取算法

工作竊取(work-stealing)算法 是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。

我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng),比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)。

但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。而在這時(shí)它們會(huì)訪問同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。


工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競爭,其缺點(diǎn)是在某些情況下還是存在競爭,比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。


  1. ForkJoinPool 的每個(gè)工作線程都維護(hù)著一個(gè)工作隊(duì)列(WorkQueue),這是一個(gè)雙端隊(duì)列(Deque),里面存放的對(duì)象是任務(wù)(ForkJoinTask)。
  2. 每個(gè)工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),會(huì)放入工作隊(duì)列的隊(duì)尾,并且工作線程在處理自己的工作隊(duì)列時(shí),使用的是 LIFO 方式,也就是說每次從隊(duì)尾取出任務(wù)來執(zhí)行。
  3. 每個(gè)工作線程在處理自己的工作隊(duì)列同時(shí),會(huì)嘗試竊取一個(gè)任務(wù)(或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊(duì)列),竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首,也就是說工作線程在竊取其他工作線程的任務(wù)時(shí),使用的是 FIFO 方式。
  4. 在遇到 join() 時(shí),如果需要 join 的任務(wù)尚未完成,則會(huì)先處理其他任務(wù),并等待其完成。
  5. 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時(shí),進(jìn)入休眠。

Fork/Join的使用

使用場景示例

定義fork/join任務(wù),如下示例,隨機(jī)生成2000w條數(shù)據(jù)在數(shù)組當(dāng)中,然后求和_

  1. package com.niuh.forkjoin.recursivetask; 
  2.  
  3. import java.util.concurrent.RecursiveTask; 
  4.  
  5. /** 
  6.  * RecursiveTask 并行計(jì)算,同步有返回值 
  7.  * ForkJoin框架處理的任務(wù)基本都能使用遞歸處理,比如求斐波那契數(shù)列等,但遞歸算法的缺陷是: 
  8.  * 一只會(huì)只用單線程處理, 
  9.  * 二是遞歸次數(shù)過多時(shí)會(huì)導(dǎo)致堆棧溢出; 
  10.  * ForkJoin解決了這兩個(gè)問題,使用多線程并發(fā)處理,充分利用計(jì)算資源來提高效率,同時(shí)避免堆棧溢出發(fā)生。 
  11.  * 當(dāng)然像求斐波那契數(shù)列這種小問題直接使用線性算法搞定可能更簡單,實(shí)際應(yīng)用中完全沒必要使用ForkJoin框架, 
  12.  * 所以ForkJoin是核彈,是用來對(duì)付大家伙的,比如超大數(shù)組排序。 
  13.  * 最佳應(yīng)用場景:多核、多內(nèi)存、可以分割計(jì)算再合并的計(jì)算密集型任務(wù) 
  14.  */ 
  15. class LongSum extends RecursiveTask<Long> { 
  16.     //任務(wù)拆分的最小閥值 
  17.     static final int SEQUENTIAL_THRESHOLD = 1000; 
  18.     static final long NPS = (1000L * 1000 * 1000); 
  19.     static final boolean extraWork = true; // change to add more than just a sum 
  20.  
  21.  
  22.     int low; 
  23.     int high; 
  24.     int[] array; 
  25.  
  26.     LongSum(int[] arr, int lo, int hi) { 
  27.         array = arr; 
  28.         low = lo; 
  29.         high = hi; 
  30.     } 
  31.  
  32.     /** 
  33.      * fork()方法:將任務(wù)放入隊(duì)列并安排異步執(zhí)行,一個(gè)任務(wù)應(yīng)該只調(diào)用一次fork()函數(shù),除非已經(jīng)執(zhí)行完畢并重新初始化。 
  34.      * tryUnfork()方法:嘗試把任務(wù)從隊(duì)列中拿出單獨(dú)處理,但不一定成功。 
  35.      * join()方法:等待計(jì)算完成并返回計(jì)算結(jié)果。 
  36.      * isCompletedAbnormally()方法:用于判斷任務(wù)計(jì)算是否發(fā)生異常。 
  37.      */ 
  38.     protected Long compute() { 
  39.  
  40.         if (high - low <= SEQUENTIAL_THRESHOLD) { 
  41.             long sum = 0; 
  42.             for (int i = low; i < high; ++i) { 
  43.                 sum += array[i]; 
  44.             } 
  45.             return sum
  46.  
  47.         } else { 
  48.             int mid = low + (high - low) / 2; 
  49.             LongSum left = new LongSum(array, low, mid); 
  50.             LongSum right = new LongSum(array, mid, high); 
  51.             left.fork(); 
  52.             right.fork(); 
  53.             long rightAns = right.join(); 
  54.             long leftAns = left.join(); 
  55.             return leftAns + rightAns; 
  56.         } 
  57.     } 

 執(zhí)行fork/join任務(wù)

  1. package com.niuh.forkjoin.recursivetask; 
  2.  
  3. import com.niuh.forkjoin.utils.Utils; 
  4.  
  5. import java.util.concurrent.ForkJoinPool; 
  6. import java.util.concurrent.ForkJoinTask; 
  7.  
  8. public class LongSumMain { 
  9.     //獲取邏輯處理器數(shù)量 
  10.     static final int NCPU = Runtime.getRuntime().availableProcessors(); 
  11.     /** 
  12.      * for time conversion 
  13.      */ 
  14.     static final long NPS = (1000L * 1000 * 1000); 
  15.  
  16.     static long calcSum; 
  17.  
  18.     static final boolean reportSteals = true
  19.  
  20.     public static void main(String[] args) throws Exception { 
  21.         int[] array = Utils.buildRandomIntArray(2000000); 
  22.         System.out.println("cpu-num:" + NCPU); 
  23.         //單線程下計(jì)算數(shù)組數(shù)據(jù)總和 
  24.         long start = System.currentTimeMillis(); 
  25.         calcSum = seqSum(array); 
  26.         System.out.println("seq sum=" + calcSum); 
  27.         System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start)); 
  28.  
  29.         start = System.currentTimeMillis(); 
  30.         //采用fork/join方式將數(shù)組求和任務(wù)進(jìn)行拆分執(zhí)行,最后合并結(jié)果 
  31.         LongSum ls = new LongSum(array, 0, array.length); 
  32.         ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數(shù) 
  33.         ForkJoinTask<Long> task = fjp.submit(ls); 
  34.  
  35.         System.out.println("forkjoin sum=" + task.get()); 
  36.         System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start)); 
  37.         if (task.isCompletedAbnormally()) { 
  38.             System.out.println(task.getException()); 
  39.         } 
  40.  
  41.         fjp.shutdown(); 
  42.  
  43.     } 
  44.  
  45.  
  46.     static long seqSum(int[] array) { 
  47.         long sum = 0; 
  48.         for (int i = 0; i < array.length; ++i) { 
  49.             sum += array[i]; 
  50.         } 
  51.         return sum
  52.     } 

 Fork/Join框架原理

Fork/Join 其實(shí)就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實(shí)現(xiàn)其三個(gè)抽象子類)為任務(wù)、ForkJoinWorkerThread作為執(zhí)行任務(wù)的具體線程實(shí)體這三者構(gòu)成的任務(wù)調(diào)度機(jī)制。


ForkJoinWorkerThread

ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對(duì)線程的調(diào)度執(zhí)行做任何更改。


ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創(chuàng)建出來之后都被設(shè)置成為了守護(hù)線程,由它來執(zhí)行ForkJoinTasks。該類主要為了維護(hù)創(chuàng)建線程實(shí)例時(shí)通過ForkJoinPool為其創(chuàng)建的任務(wù)隊(duì)列,與其他兩個(gè)線程池整個(gè)線程池只有一個(gè)任務(wù)隊(duì)列不同,F(xiàn)orkJoinPool管理的所有工作線程都擁有自己的工作隊(duì)列,為了實(shí)現(xiàn)任務(wù)竊取機(jī)制,該隊(duì)列被設(shè)計(jì)成一個(gè)雙端隊(duì)列,而ForkJoinWorkerThread的首要任務(wù)就是執(zhí)行自己的這個(gè)雙端任務(wù)隊(duì)列中的任務(wù),其次是竊取其他線程的工作隊(duì)列,以下是其代碼片段:

  1. public class ForkJoinWorkerThread extends Thread { 
  2.  // 這個(gè)線程工作的ForkJoinPool池 
  3.     final ForkJoinPool pool;     
  4.     // 這個(gè)線程擁有的工作竊取機(jī)制的工作隊(duì)列 
  5.     final ForkJoinPool.WorkQueue workQueue;  
  6.  
  7.     //創(chuàng)建在給定ForkJoinPool池中執(zhí)行的ForkJoinWorkerThread。 
  8.     protected ForkJoinWorkerThread(ForkJoinPool pool) { 
  9.         // Use a placeholder until a useful name can be set in registerWorker 
  10.         super("aForkJoinWorkerThread"); 
  11.         this.pool = pool; 
  12.         //向ForkJoinPool執(zhí)行池注冊(cè)當(dāng)前工作線程,F(xiàn)orkJoinPool為其分配一個(gè)工作隊(duì)列 
  13.         this.workQueue = pool.registerWorker(this);  
  14.     } 
  15.  
  16.     //該工作線程的執(zhí)行內(nèi)容就是執(zhí)行工作隊(duì)列中的任務(wù) 
  17.     public void run() { 
  18.         if (workQueue.array == null) { // only run once 
  19.             Throwable exception = null
  20.             try { 
  21.                 onStart(); 
  22.                 pool.runWorker(workQueue); //執(zhí)行工作隊(duì)列中的任務(wù) 
  23.             } catch (Throwable ex) { 
  24.                 exception = ex; //記錄異常 
  25.             } finally { 
  26.                 try { 
  27.                     onTermination(exception); 
  28.                 } catch (Throwable ex) { 
  29.                     if (exception == null
  30.                         exception = ex; 
  31.                 } finally { 
  32.                     pool.deregisterWorker(this, exception); //撤銷工作 
  33.                 } 
  34.             } 
  35.         } 
  36.     } 
  37.  
  38.     ..... 

 ForkJoinTask

ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個(gè)抽象類。


ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個(gè) ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork() 和 join() 操作的機(jī)制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join框架提供類以下幾個(gè)子類:

  • RecursiveAction:用于沒有返回結(jié)果的任務(wù)。(比如寫數(shù)據(jù)到磁盤,然后就退出。一個(gè) RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨(dú)立的線程或者 CPU 執(zhí)行。我們可以通過繼承來實(shí)現(xiàn)一個(gè) RecusiveAction)
  • RescursiveTask:用于有返回結(jié)果的任務(wù)。(可以將自己的工作分割為若干更小任務(wù),并將這些子任務(wù)的執(zhí)行合并到一個(gè)集體結(jié)果??梢杂袔讉€(gè)水平的分割和合并)
  • CountedCompleter :在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行一個(gè)自定義的鉤子函數(shù)。

常量介紹

ForkJoinTask 有一個(gè)int類型的status字段:

  • 其高16位存儲(chǔ)任務(wù)執(zhí)行狀態(tài)例如NORMAL、CANCELLED或EXCEPTIONAL
  • 低16位預(yù)留用于用戶自定義的標(biāo)記。

任務(wù)未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個(gè)小于0的值,這幾個(gè)值也是按大小順序的:0(初始狀態(tài)) > NORMAL > CANCELLED > EXCEPTIONAL.

  1. public abstract class ForkJoinTask<V> implements Future<V>, Serializable { 
  2.  
  3.     /** 該任務(wù)的執(zhí)行狀態(tài) */ 
  4.     volatile int status; // accessed directly by pool and workers 
  5.     static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits 
  6.     static final int NORMAL      = 0xf0000000;  // must be negative 
  7.     static final int CANCELLED   = 0xc0000000;  // must be < NORMAL 
  8.     static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED 
  9.     static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 
  10.     static final int SMASK       = 0x0000ffff;  // short bits for tags 
  11.  
  12.     // 異常哈希表 
  13.  
  14.     //被任務(wù)拋出的異常數(shù)組,為了報(bào)告給調(diào)用者。因?yàn)楫惓:苌僖?,所以我們不直接將它們保存在task對(duì)象中,而是使用弱引用數(shù)組。注意,取消異常不會(huì)出現(xiàn)在數(shù)組,而是記錄在statue字段中 
  15.     //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。 
  16.     private static final ExceptionNode[] exceptionTable;        //異常哈希鏈表數(shù)組 
  17.     private static final ReentrantLock exceptionTableLock; 
  18.     private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應(yīng)的異常節(jié)點(diǎn)對(duì)象的引用隊(duì)列 
  19.  
  20.     /** 
  21.     * 固定容量的exceptionTable. 
  22.     */ 
  23.     private static final int EXCEPTION_MAP_CAPACITY = 32; 
  24.  
  25.  
  26.     //異常數(shù)組的鍵值對(duì)節(jié)點(diǎn)。 
  27.     //該哈希鏈表數(shù)組使用線程id進(jìn)行比較,該數(shù)組具有固定的容量,因?yàn)樗痪S護(hù)任務(wù)異常足夠長,以便參與者訪問它們,所以在持續(xù)的時(shí)間內(nèi)不應(yīng)該變得非常大。但是,由于我們不知道最后一個(gè)joiner何時(shí)完成,我們必須使用弱引用并刪除它們。我們對(duì)每個(gè)操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變?yōu)閕sQuiescent時(shí)都會(huì)調(diào)用helpExpungeStaleExceptions 
  28.     static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { 
  29.         final Throwable ex; 
  30.         ExceptionNode next
  31.         final long thrower;  // 拋出異常的線程id 
  32.         final int hashCode;  // 在弱引用消失之前存儲(chǔ)hashCode 
  33.         ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { 
  34.             super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會(huì)將該節(jié)點(diǎn)加入隊(duì)列exceptionTableRefQueue 
  35.             this.ex = ex; 
  36.             this.next = next
  37.             this.thrower = Thread.currentThread().getId(); 
  38.             this.hashCode = System.identityHashCode(task); 
  39.         } 
  40.     } 
  41.  
  42.     ................. 

 除了status記錄任務(wù)的執(zhí)行狀態(tài)之外,其他字段主要是為了對(duì)任務(wù)執(zhí)行的異常的處理,F(xiàn)orkJoinTask采用了哈希數(shù)組 + 鏈表的數(shù)據(jù)結(jié)構(gòu)(JDK8以前的HashMap實(shí)現(xiàn)方法)存放所有(因?yàn)檫@些字段是static)的ForkJoinTask任務(wù)的執(zhí)行異常。

fork 方法(安排任務(wù)異步執(zhí)行)

fork() 做的工作只有一件事,既是把任務(wù)推入當(dāng)前工作線程的工作隊(duì)列里(安排任務(wù)異步執(zhí)行)??梢詤⒖匆韵碌脑创a:

  1. public final ForkJoinTask<V> fork() { 
  2.     Thread t; 
  3.     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 
  4.         ((ForkJoinWorkerThread)t).workQueue.push(this); 
  5.     else 
  6.         ForkJoinPool.common.externalPush(this); 
  7.     return this; 

 該方法其實(shí)就是將任務(wù)通過push方法加入到當(dāng)前工作線程的工作隊(duì)列或者提交隊(duì)列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務(wù)),等待被線程池調(diào)度執(zhí)行,這是一個(gè)非阻塞的立即返回方法。

  • 這里需要知道,F(xiàn)orkJoinPool線程池通過哈希數(shù)組+雙端隊(duì)列的方式將所有的工作線程擁有的任務(wù)隊(duì)列和從外部提交的任務(wù)分別映射到哈希數(shù)組的不同槽位上。

join 方法(等待執(zhí)行結(jié)果)

join() 的工作則復(fù)雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。

  1. 檢查調(diào)用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當(dāng)前線程,等待任務(wù)完成。如果是,則不阻塞。
  2. 查看任務(wù)的完成狀態(tài),如果已經(jīng)完成,直接返回結(jié)果。
  3. 如果任務(wù)尚未完成,但處于自己的工作隊(duì)列內(nèi),則完成它。
  4. 如果任務(wù)已經(jīng)被其他的工作線程偷走,則竊取這個(gè)小偷的工作隊(duì)列內(nèi)的任務(wù)(以 FIFO 方式),執(zhí)行,以期幫助它早日完成 join 的任務(wù)。
  5. 如果偷走任務(wù)的小偷也已經(jīng)把自己的任務(wù)全部做完,正在等待需要 join 的任務(wù)時(shí),則找到小偷的小偷,幫助它完成它的任務(wù)。
  6. 遞歸地執(zhí)行第5步。

將上述流程畫成序列圖的話就是這個(gè)樣子:

 由于文章篇幅有限,源碼分析請(qǐng)查看文章末尾的“了解更多”

小結(jié)

通常ForkJoinTask只適用于非循環(huán)依賴的純函數(shù)的計(jì)算或孤立對(duì)象的操作,否則,執(zhí)行可能會(huì)遇到某種形式的死鎖,因?yàn)槿蝿?wù)循環(huán)地等待彼此。但是,這個(gè)框架支持其他方法和技術(shù)(例如使用Phaser、helpQuiesce和complete),這些方法和技術(shù)可用于構(gòu)造解決這種依賴任務(wù)的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標(biāo)記一個(gè)short類型的值,并使用getForkJoinTaskTag進(jìn)行檢查。ForkJoinTask實(shí)現(xiàn)沒有將這些受保護(hù)的方法或標(biāo)記用于任何目的,但是它們可以用于構(gòu)造專門的子類,由此可以使用提供的方法來避免重新訪問已經(jīng)處理過的節(jié)點(diǎn)/任務(wù)。

ForkJoinTask應(yīng)該執(zhí)行相對(duì)較少的計(jì)算,并且應(yīng)該避免不確定的循環(huán)。大任務(wù)應(yīng)該被分解成更小的子任務(wù),通常通過遞歸分解。如果任務(wù)太大,那么并行性就不能提高吞吐量。如果太小,那么內(nèi)存和內(nèi)部任務(wù)維護(hù)開銷可能會(huì)超過處理開銷。

ForkJoinTask是可序列化的,這使它們能夠在諸如遠(yuǎn)程執(zhí)行框架之類的擴(kuò)展中使用。只在執(zhí)行之前或之后序列化任務(wù)才是明智的,而不是在執(zhí)行期間。

ForkJoinPool

ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部。當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒有任務(wù)時(shí),它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)。


常量介紹

ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量

  1. // Constants shared across ForkJoinPool and WorkQueue 
  2.  
  3. // 限定參數(shù) 
  4. static final int SMASK = 0xffff;        //  低位掩碼,也是最大索引位 
  5. static final int MAX_CAP = 0x7fff;      //  工作線程最大容量 
  6. static final int EVENMASK = 0xfffe;     //  偶數(shù)低位掩碼 
  7. static final int SQMASK = 0x007e;       //  workQueues 數(shù)組最多64個(gè)槽位 
  8.  
  9. // ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位 
  10. static final int SCANNING = 1;          // 標(biāo)記是否正在運(yùn)行任務(wù) 
  11. static final int INACTIVE = 1 << 31;    // 失活狀態(tài)  負(fù)數(shù) 
  12. static final int SS_SEQ = 1 << 16;      // 版本戳,防止ABA問題 
  13.  
  14. // ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記 
  15. static final int MODE_MASK = 0xffff << 16;  // 模式掩碼 
  16. static final int LIFO_QUEUE = 0;    // LIFO隊(duì)列 
  17. static final int FIFO_QUEUE = 1 << 16;  // FIFO隊(duì)列 
  18. static final int SHARED_QUEUE = 1 << 31;    // 共享模式隊(duì)列,負(fù)數(shù) ForkJoinPool 中的相關(guān)常量和實(shí)例字段: 

 ForkJoinPool 中的相關(guān)常量和實(shí)例字段

  1. // 低位和高位掩碼 
  2. private static final long SP_MASK = 0xffffffffL; 
  3. private static final long UC_MASK = ~SP_MASK; 
  4.  
  5. // 活躍線程數(shù) 
  6. private static final int AC_SHIFT = 48; 
  7. private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量 
  8. private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼 
  9.  
  10. // 工作線程數(shù) 
  11. private static final int TC_SHIFT = 32; 
  12. private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量 
  13. private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼 
  14. private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 創(chuàng)建工作線程標(biāo)志 
  15.  
  16. // 池狀態(tài) 
  17. private static final int RSLOCK = 1; 
  18. private static final int RSIGNAL = 1 << 1; 
  19. private static final int STARTED = 1 << 2; 
  20. private static final int STOP = 1 << 29; 
  21. private static final int TERMINATED = 1 << 30; 
  22. private static final int SHUTDOWN = 1 << 31; 
  23.  
  24. // 實(shí)例字段 
  25. volatile long ctl;                   // 主控制參數(shù) 
  26. volatile int runState;               // 運(yùn)行狀態(tài)鎖 
  27. final int config;                    // 并行度|模式 
  28. int indexSeed;                       // 用于生成工作線程索引 
  29. volatile WorkQueue[] workQueues;     // 主對(duì)象注冊(cè)信息,workQueue 
  30. final ForkJoinWorkerThreadFactory factory;// 線程工廠 
  31. final UncaughtExceptionHandler ueh;  // 每個(gè)工作線程的異常信息 
  32. final String workerNamePrefix;       // 用于創(chuàng)建工作線程的名稱 
  33. volatile AtomicLong stealCounter;    // 偷取任務(wù)總數(shù),也可作為同步監(jiān)視器 
  34.  
  35. /** 靜態(tài)初始化字段 */ 
  36. //線程工廠 
  37. public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; 
  38. //啟動(dòng)或殺死線程的方法調(diào)用者的權(quán)限 
  39. private static final RuntimePermission modifyThreadPermission; 
  40. // 公共靜態(tài)pool 
  41. static final ForkJoinPool common; 
  42. //并行度,對(duì)應(yīng)內(nèi)部common池 
  43. static final int commonParallelism; 
  44. //備用線程數(shù),在tryCompensate中使用 
  45. private static int commonMaxSpares; 
  46. //創(chuàng)建workerNamePrefix(工作線程名稱前綴)時(shí)的序號(hào) 
  47. private static int poolNumberSequence; 
  48. //線程阻塞等待新的任務(wù)的超時(shí)值(以納秒為單位),默認(rèn)2秒 
  49. private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 
  50. //空閑超時(shí)時(shí)間,防止timer未命中 
  51. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms 
  52. //默認(rèn)備用線程數(shù) 
  53. private static final int DEFAULT_COMMON_MAX_SPARES = 256; 
  54. //阻塞前自旋的次數(shù),用在在awaitRunStateLock和awaitWork中 
  55. private static final int SPINS  = 0; 
  56. //indexSeed的增量 
  57. private static final int SEED_INCREMENT = 0x9e3779b9; 

 ForkJoinPool 的內(nèi)部狀態(tài)都是通過一個(gè)64位的 long 型 變量ctl來存儲(chǔ),它由四個(gè)16位的子域組成:

  • AC: 正在運(yùn)行工作線程數(shù)減去目標(biāo)并行度,高16位
  • TC: 總工作線程數(shù)減去目標(biāo)并行度,中高16位
  • SS: 棧頂?shù)却€程的版本計(jì)數(shù)和狀態(tài),中低16位
  • ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位

ForkJoinPool.WorkQueue 中的相關(guān)屬性:

  1. //初始隊(duì)列容量,2的冪 
  2. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 
  3. //最大隊(duì)列容量 
  4. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 
  5.  
  6. // 實(shí)例字段 
  7. volatile int scanState;    // Woker狀態(tài), <0: inactive; odd:scanning 
  8. int stackPred;             // 記錄前一個(gè)棧頂?shù)腸tl 
  9. int nsteals;               // 偷取任務(wù)數(shù) 
  10. int hint;                  // 記錄偷取者索引,初始為隨機(jī)索引 
  11. int config;                // 池索引和模式 
  12. volatile int qlock;        // 1: locked, < 0: terminate; else 0 
  13. volatile int base;         // 下一個(gè)poll操作的索引(棧底/隊(duì)列頭) 
  14. int top;                   // 一個(gè)push操作的索引(棧頂/隊(duì)列尾) 
  15. ForkJoinTask<?>[] array;   // 任務(wù)數(shù)組 
  16. final ForkJoinPool pool;   // the containing pool (may be null
  17. final ForkJoinWorkerThread owner; // 當(dāng)前工作隊(duì)列的工作線程,共享模式下為null 
  18. volatile Thread parker;    // 調(diào)用park阻塞期間為owner,其他情況為null 
  19. volatile ForkJoinTask<?> currentJoin;  // 記錄被join過來的任務(wù) 
  20. volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊(duì)列偷取過來的任務(wù) 

 內(nèi)部數(shù)據(jù)結(jié)構(gòu)

ForkJoinPool采用了哈希數(shù)組 + 雙端隊(duì)列的方式存放任務(wù),但這里的任務(wù)分為兩類:

  • 一類是通過execute、submit 提交的外部任務(wù)
  • 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務(wù)

ForkJoinPool并沒有把這兩種任務(wù)混在一個(gè)任務(wù)隊(duì)列中,對(duì)于外部任務(wù),會(huì)利用Thread內(nèi)部的隨機(jī)probe值映射到哈希數(shù)組的偶數(shù)槽位中的提交隊(duì)列中,這種提交隊(duì)列是一種數(shù)組實(shí)現(xiàn)的雙端隊(duì)列稱之為Submission Queue,專門存放外部提交的任務(wù)。

對(duì)于ForkJoinWorkerThread工作線程,每一個(gè)工作線程都分配了一個(gè)工作隊(duì)列,這也是一個(gè)雙端隊(duì)列,稱之為Work Queue,這種隊(duì)列都會(huì)被映射到哈希數(shù)組的奇數(shù)槽位,每一個(gè)工作線程fork/join分解的任務(wù)都會(huì)被添加到自己擁有的那個(gè)工作隊(duì)列中。

在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數(shù)組,其元素就是內(nèi)部類WorkQueue實(shí)現(xiàn)的基于數(shù)組的雙端隊(duì)列。該哈希數(shù)組的長度為2的冪,并且支持?jǐn)U容。如下就是該哈希數(shù)組的示意結(jié)構(gòu)圖:

如圖,提交隊(duì)列位于哈希數(shù)組workQueue的奇數(shù)索引槽位,工作線程的工作隊(duì)列位于偶數(shù)槽位。

  • 默認(rèn)情況下,asyncMode為false時(shí):因此工作線程把工作隊(duì)列當(dāng)著棧一樣使用(后進(jìn)先出),將分解的子任務(wù)推入工作隊(duì)列的top端,取任務(wù)的時(shí)候也從top端取(凡是雙端隊(duì)列都會(huì)有兩個(gè)分別指向隊(duì)列兩端的指針,這里就是圖上畫出的base和top);而當(dāng)某些工作線程的任務(wù)為空的時(shí)候,就會(huì)從其他隊(duì)列(不限于workQueue,也會(huì)是提交隊(duì)列)竊取(steal)任務(wù),如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個(gè)任務(wù),竊取任務(wù)的時(shí)候采用的是先進(jìn)先出FIFO的策略(即從base端竊取任務(wù)),這樣不但可以避免在取任務(wù)的時(shí)候與擁有其隊(duì)列的工作線程發(fā)生沖突,從而減小競爭,還可以輔助其完成比較大的任務(wù)。
  • asyncMode為true的話,擁有該工作隊(duì)列的工作線程將按照先進(jìn)先出的策略從base端取任務(wù),這一般只用于不需要返回結(jié)果的任務(wù),或者事件消息傳遞框架。

ForkJoinPool構(gòu)造函數(shù)

其完整構(gòu)造方法如下

  1. private ForkJoinPool(int parallelism, 
  2.                      ForkJoinWorkerThreadFactory factory, 
  3.                      UncaughtExceptionHandler handler, 
  4.                      int mode, 
  5.                      String workerNamePrefix) { 
  6.     this.workerNamePrefix = workerNamePrefix; 
  7.     this.factory = factory; 
  8.     this.ueh = handler; 
  9.     this.config = (parallelism & SMASK) | mode; 
  10.     long np = (long)(-parallelism); // offset ctl counts 
  11.     this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 

 重要參數(shù)解釋

  1. parallelism:并行度( the parallelism level),默認(rèn)情況下跟我們機(jī)器的cpu個(gè)數(shù)保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機(jī)器運(yùn)行時(shí)可用的CPU個(gè)數(shù)。
  2. factory:創(chuàng)建新線程的工廠( the factory for creating new threads)。默認(rèn)情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
  3. handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執(zhí)行任務(wù)時(shí)由于某些無法預(yù)料到的錯(cuò)誤而導(dǎo)致任務(wù)線程中斷時(shí)進(jìn)行一些處理,默認(rèn)情況為null。
  4. asyncMode:這個(gè)參數(shù)要注意,在ForkJoinPool中,每一個(gè)工作線程都有一個(gè)獨(dú)立的任務(wù)隊(duì)列
  • asyncMode表示工作線程內(nèi)的任務(wù)隊(duì)列是采用何種方式進(jìn)行調(diào)度,可以是先進(jìn)先出FIFO,也可以是后進(jìn)先出LIFO。如果為true,則線程池中的工作線程則使用先進(jìn)先出方式進(jìn)行任務(wù)調(diào)度,默認(rèn)情況下是false。

ForkJoinPool.submit 方法

  1. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 
  2.     if (task == null
  3.         throw new NullPointerException(); 
  4.     //提交到工作隊(duì)列 
  5.     externalPush(task); 
  6.     return task; 

ForkJoinPool 自身擁有工作隊(duì)列,這些工作隊(duì)列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務(wù),而這些工作隊(duì)列被稱為 submitting queue 。 submit() 和 fork() 其實(shí)沒有本質(zhì)區(qū)別,只是提交對(duì)象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對(duì)象,因此當(dāng)其中的任務(wù)被一個(gè)工作線程成功竊取時(shí),就意味著提交的任務(wù)真正開始進(jìn)入執(zhí)行階段。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2020-12-09 08:21:47

編程Exchanger工具

2020-11-30 16:01:03

Semaphore

2020-12-04 19:28:53

CountDownLaPhaserCyclicBarri

2020-12-03 11:15:21

CyclicBarri

2020-12-08 08:53:53

編程ThreadPoolE線程池

2017-09-19 14:53:37

Java并發(fā)編程并發(fā)代碼設(shè)計(jì)

2022-11-09 09:01:08

并發(fā)編程線程池

2020-12-10 07:00:38

編程線程池定時(shí)任務(wù)

2012-03-09 10:44:11

Java

2020-12-11 07:32:45

編程ThreadLocalJava

2020-11-13 08:42:24

Synchronize

2022-04-13 08:23:31

Golang并發(fā)

2017-01-10 13:39:57

Python線程池進(jìn)程池

2020-12-07 09:40:19

Future&Futu編程Java

2019-11-07 09:20:29

Java線程操作系統(tǒng)

2021-03-10 15:59:39

JavaSynchronize并發(fā)編程

2016-10-21 11:04:07

JavaScript異步編程原理解析

2020-07-06 08:03:32

Java悲觀鎖樂觀鎖

2025-03-20 06:48:55

性能優(yōu)化JDK

2020-11-16 08:11:32

ReentrantLo
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)