自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

ForkJoinPool:大任務(wù)拆分,讓并行嗨起來(lái)

開(kāi)發(fā) 前端
在使用ForkJoinPool時(shí),需要特別注意任務(wù)的類型是否為純函數(shù)計(jì)算類型,也就是這些任務(wù)不應(yīng)該關(guān)心狀態(tài)或者外界的變化,這樣才是最安全的做法。如果是阻塞類型任務(wù),那么你需要謹(jǐn)慎評(píng)估技術(shù)方案。

線程池ThreadPoolExecutor,它通過(guò)對(duì)任務(wù)隊(duì)列和線程的有效管理實(shí)現(xiàn)了對(duì)并發(fā)任務(wù)的處理。然而,ThreadPoolExecutor有兩個(gè)明顯的缺點(diǎn):一是無(wú)法對(duì)大任務(wù)進(jìn)行拆分,對(duì)于某個(gè)任務(wù)只能由單線程執(zhí)行;二是工作線程從隊(duì)列中獲取任務(wù)時(shí)存在競(jìng)爭(zhēng)情況。這兩個(gè)缺點(diǎn)都會(huì)影響任務(wù)的執(zhí)行效率,要知道高并發(fā)場(chǎng)景中的每一毫秒都彌足珍貴。

針對(duì)這兩個(gè)問(wèn)題,本文即將介紹的ForkJoinPool給出了可選的答案。在本文中,我們將首先從分治算法開(kāi)始介紹,接著體驗(yàn)ForkJoinPool中自定義任務(wù)的實(shí)現(xiàn),最后再深入到Java中去理解ForkJoinPool的原理和用法。

一、分治算法與Fork/Join模式

在并發(fā)計(jì)算中,F(xiàn)ork/Join模式往往用于對(duì)大任務(wù)的并行計(jì)算,它通過(guò)遞歸的方式對(duì)任務(wù)不斷地拆解,再將結(jié)果進(jìn)行合并。如果從其思想上看,F(xiàn)ork/Join并不復(fù)雜,其本質(zhì)是分治算法(Divide-and-Conquer) 的應(yīng)用。

分治算法的基本思想是將一個(gè)規(guī)模為N的問(wèn)題分解為K個(gè)規(guī)模較小的子問(wèn)題,這些子問(wèn)題相互獨(dú)立且與原問(wèn)題性質(zhì)相同。求出子問(wèn)題的解,就可得到原問(wèn)題的解。分治算法的步驟如下:

(1)分解:將要解決的問(wèn)題劃分成若干規(guī)模較小的同類問(wèn)題;

(2)求解:當(dāng)子問(wèn)題劃分得足夠小時(shí),用較簡(jiǎn)單的方法解決;

(3)合并:按原問(wèn)題的要求,將子問(wèn)題的解逐層合并構(gòu)成原問(wèn)題的解。

在并發(fā)計(jì)算中,F(xiàn)ork/Join模式往往用于對(duì)大任務(wù)的并行計(jì)算,它通過(guò)遞歸的方式對(duì)任務(wù)不斷地拆解,再將結(jié)果進(jìn)行合并。如果從其思想上看,F(xiàn)ork/Join并不復(fù)雜,其本質(zhì)是分治算法(Divide-and-Conquer) 的應(yīng)用。

分治算法的基本思想是將一個(gè)規(guī)模為N的問(wèn)題分解為K個(gè)規(guī)模較小的子問(wèn)題,這些子問(wèn)題相互獨(dú)立且與原問(wèn)題性質(zhì)相同。求出子問(wèn)題的解,就可得到原問(wèn)題的解。分治算法的步驟如下:

(1)分解:將要解決的問(wèn)題劃分成若干規(guī)模較小的同類問(wèn)題;

(2)求解:當(dāng)子問(wèn)題劃分得足夠小時(shí),用較簡(jiǎn)單的方法解決;

(3)合并:按原問(wèn)題的要求,將子問(wèn)題的解逐層合并構(gòu)成原問(wèn)題的解。

圖片圖片

Fork/Join對(duì)任務(wù)的拆分和對(duì)結(jié)果合并過(guò)程也是如此,可以用下面?zhèn)未a來(lái)表示:

solve(problem):
    if problem is small enough:
        // 如果任務(wù)足夠小,執(zhí)行任務(wù)
        solve problem directly (sequential algorithm)
    else:
        // 拆分任務(wù)
        for part in subdivide(problem)
            fork subtask to solve(part)
        // 合并結(jié)果
        join all subtasks spawned in previous loop
        return combined results

所以,理解Fork/Join模型和ForkJoinPool線程池,首先要理解其背后的算法的目的和思想,因?yàn)楹笪乃斒龅腇orkJoinPool不過(guò)只是這種算法的一種的實(shí)現(xiàn)和應(yīng)用。

