自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

90%的人(包括我)都以為會(huì)用ThreadPoolExecutor了,看了這十張圖再說(shuō)吧!

開(kāi)發(fā) 前端
如果經(jīng)?;贓xecutors提供的工廠方法創(chuàng)建線程池,很容易忽略線程池內(nèi)部的實(shí)現(xiàn)。特別是拒絕策略,因使用Executors創(chuàng)建線程池時(shí)不會(huì)傳入這個(gè)參數(shù),直接采用默認(rèn)值,所以常常被忽略。

[[432670]]

在阿里巴巴手冊(cè)中有一條建議:

【強(qiáng)制】線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò)ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。

如果經(jīng)?;贓xecutors提供的工廠方法創(chuàng)建線程池,很容易忽略線程池內(nèi)部的實(shí)現(xiàn)。特別是拒絕策略,因使用Executors創(chuàng)建線程池時(shí)不會(huì)傳入這個(gè)參數(shù),直接采用默認(rèn)值,所以常常被忽略。

下面我們就來(lái)了解一下線程池相關(guān)的實(shí)現(xiàn)原理、API以及實(shí)例。

線程池的作用

在實(shí)踐應(yīng)用中創(chuàng)建線程池主要是為了:

  • 減少資源開(kāi)銷:減少每次創(chuàng)建、銷毀線程的開(kāi)銷;
  • 提高響應(yīng)速度:請(qǐng)求到來(lái)時(shí),線程已創(chuàng)建好,可直接執(zhí)行,提高響應(yīng)速度;
  • 提高線程的可管理性:線程是稀缺資源,需根據(jù)情況加以限制,確保系統(tǒng)穩(wěn)定運(yùn)行;

ThreadPoolExecutor

ThreadPoolExecutor可以實(shí)現(xiàn)線程池的創(chuàng)建。ThreadPoolExecutor相關(guān)類圖如下:

類圖

從類圖可以看出,ThreadPoolExecutor最終實(shí)現(xiàn)了Executor接口,是線程池創(chuàng)建的真正實(shí)現(xiàn)者。

Executor兩級(jí)調(diào)度模型

Executor模型

在HotSpot虛擬機(jī)中,Java中的線程將會(huì)被一一映射為操作系統(tǒng)的線程。在Java虛擬機(jī)層面,用戶將多個(gè)任務(wù)提交給Executor框架,Executor負(fù)責(zé)分配線程執(zhí)行它們;在操作系統(tǒng)層面,操作系統(tǒng)再將這些線程分配給處理器執(zhí)行。

ThreadPoolExecutor的三個(gè)角色

任務(wù)

ThreadPoolExecutor接受兩種類型的任務(wù):Callable和Runnable。

  • Callable:該類任務(wù)有返回結(jié)果,可以拋出異常。通過(guò)submit方法提交,返回Future對(duì)象。通過(guò)get獲取執(zhí)行結(jié)果。
  • Runnable:該類任務(wù)只執(zhí)行,無(wú)法獲取返回結(jié)果,在執(zhí)行過(guò)程中無(wú)法拋異常。通過(guò)execute或submit方法提交。

任務(wù)執(zhí)行器

Executor框架最核心的接口是Executor,它表示任務(wù)的執(zhí)行器。

通過(guò)上面類圖可以看出,Executor的子接口為ExecutorService。再往底層有兩大實(shí)現(xiàn)類:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。

執(zhí)行結(jié)果

Future接口表示異步的執(zhí)行結(jié)果,它的實(shí)現(xiàn)類為FutureTask。

三個(gè)角色之間的處理邏輯圖如下:

FutureTask邏輯

線程池處理流程

線程池處理流程

一個(gè)線程從被提交(submit)到執(zhí)行共經(jīng)歷以下流程:

  • 線程池判斷核心線程池里是的線程是否都在執(zhí)行任務(wù),如果不是,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進(jìn)入下一個(gè)流程;
  • 線程池判斷工作隊(duì)列是否已滿。如果工作隊(duì)列沒(méi)有滿,則將新提交的任務(wù)儲(chǔ)存在這個(gè)工作隊(duì)列里。如果工作隊(duì)列滿了,則進(jìn)入下一個(gè)流程;
  • 線程池判斷其內(nèi)部線程是否都處于工作狀態(tài)。如果沒(méi)有,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果已滿了,則交給飽和策略來(lái)處理這個(gè)任務(wù)。

