JDK 7中的 Fork/Join模式
介 紹
隨著多核芯片逐漸成為主流,大多數(shù)軟件開發(fā)人員不可避免地需要了解并行編程的知識。而同時(shí),主流程序語言正在將越來越多的并行特性合并到標(biāo)準(zhǔn)庫或者語言本身之中。我們可以看到,JDK 在這方面同樣走在潮流的前方。在 JDK 標(biāo)準(zhǔn)版 5 中,由 Doug Lea 提供的并行框架成為了標(biāo)準(zhǔn)庫的一部分(JSR-166)。隨后,在 JDK 6 中,一些新的并行特性,例如并行 collection 框架,合并到了標(biāo)準(zhǔn)庫中(JSR-166x)。直到今天,盡管 Java SE 7 還沒有正式發(fā)布,一些并行相關(guān)的新特性已經(jīng)出現(xiàn)在 JSR-166y 中:
1.Fork/Join 模式;
2.TransferQueue,它繼承自 BlockingQueue 并能在隊(duì)列滿時(shí)阻塞“生產(chǎn)者”;
3.ArrayTasks/ListTasks,用于并行執(zhí)行某些數(shù)組/列表相關(guān)任務(wù)的類;
4.IntTasks/LongTasks/DoubleTasks,用于并行處理數(shù)字類型數(shù)組的工具類,提供了排序、查找、求和、求最小值、求最大值等功能;
其中,對 Fork/Join 模式的支持可能是對開發(fā)并行軟件來說最通用的新特性。在 JSR-166y 中,Doug Lea 實(shí)現(xiàn)ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 時(shí)就大量的用到了 Fork/Join 模式。讀者還需要注意一點(diǎn),因?yàn)?JDK 7 還沒有正式發(fā)布,因此本文涉及到的功能和發(fā)布版本有可能不一樣。
Fork/Join 模式有自己的適用范圍。如果一個(gè)應(yīng)用能被分解成多個(gè)子任務(wù),并且組合多個(gè)子任務(wù)的結(jié)果就能夠獲得最終的答案,那么這個(gè)應(yīng)用就適合用 Fork/Join 模式來解決。圖 1 給出了一個(gè) Fork/Join 模式的示意圖,位于圖上部的 Task 依賴于位于其下的 Task 的執(zhí)行,只有當(dāng)所有的子任務(wù)都完成之后,調(diào)用者才能獲得 Task 0 的返回結(jié)果。
圖 1. Fork/Join 模式示意圖
可以說,F(xiàn)ork/Join 模式能夠解決很多種類的并行問題。通過使用 Doug Lea 提供的 Fork/Join 框架,軟件開發(fā)人員只需要關(guān)注任務(wù)的劃分和中間結(jié)果的組合就能充分利用并行平臺的優(yōu)良性能。其他和并行相關(guān)的諸多難于處理的問題,例如負(fù)載平衡、同步等,都可以由框架采用統(tǒng)一的方式解決。這樣,我們就能夠輕松地獲得并行的好處而避免了并行編程的困難且容易出錯(cuò)的缺點(diǎn)。
使用 Fork/Join 模式
在開始嘗試 Fork/Join 模式之前,我們需要從 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下載 JSR-166y 的源代碼,并且我們還需要安裝最新版本的 JDK 6(下載網(wǎng)址請參閱 參考資源)。Fork/Join 模式的使用方式非常直觀。首先,我們需要編寫一個(gè) ForkJoinTask 來完成子任務(wù)的分割、中間結(jié)果的合并等工作。隨后,我們將這個(gè) ForkJoinTask 交給 ForkJoinPool 來完成應(yīng)用的執(zhí)行。
通常我們并不直接繼承 ForkJoinTask,它包含了太多的抽象方法。針對特定的問題,我們可以選擇 ForkJoinTask 的不同子類來完成任務(wù)。RecursiveAction 是 ForkJoinTask 的一個(gè)子類,它代表了一類最簡單的 ForkJoinTask:不需要返回值,當(dāng)子任務(wù)都執(zhí)行完畢之后,不需要進(jìn)行中間結(jié)果的組合。如果我們從 RecursiveAction 開始繼承,那么我們只需要重載 protected void compute() 方法。下面,我們來看看怎么為快速排序算法建立一個(gè) ForkJoinTask 的子類:
清單 1. ForkJoinTask 的子類
- classSortTaskextendsRecursiveAction{
- finallong[]array;
- finalintlo;
- finalinthi;
- privateintTHRESHOLD=30;
- publicSortTask(long[]array){
- this.array=array;
- this.lo=0;
- this.hi=array.length-1;
- }
- publicSortTask(long[]array,intlo,inthi){
- this.array=array;
- this.lo=lo;
- this.hi=hi;
- }
- protectedvoidcompute(){
- if(hi-lo<THRESHOLD)
- sequentiallySort(array,lo,hi);
- else{
- intpivot=partition(array,lo,hi);
- coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,
- pivot+1,hi));
- }
- }
- privateintpartition(long[]array,intlo,inthi){
- longx=array[hi];
- inti=lo-1;
- for(intj=lo;j<hi;j++){
- if(array[j]<=x){
- i++;
- swap(array,i,j);
- }
- }
- swap(array,i+1,hi);
- returni+1;
- }
- privatevoidswap(long[]array,inti,intj){
- if(i!=j){
- longtemp=array[i];
- array[i]=array[j];
- array[j]=temp;
- }
- }
- privatevoidsequentiallySort(long[]array,intlo,inthi){
- Arrays.sort(array,lo,hi+1);
- }
- }
在清單1中,SortTask 首先通過 partition() 方法將數(shù)組分成兩個(gè)部分。隨后,兩個(gè)子任務(wù)將被生成并分別排序數(shù)組的兩個(gè)部分。當(dāng)子任務(wù)足夠小時(shí),再將其分割為更小的任務(wù)反而引起性能的降低。因此,這里我們使用一個(gè) THRESHOLD,限定在子任務(wù)規(guī)模較小時(shí),使用直接排序,而不是再將其分割成為更小的任務(wù)。其中,我們用到了 RecursiveAction 提供的方法 coInvoke()。它表示:啟動所有的任務(wù),并在所有任務(wù)都正常結(jié)束后返回。如果其中一個(gè)任務(wù)出現(xiàn)異常,則其它所有的任務(wù)都取消。coInvoke() 的參數(shù)還可以是任務(wù)的數(shù)組。
現(xiàn)在剩下的工作就是將 SortTask 提交到 ForkJoinPool 了。ForkJoinPool() 默認(rèn)建立具有與 CPU 可使用線程數(shù)相等線程個(gè)數(shù)的線程池。我們在一個(gè) JUnit 的 test 方法中將 SortTask 提交給一個(gè)新建的 ForkJoinPool:
清單 2. 新建的 ForkJoinPool
- @Test
- publicvoidtestSort()throwsException{
- ForkJoinTasksort=newSortTask(array);
- ForkJoinPoolfjpool=newForkJoinPool();
- fjpool.submit(sort);
- fjpool.shutdown();
- fjpool.awaitTermination(30,TimeUnit.SECONDS);
- assertTrue(checkSorted(array));
- }
在上面的代碼中,我們用到了 ForkJoinPool 提供的如下函數(shù):
1. submit():將 ForkJoinTask 類的對象提交給 ForkJoinPool,F(xiàn)orkJoinPool 將立刻開始執(zhí)行 ForkJoinTask。
2. shutdown():執(zhí)行此方法之后,F(xiàn)orkJoinPool 不再接受新的任務(wù),但是已經(jīng)提交的任務(wù)可以繼續(xù)執(zhí)行。如果希望立刻停止所有的任務(wù),可以嘗試 shutdownNow() 方法。
3. awaitTermination():阻塞當(dāng)前線程直到 ForkJoinPool 中所有的任務(wù)都執(zhí)行結(jié)束。
并行快速排序的完整代碼如下所示:
清單 3. 并行快速排序的完整代碼
- packagetests;
- importstaticorg.junit.Assert.*;
- importjava.util.Arrays;
- importjava.util.Random;
- importjava.util.concurrent.TimeUnit;
- importjsr166y.forkjoin.ForkJoinPool;
- importjsr166y.forkjoin.ForkJoinTask;
- importjsr166y.forkjoin.RecursiveAction;
- importorg.junit.Before;
- importorg.junit.Test;
- classSortTaskextendsRecursiveAction{
- finallong[]array;
- finalintlo;
- finalinthi;
- privateintTHRESHOLD=0;//Fordemoonly
- publicSortTask(long[]array){
- this.array=array;
- this.lo=0;
- this.hi=array.length-1;
- }
- publicSortTask(long[]array,intlo,inthi){
- this.array=array;
- this.lo=lo;
- this.hi=hi;
- }
- protectedvoidcompute(){
- if(hi-lo<THRESHOLD)
- sequentiallySort(array,lo,hi);
- else{
- intpivot=partition(array,lo,hi);
- System.out.println(" pivot="+pivot+",low="+lo+",high="+hi);
- System.out.println("array"+Arrays.toString(array));
- coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,
- pivot+1,hi));
- }
- }
- privateintpartition(long[]array,intlo,inthi){
- longx=array[hi];
- inti=lo-1;
- for(intj=lo;j<hi;j++){
- if(array[j]<=x){
- i++;
- swap(array,i,j);
- }
- }
- swap(array,i+1,hi);
- returni+1;
- }
- privatevoidswap(long[]array,inti,intj){
- if(i!=j){
- longtemp=array[i];
- array[i]=array[j];
- array[j]=temp;
- }
- }
- privatevoidsequentiallySort(long[]array,intlo,inthi){
- Arrays.sort(array,lo,hi+1);
- }
- }
- publicclassTestForkJoinSimple{
- privatestaticfinalintNARRAY=16;//Fordemoonly
- long[]array=newlong[NARRAY];
- Randomrand=newRandom();
- @Before
- publicvoidsetUp(){
- for(inti=0;i<array.length;i++){
- array[i]=rand.nextLong()%100;//Fordemoonly
- }
- System.out.println("InitialArray:"+Arrays.toString(array));
- }
- @Test
- publicvoidtestSort()throwsException{
- ForkJoinTasksort=newSortTask(array);
- ForkJoinPoolfjpool=newForkJoinPool();
- fjpool.submit(sort);
- fjpool.shutdown();
- fjpool.awaitTermination(30,TimeUnit.SECONDS);
- assertTrue(checkSorted(array));
- }
- booleancheckSorted(long[]a){
- for(inti=0;i<a.length-1;i++){
- if(a[i]>(a[i+1])){
- returnfalse;
- }
- }
- returntrue;
- }
- }
運(yùn)行以上代碼,我們可以得到以下結(jié)果:
- InitialArray:[46,-12,74,-67,76,-13,-91,-96]
- pivot=0,low=0,high=7
- array[-96,-12,74,-67,76,-13,-91,46]
- pivot=5,low=1,high=7
- array[-96,-12,-67,-13,-91,46,76,74]
- pivot=1,low=1,high=4
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=4,low=2,high=4
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=3,low=2,high=3
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=2,low=2,high=2
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=6,low=6,high=7
- array[-96,-91,-67,-13,-12,46,74,76]
- pivot=7,low=7,high=7
- array[-96,-91,-67,-13,-12,46,74,76]
#p#
Fork/Join 模式高級特性
使用 RecursiveTask
除了 RecursiveAction,F(xiàn)ork/Join 框架還提供了其他 ForkJoinTask 子類:帶有返回值的 RecursiveTask,使用 finish() 方法顯式中止的 AsyncAction 和 LinkedAsyncAction,以及可使用 TaskBarrier 為每個(gè)任務(wù)設(shè)置不同中止條件的 CyclicAction。
從 RecursiveTask 繼承的子類同樣需要重載 protected void compute() 方法。與 RecursiveAction 稍有不同的是,它可使用泛型指定一個(gè)返回值的類型。下面,我們來看看如何使用 RecursiveTask 的子類。
清單 4. RecursiveTask 的子類
- classFibonacciextendsRecursiveTask<Integer>{
- finalintn;
- Fibonacci(intn){
- this.n=n;
- }
- privateintcompute(intsmall){
- finalint[]results={1,1,2,3,5,8,13,21,34,55,89};
- returnresults[small];
- }
- publicIntegercompute(){
- if(n<=10){
- returncompute(n);
- }
- Fibonaccif1=newFibonacci(n-1);
- Fibonaccif2=newFibonacci(n-2);
- f1.fork();
- f2.fork();
- returnf1.join()+f2.join();
- }
- }
在清單4 中,F(xiàn)ibonacci 的返回值為 Integer 類型。其 compute() 函數(shù)首先建立兩個(gè)子任務(wù),啟動子任務(wù)執(zhí)行,阻塞以等待子任務(wù)的結(jié)果返回,相加后得到最終結(jié)果。同樣,當(dāng)子任務(wù)足夠小時(shí),通過查表得到其結(jié)果,以減小因過多地分割任務(wù)引起的性能降低。其中,我們用到了 RecursiveTask 提供的方法 fork() 和 join()。它們分別表示:子任務(wù)的異步執(zhí)行和阻塞等待結(jié)果完成。
現(xiàn)在剩下的工作就是將 Fibonacci 提交到 ForkJoinPool 了,我們在一個(gè) JUnit 的 test 方法中作了如下處理:
清單 5. 將 Fibonacci 提交到 ForkJoinPool
- @Test
- publicvoidtestFibonacci()throwsInterruptedException,ExecutionException{
- ForkJoinTask<Integer>fjt=newFibonacci(45);
- ForkJoinPoolfjpool=newForkJoinPool();
- Future<Integer>result=fjpool.submit(fjt);
- //dosomething
- System.out.println(result.get());
- }
使用 CyclicAction 來處理循環(huán)任務(wù)
CyclicAction 的用法稍微復(fù)雜一些。如果一個(gè)復(fù)雜任務(wù)需要幾個(gè)線程協(xié)作完成,并且線程之間需要在某個(gè)點(diǎn)等待所有其他線程到達(dá),那么我們就能方便的用 CyclicAction 和 TaskBarrier 來完成。圖 2 描述了使用 CyclicAction 和 TaskBarrier 的一個(gè)典型場景。
圖 2. 使用 CyclicAction 和 TaskBarrier 執(zhí)行多線程任務(wù)
繼承自 CyclicAction 的子類需要 TaskBarrier 為每個(gè)任務(wù)設(shè)置不同的中止條件。從 CyclicAction 繼承的子類需要重載 protected void compute() 方法,定義在 barrier 的每個(gè)步驟需要執(zhí)行的動作。compute() 方法將被反復(fù)執(zhí)行直到 barrier 的 isTerminated() 方法返回 True。TaskBarrier 的行為類似于 CyclicBarrier。下面,我們來看看如何使用 CyclicAction 的子類。
清單 6. 使用 CyclicAction 的子類
- classConcurrentPrintextendsRecursiveAction{
- protectedvoidcompute(){
- TaskBarrierb=newTaskBarrier(){
- protectedbooleanterminate(intcycle,intregisteredParties){
- System.out.println("Cycleis"+cycle+";"
- +registeredParties+"parties");
- returncycle>=10;
- }
- };
- intn=3;
- CyclicAction[]actions=newCyclicAction[n];
- for(inti=0;i<n;++i){
- finalintindex=i;
- actions[i]=newCyclicAction(b){
- protectedvoidcompute(){
- System.out.println("I'mworking"+getCycle()+""
- +index);
- try{
- Thread.sleep(500);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- }
- };
- }
- for(inti=0;i<n;++i)
- actions[i].fork();
- for(inti=0;i<n;++i)
- actions[i].join();
- }
- }
在清單6中,CyclicAction[] 數(shù)組建立了三個(gè)任務(wù),打印各自的工作次數(shù)和序號。而在 b.terminate() 方法中,我們設(shè)置的中止條件表示重復(fù) 10 次計(jì)算后中止?,F(xiàn)在剩下的工作就是將 ConcurrentPrint 提交到 ForkJoinPool 了。我們可以在 ForkJoinPool 的構(gòu)造函數(shù)中指定需要的線程數(shù)目,例如 ForkJoinPool(4) 就表明線程池包含 4 個(gè)線程。我們在一個(gè) JUnit 的 test 方法中運(yùn)行 ConcurrentPrint 的這個(gè)循環(huán)任務(wù):
清單 7. 運(yùn)行 ConcurrentPrint 循環(huán)任務(wù)
- @Test
- publicvoidtestBarrier()throwsInterruptedException,ExecutionException{
- ForkJoinTaskfjt=newConcurrentPrint();
- ForkJoinPoolfjpool=newForkJoinPool(4);
- fjpool.submit(fjt);
- fjpool.shutdown();
- }
RecursiveTask 和 CyclicAction 兩個(gè)例子的完整代碼如下所示:
清單 8. RecursiveTask 和 CyclicAction 兩個(gè)例子的完整代碼
- packagetests;
- importjava.util.concurrent.ExecutionException;
- importjava.util.concurrent.Future;
- importjsr166y.forkjoin.CyclicAction;
- importjsr166y.forkjoin.ForkJoinPool;
- importjsr166y.forkjoin.ForkJoinTask;
- importjsr166y.forkjoin.RecursiveAction;
- importjsr166y.forkjoin.RecursiveTask;
- importjsr166y.forkjoin.TaskBarrier;
- importorg.junit.Test;
- classFibonacciextendsRecursiveTask<Integer>{
- finalintn;
- Fibonacci(intn){
- this.n=n;
- }
- privateintcompute(intsmall){
- finalint[]results={1,1,2,3,5,8,13,21,34,55,89};
- returnresults[small];
- }
- publicIntegercompute(){
- if(n<=10){
- returncompute(n);
- }
- Fibonaccif1=newFibonacci(n-1);
- Fibonaccif2=newFibonacci(n-2);
- System.out.println("forknewthreadfor"+(n-1));
- f1.fork();
- System.out.println("forknewthreadfor"+(n-2));
- f2.fork();
- returnf1.join()+f2.join();
- }
- }
- classConcurrentPrintextendsRecursiveAction{
- protectedvoidcompute(){
- TaskBarrierb=newTaskBarrier(){
- protectedbooleanterminate(intcycle,intregisteredParties){
- System.out.println("Cycleis"+cycle+";"
- +registeredParties+"parties");
- returncycle>=10;
- }
- };
- intn=3;
- CyclicAction[]actions=newCyclicAction[n];
- for(inti=0;i<n;++i){
- finalintindex=i;
- actions[i]=newCyclicAction(b){
- protectedvoidcompute(){
- System.out.println("I'mworking"+getCycle()+""
- +index);
- try{
- Thread.sleep(500);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- }
- };
- }
- for(inti=0;i<n;++i)
- actions[i].fork();
- for(inti=0;i<n;++i)
- actions[i].join();
- }
- }
- publicclassTestForkJoin{
- @Test
- publicvoidtestBarrier()throwsInterruptedException,ExecutionException{
- System.out.println(" testingTaskBarrier...");
- ForkJoinTaskfjt=newConcurrentPrint();
- ForkJoinPoolfjpool=newForkJoinPool(4);
- fjpool.submit(fjt);
- fjpool.shutdown();
- }
- @Test
- publicvoidtestFibonacci()throwsInterruptedException,ExecutionException{
- System.out.println(" testingFibonacci...");
- finalintnum=14;//Fordemoonly
- ForkJoinTask<Integer>fjt=newFibonacci(num);
- ForkJoinPoolfjpool=newForkJoinPool();
- Future<Integer>result=fjpool.submit(fjt);
- //dosomething
- System.out.println("Fibonacci("+num+")="+result.get());
- }
- }
運(yùn)行以上代碼,我們可以得到以下結(jié)果:
- testingTaskBarrier...
- I'mworking02
- I'mworking00
- I'mworking01
- Cycleis0;3parties
- I'mworking12
- I'mworking10
- I'mworking11
- Cycleis1;3parties
- I'mworking20
- I'mworking21
- I'mworking22
- Cycleis2;3parties
- I'mworking30
- I'mworking32
- I'mworking31
- Cycleis3;3parties
- I'mworking42
- I'mworking40
- I'mworking41
- Cycleis4;3parties
- I'mworking51
- I'mworking50
- I'mworking52
- Cycleis5;3parties
- I'mworking60
- I'mworking62
- I'mworking61
- Cycleis6;3parties
- I'mworking72
- I'mworking70
- I'mworking71
- Cycleis7;3parties
- I'mworking81
- I'mworking80
- I'mworking82
- Cycleis8;3parties
- I'mworking90
- I'mworking92
- testingFibonacci...
- forknewthreadfor13
- forknewthreadfor12
- forknewthreadfor11
- forknewthreadfor10
- forknewthreadfor12
- forknewthreadfor11
- forknewthreadfor10
- forknewthreadfor9
- forknewthreadfor10
- forknewthreadfor9
- forknewthreadfor11
- forknewthreadfor10
- forknewthreadfor10
- forknewthreadfor9
- Fibonacci(14)=610
結(jié) 論
從以上的例子中可以看到,通過使用 Fork/Join 模式,軟件開發(fā)人員能夠方便地利用多核平臺的計(jì)算能力。盡管還沒有做到對軟件開發(fā)人員完全透明,F(xiàn)ork/Join 模式已經(jīng)極大地簡化了編寫并發(fā)程序的瑣碎工作。對于符合 Fork/Join 模式的應(yīng)用,軟件開發(fā)人員不再需要處理各種并行相關(guān)事務(wù),例如同步、通信等,以難以調(diào)試而聞名的死鎖和 data race 等錯(cuò)誤也就不會出現(xiàn),提升了思考問題的層次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,僅僅關(guān)注如何劃分任務(wù)和組合中間結(jié)果,將剩下的事情丟給 Fork/Join 框架。
在實(shí)際工作中利用 Fork/Join 模式,可以充分享受多核平臺為應(yīng)用帶來的免費(fèi)午餐。
參考資料
◆ 閱讀文章“The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software”:了解為什么從現(xiàn)在開始每個(gè)嚴(yán)肅的軟件工作者都應(yīng)該了解并行編程方法。
◆ 閱讀 Doug Lea 的文章“A Java Fork/Join Framework”:了解 Fork/Join 模式的實(shí)現(xiàn)機(jī)制和執(zhí)行性能。
◆ 閱讀 developerWorks 文章“馴服 Tiger:并發(fā)集合”:了解如何使用并行 Collection 庫。
◆ 閱讀 developerWorks 文章“Java 理論與實(shí)踐:非阻塞算法簡介”:介紹了 JDK 5 在并行方面的重要增強(qiáng)以及在 JDK5 平臺上如何實(shí)現(xiàn)非阻塞算法的一般介紹。
◆ 書籍“Java Concurrency in Practice”:介紹了大量的并行編程技巧、反模式、可行的解決方案等,它對于 JDK 5 中的新特性也有詳盡的介紹。
獲得產(chǎn)品和技術(shù)
◆ 訪問 Doug Lea 的 JSR 166 站點(diǎn)獲得最新的源代碼。
◆ 從 Sun 公司 網(wǎng)站下載 Java SE 6。
原文鏈接:http://zhangziyangup.iteye.com/blog/1324592
【編輯推薦】
- 利用JavaMail API 解析MIME
- 詳細(xì)解析Java中抽象類和接口的區(qū)別
- 解讀Java環(huán)境變量配置
- Java精確截取字符串
- Cinch和Sysmon發(fā)布 Java輔助開發(fā)工具