淺談Java的Fork/Join并發(fā)框架
1. Fork/Join是什么
Oracle的官方給出的定義是:Fork/Join框架是一個(gè)實(shí)現(xiàn)了ExecutorService接口的多線程處理器。它可以把一個(gè)大的任務(wù)劃分為若干個(gè)小的任務(wù)并發(fā)執(zhí)行,充分利用可用的資源,進(jìn)而提高應(yīng)用的執(zhí)行效率。
Fork/Join實(shí)現(xiàn)了ExecutorService,所以它的任務(wù)也需要放在線程池中執(zhí)行。它的不同在于它使用了工作竊取算法,空閑的線程可以從滿負(fù)荷的線程中竊取任務(wù)來幫忙執(zhí)行。(我個(gè)人理解的工作竊取大意就是:由于線程池中的每個(gè)線程都有一個(gè)隊(duì)列,而且線程間互不影響。那么線程每次都從自己的任務(wù)隊(duì)列的頭部獲取一個(gè)任務(wù)出來執(zhí)行。如果某個(gè)時(shí)候一個(gè)線程的任務(wù)隊(duì)列空了,而其余的線程任務(wù)隊(duì)列中還有任務(wù),那么這個(gè)線程就會(huì)從其他線程的任務(wù)隊(duì)列中取一個(gè)任務(wù)出來幫忙執(zhí)行。就像偷取了其他人的工作一樣)
Fork/Join框架的核心是繼承了AbstractExecutorService的ForkJoinPool類,它保證了工作竊取算法和ForkJoinTask的正常工作。
下面是引用Oracle官方定義的原文:
The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.
As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.
2. Fork/Join的基本用法
(1)Fork/Join基類
上文已經(jīng)提到,F(xiàn)ork/Join就是要講一個(gè)大的任務(wù)分割成若干小的任務(wù),所以***步當(dāng)然是要做任務(wù)的分割,大致方式如下:
- if (這個(gè)任務(wù)足夠小){
- 執(zhí)行要做的任務(wù)
- } else {
- 將任務(wù)分割成兩小部分
- 執(zhí)行兩小部分并等待執(zhí)行結(jié)果
- }
要實(shí)現(xiàn)FrokJoinTask我們需要一個(gè)繼承了RecursiveTask或RecursiveAction的基類,并根據(jù)自身業(yè)務(wù)情況將上面的代碼放入基類的coupute方法中。RecursiveTask和RecursiveAction都繼承了FrokJoinTask,它倆的區(qū)別就是RecursiveTask有返回值而RecursiveAction沒有。下面是我做的一個(gè)選出字符串列表中還有"a"的元素的Demo:
- @Override
- protected List<String> compute() {
- // 當(dāng)end與start之間的差小于閾值時(shí),開始進(jìn)行實(shí)際篩選
- if (end - this.start < threshold) {
- List<String> temp = list.subList(this.start, end);
- return temp.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList());
- } else {
- // 如果當(dāng)end與start之間的差大于閾值時(shí)
- // 將大任務(wù)分解成兩個(gè)小任務(wù)。
- int middle = (this.start + end) / 2;
- ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold);
- ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold);
- // 并行執(zhí)行兩個(gè)“小任務(wù)”
- left.fork();
- right.fork();
- // 把兩個(gè)“小任務(wù)”的結(jié)果合并起來
- List<String> join = left.join();
- join.addAll(right.join());
- return join;
- }
- }
(2)執(zhí)行類
做好了基類就可以開始調(diào)用了,調(diào)用時(shí)首先我們需要Fork/Join線程池ForkJoinPool,然后向線程池中提交一個(gè)ForkJoinTask并得到結(jié)果。ForkJoinPool的submit方法的入?yún)⑹且粋€(gè)ForkJoinTask,返回值也是一個(gè)ForkJoinTask,它提供一個(gè)get方法可以獲取到執(zhí)行結(jié)果。
代碼如下:
- ForkJoinPool pool = new ForkJoinPool();
- // 提交可分解的ForkJoinTask任務(wù)
- ForkJoinTask<List<String>> future = pool.submit(forkJoinService);
- System.out.println(future.get());
- // 關(guān)閉線程池
- pool.shutdown();
就這樣我們就完成了一個(gè)簡(jiǎn)單的Fork/Join的開發(fā)。
提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封裝的方法也都用到了Fork/Join。(細(xì)心的讀者可能注意到我在Fork/Join中也有用到stream,所以其實(shí)這個(gè)Fork/Join是多余的,因?yàn)閟tream已經(jīng)實(shí)現(xiàn)了Fork/Join,不過這只是一個(gè)Demo展示,沒有任何實(shí)際用處也就無所謂了)
引用官方原文:
One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems.
Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release.
附完整代碼以便以后參考:
1. 定義抽象類(用于拓展,此例中沒有實(shí)際作用,可以不定義此類):
- import java.util.concurrent.RecursiveTask;
- /**
- * Description: ForkJoin接口
- * Designer: jack
- * Date: 2017/8/3
- * Version: 1.0.0
- */
- public abstract class ForkJoinService<T> extends RecursiveTask<T>{
- @Override
- protected abstract T compute();
- }
2. 定義基類
- import java.util.List;
- import java.util.stream.Collectors;
- /**
- * Description: ForkJoin基類
- * Designer: jack
- * Date: 2017/8/3
- * Version: 1.0.0
- */
- public class ForkJoinTest extends ForkJoinService<List<String>> {
- private static ForkJoinTest forkJoinTest;
- private int threshold; //閾值
- private List<String> list; //待拆分List
- private ForkJoinTest(List<String> list, int threshold) {
- this.list = list;
- this.threshold = threshold;
- }
- @Override
- protected List<String> compute() {
- // 當(dāng)end與start之間的差小于閾值時(shí),開始進(jìn)行實(shí)際篩選
- if (list.size() < threshold) {
- return list.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList());
- } else {
- // 如果當(dāng)end與start之間的差大于閾值時(shí),將大任務(wù)分解成兩個(gè)小任務(wù)。
- int middle = list.size() / 2;
- List<String> leftList = list.subList(0, middle);
- List<String> rightList = list.subList(middle, list.size());
- ForkJoinTest left = new ForkJoinTest(leftList, threshold);
- ForkJoinTest right = new ForkJoinTest(rightList, threshold);
- // 并行執(zhí)行兩個(gè)“小任務(wù)”
- left.fork();
- right.fork();
- // 把兩個(gè)“小任務(wù)”的結(jié)果合并起來
- List<String> join = left.join();
- join.addAll(right.join());
- return join;
- }
- }
- /**
- * 獲取ForkJoinTest實(shí)例
- * @param list 待處理List
- * @param threshold 閾值
- * @return ForkJoinTest實(shí)例
- */
- public static ForkJoinService<List<String>> getInstance(List<String> list, int threshold) {
- if (forkJoinTest == null) {
- synchronized (ForkJoinTest.class) {
- if (forkJoinTest == null) {
- forkJoinTest = new ForkJoinTest(list, threshold);
- }
- }
- }
- return forkJoinTest;
- }
- }
3. 執(zhí)行類
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinTask;
- /**
- * Description: Fork/Join執(zhí)行類
- * Designer: jack
- * Date: 2017/8/3
- * Version: 1.0.0
- */
- public class Test {
- public static void main(String args[]) throws ExecutionException, InterruptedException {
- String[] strings = {"a", "ah", "b", "ba", "ab", "ac", "sd", "fd", "ar", "te", "se", "te",
- "sdr", "gdf", "df", "fg", "gh", "oa", "ah", "qwe", "re", "ty", "ui"};
- List<String> stringList = new ArrayList<>(Arrays.asList(strings));
- ForkJoinPool pool = new ForkJoinPool();
- ForkJoinService<List<String>> forkJoinService = ForkJoinTest.getInstance(stringList, 20);
- // 提交可分解的ForkJoinTask任務(wù)
- ForkJoinTask<List<String>> future = pool.submit(forkJoinService);
- System.out.println(future.get());
- // 關(guān)閉線程池
- pool.shutdown();
- }
- }