線程池在執(zhí)行execute方法時(shí),主要有以下四種情況:

線程池執(zhí)行excute方法

  • 如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(需要獲得全局鎖);
  • 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue;
  • 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(需要獲得全局鎖);
  • 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maxiumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。

線程池采取上述的流程進(jìn)行設(shè)計(jì)是為了減少獲取全局鎖的次數(shù)。在線程池完成預(yù)熱(當(dāng)前運(yùn)行的線程數(shù)大于或等于corePoolSize)之后,幾乎所有的excute方法調(diào)用都執(zhí)行步驟二。

線程的狀態(tài)流轉(zhuǎn)

順便再回顧一下線程的狀態(tài)的轉(zhuǎn)換,在JDK中Thread類中提供了一個(gè)枚舉類,例舉了線程的各個(gè)狀態(tài):

  1. public enum State { 
  2.  
  3.        NEW, 
  4.  
  5.        RUNNABLE, 
  6.  
  7.        BLOCKED, 
  8.  
  9.        WAITING, 
  10.  
  11.        TIMED_WAITING, 
  12.  
  13.        TERMINATED; 
  14.    } 

一共定義了6個(gè)枚舉值,其實(shí)代表的是5種類型的線程狀態(tài):

  • NEW:新建;
  • RUNNABLE:運(yùn)行狀態(tài);
  • BLOCKED:阻塞狀態(tài);
  • WAITING:等待狀態(tài),WAITING和TIMED_WAITING可以歸為一類,都屬于等待狀態(tài),只是后者可以設(shè)置等待時(shí)間,即等待多久;
  • TERMINATED:終止?fàn)顟B(tài);

線程關(guān)系轉(zhuǎn)換圖:

線程狀態(tài)轉(zhuǎn)換

當(dāng)new Thread()說(shuō)明這個(gè)線程處于NEW(新建狀態(tài));調(diào)用Thread.start()方法表示這個(gè)線程處于RUNNABLE(運(yùn)行狀態(tài));但是RUNNABLE狀態(tài)中又包含了兩種狀態(tài):READY(就緒狀態(tài))和RUNNING(運(yùn)行中)。調(diào)用start()方法,線程不一定獲得了CPU時(shí)間片,這時(shí)就處于READY,等待CPU時(shí)間片,當(dāng)獲得了CPU時(shí)間片,就處于RUNNING狀態(tài)。

在運(yùn)行中調(diào)用synchronized同步的代碼塊,沒(méi)有獲取到鎖,這時(shí)會(huì)處于BLOCKED(阻塞狀態(tài)),當(dāng)重新獲取到鎖時(shí),又會(huì)變?yōu)镽UNNING狀態(tài)。在代碼執(zhí)行的過(guò)程中可能會(huì)碰到Object.wait()等一些等待方法,線程的狀態(tài)又會(huì)轉(zhuǎn)變?yōu)閃AITING(等待狀態(tài)),等待被喚醒,當(dāng)調(diào)用了Object.notifyAll()喚醒了之后線程執(zhí)行完就會(huì)變?yōu)門ERMINATED(終止?fàn)顟B(tài))。

線程池的狀態(tài)

線程池中狀態(tài)通過(guò)2個(gè)二進(jìn)制位(bit)來(lái)表示線程池的5個(gè)狀態(tài):RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED:

  • RUNNING:線程池正常工作的狀態(tài),在 RUNNING 狀態(tài)下線程池接受新的任務(wù)并處理任務(wù)隊(duì)列中的任務(wù);
  • SHUTDOWN:調(diào)用shutdown()方法會(huì)進(jìn)入 SHUTDOWN 狀態(tài)。在 SHUTDOWN 狀態(tài)下,線程池不接受新的任務(wù),但是會(huì)繼續(xù)執(zhí)行任務(wù)隊(duì)列中已有的任務(wù);
  • STOP:調(diào)用shutdownNow()會(huì)進(jìn)入 STOP 狀態(tài)。在 STOP 狀態(tài)下線程池既不接受新的任務(wù),也不處理已經(jīng)在隊(duì)列中的任務(wù)。對(duì)于還在執(zhí)行任務(wù)的工作線程,線程池會(huì)發(fā)起中斷請(qǐng)求來(lái)中斷正在執(zhí)行的任務(wù),同時(shí)會(huì)清空任務(wù)隊(duì)列中還未被執(zhí)行的任務(wù);
  • TIDYING:當(dāng)線程池中的所有執(zhí)行任務(wù)的工作線程都已經(jīng)終止,并且工作線程集合為空的時(shí)候,進(jìn)入 TIDYING 狀態(tài);
  • TERMINATED:當(dāng)線程池執(zhí)行完terminated()鉤子方法以后,線程池進(jìn)入終態(tài) TERMINATED;

