自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

并發(fā)編程中一種經(jīng)典的分而治之的思想?。?/h1>

開發(fā) 前端
作者個(gè)人研發(fā)的在高并發(fā)場(chǎng)景下,提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,具有精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能。

[[357347]]

作者個(gè)人研發(fā)的在高并發(fā)場(chǎng)景下,提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,具有精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能。自開源半年多以來,已成功為十幾家中小型企業(yè)提供了精準(zhǔn)定時(shí)調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗(yàn)。為使更多童鞋受益,現(xiàn)給出開源框架地址:

https://github.com/sunshinelyz/mykit-delay

寫在前面

在JDK中,提供了這樣一種功能:它能夠?qū)?fù)雜的邏輯拆分成一個(gè)個(gè)簡(jiǎn)單的邏輯來并行執(zhí)行,待每個(gè)并行執(zhí)行的邏輯執(zhí)行完成后,再將各個(gè)結(jié)果進(jìn)行匯總,得出最終的結(jié)果數(shù)據(jù)。有點(diǎn)像Hadoop中的MapReduce。

ForkJoin是由JDK1.7之后提供的多線程并發(fā)處理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是將一個(gè)復(fù)雜的計(jì)算,按照設(shè)定的閾值分解成多個(gè)計(jì)算,然后將各個(gè)計(jì)算結(jié)果進(jìn)行匯總。相應(yīng)的,F(xiàn)orkJoin將復(fù)雜的計(jì)算當(dāng)做一個(gè)任務(wù),而分解的多個(gè)計(jì)算則是當(dāng)做一個(gè)個(gè)子任務(wù)來并行執(zhí)行。

Java并發(fā)編程的發(fā)展

對(duì)于Java語(yǔ)言來說,生來就支持多線程并發(fā)編程,在并發(fā)編程領(lǐng)域也是在不斷發(fā)展的。Java在其發(fā)展過程中對(duì)并發(fā)編程的支持越來越完善也正好印證了這一點(diǎn)。

  • Java 1 支持thread,synchronized。
  • Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
  • Java 7 加入了fork-join庫(kù)。
  • Java 8 加入了 parallel streams。

并發(fā)與并行

并發(fā)和并行在本質(zhì)上還是有所區(qū)別的。

并發(fā)

并發(fā)指的是在同一時(shí)刻,只有一個(gè)線程能夠獲取到CPU執(zhí)行任務(wù),而多個(gè)線程被快速的輪換執(zhí)行,這就使得在宏觀上具有多個(gè)線程同時(shí)執(zhí)行的效果,并發(fā)不是真正的同時(shí)執(zhí)行,并發(fā)可以使用下圖表示。

并行

并行指的是無(wú)論何時(shí),多個(gè)線程都是在多個(gè)CPU核心上同時(shí)執(zhí)行的,是真正的同時(shí)執(zhí)行。

分治法

基本思想

把一個(gè)規(guī)模大的問題劃分為規(guī)模較小的子問題,然后分而治之,最后合并子問題的解得到原問題的解。

步驟

①分割原問題;

②求解子問題;

③合并子問題的解為原問題的解。

我們可以使用如下偽代碼來表示這個(gè)步驟。

  1. if(任務(wù)很?。﹞ 
  2.     直接計(jì)算得到結(jié)果 
  3. }else
  4.     分拆成N個(gè)子任務(wù) 
  5.     調(diào)用子任務(wù)的fork()進(jìn)行計(jì)算 
  6.     調(diào)用子任務(wù)的join()合并計(jì)算結(jié)果 

在分治法中,子問題一般是相互獨(dú)立的,因此,經(jīng)常通過遞歸調(diào)用算法來求解子問題。

典型應(yīng)用

  • 二分搜索
  • 大整數(shù)乘法
  • Strassen矩陣乘法
  • 棋盤覆蓋
  • 合并排序
  • 快速排序
  • 線性時(shí)間選擇
  • 漢諾塔

ForkJoin并行處理框架

ForkJoin框架概述