二、Fork/Join應(yīng)用場(chǎng)景與體驗(yàn)

按照思想->實(shí)現(xiàn)->源碼的思路,在了解了Fork/Join思想之后,我們先通過(guò)一個(gè)場(chǎng)景手工實(shí)現(xiàn)一個(gè)RecursiveTask,這樣可以更好地體驗(yàn)Fork/Join的用法。

場(chǎng)景:給定兩個(gè)自然數(shù),計(jì)算兩個(gè)數(shù)之間的總和。比如1~n之間的和:1+2+3+…+n

為了解決這個(gè)問(wèn)題,我們創(chuàng)建了TheKingRecursiveSumTask這個(gè)核心類,它繼承于RecursiveTask。RecursiveTask是ForkJoinPool中的一種任務(wù)類型,你暫且不必深入了解它,后文會(huì)有詳細(xì)描述。TheKingRecursiveSumTask中定義了任務(wù)計(jì)算的起止范圍(sumBegin和sumEnd)和拆分閾值(threshold),以及核心計(jì)算邏輯compute()。

public class TheKingRecursiveSumTask extends RecursiveTask<Long> {
    private static final AtomicInteger taskCount = new AtomicInteger();
    private final int sumBegin;
    private final int sumEnd;
    /**
     * 任務(wù)拆分閾值,當(dāng)任務(wù)尺寸大于該值時(shí),進(jìn)行拆分
     */
    private final int threshold;

    public TheKingRecursiveSumTask(int sumBegin, int sumEnd, int threshold) {
        this.sumBegin = sumBegin;
        this.sumEnd = sumEnd;
        this.threshold = threshold;
    }

    @Override
    protected Long compute() {
        if ((sumEnd - sumBegin) > threshold) {
            // 兩個(gè)數(shù)之間的差值大于閾值,拆分任務(wù)
            TheKingRecursiveSumTask subTask1 = new TheKingRecursiveSumTask(sumBegin, (sumBegin + sumEnd) / 2, threshold);
            TheKingRecursiveSumTask subTask2 = new TheKingRecursiveSumTask((sumBegin + sumEnd) / 2, sumEnd, threshold);
            subTask1.fork();
            subTask2.fork();
            taskCount.incrementAndGet();
            return subTask1.join() + subTask2.join();
        }
        // 直接執(zhí)行結(jié)果
        long result = 0L;
        for (int i = sumBegin; i < sumEnd; i++) {
            result += i;
        }
        return result;
    }

    public static AtomicInteger getTaskCount() {
        return taskCount;
    }
}

在下面的代碼中,我們?cè)O(shè)置的計(jì)算區(qū)間值0~10000000,當(dāng)計(jì)算的個(gè)數(shù)超過(guò)100時(shí),將對(duì)任務(wù)進(jìn)行拆分,最大并發(fā)數(shù)設(shè)置為16.

public static void main(String[] args) {
     int sumBegin = 0, sumEnd = 10000000;
     computeByForkJoin(sumBegin, sumEnd);
     computeBySingleThread(sumBegin, sumEnd);
 }

 private static void computeByForkJoin(int sumBegin, int sumEnd) {
     ForkJoinPool forkJoinPool = new ForkJoinPool(16);
     long forkJoinStartTime = System.nanoTime();
     TheKingRecursiveSumTask theKingRecursiveSumTask = new TheKingRecursiveSumTask(sumBegin, sumEnd, 100);
     long forkJoinResult = forkJoinPool.invoke(theKingRecursiveSumTask);
     System.out.println("======");
     System.out.println("ForkJoin任務(wù)拆分:" + TheKingRecursiveSumTask.getTaskCount());
     System.out.println("ForkJoin計(jì)算結(jié)果:" + forkJoinResult);
     System.out.println("ForkJoin計(jì)算耗時(shí):" + (System.nanoTime() - forkJoinStartTime) / 1000000);
 }

 private static void computeBySingleThread(int sumBegin, int sumEnd) {
     long computeResult = 0 L;
     long startTime = System.nanoTime();
     for (int i = sumBegin; i < sumEnd; i++) {
         computeResult += i;
     }
     System.out.println("======");
     System.out.println("單線程計(jì)算結(jié)果:" + computeResult);
     System.out.println("單線程計(jì)算耗時(shí):" + (System.nanoTime() - startTime) / 1000000);
 }

運(yùn)行結(jié)果如下:

======
ForkJoin任務(wù)拆分:131071
ForkJoin計(jì)算結(jié)果:49999995000000
ForkJoin計(jì)算耗時(shí):207
======
單線程計(jì)算結(jié)果:49999995000000
單線程計(jì)算耗時(shí):40

Process finished with exit code 0