ThreadPoolExecutor API

ThreadPoolExecutor創(chuàng)建線程池API:

  1. public ThreadPoolExecutor(int corePoolSize, 
  2.                           int maximumPoolSize, 
  3.                           long keepAliveTime, 
  4.                           TimeUnit unit, 
  5.                           BlockingQueue<Runnable> workQueue, 
  6.                           ThreadFactory threadFactory, 
  7.                           RejectedExecutionHandler handler)  

參數(shù)解釋:

  • corePoolSize :線程池常駐核心線程數(shù)。創(chuàng)建線程池時(shí),線程池中并沒(méi)有任何線程,當(dāng)有任務(wù)來(lái)時(shí)才去創(chuàng)建線程,執(zhí)行任務(wù)。提交一個(gè)任務(wù),創(chuàng)建一個(gè)線程,直到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小,則不再創(chuàng)建。當(dāng)創(chuàng)建的線程數(shù)等于corePoolSize 時(shí),會(huì)加入設(shè)置的阻塞隊(duì)列。
  • maximumPoolSize :線程池允許創(chuàng)建的最大線程數(shù)。當(dāng)隊(duì)列滿時(shí),會(huì)創(chuàng)建線程執(zhí)行任務(wù)直到線程池中的數(shù)量等于maximumPoolSize。
  • keepAliveTime :當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
  • unit :keepAliveTime的時(shí)間單位,可選項(xiàng):天(DAYS)、小時(shí)(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微妙)。
  • workQueue :用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列。
  • threadFactory :線程工廠,用來(lái)生產(chǎn)一組相同任務(wù)的線程。主要用于設(shè)置生成的線程名詞前綴、是否為守護(hù)線程以及優(yōu)先級(jí)等。設(shè)置有意義的名稱前綴有利于在進(jìn)行虛擬機(jī)分析時(shí),知道線程是由哪個(gè)線程工廠創(chuàng)建的。
  • handler :執(zhí)行拒絕策略對(duì)象。當(dāng)達(dá)到任務(wù)緩存上限時(shí)(即超過(guò)workQueue參數(shù)能存儲(chǔ)的任務(wù)數(shù)),執(zhí)行拒接策略。也就是當(dāng)任務(wù)處理不過(guò)來(lái)的時(shí)候,線程池開(kāi)始執(zhí)行拒絕策略。JDK 1.5提供了四種飽和策略:
  • AbortPolicy:默認(rèn),直接拋異常;
  • 只用調(diào)用者所在的線程執(zhí)行任務(wù),重試添加當(dāng)前的任務(wù),它會(huì)自動(dòng)重復(fù)調(diào)用execute()方法;
  • DiscardOldestPolicy:丟棄任務(wù)隊(duì)列中最久的任務(wù);
  • DiscardPolicy:丟棄當(dāng)前任務(wù);

適當(dāng)?shù)淖枞?duì)列

當(dāng)創(chuàng)建的線程數(shù)等于corePoolSize,會(huì)將任務(wù)加入阻塞隊(duì)列(BlockingQueue),維護(hù)著等待執(zhí)行的Runnable對(duì)象。

