90%的人(包括我)都以為會(huì)用ThreadPoolExecutor了,看了這十張圖再說(shuō)吧!
在阿里巴巴手冊(cè)中有一條建議:
【強(qiáng)制】線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò)ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
如果經(jīng)?;贓xecutors提供的工廠方法創(chuàng)建線程池,很容易忽略線程池內(nèi)部的實(shí)現(xiàn)。特別是拒絕策略,因使用Executors創(chuàng)建線程池時(shí)不會(huì)傳入這個(gè)參數(shù),直接采用默認(rèn)值,所以常常被忽略。
下面我們就來(lái)了解一下線程池相關(guān)的實(shí)現(xiàn)原理、API以及實(shí)例。
線程池的作用
在實(shí)踐應(yīng)用中創(chuàng)建線程池主要是為了:
- 減少資源開(kāi)銷:減少每次創(chuàng)建、銷毀線程的開(kāi)銷;
- 提高響應(yīng)速度:請(qǐng)求到來(lái)時(shí),線程已創(chuàng)建好,可直接執(zhí)行,提高響應(yīng)速度;
- 提高線程的可管理性:線程是稀缺資源,需根據(jù)情況加以限制,確保系統(tǒng)穩(wěn)定運(yùn)行;
ThreadPoolExecutor
ThreadPoolExecutor可以實(shí)現(xiàn)線程池的創(chuàng)建。ThreadPoolExecutor相關(guān)類圖如下:
類圖
從類圖可以看出,ThreadPoolExecutor最終實(shí)現(xiàn)了Executor接口,是線程池創(chuàng)建的真正實(shí)現(xiàn)者。
Executor兩級(jí)調(diào)度模型
Executor模型
在HotSpot虛擬機(jī)中,Java中的線程將會(huì)被一一映射為操作系統(tǒng)的線程。在Java虛擬機(jī)層面,用戶將多個(gè)任務(wù)提交給Executor框架,Executor負(fù)責(zé)分配線程執(zhí)行它們;在操作系統(tǒng)層面,操作系統(tǒng)再將這些線程分配給處理器執(zhí)行。
ThreadPoolExecutor的三個(gè)角色
任務(wù)
ThreadPoolExecutor接受兩種類型的任務(wù):Callable和Runnable。
- Callable:該類任務(wù)有返回結(jié)果,可以拋出異常。通過(guò)submit方法提交,返回Future對(duì)象。通過(guò)get獲取執(zhí)行結(jié)果。
- Runnable:該類任務(wù)只執(zhí)行,無(wú)法獲取返回結(jié)果,在執(zhí)行過(guò)程中無(wú)法拋異常。通過(guò)execute或submit方法提交。
任務(wù)執(zhí)行器
Executor框架最核心的接口是Executor,它表示任務(wù)的執(zhí)行器。
通過(guò)上面類圖可以看出,Executor的子接口為ExecutorService。再往底層有兩大實(shí)現(xiàn)類:ThreadPoolExecutor和ScheduledThreadPoolExecutor(集成自ThreadPoolExecutor)。
執(zhí)行結(jié)果
Future接口表示異步的執(zhí)行結(jié)果,它的實(shí)現(xiàn)類為FutureTask。
三個(gè)角色之間的處理邏輯圖如下:
FutureTask邏輯
線程池處理流程
線程池處理流程
一個(gè)線程從被提交(submit)到執(zhí)行共經(jīng)歷以下流程:
- 線程池判斷核心線程池里是的線程是否都在執(zhí)行任務(wù),如果不是,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進(jìn)入下一個(gè)流程;
- 線程池判斷工作隊(duì)列是否已滿。如果工作隊(duì)列沒(méi)有滿,則將新提交的任務(wù)儲(chǔ)存在這個(gè)工作隊(duì)列里。如果工作隊(duì)列滿了,則進(jìn)入下一個(gè)流程;
- 線程池判斷其內(nèi)部線程是否都處于工作狀態(tài)。如果沒(méi)有,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果已滿了,則交給飽和策略來(lái)處理這個(gè)任務(wù)。
線程池在執(zhí)行execute方法時(shí),主要有以下四種情況:
線程池執(zhí)行excute方法
- 如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(需要獲得全局鎖);
- 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue;
- 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(需要獲得全局鎖);
- 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maxiumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
線程池采取上述的流程進(jìn)行設(shè)計(jì)是為了減少獲取全局鎖的次數(shù)。在線程池完成預(yù)熱(當(dāng)前運(yùn)行的線程數(shù)大于或等于corePoolSize)之后,幾乎所有的excute方法調(diào)用都執(zhí)行步驟二。
線程的狀態(tài)流轉(zhuǎn)
順便再回顧一下線程的狀態(tài)的轉(zhuǎn)換,在JDK中Thread類中提供了一個(gè)枚舉類,例舉了線程的各個(gè)狀態(tài):
- public enum State {
- NEW,
- RUNNABLE,
- BLOCKED,
- WAITING,
- TIMED_WAITING,
- TERMINATED;
- }
一共定義了6個(gè)枚舉值,其實(shí)代表的是5種類型的線程狀態(tài):
- NEW:新建;
- RUNNABLE:運(yùn)行狀態(tài);
- BLOCKED:阻塞狀態(tài);
- WAITING:等待狀態(tài),WAITING和TIMED_WAITING可以歸為一類,都屬于等待狀態(tài),只是后者可以設(shè)置等待時(shí)間,即等待多久;
- TERMINATED:終止?fàn)顟B(tài);
線程關(guān)系轉(zhuǎn)換圖:
線程狀態(tài)轉(zhuǎn)換
當(dāng)new Thread()說(shuō)明這個(gè)線程處于NEW(新建狀態(tài));調(diào)用Thread.start()方法表示這個(gè)線程處于RUNNABLE(運(yùn)行狀態(tài));但是RUNNABLE狀態(tài)中又包含了兩種狀態(tài):READY(就緒狀態(tài))和RUNNING(運(yùn)行中)。調(diào)用start()方法,線程不一定獲得了CPU時(shí)間片,這時(shí)就處于READY,等待CPU時(shí)間片,當(dāng)獲得了CPU時(shí)間片,就處于RUNNING狀態(tài)。
在運(yùn)行中調(diào)用synchronized同步的代碼塊,沒(méi)有獲取到鎖,這時(shí)會(huì)處于BLOCKED(阻塞狀態(tài)),當(dāng)重新獲取到鎖時(shí),又會(huì)變?yōu)镽UNNING狀態(tài)。在代碼執(zhí)行的過(guò)程中可能會(huì)碰到Object.wait()等一些等待方法,線程的狀態(tài)又會(huì)轉(zhuǎn)變?yōu)閃AITING(等待狀態(tài)),等待被喚醒,當(dāng)調(diào)用了Object.notifyAll()喚醒了之后線程執(zhí)行完就會(huì)變?yōu)門ERMINATED(終止?fàn)顟B(tài))。
線程池的狀態(tài)
線程池中狀態(tài)通過(guò)2個(gè)二進(jìn)制位(bit)來(lái)表示線程池的5個(gè)狀態(tài):RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED:
- RUNNING:線程池正常工作的狀態(tài),在 RUNNING 狀態(tài)下線程池接受新的任務(wù)并處理任務(wù)隊(duì)列中的任務(wù);
- SHUTDOWN:調(diào)用shutdown()方法會(huì)進(jìn)入 SHUTDOWN 狀態(tài)。在 SHUTDOWN 狀態(tài)下,線程池不接受新的任務(wù),但是會(huì)繼續(xù)執(zhí)行任務(wù)隊(duì)列中已有的任務(wù);
- STOP:調(diào)用shutdownNow()會(huì)進(jìn)入 STOP 狀態(tài)。在 STOP 狀態(tài)下線程池既不接受新的任務(wù),也不處理已經(jīng)在隊(duì)列中的任務(wù)。對(duì)于還在執(zhí)行任務(wù)的工作線程,線程池會(huì)發(fā)起中斷請(qǐng)求來(lái)中斷正在執(zhí)行的任務(wù),同時(shí)會(huì)清空任務(wù)隊(duì)列中還未被執(zhí)行的任務(wù);
- TIDYING:當(dāng)線程池中的所有執(zhí)行任務(wù)的工作線程都已經(jīng)終止,并且工作線程集合為空的時(shí)候,進(jìn)入 TIDYING 狀態(tài);
- TERMINATED:當(dāng)線程池執(zhí)行完terminated()鉤子方法以后,線程池進(jìn)入終態(tài) TERMINATED;
ThreadPoolExecutor API
ThreadPoolExecutor創(chuàng)建線程池API:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
參數(shù)解釋:
- corePoolSize :線程池常駐核心線程數(shù)。創(chuàng)建線程池時(shí),線程池中并沒(méi)有任何線程,當(dāng)有任務(wù)來(lái)時(shí)才去創(chuàng)建線程,執(zhí)行任務(wù)。提交一個(gè)任務(wù),創(chuàng)建一個(gè)線程,直到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小,則不再創(chuàng)建。當(dāng)創(chuàng)建的線程數(shù)等于corePoolSize 時(shí),會(huì)加入設(shè)置的阻塞隊(duì)列。
- maximumPoolSize :線程池允許創(chuàng)建的最大線程數(shù)。當(dāng)隊(duì)列滿時(shí),會(huì)創(chuàng)建線程執(zhí)行任務(wù)直到線程池中的數(shù)量等于maximumPoolSize。
- keepAliveTime :當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
- unit :keepAliveTime的時(shí)間單位,可選項(xiàng):天(DAYS)、小時(shí)(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微妙)。
- workQueue :用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列。
- threadFactory :線程工廠,用來(lái)生產(chǎn)一組相同任務(wù)的線程。主要用于設(shè)置生成的線程名詞前綴、是否為守護(hù)線程以及優(yōu)先級(jí)等。設(shè)置有意義的名稱前綴有利于在進(jìn)行虛擬機(jī)分析時(shí),知道線程是由哪個(gè)線程工廠創(chuàng)建的。
- handler :執(zhí)行拒絕策略對(duì)象。當(dāng)達(dá)到任務(wù)緩存上限時(shí)(即超過(guò)workQueue參數(shù)能存儲(chǔ)的任務(wù)數(shù)),執(zhí)行拒接策略。也就是當(dāng)任務(wù)處理不過(guò)來(lái)的時(shí)候,線程池開(kāi)始執(zhí)行拒絕策略。JDK 1.5提供了四種飽和策略:
- AbortPolicy:默認(rèn),直接拋異常;
- 只用調(diào)用者所在的線程執(zhí)行任務(wù),重試添加當(dāng)前的任務(wù),它會(huì)自動(dòng)重復(fù)調(diào)用execute()方法;
- DiscardOldestPolicy:丟棄任務(wù)隊(duì)列中最久的任務(wù);
- DiscardPolicy:丟棄當(dāng)前任務(wù);
適當(dāng)?shù)淖枞?duì)列
當(dāng)創(chuàng)建的線程數(shù)等于corePoolSize,會(huì)將任務(wù)加入阻塞隊(duì)列(BlockingQueue),維護(hù)著等待執(zhí)行的Runnable對(duì)象。
阻塞隊(duì)列通常有如下類型:
- ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。可以限定隊(duì)列的長(zhǎng)度,接收到任務(wù)時(shí),如果沒(méi)有達(dá)到corePoolSize的值,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊(duì)等候,如果隊(duì)列已滿,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize,并且隊(duì)列也滿了,則發(fā)生錯(cuò)誤。
- LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。這個(gè)隊(duì)列在接收到任務(wù)時(shí),如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程(核心線程)處理任務(wù);如果當(dāng)前線程數(shù)等于核心線程數(shù),則進(jìn)入隊(duì)列等待。由于這個(gè)隊(duì)列沒(méi)有最大值限制,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò)corePoolSize。
- PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
- DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列。隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口,這就意味著傳入的任務(wù)必須先實(shí)現(xiàn)Delayed接口。這個(gè)隊(duì)列在接收到任務(wù)時(shí),首先先入隊(duì),只有達(dá)到了指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)。
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。這個(gè)隊(duì)列在接收到任務(wù)時(shí),會(huì)直接提交給線程處理,而不保留它,如果所有線程都在工作就新建一個(gè)線程來(lái)處理這個(gè)任務(wù)。所以為了保證不出現(xiàn)【線程數(shù)達(dá)到了maximumPoolSize而不能新建線程】的錯(cuò)誤,使用這個(gè)類型隊(duì)列時(shí),maximumPoolSize一般指定成Integer.MAX_VALUE,即無(wú)限大。
- LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列。
- LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
明確的拒絕策略
當(dāng)任務(wù)處理不過(guò)來(lái)時(shí),線程池開(kāi)始執(zhí)行拒絕策略。
支持的拒絕策略:
- ThreadPoolExecutor.AbortPolicy: 丟棄任務(wù)并拋出RejectedExecutionException異常。(默認(rèn))
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)。(重復(fù)此過(guò)程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。
線程池關(guān)閉
- shutdown:將線程池狀態(tài)置為SHUTDOWN,并不會(huì)立即停止。停止接收外部submit的任務(wù),內(nèi)部正在跑的任務(wù)和隊(duì)列里等待的任務(wù),會(huì)執(zhí)行完后,才真正停止。
- shutdownNow:將線程池狀態(tài)置為STOP。企圖立即停止,事實(shí)上不一定,跟shutdown()一樣,先停止接收外部提交的任務(wù),忽略隊(duì)列里等待的任務(wù),嘗試將正在跑的任務(wù)interrupt中斷(如果線程未處于sleep、wait、condition、定時(shí)鎖狀態(tài),interrupt無(wú)法中斷當(dāng)前線程),返回未執(zhí)行的任務(wù)列表。
- awaitTermination(long timeOut, TimeUnit unit)當(dāng)前線程阻塞,直到等所有已提交的任務(wù)(包括正在跑的和隊(duì)列中等待的)執(zhí)行完或者等超時(shí)時(shí)間到或者線程被中斷,拋出InterruptedException,然后返回true(shutdown請(qǐng)求后所有任務(wù)執(zhí)行完畢)或false(已超時(shí))。
Executors
Executors是一個(gè)幫助類,提供了創(chuàng)建幾種預(yù)配置線程池實(shí)例的方法:newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool等。
如果查看源碼就會(huì)發(fā)現(xiàn),Executors本質(zhì)上就是實(shí)現(xiàn)了幾類默認(rèn)的ThreadPoolExecutor。而阿里巴巴開(kāi)發(fā)手冊(cè),不建議采用Executors默認(rèn)的,讓使用者直接通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建。
Executors.newSingleThreadExecutor()
創(chuàng)建一個(gè)單線程的線程池。這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來(lái)替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
該類型線程池的結(jié)構(gòu)圖:
newSingleThreadExecutor
該線程池的特點(diǎn):
- 只會(huì)創(chuàng)建一條工作線程處理任務(wù);
- 采用的阻塞隊(duì)列為L(zhǎng)inkedBlockingQueue;
Executors.newFixedThreadPool()
創(chuàng)建固定大小的線程池。每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。
- new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
該類型線程池的結(jié)構(gòu)圖:
newFixedThreadPool
該線程池的特點(diǎn):
- 固定大小;
- corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads;
- keepAliveTime為0,意味著一旦有多余的空閑線程,就會(huì)被立即停止掉;但這里keepAliveTime無(wú)效;
- 阻塞隊(duì)列采用了LinkedBlockingQueue,一個(gè)無(wú)界隊(duì)列;
- 由于阻塞隊(duì)列是一個(gè)無(wú)界隊(duì)列,因此永遠(yuǎn)不可能拒絕任務(wù);
- 由于采用了無(wú)界隊(duì)列,實(shí)際線程數(shù)量將永遠(yuǎn)維持在nThreads,因此maximumPoolSize和keepAliveTime將無(wú)效。
Executors.newCachedThreadPool()
創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過(guò)了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來(lái)處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說(shuō)JVM)能夠創(chuàng)建的最大線程大小。
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
該類型線程池的結(jié)構(gòu)圖:
newCachedThreadPool
- 該線程池的特點(diǎn):
- 可以無(wú)限擴(kuò)大;
- 比較適合處理執(zhí)行時(shí)間比較小的任務(wù);
- corePoolSize為0,maximumPoolSize為無(wú)限大,意味著線程數(shù)量可以無(wú)限大;
- keepAliveTime為60S,意味著線程空閑時(shí)間超過(guò)60s就會(huì)被殺死;
- 采用SynchronousQueue裝等待的任務(wù),這個(gè)阻塞隊(duì)列沒(méi)有存儲(chǔ)空間,這意味著只要有請(qǐng)求到來(lái),就必須要找到一條工作線程處理它,如果當(dāng)前沒(méi)有空閑的線程,那么就會(huì)再創(chuàng)建一條新的線程。
Executors.newScheduledThreadPool()
創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
- new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
該線程池類圖:
newScheduledThreadPool
該線程池的特點(diǎn):
- 接收SchduledFutureTask類型的任務(wù),有兩種提交任務(wù)的方式:scheduledAtFixedRate和scheduledWithFixedDelay。SchduledFutureTask接收的參數(shù):
- time:任務(wù)開(kāi)始的時(shí)間
- sequenceNumber:任務(wù)的序號(hào)
- period:任務(wù)執(zhí)行的時(shí)間間隔
- 采用DelayQueue存儲(chǔ)等待的任務(wù);
- DelayQueue內(nèi)部封裝了一個(gè)PriorityQueue,它會(huì)根據(jù)time的先后時(shí)間排序,若time相同則根據(jù)sequenceNumber排序;
- DelayQueue也是一個(gè)無(wú)界隊(duì)列;
- 工作線程執(zhí)行時(shí),工作線程會(huì)從DelayQueue取已經(jīng)到期的任務(wù)去執(zhí)行;執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayQueue;
Executors.newWorkStealingPool()
JDK8引入,創(chuàng)建持有足夠線程的線程池支持給定的并行度,并通過(guò)使用多個(gè)隊(duì)列減少競(jìng)爭(zhēng)。
- public static ExecutorService newWorkStealingPool() {
- return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }
Executors方法的弊端
1)newFixedThreadPool 和 newSingleThreadExecutor:允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM。2)newCachedThreadPool 和 newScheduledThreadPool:允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM。
合理配置線程池大小
合理配置線程池,需要先分析任務(wù)特性,可以從以下角度來(lái)進(jìn)行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級(jí):高,中和低。
- 任務(wù)的執(zhí)行時(shí)間:長(zhǎng),中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。
另外,還需要查看系統(tǒng)的內(nèi)核數(shù):
- Runtime.getRuntime().availableProcessors());
根據(jù)任務(wù)所需要的CPU和IO資源可以分為:
- CPU密集型任務(wù): 主要是執(zhí)行計(jì)算任務(wù),響應(yīng)時(shí)間很快,CPU一直在運(yùn)行。一般公式:線程數(shù) = CPU核數(shù) + 1。只有在真正的多核CPU上才能得到加速,優(yōu)點(diǎn)是不存在線程切換開(kāi)銷,提高了CPU的利用率并減少了線程切換的效能損耗。
- IO密集型任務(wù):主要是進(jìn)行IO操作,CPU并不是一直在執(zhí)行任務(wù),IO操作(CPU空閑狀態(tài))的時(shí)間較長(zhǎng),應(yīng)配置盡可能多的線程,其中的線程在IO操作時(shí),其他線程可以繼續(xù)利用CPU,從而提高CPU的利用率。一般公式:線程數(shù) = CPU核數(shù) * 2。
使用實(shí)例
任務(wù)實(shí)現(xiàn)類:
- /**
- * 任務(wù)實(shí)現(xiàn)線程
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class MyThread implements Runnable{
- private final Integer number;
- public MyThread(int number){
- this.number = number;
- }
- public Integer getNumber() {
- return number;
- }
- @Override
- public void run() {
- try {
- // 業(yè)務(wù)處理
- TimeUnit.SECONDS.sleep(1);
- System.out.println("Hello! ThreadPoolExecutor - " + getNumber());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
自定義阻塞提交的ThreadLocalExcutor:
- /**
- * 自定義阻塞提交的ThreadPoolExecutor
- * @author sec
- * @version 1.0
- * @date 2021/10/30
- **/
- public class CustomBlockThreadPoolExecutor {
- private ThreadPoolExecutor pool = null;
- /**
- * 線程池初始化方法
- */
- public void init() {
- // 核心線程池大小
- int poolSize = 2;
- // 最大線程池大小
- int maxPoolSize = 4;
- // 線程池中超過(guò)corePoolSize數(shù)目的空閑線程最大存活時(shí)間:30+單位TimeUnit
- long keepAliveTime = 30L;
- // ArrayBlockingQueue<Runnable> 阻塞隊(duì)列容量30
- int arrayBlockingQueueSize = 30;
- pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime,
- TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(),
- new CustomRejectedExecutionHandler());
- }
- /**
- * 關(guān)閉線程池方法
- */
- public void destroy() {
- if (pool != null) {
- pool.shutdownNow();
- }
- }
- public ExecutorService getCustomThreadPoolExecutor() {
- return this.pool;
- }
- /**
- * 自定義線程工廠類,
- * 生成的線程名詞前綴、是否為守護(hù)線程以及優(yōu)先級(jí)等
- */
- private static class CustomThreadFactory implements ThreadFactory {
- private final AtomicInteger count = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
- t.setName(threadName);
- return t;
- }
- }
- /**
- * 自定義拒絕策略對(duì)象
- */
- private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- // 核心改造點(diǎn),將blockingqueue的offer改成put阻塞提交
- try {
- executor.getQueue().put(r);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 當(dāng)提交任務(wù)被拒絕時(shí),進(jìn)入拒絕機(jī)制,實(shí)現(xiàn)拒絕方法,把任務(wù)重新用阻塞提交方法put提交,實(shí)現(xiàn)阻塞提交任務(wù)功能,防止隊(duì)列過(guò)大,OOM
- */
- public static void main(String[] args) {
- CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor();
- // 初始化
- executor.init();
- ExecutorService pool = executor.getCustomThreadPoolExecutor();
- for (int i = 1; i < 51; i++) {
- MyThread myThread = new MyThread(i);
- System.out.println("提交第" + i + "個(gè)任務(wù)");
- pool.execute(myThread);
- }
- pool.shutdown();
- try {
- // 阻塞,超時(shí)時(shí)間到或者線程被中斷
- if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
- // 立即關(guān)閉
- executor.destroy();
- }
- } catch (InterruptedException e) {
- executor.destroy();
- }
- }
- }
小結(jié)
看似簡(jiǎn)單的線程池創(chuàng)建,其中卻蘊(yùn)含著各類知識(shí),融合貫通,根據(jù)具體場(chǎng)景采用具體的參數(shù)進(jìn)行設(shè)置才能夠達(dá)到最優(yōu)的效果。
總結(jié)一下就是:
- 用ThreadPoolExecutor自定義線程池,要看線程的用途。如果任務(wù)量不大,可以用無(wú)界隊(duì)列,如果任務(wù)量非常大,要用有界隊(duì)列,防止OOM;
- 如果任務(wù)量很大,且要求每個(gè)任務(wù)都處理成功,要對(duì)提交的任務(wù)進(jìn)行阻塞提交,重寫拒絕機(jī)制,改為阻塞提交。保證不拋棄一個(gè)任務(wù);
- 最大線程數(shù)一般設(shè)為2N+1最好,N是CPU核數(shù);
- 核心線程數(shù),要根據(jù)任務(wù)是CPU密集型,還是IO密集型。同時(shí),如果任務(wù)是一天跑一次,設(shè)置為0合適,因?yàn)榕芡昃屯5袅?
- 如果要獲取任務(wù)執(zhí)行結(jié)果,用CompletionService,但是注意,獲取任務(wù)的結(jié)果要重新開(kāi)一個(gè)線程獲取,如果在主線程獲取,就要等任務(wù)都提交后才獲取,就會(huì)阻塞大量任務(wù)結(jié)果,隊(duì)列過(guò)大OOM,所以最好異步開(kāi)個(gè)線程獲取結(jié)果。
參考文章:
[1]https://www.jianshu.com/p/94852bd1a283
[2]https://blog.csdn.net/jek123456/article/details/90601351
[3]https://blog.csdn.net/z_s_z2016/article/details/81674893
[4]https://zhuanlan.zhihu.com/p/33264000
[5]https://www.cnblogs.com/semi-sub/p/13021786.html