從計(jì)算結(jié)果中可以看到,F(xiàn)orkJoinPool總共進(jìn)行了131071次的任務(wù)拆分,最終的計(jì)算結(jié)果是49999995000000,耗時(shí)207毫秒。不過(guò),細(xì)心的你可能已經(jīng)發(fā)現(xiàn)了,F(xiàn)orkJoin的并行計(jì)算的耗時(shí)竟然比單程程還慢?并且足足慢了近5倍!先別慌,關(guān)于ForkJoin的性能問(wèn)題,我們會(huì)在后文有講解。

三、ForkJoinPool設(shè)計(jì)與源碼分析

在Java中,F(xiàn)orkJoinPool是Fork/Join模型的實(shí)現(xiàn),于Java7引入并在Java8中廣泛應(yīng)用。ForkJoinPool允許其他線程向它提交任務(wù),并根據(jù)設(shè)定將這些任務(wù)拆分為粒度更細(xì)的子任務(wù),這些子任務(wù)將由ForkJoinPool內(nèi)部的工作線程來(lái)并行執(zhí)行,并且工作線程之間可以竊取彼此之間的任務(wù)。

在接口實(shí)現(xiàn)和繼承關(guān)系上,F(xiàn)orkJoinPool和ThreadPoolExecutor類似,都實(shí)現(xiàn)了Executor和ExecutorService接口,并繼承了AbstractExecutorService抽類。而在任務(wù)類型上,F(xiàn)orkJoinPool主要有兩種任務(wù)類型:RecursiveAction和RecursiveTask,它們繼承于ForkJoinTask. 相關(guān)關(guān)系如下圖所示:

圖片圖片

解讀ForkJoinPool的源碼并不容易,雖然它的思想較為簡(jiǎn)單,但在實(shí)現(xiàn)上要考慮的顯然更多,加上部分代碼可讀性一般,所以講解它的全部源碼是不現(xiàn)實(shí)的,當(dāng)然也是沒(méi)必要的。在下文中,我們將主要介紹其核心的任務(wù)提交和執(zhí)行相關(guān)的部分源碼,其他源碼有興趣的可以自行閱讀。

1. 構(gòu)造ForkJoinPool的幾種不同方式

ForkJoinPool中有四個(gè)核心參數(shù),用于控制線程池的并行數(shù)、工作線程的創(chuàng)建、異常處理和模式指定等。各參數(shù)解釋如下:

  • int parallelism:指定并行級(jí)別(parallelism level)。ForkJoinPool將根據(jù)這個(gè)設(shè)定,決定工作線程的數(shù)量。如果未設(shè)置的話,將使用Runtime.getRuntime().availableProcessors()來(lái)設(shè)置并行級(jí)別;
  • ForkJoinWorkerThreadFactory factory:ForkJoinPool在創(chuàng)建線程時(shí),會(huì)通過(guò)factory來(lái)創(chuàng)建。注意,這里需要實(shí)現(xiàn)的是ForkJoinWorkerThreadFactory,而不是ThreadFactory. 如果你不指定factory,那么將由默認(rèn)的DefaultForkJoinWorkerThreadFactory負(fù)責(zé)線程的創(chuàng)建工作;
  • UncaughtExceptionHandler handler:指定異常處理器,當(dāng)任務(wù)在運(yùn)行中出錯(cuò)時(shí),將由設(shè)定的處理器處理;
  • boolean asyncMode:從名字上看,你可能會(huì)覺(jué)得它是異步模式設(shè)置,但其實(shí)是設(shè)置隊(duì)列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE. 當(dāng)asyncMode為true時(shí),將使用先進(jìn)先出隊(duì)列,而為false時(shí)則使用后進(jìn)先出的模式。

圍繞上面的四個(gè)核心參數(shù),F(xiàn)orkJoinPool提供了三種構(gòu)造方式,使用時(shí)你可以根據(jù)需要選擇其中的一種。

方式一:默認(rèn)無(wú)參構(gòu)造

在該構(gòu)造方式中,你無(wú)需設(shè)定任何參數(shù)。ForkJoinPool將根據(jù)當(dāng)前處理器數(shù)量來(lái)設(shè)置并行數(shù)量,并使用默認(rèn)的線程構(gòu)造工廠。不推薦。

public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
 }

方式二:通過(guò)并行數(shù)構(gòu)造

在該構(gòu)造方式中,你可以指定并行數(shù)量,以更有效地平衡處理器數(shù)量和負(fù)載。建議在設(shè)置時(shí),并行級(jí)別應(yīng)低于當(dāng)前處理器的數(shù)量。

public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
 }

方式三:自定義全部參數(shù)構(gòu)造