阻塞隊(duì)列通常有如下類型:

  • ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。可以限定隊(duì)列的長(zhǎng)度,接收到任務(wù)時(shí),如果沒(méi)有達(dá)到corePoolSize的值,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊(duì)等候,如果隊(duì)列已滿,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize,并且隊(duì)列也滿了,則發(fā)生錯(cuò)誤。
  • LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。這個(gè)隊(duì)列在接收到任務(wù)時(shí),如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程(核心線程)處理任務(wù);如果當(dāng)前線程數(shù)等于核心線程數(shù),則進(jìn)入隊(duì)列等待。由于這個(gè)隊(duì)列沒(méi)有最大值限制,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò)corePoolSize。
  • PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
  • DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口,這就意味著傳入的任務(wù)必須先實(shí)現(xiàn)Delayed接口。這個(gè)隊(duì)列在接收到任務(wù)時(shí),首先先入隊(duì),只有達(dá)到了指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)。
  • SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。這個(gè)隊(duì)列在接收到任務(wù)時(shí),會(huì)直接提交給線程處理,而不保留它,如果所有線程都在工作就新建一個(gè)線程來(lái)處理這個(gè)任務(wù)。所以為了保證不出現(xiàn)【線程數(shù)達(dá)到了maximumPoolSize而不能新建線程】的錯(cuò)誤,使用這個(gè)類型隊(duì)列時(shí),maximumPoolSize一般指定成Integer.MAX_VALUE,即無(wú)限大。
  • LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
  • LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。

明確的拒絕策略

當(dāng)任務(wù)處理不過(guò)來(lái)時(shí),線程池開(kāi)始執(zhí)行拒絕策略。

支持的拒絕策略:

  • ThreadPoolExecutor.AbortPolicy: 丟棄任務(wù)并拋出RejectedExecutionException異常。(默認(rèn))
  • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)。(重復(fù)此過(guò)程)
  • ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。

線程池關(guān)閉

  • shutdown:將線程池狀態(tài)置為SHUTDOWN,并不會(huì)立即停止。停止接收外部submit的任務(wù),內(nèi)部正在跑的任務(wù)和隊(duì)列里等待的任務(wù),會(huì)執(zhí)行完后,才真正停止。
  • shutdownNow:將線程池狀態(tài)置為STOP。企圖立即停止,事實(shí)上不一定,跟shutdown()一樣,先停止接收外部提交的任務(wù),忽略隊(duì)列里等待的任務(wù),嘗試將正在跑的任務(wù)interrupt中斷(如果線程未處于sleep、wait、condition、定時(shí)鎖狀態(tài),interrupt無(wú)法中斷當(dāng)前線程),返回未執(zhí)行的任務(wù)列表。
  • awaitTermination(long timeOut, TimeUnit unit)當(dāng)前線程阻塞,直到等所有已提交的任務(wù)(包括正在跑的和隊(duì)列中等待的)執(zhí)行完或者等超時(shí)時(shí)間到或者線程被中斷,拋出InterruptedException,然后返回true(shutdown請(qǐng)求后所有任務(wù)執(zhí)行完畢)或false(已超時(shí))。

Executors

Executors是一個(gè)幫助類,提供了創(chuàng)建幾種預(yù)配置線程池實(shí)例的方法:newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool等。

如果查看源碼就會(huì)發(fā)現(xiàn),Executors本質(zhì)上就是實(shí)現(xiàn)了幾類默認(rèn)的ThreadPoolExecutor。而阿里巴巴開(kāi)發(fā)手冊(cè),不建議采用Executors默認(rèn)的,讓使用者直接通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建。

Executors.newSingleThreadExecutor()

創(chuàng)建一個(gè)單線程的線程池。這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來(lái)替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。

  1. new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) 

該類型線程池的結(jié)構(gòu)圖:

newSingleThreadExecutor

該線程池的特點(diǎn):

  • 只會(huì)創(chuàng)建一條工作線程處理任務(wù);
  • 采用的阻塞隊(duì)列為L(zhǎng)inkedBlockingQueue;

Executors.newFixedThreadPool()

創(chuàng)建固定大小的線程池。每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。

  1. new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 

該類型線程池的結(jié)構(gòu)圖:

newFixedThreadPool

該線程池的特點(diǎn):

  • 固定大小;
  • corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads;
  • keepAliveTime為0,意味著一旦有多余的空閑線程,就會(huì)被立即停止掉;但這里keepAliveTime無(wú)效;
  • 阻塞隊(duì)列采用了LinkedBlockingQueue,一個(gè)無(wú)界隊(duì)列;
  • 由于阻塞隊(duì)列是一個(gè)無(wú)界隊(duì)列,因此永遠(yuǎn)不可能拒絕任務(wù);
  • 由于采用了無(wú)界隊(duì)列,實(shí)際線程數(shù)量將永遠(yuǎn)維持在nThreads,因此maximumPoolSize和keepAliveTime將無(wú)效。