Java 1.7 引入了一種新的并發(fā)框架—— Fork/Join Framework,主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù)。

ForkJoin框架的本質(zhì)是一個(gè)用于并行執(zhí)行任務(wù)的框架, 能夠把一個(gè)大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)的計(jì)算結(jié)果。在Java中,F(xiàn)orkJoin框架與ThreadPool共存,并不是要替換ThreadPool

其實(shí),在Java 8中引入的并行流計(jì)算,內(nèi)部就是采用的ForkJoinPool來實(shí)現(xiàn)的。例如,下面使用并行流實(shí)現(xiàn)打印數(shù)組元組的程序。

  1. public class SumArray { 
  2.     public static void main(String[] args){ 
  3.         List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9); 
  4.         numberList.parallelStream().forEach(System.out::println); 
  5.     } 

這段代碼的背后就使用到了ForkJoinPool。

說到這里,可能有讀者會(huì)問:可以使用線程池的ThreadPoolExecutor來實(shí)現(xiàn)啊?為什么要使用ForkJoinPool啊?ForkJoinPool是個(gè)什么鬼啊?! 接下來,我們就來回答這個(gè)問題。

ForkJoin框架原理

ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實(shí)現(xiàn)了Executor和ExecutorService接口。它使用了一個(gè)無(wú)限隊(duì)列來保存需要執(zhí)行的任務(wù),而線程的數(shù)量則是通過構(gòu)造函數(shù)傳入,如果沒有向構(gòu)造函數(shù)中傳入指定的線程數(shù)量,那么當(dāng)前計(jì)算機(jī)可用的CPU數(shù)量會(huì)被設(shè)置為線程數(shù)量作為默認(rèn)值。

ForkJoinPool主要使用 分治法(Divide-and-Conquer Algorithm) 來解決問題。典型的應(yīng)用比如快速排序算法。這里的要點(diǎn)在于,F(xiàn)orkJoinPool能夠使用相對(duì)較少的線程來處理大量的任務(wù)。比如要對(duì)1000萬(wàn)個(gè)數(shù)據(jù)進(jìn)行排序,那么會(huì)將這個(gè)任務(wù)分割成兩個(gè)500萬(wàn)的排序任務(wù)和一個(gè)針對(duì)這兩組500萬(wàn)數(shù)據(jù)的合并任務(wù)。以此類推,對(duì)于500萬(wàn)的數(shù)據(jù)也會(huì)做出同樣的分割處理,到最后會(huì)設(shè)置一個(gè)閾值來規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時(shí),停止這樣的分割處理。比如,當(dāng)元素的數(shù)量小于10時(shí),會(huì)停止分割,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M(jìn)行排序。那么到最后,所有的任務(wù)加起來會(huì)有大概200萬(wàn)+個(gè)。問題的關(guān)鍵在于,對(duì)于一個(gè)任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后,它才能夠被執(zhí)行。

所以當(dāng)使用ThreadPoolExecutor時(shí),使用分治法會(huì)存在問題,因?yàn)門hreadPoolExecutor中的線程無(wú)法向任務(wù)隊(duì)列中再添加一個(gè)任務(wù)并在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool就能夠解決這個(gè)問題,它就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時(shí)線程就能夠從隊(duì)列中選擇子任務(wù)執(zhí)行。

那么使用ThreadPoolExecutor或者ForkJoinPool,性能上會(huì)有什么差異呢?

首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù),比如使用4個(gè)線程來完成超過200萬(wàn)個(gè)任務(wù)。但是,使用ThreadPoolExecutor時(shí),是不可能完成的,因?yàn)門hreadPoolExecutor中的Thread無(wú)法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬(wàn)個(gè)具有父子關(guān)系的任務(wù)時(shí),也需要200萬(wàn)個(gè)線程,很顯然這是不可行的,也是很不合理的!!

工作竊取算法

假如我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng),于是把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng),比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。而在這時(shí)它們會(huì)訪問同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點(diǎn):充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競(jìng)爭(zhēng)。