以上兩種構(gòu)造方式都是基于這種構(gòu)造,它允許你配置所有的核心參數(shù)。為了更有效地管理ForkJoinPool,建議你使用這種構(gòu)造方式。

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
 }

2. 按類型提交不同任務(wù)

任務(wù)提交是ForkJoinPool的核心能力之一,在提交任務(wù)時(shí)你有三種選擇,如下面表格所示:


從非fork/join線程調(diào)用

從fork/join調(diào)用

提交異步執(zhí)行

execute(ForkJoinTask)

ForkJoinTask.fork()

等待并獲取結(jié)果

invoke(ForkJoinTask)

ForkJoinTask.invoke()

提交執(zhí)行獲取Future結(jié)果

submit(ForkJoinTask)

ForkJoinTask.fork() (ForkJoinTasks are Futures)

第一類核心方法:invoke

invoke類型的方法接受ForkJoinTask類型的任務(wù),并在任務(wù)執(zhí)行結(jié)束后,返回泛型結(jié)果。如果提交的任務(wù)是null,將拋出空指針異常。

public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
 }

第二類核心方法:execute

execute類型的方法在提交任務(wù)后,不會(huì)返回結(jié)果。另外要注意的是,F(xiàn)orkJoinPool不僅允許提交ForkJoinTask類型任務(wù),還允許提交Callable或Runnable任務(wù),因此你可以像使用現(xiàn)有Executors一樣使用ForkJoinPool。

當(dāng)然,Callable或Runnable類型任務(wù)時(shí),將會(huì)轉(zhuǎn)換為ForkJoinTask類型,具體可以查看任務(wù)提交的相關(guān)源碼。那么,這類任務(wù)和直接提交ForkJoinTask任務(wù)有什么區(qū)別呢?還是有的。區(qū)別在于,由于任務(wù)是不可切分的,所以這類任務(wù)無(wú)法獲得任務(wù)拆分這方面的效益,不過(guò)仍然可以獲得任務(wù)竊取帶來(lái)的好處和性能提升。

public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
 }

public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
            job = (ForkJoinTask<?>) task;
        else
            job = new ForkJoinTask.RunnableExecuteAction(task);
        externalPush(job);
 }

第三類核心方法:submit

submit類型的方法支持三種類型的任務(wù)提交:ForkJoinTask類型、Callable類型和Runnable類型。在提交任務(wù)后,將返回ForkJoinTask類型的結(jié)果。如果提交的任務(wù)是null,將拋出空指針異常,并且當(dāng)任務(wù)不能按計(jì)劃執(zhí)行的話,將拋出任務(wù)拒絕異常。

public < T > ForkJoinTask < T > submit(ForkJoinTask < T > task) {
       if (task == null)
           throw new NullPointerException();
       externalPush(task);
       return task;
   }

   public < T > ForkJoinTask < T > submit(Callable < T > task) {
       ForkJoinTask < T > job = new ForkJoinTask.AdaptedCallable < T > (task);
       externalPush(job);
       return job;
   }

   public < T > ForkJoinTask < T > submit(Runnable task, T result) {
       ForkJoinTask < T > job = new ForkJoinTask.AdaptedRunnable < T > (task, result);
       externalPush(job);
       return job;
   }

   public ForkJoinTask < ? > submit(Runnable task) {
       if (task == null)
           throw new NullPointerException();
       ForkJoinTask < ? > job;
       if (task instanceof ForkJoinTask < ? > ) // avoid re-wrap
           job = (ForkJoinTask < ? > ) task;
       else
           job = new ForkJoinTask.AdaptedRunnableAction(task);
       externalPush(job);
       return job;
   }

3. ForkJoinTask

ForkJoinTask是ForkJoinPool的核心之一,它是任務(wù)的實(shí)際載體,定義了任務(wù)執(zhí)行時(shí)的具體邏輯和拆分邏輯,本文前面的示例代碼就是通過(guò)繼承它實(shí)現(xiàn)。作為一個(gè)抽象類,F(xiàn)orkJoinTask的行為有點(diǎn)類似于線程,但它更為輕量,因?yàn)樗痪S護(hù)自己的運(yùn)行時(shí)堆棧或程序計(jì)數(shù)器等。

在類的設(shè)計(jì)上,F(xiàn)orkJoinTask繼承了Future接口,所以也可以將其看作是輕量級(jí)的Future,它們之間的關(guān)系如下圖所示。

(1)fork與join

fork()/join()是ForkJoinTask甚至是ForkJoinPool的核心方法,承載著主要的任務(wù)協(xié)調(diào)作用,一個(gè)用于任務(wù)提交,一個(gè)用于結(jié)果獲取。

fork-提交任務(wù)