Executors.newCachedThreadPool()

創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過(guò)了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來(lái)處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說(shuō)JVM)能夠創(chuàng)建的最大線程大小。

  1. new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 

該類型線程池的結(jié)構(gòu)圖:

newCachedThreadPool

  • 該線程池的特點(diǎn):
  • 可以無(wú)限擴(kuò)大;
  • 比較適合處理執(zhí)行時(shí)間比較小的任務(wù);
  • corePoolSize為0,maximumPoolSize為無(wú)限大,意味著線程數(shù)量可以無(wú)限大;
  • keepAliveTime為60S,意味著線程空閑時(shí)間超過(guò)60s就會(huì)被殺死;
  • 采用SynchronousQueue裝等待的任務(wù),這個(gè)阻塞隊(duì)列沒(méi)有存儲(chǔ)空間,這意味著只要有請(qǐng)求到來(lái),就必須要找到一條工作線程處理它,如果當(dāng)前沒(méi)有空閑的線程,那么就會(huì)再創(chuàng)建一條新的線程。

Executors.newScheduledThreadPool()

創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。

  1. new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  2.               new DelayedWorkQueue()); 

該線程池類圖:

newScheduledThreadPool

該線程池的特點(diǎn):

  • 接收SchduledFutureTask類型的任務(wù),有兩種提交任務(wù)的方式:scheduledAtFixedRate和scheduledWithFixedDelay。SchduledFutureTask接收的參數(shù):
  • time:任務(wù)開(kāi)始的時(shí)間
  • sequenceNumber:任務(wù)的序號(hào)
  • period:任務(wù)執(zhí)行的時(shí)間間隔
  • 采用DelayQueue存儲(chǔ)等待的任務(wù);
  • DelayQueue內(nèi)部封裝了一個(gè)PriorityQueue,它會(huì)根據(jù)time的先后時(shí)間排序,若time相同則根據(jù)sequenceNumber排序;
  • DelayQueue也是一個(gè)無(wú)界隊(duì)列;
  • 工作線程執(zhí)行時(shí),工作線程會(huì)從DelayQueue取已經(jīng)到期的任務(wù)去執(zhí)行;執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayQueue;

Executors.newWorkStealingPool()

JDK8引入,創(chuàng)建持有足夠線程的線程池支持給定的并行度,并通過(guò)使用多個(gè)隊(duì)列減少競(jìng)爭(zhēng)。

  1. public static ExecutorService newWorkStealingPool() { 
  2.     return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), 
  3.         ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
  4.         nulltrue); 

Executors方法的弊端

1)newFixedThreadPool 和 newSingleThreadExecutor:允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM。2)newCachedThreadPool 和 newScheduledThreadPool:允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM。

合理配置線程池大小

合理配置線程池,需要先分析任務(wù)特性,可以從以下角度來(lái)進(jìn)行分析:

  • 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
  • 任務(wù)的優(yōu)先級(jí):高,中和低。
  • 任務(wù)的執(zhí)行時(shí)間:長(zhǎng),中和短。
  • 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。

另外,還需要查看系統(tǒng)的內(nèi)核數(shù):

  1. Runtime.getRuntime().availableProcessors()); 

根據(jù)任務(wù)所需要的CPU和IO資源可以分為:

  • CPU密集型任務(wù): 主要是執(zhí)行計(jì)算任務(wù),響應(yīng)時(shí)間很快,CPU一直在運(yùn)行。一般公式:線程數(shù) = CPU核數(shù) + 1。只有在真正的多核CPU上才能得到加速,優(yōu)點(diǎn)是不存在線程切換開(kāi)銷,提高了CPU的利用率并減少了線程切換的效能損耗。
  • IO密集型任務(wù):主要是進(jìn)行IO操作,CPU并不是一直在執(zhí)行任務(wù),IO操作(CPU空閑狀態(tài))的時(shí)間較長(zhǎng),應(yīng)配置盡可能多的線程,其中的線程在IO操作時(shí),其他線程可以繼續(xù)利用CPU,從而提高CPU的利用率。一般公式:線程數(shù) = CPU核數(shù) * 2。