工作竊取算法的缺點(diǎn):在某些情況下還是存在競(jìng)爭(zhēng),比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且該算法會(huì)消耗更多的系統(tǒng)資源,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。

Fork/Join框架局限性:

對(duì)于Fork/Join框架而言,當(dāng)一個(gè)任務(wù)正在等待它使用Join操作創(chuàng)建的子任務(wù)結(jié)束時(shí),執(zhí)行這個(gè)任務(wù)的工作線程查找其他未被執(zhí)行的任務(wù),并開始執(zhí)行這些未被執(zhí)行的任務(wù),通過這種方式,線程充分利用它們的運(yùn)行時(shí)間來提高應(yīng)用程序的性能。為了實(shí)現(xiàn)這個(gè)目標(biāo),F(xiàn)ork/Join框架執(zhí)行的任務(wù)有一些局限性。

(1)任務(wù)只能使用Fork和Join操作來進(jìn)行同步機(jī)制,如果使用了其他同步機(jī)制,則在同步操作時(shí),工作線程就不能執(zhí)行其他任務(wù)了。比如,在Fork/Join框架中,使任務(wù)進(jìn)行了睡眠,那么,在睡眠期間內(nèi),正在執(zhí)行這個(gè)任務(wù)的工作線程將不會(huì)執(zhí)行其他任務(wù)了。(2)在Fork/Join框架中,所拆分的任務(wù)不應(yīng)該去執(zhí)行IO操作,比如:讀寫數(shù)據(jù)文件。(3)任務(wù)不能拋出檢查異常,必須通過必要的代碼來出來這些異常。

ForkJoin框架的實(shí)現(xiàn)

ForkJoin框架中一些重要的類如下所示。

ForkJoinPool 框架中涉及的主要類如下所示。

1.ForkJoinPool類

實(shí)現(xiàn)了ForkJoin框架中的線程池,由類圖可以看出,F(xiàn)orkJoinPool類實(shí)現(xiàn)了線程池的Executor接口。

我們也可以從下圖中看出ForkJoinPool的類圖關(guān)系。

其中,可以使用Executors.newWorkStealPool()方法創(chuàng)建ForkJoinPool。

ForkJoinPool中提供了如下提交任務(wù)的方法。

  1. public void execute(ForkJoinTask<?> task) 
  2. public void execute(Runnable task) 
  3. public <T> T invoke(ForkJoinTask<T> task) 
  4. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
  5. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) 
  6. public <T> ForkJoinTask<T> submit(Callable<T> task) 
  7. public <T> ForkJoinTask<T> submit(Runnable task, T result) 
  8. public ForkJoinTask<?> submit(Runnable task) 

2.ForkJoinWorkerThread類

實(shí)現(xiàn)ForkJoin框架中的線程。

3.ForkJoinTask類

ForkJoinTask封裝了數(shù)據(jù)及其相應(yīng)的計(jì)算,并且支持細(xì)粒度的數(shù)據(jù)并行。ForkJoinTask比線程要輕量,F(xiàn)orkJoinPool中少量工作線程能夠運(yùn)行大量的ForkJoinTask。

ForkJoinTask類中主要包括兩個(gè)方法fork()和join(),分別實(shí)現(xiàn)任務(wù)的分拆與合并。

fork()方法類似于Thread.start(),但是它并不立即執(zhí)行任務(wù),而是將任務(wù)放入工作隊(duì)列中。跟Thread.join()方法不同,F(xiàn)orkJoinTask的join()方法并不簡(jiǎn)單的阻塞線程,而是利用工作線程運(yùn)行其他任務(wù),當(dāng)一個(gè)工作線程中調(diào)用join(),它將處理其他任務(wù),直到注意到目標(biāo)子任務(wù)已經(jīng)完成。

我們可以使用下圖來表示這個(gè)過程。

ForkJoinTask有3個(gè)子類:

  • RecursiveAction:無(wú)返回值的任務(wù)。
  • RecursiveTask:有返回值的任務(wù)。
  • CountedCompleter:完成任務(wù)后將觸發(fā)其他任務(wù)。