fork()方法用于向當(dāng)前任務(wù)所運(yùn)行的線程池中提交任務(wù),比如上文示例代碼中的subTask1.fork(). 注意,不同于其他線程池的寫(xiě)法,任務(wù)提交由任務(wù)自己通過(guò)調(diào)用fork()完成,對(duì)此不要感覺(jué)詫異,fork()內(nèi)部會(huì)將任務(wù)與當(dāng)前線程進(jìn)行關(guān)聯(lián)。

從源碼中看,如果當(dāng)前線程是ForkJoinWorkerThread類型,將會(huì)放入該線程的任務(wù)隊(duì)列,否則放入common線程池的任務(wù)隊(duì)列中。關(guān)于common線程池,后續(xù)會(huì)有介紹。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join-獲取任務(wù)執(zhí)行結(jié)果

前面,你已經(jīng)知道可以通過(guò)fork()提交任務(wù)。那么現(xiàn)在,你則可以通過(guò)join()方法獲取任務(wù)的執(zhí)行結(jié)果。調(diào)用join()時(shí),將阻塞當(dāng)前線程直到對(duì)應(yīng)的子任務(wù)完成運(yùn)行并返回結(jié)果。從源碼看,join()的核心邏輯由doJoin()負(fù)責(zé)。doJoin()雖然很短,但可讀性較差,閱讀時(shí)稍微忍一下。

public final V join() {
    int s;
    // 如果調(diào)用doJoin返回的非NORMAL狀態(tài),將報(bào)告異常
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    // 正常執(zhí)行結(jié)束,返回原始結(jié)果
    return getRawResult();
}

private int doJoin() {
    int s;
    Thread t;
    ForkJoinWorkerThread wt;
    ForkJoinPool.WorkQueue w;
    //如果已完成,返回狀態(tài)
    return (s = status) < 0 ? s :
     //如果未完成且當(dāng)前線程是ForkJoinWorkerThread,則從該線程中取出workQueue,并嘗試將當(dāng)前task取出執(zhí)行。如果執(zhí)行的結(jié)果是完成,則返回狀態(tài);否則,使用當(dāng)前線程池awaitJoin方法進(jìn)行等待
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread) t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0 L):
     //當(dāng)前線程非ForkJoinWorkerThread,調(diào)用externalAwaitDone方法等待
        externalAwaitDone();
}

final int doExec() {
    int s;
    boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        // 執(zhí)行完成后,將狀態(tài)設(shè)置為NORMAL
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

(2)RecursiveAction與RecursiveTask

圖片圖片

在ForkJoinPool中,常用的有兩種任務(wù)類型:返回結(jié)果的和不返回結(jié)果的,這方面和ThreadPoolExecutor等線程池是一致的,對(duì)應(yīng)的兩個(gè)類分別是:RecursiveAction和RecursiveTask. 從類圖中可以看到,它們均繼承于ForkJoinTask.

RecursiveAction:無(wú)結(jié)果返回

RecursiveAction用于遞歸執(zhí)行但不需要返回結(jié)果的任務(wù),比如下面的排序就是它的典型應(yīng)用場(chǎng)景。在使用RecursiveAction時(shí),你需要繼承并實(shí)現(xiàn)它的核心方法compute().

static class SortTask extends RecursiveAction {
    final long[] array;
    final int lo, hi;
    SortTask(long[] array, int lo, int hi) {
        this.array = array;
        this.lo = lo;
        this.hi = hi;
    }
    SortTask(long[] array) {
        this(array, 0, array.length);
    }
    // 核心計(jì)算方法
    protected void compute() {
        if (hi - lo < THRESHOLD)
            // 直接執(zhí)行
            sortSequentially(lo, hi);
        else {
            // 拆分任務(wù)
            int mid = (lo + hi) >>> 1;
            invokeAll(new SortTask(array, lo, mid),
                new SortTask(array, mid, hi));
            merge(lo, mid, hi);
        }
    }
    // implementation details follow:
    static final int THRESHOLD = 1000;
    void sortSequentially(int lo, int hi) {
        Arrays.sort(array, lo, hi);
    }
    void merge(int lo, int mid, int hi) {
        long[] buf = Arrays.copyOfRange(array, lo, mid);
        for (int i = 0, j = lo, k = mid; i < buf.length; j++)
            array[j] = (k == hi || buf[i] < array[k]) ?
            buf[i++] : array[k++];
    }
}

RecursiveTask:返回結(jié)果

RecursiveTask用于遞歸執(zhí)行需要返回結(jié)果的任務(wù),比如前面示例代碼中的求和或下面這段求斐波拉契數(shù)列求和都是它的典型應(yīng)用場(chǎng)景。在使用RecursiveTask時(shí),你也需要繼承并實(shí)現(xiàn)它的核心方法compute().

class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
       return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();
   }
 }

(3)ForkJoinTask使用限制