使用實(shí)例

任務(wù)實(shí)現(xiàn)類:

  1. /** 
  2.  * 任務(wù)實(shí)現(xiàn)線程 
  3.  * @author sec 
  4.  * @version 1.0 
  5.  * @date 2021/10/30 
  6.  **/ 
  7. public class MyThread implements Runnable{ 
  8.  
  9.    private final Integer number; 
  10.  
  11.    public MyThread(int number){ 
  12.       this.number = number; 
  13.    } 
  14.  
  15.    public Integer getNumber() { 
  16.       return number; 
  17.    } 
  18.  
  19.    @Override 
  20.    public void run() { 
  21.       try { 
  22.          // 業(yè)務(wù)處理 
  23.          TimeUnit.SECONDS.sleep(1); 
  24.          System.out.println("Hello! ThreadPoolExecutor - " + getNumber()); 
  25.       } catch (InterruptedException e) { 
  26.          e.printStackTrace(); 
  27.       } 
  28.    } 

自定義阻塞提交的ThreadLocalExcutor:

  1. /** 
  2.  * 自定義阻塞提交的ThreadPoolExecutor 
  3.  * @author sec 
  4.  * @version 1.0 
  5.  * @date 2021/10/30 
  6.  **/ 
  7. public class CustomBlockThreadPoolExecutor { 
  8.  
  9.    private ThreadPoolExecutor pool = null
  10.  
  11.    /** 
  12.     * 線程池初始化方法 
  13.     */ 
  14.    public void init() { 
  15.       // 核心線程池大小 
  16.       int poolSize = 2; 
  17.       // 最大線程池大小 
  18.       int maxPoolSize = 4; 
  19.       // 線程池中超過(guò)corePoolSize數(shù)目的空閑線程最大存活時(shí)間:30+單位TimeUnit 
  20.       long keepAliveTime = 30L; 
  21.       // ArrayBlockingQueue<Runnable> 阻塞隊(duì)列容量30 
  22.       int arrayBlockingQueueSize = 30; 
  23.       pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, 
  24.             TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(), 
  25.             new CustomRejectedExecutionHandler()); 
  26.    } 
  27.  
  28.    /** 
  29.     * 關(guān)閉線程池方法 
  30.     */ 
  31.    public void destroy() { 
  32.       if (pool != null) { 
  33.          pool.shutdownNow(); 
  34.       } 
  35.    } 
  36.  
  37.    public ExecutorService getCustomThreadPoolExecutor() { 
  38.       return this.pool; 
  39.    } 
  40.  
  41.    /** 
  42.     * 自定義線程工廠類, 
  43.     * 生成的線程名詞前綴、是否為守護(hù)線程以及優(yōu)先級(jí)等 
  44.     */ 
  45.    private static class CustomThreadFactory implements ThreadFactory { 
  46.  
  47.       private final AtomicInteger count = new AtomicInteger(0); 
  48.  
  49.       @Override 
  50.       public Thread newThread(Runnable r) { 
  51.          Thread t = new Thread(r); 
  52.          String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1); 
  53.          t.setName(threadName); 
  54.          return t; 
  55.       } 
  56.    } 
  57.  
  58.  
  59.    /** 
  60.     * 自定義拒絕策略對(duì)象 
  61.     */ 
  62.    private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler { 
  63.       @Override 
  64.       public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  65.          // 核心改造點(diǎn),將blockingqueue的offer改成put阻塞提交 
  66.          try { 
  67.             executor.getQueue().put(r); 
  68.          } catch (InterruptedException e) { 
  69.             e.printStackTrace(); 
  70.          } 
  71.       } 
  72.    } 
  73.  
  74.    /** 
  75.     * 當(dāng)提交任務(wù)被拒絕時(shí),進(jìn)入拒絕機(jī)制,實(shí)現(xiàn)拒絕方法,把任務(wù)重新用阻塞提交方法put提交,實(shí)現(xiàn)阻塞提交任務(wù)功能,防止隊(duì)列過(guò)大,OOM 
  76.     */ 
  77.    public static void main(String[] args) { 
  78.  
  79.       CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor(); 
  80.  
  81.       // 初始化 
  82.       executor.init(); 
  83.       ExecutorService pool = executor.getCustomThreadPoolExecutor(); 
  84.       for (int i = 1; i < 51; i++) { 
  85.          MyThread myThread = new MyThread(i); 
  86.          System.out.println("提交第" + i + "個(gè)任務(wù)"); 
  87.          pool.execute(myThread); 
  88.       } 
  89.  
  90.       pool.shutdown(); 
  91.       try { 
  92.          // 阻塞,超時(shí)時(shí)間到或者線程被中斷 
  93.          if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 
  94.             // 立即關(guān)閉 
  95.             executor.destroy(); 
  96.          } 
  97.       } catch (InterruptedException e) { 
  98.          executor.destroy(); 
  99.       } 
  100.    } 