4.RecursiveTask類

有返回結(jié)果的ForkJoinTask實(shí)現(xiàn)Callable。

5.RecursiveAction類

無(wú)返回結(jié)果的ForkJoinTask實(shí)現(xiàn)Runnable。

6.CountedCompleter類

在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行一個(gè)自定義的鉤子函數(shù)。

ForkJoin示例程序

  1. package io.binghe.concurrency.example.aqs; 
  2.   
  3. import lombok.extern.slf4j.Slf4j; 
  4. import java.util.concurrent.ForkJoinPool; 
  5. import java.util.concurrent.Future; 
  6. import java.util.concurrent.RecursiveTask; 
  7. @Slf4j 
  8. public class ForkJoinTaskExample extends RecursiveTask<Integer> { 
  9.     public static final int threshold = 2; 
  10.     private int start; 
  11.     private int end
  12.     public ForkJoinTaskExample(int start, int end) { 
  13.         this.start = start; 
  14.         this.end = end
  15.     } 
  16.     @Override 
  17.     protected Integer compute() { 
  18.         int sum = 0; 
  19.         //如果任務(wù)足夠小就計(jì)算任務(wù) 
  20.         boolean canCompute = (end - start) <= threshold; 
  21.         if (canCompute) { 
  22.             for (int i = start; i <= end; i++) { 
  23.                 sum += i; 
  24.             } 
  25.         } else { 
  26.             // 如果任務(wù)大于閾值,就分裂成兩個(gè)子任務(wù)計(jì)算 
  27.             int middle = (start + end) / 2; 
  28.             ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); 
  29.             ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); 
  30.   
  31.             // 執(zhí)行子任務(wù) 
  32.             leftTask.fork(); 
  33.             rightTask.fork(); 
  34.   
  35.             // 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果 
  36.             int leftResult = leftTask.join(); 
  37.             int rightResult = rightTask.join(); 
  38.   
  39.             // 合并子任務(wù) 
  40.             sum = leftResult + rightResult; 
  41.         } 
  42.         return sum
  43.     } 
  44.     public static void main(String[] args) { 
  45.         ForkJoinPool forkjoinPool = new ForkJoinPool(); 
  46.   
  47.         //生成一個(gè)計(jì)算任務(wù),計(jì)算1+2+3+4 
  48.         ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); 
  49.   
  50.         //執(zhí)行一個(gè)任務(wù) 
  51.         Future<Integer> result = forkjoinPool.submit(task); 
  52.   
  53.         try { 
  54.             log.info("result:{}", result.get()); 
  55.         } catch (Exception e) { 
  56.             log.error("exception", e); 
  57.         } 
  58.     } 

本文轉(zhuǎn)載自微信公眾號(hào)「冰河技術(shù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系冰河技術(shù)公眾號(hào)。 

 

責(zé)任編輯:武曉燕 來源: 冰河技術(shù)
相關(guān)推薦

2018-06-21 14:20:24

Linuxfindfd

2025-01-15 12:00:00

Java線程編程

2023-10-24 09:03:05

C++編程

2023-07-18 18:10:04

2013-07-05 14:33:19

IoCDIP

2010-01-25 15:09:17

C++語(yǔ)言

2024-02-27 09:39:07

C語(yǔ)言cJSON開發(fā)

2013-03-12 14:07:06

Java編程

2012-02-01 10:18:23

編程

2023-12-04 08:21:18

虛擬線程Tomcat

2015-03-13 11:23:21

編程編程超能力編程能力

2012-07-30 09:58:53

2012-11-01 13:41:25

編程語(yǔ)言BasicPerl

2020-12-09 10:15:34

Pythonweb代碼

2020-12-23 10:10:23

Pythonweb代碼

2022-06-22 09:44:41

Python文件代碼

2022-07-07 10:33:27

Python姿勢(shì)代碼

2020-07-10 10:48:51

編程語(yǔ)言JavaPython

2022-11-22 11:18:38

Java虛擬線程

2012-03-14 11:46:30

ibmdw
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)