雖然在某些場(chǎng)景下,F(xiàn)orkJoinTask可以通過(guò)任務(wù)拆解的方式提高執(zhí)行效率,但是需要注意的是它并非適合所有的場(chǎng)景。ForkJoinTask在使用時(shí)需要謹(jǐn)記一些限制,違背這些限制可能會(huì)適得其反甚至引來(lái)災(zāi)難。

為什么這么說(shuō)呢?

這是因?yàn)?,F(xiàn)orkJoinTask最適合用于純粹的計(jì)算任務(wù),也就是純函數(shù)計(jì)算,計(jì)算過(guò)程中的對(duì)象都是獨(dú)立的,對(duì)外部沒(méi)有依賴。你可以想象,如果大量的任務(wù)或被拆分的子任務(wù)之間彼此依賴或?qū)ν獠看嬖趪?yán)重阻塞依賴,那將是怎樣的畫(huà)面...用千絲萬(wàn)縷來(lái)形容也不為過(guò),外部依賴會(huì)帶來(lái)任務(wù)執(zhí)行和問(wèn)題排查方面的雙重不確定性。

所以,在理想情況下,提交到ForkJoinPool中的任務(wù)應(yīng)避免執(zhí)行阻塞I/O,以免出現(xiàn)不可控的意外情況。當(dāng)然,這也并非是絕對(duì)的,在必要時(shí)你也可以定義和使用可阻塞的ForkJoinTask,只不過(guò)你需要付出更多的代價(jià)和考慮,使用時(shí)應(yīng)當(dāng)慎之又慎,本文對(duì)此不作敘述。

4. 工作隊(duì)列與任務(wù)竊取

前面已經(jīng)說(shuō)到,F(xiàn)orkJoinPool與ThreadPoolExecutor有個(gè)很大的不同之處在于,F(xiàn)orkJoinPool存在引入了任務(wù)竊取設(shè)計(jì),它是其性能保證的關(guān)鍵之一。

關(guān)于任務(wù)竊取,簡(jiǎn)單來(lái)說(shuō),就是允許空閑線程從繁忙線程的雙端隊(duì)列中竊取任務(wù)。默認(rèn)情況下,工作線程從它自己的雙端隊(duì)列的頭部獲取任務(wù)。但是,當(dāng)自己的任務(wù)為空時(shí),線程會(huì)從其他繁忙線程雙端隊(duì)列的尾部中獲取任務(wù)。這種方法,最大限度地減少了線程競(jìng)爭(zhēng)任務(wù)的可能性。

ForkJoinPool的大部分操作都發(fā)生在工作竊取隊(duì)列(work-stealing queues ) 中,該隊(duì)列由內(nèi)部類WorkQueue實(shí)現(xiàn)。其實(shí),這個(gè)隊(duì)列也不是什么神奇之物,它是Deques的特殊形式,但僅支持三種操作方式:push、pop和poll(也稱為竊?。.?dāng)然,在ForkJoinPool中,隊(duì)列的讀取有著嚴(yán)格的約束,push和pop僅能從其所屬線程調(diào)用,而poll則可以從其他線程調(diào)用。換句話說(shuō),前兩個(gè)方法是留給自己用的,而第三種方法則是為了方便別人來(lái)竊取任務(wù)用的。任務(wù)竊取的相關(guān)過(guò)程,可以用下面這幅圖來(lái)表示,這幅圖建議你收藏:

圖片圖片

看到這里,不知你是否會(huì)有疑問(wèn):為什么工作線程總是從自己的頭部獲取任務(wù)?為什么要這樣設(shè)計(jì)?首先處理隊(duì)列中等待時(shí)間較長(zhǎng)的任務(wù)難道不是更有意義嗎?

答案當(dāng)然不會(huì)是“更有意義”。這樣做的主要原因是為了提高性能,通過(guò)始終選擇最近提交的任務(wù),可以增加資源仍分配在CPU緩存中的機(jī)會(huì),這樣CPU處理起來(lái)要快一些。而竊取者之所以從尾部獲取任務(wù),則是為了降低線程之間的競(jìng)爭(zhēng)可能,畢竟大家都從一個(gè)部分拿任務(wù),競(jìng)爭(zhēng)的可能要大很多。

此外,這樣的設(shè)計(jì)還有一種考慮。由于任務(wù)是可分割的,那隊(duì)列中較舊的任務(wù)最有可能粒度較大,因?yàn)樗鼈兛赡苓€沒(méi)有被分割,而空閑的線程則相對(duì)更有“精力”來(lái)完成這些粒度較大的任務(wù)。

5. ForkJoinPool監(jiān)控