小結(jié)

看似簡(jiǎn)單的線程池創(chuàng)建,其中卻蘊(yùn)含著各類知識(shí),融合貫通,根據(jù)具體場(chǎng)景采用具體的參數(shù)進(jìn)行設(shè)置才能夠達(dá)到最優(yōu)的效果。

總結(jié)一下就是:

  • 用ThreadPoolExecutor自定義線程池,要看線程的用途。如果任務(wù)量不大,可以用無(wú)界隊(duì)列,如果任務(wù)量非常大,要用有界隊(duì)列,防止OOM;
  • 如果任務(wù)量很大,且要求每個(gè)任務(wù)都處理成功,要對(duì)提交的任務(wù)進(jìn)行阻塞提交,重寫拒絕機(jī)制,改為阻塞提交。保證不拋棄一個(gè)任務(wù);
  • 最大線程數(shù)一般設(shè)為2N+1最好,N是CPU核數(shù);
  • 核心線程數(shù),要根據(jù)任務(wù)是CPU密集型,還是IO密集型。同時(shí),如果任務(wù)是一天跑一次,設(shè)置為0合適,因?yàn)榕芡昃屯5袅?
  • 如果要獲取任務(wù)執(zhí)行結(jié)果,用CompletionService,但是注意,獲取任務(wù)的結(jié)果要重新開(kāi)一個(gè)線程獲取,如果在主線程獲取,就要等任務(wù)都提交后才獲取,就會(huì)阻塞大量任務(wù)結(jié)果,隊(duì)列過(guò)大OOM,所以最好異步開(kāi)個(gè)線程獲取結(jié)果。

參考文章:

[1]https://www.jianshu.com/p/94852bd1a283

[2]https://blog.csdn.net/jek123456/article/details/90601351

[3]https://blog.csdn.net/z_s_z2016/article/details/81674893

[4]https://zhuanlan.zhihu.com/p/33264000 

[5]https://www.cnblogs.com/semi-sub/p/13021786.html

 

責(zé)任編輯:武曉燕 來(lái)源: 程序新視界
相關(guān)推薦

2021-05-07 17:11:19

負(fù)載均衡運(yùn)維服務(wù)

2022-03-07 17:43:30

注冊(cè)微服務(wù)架構(gòu)

2022-09-26 11:32:14

用戶分層服務(wù)業(yè)務(wù)

2021-03-18 12:16:44

用戶分層業(yè)務(wù)

2022-04-20 11:03:28

Linux內(nèi)存管理

2021-10-22 09:28:15

開(kāi)發(fā)技能代碼

2021-01-28 11:39:01

數(shù)據(jù)分析銷售

2022-07-05 11:18:50

數(shù)據(jù)分析銷售業(yè)績(jī)

2018-05-28 21:17:57

大數(shù)據(jù)分析軟件

2020-09-03 09:38:06

ElasticsearES Lucene

2015-04-01 13:57:49

2017-10-13 12:51:36

語(yǔ)言PHP、Python、差異比較

2014-07-28 16:13:19

Gitlinux開(kāi)源

2022-04-11 11:55:34

架構(gòu)技術(shù)調(diào)優(yōu)

2020-04-16 14:52:50

AI Reddit人工智能

2019-08-13 09:29:14

Kafka運(yùn)營(yíng)數(shù)據(jù)

2014-08-25 15:01:03

Google

2020-12-18 09:45:33

DockerLinux命令

2023-10-10 08:16:07

Spring依賴注入SpEL表達(dá)式

2018-10-24 09:25:03

數(shù)據(jù)中心現(xiàn)狀差異
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)