并發(fā)編程中一種經(jīng)典的分而治之的思想?。?/h1>
作者個(gè)人研發(fā)的在高并發(fā)場(chǎng)景下,提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,具有精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能。自開源半年多以來,已成功為十幾家中小型企業(yè)提供了精準(zhǔn)定時(shí)調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗(yàn)。為使更多童鞋受益,現(xiàn)給出開源框架地址:
https://github.com/sunshinelyz/mykit-delay
寫在前面
在JDK中,提供了這樣一種功能:它能夠?qū)?fù)雜的邏輯拆分成一個(gè)個(gè)簡(jiǎn)單的邏輯來并行執(zhí)行,待每個(gè)并行執(zhí)行的邏輯執(zhí)行完成后,再將各個(gè)結(jié)果進(jìn)行匯總,得出最終的結(jié)果數(shù)據(jù)。有點(diǎn)像Hadoop中的MapReduce。
ForkJoin是由JDK1.7之后提供的多線程并發(fā)處理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是將一個(gè)復(fù)雜的計(jì)算,按照設(shè)定的閾值分解成多個(gè)計(jì)算,然后將各個(gè)計(jì)算結(jié)果進(jìn)行匯總。相應(yīng)的,F(xiàn)orkJoin將復(fù)雜的計(jì)算當(dāng)做一個(gè)任務(wù),而分解的多個(gè)計(jì)算則是當(dāng)做一個(gè)個(gè)子任務(wù)來并行執(zhí)行。
Java并發(fā)編程的發(fā)展
對(duì)于Java語(yǔ)言來說,生來就支持多線程并發(fā)編程,在并發(fā)編程領(lǐng)域也是在不斷發(fā)展的。Java在其發(fā)展過程中對(duì)并發(fā)編程的支持越來越完善也正好印證了這一點(diǎn)。
- Java 1 支持thread,synchronized。
- Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
- Java 7 加入了fork-join庫(kù)。
- Java 8 加入了 parallel streams。
并發(fā)與并行
并發(fā)和并行在本質(zhì)上還是有所區(qū)別的。
并發(fā)
并發(fā)指的是在同一時(shí)刻,只有一個(gè)線程能夠獲取到CPU執(zhí)行任務(wù),而多個(gè)線程被快速的輪換執(zhí)行,這就使得在宏觀上具有多個(gè)線程同時(shí)執(zhí)行的效果,并發(fā)不是真正的同時(shí)執(zhí)行,并發(fā)可以使用下圖表示。
并行
并行指的是無(wú)論何時(shí),多個(gè)線程都是在多個(gè)CPU核心上同時(shí)執(zhí)行的,是真正的同時(shí)執(zhí)行。
分治法
基本思想
把一個(gè)規(guī)模大的問題劃分為規(guī)模較小的子問題,然后分而治之,最后合并子問題的解得到原問題的解。
步驟
①分割原問題;
②求解子問題;
③合并子問題的解為原問題的解。
我們可以使用如下偽代碼來表示這個(gè)步驟。
- if(任務(wù)很?。﹞
- 直接計(jì)算得到結(jié)果
- }else{
- 分拆成N個(gè)子任務(wù)
- 調(diào)用子任務(wù)的fork()進(jìn)行計(jì)算
- 調(diào)用子任務(wù)的join()合并計(jì)算結(jié)果
- }
在分治法中,子問題一般是相互獨(dú)立的,因此,經(jīng)常通過遞歸調(diào)用算法來求解子問題。
典型應(yīng)用
- 二分搜索
- 大整數(shù)乘法
- Strassen矩陣乘法
- 棋盤覆蓋
- 合并排序
- 快速排序
- 線性時(shí)間選擇
- 漢諾塔
ForkJoin并行處理框架
ForkJoin框架概述
Java 1.7 引入了一種新的并發(fā)框架—— Fork/Join Framework,主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù)。
ForkJoin框架的本質(zhì)是一個(gè)用于并行執(zhí)行任務(wù)的框架, 能夠把一個(gè)大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)的計(jì)算結(jié)果。在Java中,F(xiàn)orkJoin框架與ThreadPool共存,并不是要替換ThreadPool
其實(shí),在Java 8中引入的并行流計(jì)算,內(nèi)部就是采用的ForkJoinPool來實(shí)現(xiàn)的。例如,下面使用并行流實(shí)現(xiàn)打印數(shù)組元組的程序。
- public class SumArray {
- public static void main(String[] args){
- List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
- numberList.parallelStream().forEach(System.out::println);
- }
- }
這段代碼的背后就使用到了ForkJoinPool。
說到這里,可能有讀者會(huì)問:可以使用線程池的ThreadPoolExecutor來實(shí)現(xiàn)啊?為什么要使用ForkJoinPool啊?ForkJoinPool是個(gè)什么鬼啊?! 接下來,我們就來回答這個(gè)問題。
ForkJoin框架原理
ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實(shí)現(xiàn)了Executor和ExecutorService接口。它使用了一個(gè)無(wú)限隊(duì)列來保存需要執(zhí)行的任務(wù),而線程的數(shù)量則是通過構(gòu)造函數(shù)傳入,如果沒有向構(gòu)造函數(shù)中傳入指定的線程數(shù)量,那么當(dāng)前計(jì)算機(jī)可用的CPU數(shù)量會(huì)被設(shè)置為線程數(shù)量作為默認(rèn)值。
ForkJoinPool主要使用 分治法(Divide-and-Conquer Algorithm) 來解決問題。典型的應(yīng)用比如快速排序算法。這里的要點(diǎn)在于,F(xiàn)orkJoinPool能夠使用相對(duì)較少的線程來處理大量的任務(wù)。比如要對(duì)1000萬(wàn)個(gè)數(shù)據(jù)進(jìn)行排序,那么會(huì)將這個(gè)任務(wù)分割成兩個(gè)500萬(wàn)的排序任務(wù)和一個(gè)針對(duì)這兩組500萬(wàn)數(shù)據(jù)的合并任務(wù)。以此類推,對(duì)于500萬(wàn)的數(shù)據(jù)也會(huì)做出同樣的分割處理,到最后會(huì)設(shè)置一個(gè)閾值來規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時(shí),停止這樣的分割處理。比如,當(dāng)元素的數(shù)量小于10時(shí),會(huì)停止分割,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M(jìn)行排序。那么到最后,所有的任務(wù)加起來會(huì)有大概200萬(wàn)+個(gè)。問題的關(guān)鍵在于,對(duì)于一個(gè)任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后,它才能夠被執(zhí)行。
所以當(dāng)使用ThreadPoolExecutor時(shí),使用分治法會(huì)存在問題,因?yàn)門hreadPoolExecutor中的線程無(wú)法向任務(wù)隊(duì)列中再添加一個(gè)任務(wù)并在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool就能夠解決這個(gè)問題,它就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時(shí)線程就能夠從隊(duì)列中選擇子任務(wù)執(zhí)行。
那么使用ThreadPoolExecutor或者ForkJoinPool,性能上會(huì)有什么差異呢?
首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù),比如使用4個(gè)線程來完成超過200萬(wàn)個(gè)任務(wù)。但是,使用ThreadPoolExecutor時(shí),是不可能完成的,因?yàn)門hreadPoolExecutor中的Thread無(wú)法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬(wàn)個(gè)具有父子關(guān)系的任務(wù)時(shí),也需要200萬(wàn)個(gè)線程,很顯然這是不可行的,也是很不合理的!!
工作竊取算法
假如我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng),于是把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng),比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。而在這時(shí)它們會(huì)訪問同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。
工作竊取算法的優(yōu)點(diǎn):充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競(jìng)爭(zhēng)。
工作竊取算法的缺點(diǎn):在某些情況下還是存在競(jìng)爭(zhēng),比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且該算法會(huì)消耗更多的系統(tǒng)資源,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。
Fork/Join框架局限性:
對(duì)于Fork/Join框架而言,當(dāng)一個(gè)任務(wù)正在等待它使用Join操作創(chuàng)建的子任務(wù)結(jié)束時(shí),執(zhí)行這個(gè)任務(wù)的工作線程查找其他未被執(zhí)行的任務(wù),并開始執(zhí)行這些未被執(zhí)行的任務(wù),通過這種方式,線程充分利用它們的運(yùn)行時(shí)間來提高應(yīng)用程序的性能。為了實(shí)現(xiàn)這個(gè)目標(biāo),F(xiàn)ork/Join框架執(zhí)行的任務(wù)有一些局限性。
(1)任務(wù)只能使用Fork和Join操作來進(jìn)行同步機(jī)制,如果使用了其他同步機(jī)制,則在同步操作時(shí),工作線程就不能執(zhí)行其他任務(wù)了。比如,在Fork/Join框架中,使任務(wù)進(jìn)行了睡眠,那么,在睡眠期間內(nèi),正在執(zhí)行這個(gè)任務(wù)的工作線程將不會(huì)執(zhí)行其他任務(wù)了。(2)在Fork/Join框架中,所拆分的任務(wù)不應(yīng)該去執(zhí)行IO操作,比如:讀寫數(shù)據(jù)文件。(3)任務(wù)不能拋出檢查異常,必須通過必要的代碼來出來這些異常。
ForkJoin框架的實(shí)現(xiàn)
ForkJoin框架中一些重要的類如下所示。
ForkJoinPool 框架中涉及的主要類如下所示。
1.ForkJoinPool類
實(shí)現(xiàn)了ForkJoin框架中的線程池,由類圖可以看出,F(xiàn)orkJoinPool類實(shí)現(xiàn)了線程池的Executor接口。
我們也可以從下圖中看出ForkJoinPool的類圖關(guān)系。
其中,可以使用Executors.newWorkStealPool()方法創(chuàng)建ForkJoinPool。
ForkJoinPool中提供了如下提交任務(wù)的方法。
- public void execute(ForkJoinTask<?> task)
- public void execute(Runnable task)
- public <T> T invoke(ForkJoinTask<T> task)
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
- public <T> ForkJoinTask<T> submit(Callable<T> task)
- public <T> ForkJoinTask<T> submit(Runnable task, T result)
- public ForkJoinTask<?> submit(Runnable task)
2.ForkJoinWorkerThread類
實(shí)現(xiàn)ForkJoin框架中的線程。
3.ForkJoinTask類
ForkJoinTask封裝了數(shù)據(jù)及其相應(yīng)的計(jì)算,并且支持細(xì)粒度的數(shù)據(jù)并行。ForkJoinTask比線程要輕量,F(xiàn)orkJoinPool中少量工作線程能夠運(yùn)行大量的ForkJoinTask。
ForkJoinTask類中主要包括兩個(gè)方法fork()和join(),分別實(shí)現(xiàn)任務(wù)的分拆與合并。
fork()方法類似于Thread.start(),但是它并不立即執(zhí)行任務(wù),而是將任務(wù)放入工作隊(duì)列中。跟Thread.join()方法不同,F(xiàn)orkJoinTask的join()方法并不簡(jiǎn)單的阻塞線程,而是利用工作線程運(yùn)行其他任務(wù),當(dāng)一個(gè)工作線程中調(diào)用join(),它將處理其他任務(wù),直到注意到目標(biāo)子任務(wù)已經(jīng)完成。
我們可以使用下圖來表示這個(gè)過程。
ForkJoinTask有3個(gè)子類:
- RecursiveAction:無(wú)返回值的任務(wù)。
- RecursiveTask:有返回值的任務(wù)。
- CountedCompleter:完成任務(wù)后將觸發(fā)其他任務(wù)。
4.RecursiveTask類
有返回結(jié)果的ForkJoinTask實(shí)現(xiàn)Callable。
5.RecursiveAction類
無(wú)返回結(jié)果的ForkJoinTask實(shí)現(xiàn)Runnable。
6.CountedCompleter類
在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行一個(gè)自定義的鉤子函數(shù)。
ForkJoin示例程序
- package io.binghe.concurrency.example.aqs;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.Future;
- import java.util.concurrent.RecursiveTask;
- @Slf4j
- public class ForkJoinTaskExample extends RecursiveTask<Integer> {
- public static final int threshold = 2;
- private int start;
- private int end;
- public ForkJoinTaskExample(int start, int end) {
- this.start = start;
- this.end = end;
- }
- @Override
- protected Integer compute() {
- int sum = 0;
- //如果任務(wù)足夠小就計(jì)算任務(wù)
- boolean canCompute = (end - start) <= threshold;
- if (canCompute) {
- for (int i = start; i <= end; i++) {
- sum += i;
- }
- } else {
- // 如果任務(wù)大于閾值,就分裂成兩個(gè)子任務(wù)計(jì)算
- int middle = (start + end) / 2;
- ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
- ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
- // 執(zhí)行子任務(wù)
- leftTask.fork();
- rightTask.fork();
- // 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果
- int leftResult = leftTask.join();
- int rightResult = rightTask.join();
- // 合并子任務(wù)
- sum = leftResult + rightResult;
- }
- return sum;
- }
- public static void main(String[] args) {
- ForkJoinPool forkjoinPool = new ForkJoinPool();
- //生成一個(gè)計(jì)算任務(wù),計(jì)算1+2+3+4
- ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
- //執(zhí)行一個(gè)任務(wù)
- Future<Integer> result = forkjoinPool.submit(task);
- try {
- log.info("result:{}", result.get());
- } catch (Exception e) {
- log.error("exception", e);
- }
- }
- }
本文轉(zhuǎn)載自微信公眾號(hào)「冰河技術(shù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系冰河技術(shù)公眾號(hào)。