對(duì)于一個(gè)復(fù)雜框架來(lái)說(shuō),實(shí)時(shí)地了解ForkJoinPool的內(nèi)部狀態(tài)是十分必要的。因此,F(xiàn)orkJoinPool提供了一些常用方法。通過(guò)這些方法,你可以了解當(dāng)前的工作線程、任務(wù)處理等情況。

(1)獲取運(yùn)行狀態(tài)的線程總數(shù)

public int getRunningThreadCount() {
    int rc = 0;
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 1; i < ws.length; i += 2) {
            if ((w = ws[i]) != null && w.isApparentlyUnblocked())
                ++rc;
        }
    }
    return rc;
}

(2)獲取活躍線程數(shù)量

public int getActiveThreadCount() {
    int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
    return (r <= 0) ? 0 : r; // suppress momentarily negative values
}

(3)判斷ForkJoinPool是否空閑

public boolean isQuiescent() {
    return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}

(4)獲取任務(wù)竊取數(shù)量

public long getStealCount() {
    AtomicLong sc = stealCounter;
    long count = (sc == null) ? 0 L : sc.get();
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 1; i < ws.length; i += 2) {
            if ((w = ws[i]) != null)
                count += w.nsteals;
        }
    }
    return count;
}

(5)獲取隊(duì)列中的任務(wù)數(shù)量

public long getQueuedTaskCount() {
    long count = 0;
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 1; i < ws.length; i += 2) {
            if ((w = ws[i]) != null)
                count += w.queueSize();
        }
    }
    return count;
}

(6)獲取已提交的任務(wù)數(shù)量

public int getQueuedSubmissionCount() {
    int count = 0;
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 0; i < ws.length; i += 2) {
            if ((w = ws[i]) != null)
                count += w.queueSize();
        }
    }
    return count;
}

四、警惕ForkJoinPool#commonPool

在上文中所示的源碼中,你可能已經(jīng)在多處注意到commonPool的存在。在ForkJoinPool中,commonPool是一個(gè)共享的、靜態(tài)的線程池,并且在實(shí)際使用時(shí)才會(huì)進(jìn)行懶加載,Java8中的CompletableFuture和并行流(Parallel Streams)用的就是它。不過(guò),使用CompletableFuture時(shí)你可以指定自己的線程池,但是并行流在使用時(shí)卻不可以,這也是我們要警惕的地方。

為什么這么說(shuō)呢?ForkJoinPool中的commonPool設(shè)計(jì)初衷是為了降低線程池的重復(fù)創(chuàng)建,讓一些任務(wù)共用同一個(gè)線程池,畢竟創(chuàng)建線程池和創(chuàng)建線程都是昂貴的。然而,凡事都有兩面性,commonPool在某些場(chǎng)景下確實(shí)可以達(dá)到線程池復(fù)用的目的,但是,如果你決定與別人分享自己空間,那么當(dāng)你想使用它的時(shí)候,它可能不再完全屬于你。也就是說(shuō),當(dāng)你想用commonPool時(shí),它可能已經(jīng)其他任務(wù)填滿了。

提交到ForkJoinPool中的任務(wù)一般有兩類:計(jì)算類型和阻塞類型。考慮一個(gè)場(chǎng)景,應(yīng)用中多處都在使用這個(gè)共享線程池,有人在某處做了個(gè)不當(dāng)操作,比如往池子里丟入了阻塞型任務(wù),那么結(jié)果會(huì)怎樣?結(jié)果當(dāng)然是,整個(gè)線程池都有可能被阻塞!如此,整個(gè)應(yīng)用都面臨著被拖垮的風(fēng)險(xiǎn)??吹竭@里,對(duì)于Java8中的并行流的使用,你就應(yīng)該高度警惕了。

那怎么避免這種情況發(fā)生呢?答案是盡量避免使用commonPool,并且在需要運(yùn)行阻塞任務(wù)時(shí),應(yīng)當(dāng)創(chuàng)建獨(dú)立的線程池,和系統(tǒng)的其他部分保持隔離,以免風(fēng)險(xiǎn)擴(kuò)散。

五、ForkJoinPool性能評(píng)估

為了測(cè)試ForkJoinPool的性能,我做了一組簡(jiǎn)單的、非正式實(shí)驗(yàn)。實(shí)驗(yàn)分三組進(jìn)行,為了盡可能讓每組的數(shù)據(jù)客觀,每組實(shí)驗(yàn)均運(yùn)行5次,取最后的平均數(shù)。

  • 實(shí)驗(yàn)代碼:本文第一部分的示例代碼;
  • 實(shí)驗(yàn)環(huán)境:Mac;
  • JDK版本:8;
  • 任務(wù)分隔閾值:100

實(shí)驗(yàn)結(jié)果如下方表格所示:

從實(shí)驗(yàn)結(jié)果(0表示不到1毫秒)來(lái)看,F(xiàn)orkJoinPool的性能竟然不如單線程的效率高!這樣的結(jié)果,似乎很驚喜、很意外...然而,為什么會(huì)這樣?

不要驚訝,之所以會(huì)出現(xiàn)這個(gè)令你匪夷所思的結(jié)果,其原因在于任務(wù)拆分的粒度過(guò)小!在上面的測(cè)試中,任務(wù)拆分閾值僅為100,導(dǎo)致Fork/Join在計(jì)算時(shí)出現(xiàn)大量的任務(wù)拆分動(dòng)作,也就是任務(wù)分的太細(xì),大量的任務(wù)拆分和管理也是需要額外成本的。

以0~1000000求和為例,當(dāng)把閾值從100調(diào)整為100000時(shí),其結(jié)果結(jié)果如下??梢钥吹剑現(xiàn)ork/Join的優(yōu)勢(shì)就體現(xiàn)出來(lái)了。

======
ForkJoin任務(wù)拆分:16383
ForkJoin計(jì)算結(jié)果:499999999500000000
ForkJoin計(jì)算耗時(shí):143
======
單線程計(jì)算結(jié)果:499999999500000000
單線程計(jì)算耗時(shí):410

那么,問(wèn)題又來(lái)了,哪些因素會(huì)影響Fork/Join的性能呢?

根據(jù)經(jīng)驗(yàn)和實(shí)驗(yàn),任務(wù)總數(shù)、單任務(wù)執(zhí)行耗時(shí)以及并行數(shù)都會(huì)影響到性能。所以,當(dāng)你使用Fork/Join框架時(shí),你需要謹(jǐn)慎評(píng)估這三個(gè)指標(biāo),最好能通過(guò)模擬對(duì)比評(píng)估,不要憑感覺(jué)冒然在生產(chǎn)環(huán)境使用。

小結(jié)

以上就是關(guān)于ForkJoinPool的全部?jī)?nèi)容。Fork/Join是一種基于分治算法的模型,在并發(fā)處理計(jì)算型任務(wù)時(shí)有著顯著的優(yōu)勢(shì)。其效率的提升主要得益于兩個(gè)方面:

  • 任務(wù)切分:將大的任務(wù)分割成更小粒度的小任務(wù),讓更多的線程參與執(zhí)行;
  • 任務(wù)竊?。和ㄟ^(guò)任務(wù)竊取,充分地利用空閑線程,并減少競(jìng)爭(zhēng)。

在使用ForkJoinPool時(shí),需要特別注意任務(wù)的類型是否為純函數(shù)計(jì)算類型,也就是這些任務(wù)不應(yīng)該關(guān)心狀態(tài)或者外界的變化,這樣才是最安全的做法。如果是阻塞類型任務(wù),那么你需要謹(jǐn)慎評(píng)估技術(shù)方案。雖然ForkJoinPool也能處理阻塞類型任務(wù),但可能會(huì)帶來(lái)復(fù)雜的管理成本。

而在性能方面,要認(rèn)識(shí)到Fork/Join的性能并不是開(kāi)箱即來(lái),而是需要你去評(píng)估和驗(yàn)證一些重要指標(biāo),通過(guò)數(shù)據(jù)對(duì)比得出最佳結(jié)論。

此外,F(xiàn)orkJoinPool雖然提供了commonPool,但出于潛在的風(fēng)險(xiǎn)考慮,不推薦使用或謹(jǐn)慎使用。

責(zé)任編輯:武曉燕 來(lái)源: 一安未來(lái)
相關(guān)推薦

2015-04-21 15:05:32

海霖

2024-01-25 08:40:12

線程雙異步Java8

2019-11-25 10:12:59

Python技巧工具

2014-11-28 09:42:23

程序員

2019-12-19 14:07:33

IT運(yùn)營(yíng)CIO安全

2013-06-18 10:21:43

云計(jì)算云服務(wù)公共云服務(wù)

2013-12-09 10:38:08

程序員任務(wù)

2016-06-27 15:55:15

移動(dòng)

2012-04-06 10:31:44

Java

2011-05-24 15:29:05

程序CC++

2012-07-01 03:23:31

JBuilder

2014-02-12 13:43:50

代碼并行任務(wù)

2009-07-22 14:56:50

ERPVPNVPN加速

2009-10-20 11:12:26

綜合布線系統(tǒng)

2024-10-31 16:42:41

2009-03-13 10:54:18

SQL Server并行查詢數(shù)據(jù)庫(kù)管理

2009-03-10 19:03:11

Linux圖形環(huán)境桌面

2017-04-13 19:20:18

Python代碼并行任務(wù)

2018-05-11 11:00:11

LinuxWindows系統(tǒng)調(diào)用

2023-12-05 07:54:18

Java 7ThreadPool
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)