背會(huì)了常見(jiàn)的幾個(gè)線程池用法,結(jié)果被問(wèn)翻
背景
這是張小帥失業(yè)之后的第三場(chǎng)面試。
面試官:“實(shí)際開(kāi)發(fā)中用過(guò)多線程吧,那聊聊線程池吧”。
“有CachedThreadPool:可緩存線程池,FixedThreadPool:定長(zhǎng)線程池.......balabala”。小帥暗暗竊喜,還好把這幾種線程池背下來(lái)了,看來(lái)這次可以上岸了。
面試官點(diǎn)點(diǎn)頭,繼續(xù)問(wèn)到“那線程池底層是如何實(shí)現(xiàn)復(fù)用的?”
“額,這個(gè)....”
寒風(fēng)中,那個(gè)男人的背影在暮色中顯得孤寂而凄涼,仿佛與世隔絕,獨(dú)自面對(duì)著無(wú)盡的寂寞......
概要
如果問(wèn)到線程池的話,不好好剖析過(guò)底層代碼,恐怕真的會(huì)像小帥那樣被問(wèn)翻吧。
那么在此我們就來(lái)好好剖析一下線程池的底層吧。我們大概從如下幾個(gè)方面著手:
概覽圖
什么是線程池
說(shuō)到線程池,其實(shí)我們要先聊到池化技術(shù)。
池化技術(shù):我們將資源或者任務(wù)放入池子,使用時(shí)從池中取,用完之后交給池子管理。通過(guò)優(yōu)化資源分配的效率,達(dá)到性能的調(diào)優(yōu)。
池化技術(shù)優(yōu)點(diǎn):
- 資源被重復(fù)使用,減少了資源在分配銷毀過(guò)程中的系統(tǒng)的調(diào)度消耗。比如,在IO密集型的服務(wù)器上,并發(fā)處理過(guò)程中的子線程或子進(jìn)程的創(chuàng)建和銷毀過(guò)程,帶來(lái)的系統(tǒng)開(kāi)銷將是難以接受的。所以在業(yè)務(wù)實(shí)現(xiàn)上,通常把一些資源預(yù)先分配好,如線程池,數(shù)據(jù)庫(kù)連接池,Redis連接池,HTTP連接池等,來(lái)減少系統(tǒng)消耗,提升系統(tǒng)性能。
- 池化技術(shù)分配資源,會(huì)集中分配,這樣有效避免了碎片化的問(wèn)題。
- 可以對(duì)資源的整體使用做限制,相關(guān)資源預(yù)分配且只在預(yù)分配后生成,后續(xù)不再動(dòng)態(tài)添加,從而限制了整個(gè)系統(tǒng)對(duì)資源的使用上限。
所以我們說(shuō)線程池是提升線程可重復(fù)利用率、可控性的池化技術(shù)的一種。
線程池的使用
1.多線程發(fā)送郵件案例
現(xiàn)在我們有這樣一個(gè)場(chǎng)景,上層有業(yè)務(wù)系統(tǒng)批量調(diào)用底層進(jìn)行發(fā)送郵件,廢話不多,直接上代碼:
demo
最終運(yùn)行輸出結(jié)果為:
由線程:pool-1-thread-1 發(fā)送第:0封郵件
由線程:pool-1-thread-2 發(fā)送第:1封郵件
由線程:pool-1-thread-1 發(fā)送第:2封郵件
由線程:pool-1-thread-2 發(fā)送第:3封郵件
由線程:pool-1-thread-1 發(fā)送第:4封郵件
由線程:pool-1-thread-1 發(fā)送第:6封郵件
由線程:pool-1-thread-2 發(fā)送第:5封郵件
由線程:pool-1-thread-1 發(fā)送第:7封郵件
由線程:pool-1-thread-2 發(fā)送第:8封郵件
由線程:pool-1-thread-1 發(fā)送第:9封郵件
上面的例子中從結(jié)果來(lái)看是10封郵件分別由兩條線程發(fā)送出去了,上圖可見(jiàn),我們給ThreadPoolExecutor這個(gè)執(zhí)行器分別指定了七個(gè)參數(shù)。那么參數(shù)的含義到底是什么呢?接下來(lái)咱們層層抽絲剝繭。
2.構(gòu)造函數(shù)說(shuō)明
大家估計(jì)會(huì)有疑問(wèn),線程池的種類那么多,案例中為什么要用TheadPoolExecutor類呢,其他的種類是由TheadPoolExecutor通過(guò)不同的入?yún)⒍x出來(lái)的,所以我們直接拿ThreadPoolExecutor來(lái)看。
我們先來(lái)看一下ThreadPoolExecutor的繼承關(guān)系,有個(gè)宏觀印象:
宏觀繼承
我們?cè)賮?lái)看一下ThreadPoolExecutor的構(gòu)造方法:
構(gòu)造方法
下面我們來(lái)解釋一下幾個(gè)參數(shù)的含義:
- corePoolSize:核心線程數(shù)。
- maximumPoolSize:最大線程數(shù)。
- keepAliveTime:線程池中線程的最大閑置生命周期。
- unit:針對(duì)keepAliveTime的時(shí)間單位。
- workQueue:阻塞隊(duì)列。
- threadFactory:創(chuàng)建線程的線程工廠。
- handler:拒絕策略。
大家對(duì)上述的含義初步有個(gè)概念。
3.工作流程概述
看了上面的構(gòu)造函數(shù)字段大家估計(jì)也還是優(yōu)點(diǎn)懵的,尤其是從來(lái)沒(méi)有接觸過(guò)商品池的小伙伴。所以老貓又?jǐn)]了一張商品池的大概的工作流程圖,方便大家把這些概念串起來(lái)。
大概流程
上圖中老貓標(biāo)記了四條線,簡(jiǎn)單介紹一下(當(dāng)然上圖若有問(wèn)題,也希望大家能夠指出來(lái))。
- 當(dāng)發(fā)起任務(wù)時(shí)候,會(huì)計(jì)算線程池中存在的線程數(shù)量與核心線程數(shù)量(corePoolSize)進(jìn)行比較,如果小于,則在線程池中創(chuàng)建線程,否則,進(jìn)行下一步判斷。
- 如果不滿足條件1,則會(huì)將任務(wù)添加到阻塞隊(duì)列中。等待線程池中的線程空閑下來(lái)后,獲取隊(duì)列中的任務(wù)進(jìn)行執(zhí)行。
- 但是條件2中如果阻塞隊(duì)列滿了之后,此時(shí)又會(huì)重新獲取當(dāng)前線程的數(shù)量和最大線程數(shù)(maximumPoolSize)進(jìn)行比較,如果發(fā)現(xiàn)小于最大線程數(shù),那么繼續(xù)添加到線程池中即可。
- 如果都不滿足上述條件,那么此時(shí)會(huì)放到拒絕策略中。
4.execute核心流程剖析
接下來(lái)我們來(lái)看一下執(zhí)行theadPoolExecutor.execute()的時(shí)候到底發(fā)生了什么。先來(lái)看一下源碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
(1) ctl變量
進(jìn)入執(zhí)行源碼之后我們首先看到的是ctl,只知道ctl中拿到了一個(gè)int數(shù)據(jù)至于這個(gè)數(shù)值有什么用,目前不知道,接著看涉及的相關(guān)代碼,老貓將相關(guān)的代碼解讀放到源碼中進(jìn)行注釋。
//通過(guò)ctl獲取線程池的狀態(tài)以及包含的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // COUNT_BITS = 32-3 = 29
/**001左移29位
* 00100000 00000000 00000000 00000000
* 操作減1
* 00011111 11111111 11111111 11111111(表示初始化的時(shí)候線程情況,1表示均有空閑線程)
* 換成十進(jìn)制:COUNT_MASK = 536870911
*/
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
/**
* 運(yùn)行中狀態(tài)
* 1的原碼
* 00000000 00000000 00000000 00000001
* 取反+1
* 11111111 11111111 11111111 11111111
* 左移29位
* 11100000 00000000 00000000 00000000
**/
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //運(yùn)行中狀態(tài) 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; //終止?fàn)顟B(tài) 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; //停止 00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
//取高3位表示獲取運(yùn)行狀態(tài)
private static int runStateOf(int c) { return c & ~COUNT_MASK; } //~COUNT_MASK表示取反碼:11100000 00000000 00000000 00000000
//取出低位29位的值,當(dāng)前活躍的線程數(shù)
private static int workerCountOf(int c) { return c & COUNT_MASK; } //COUNT_MASK:00011111 11111111 11111111 11111111
//計(jì)算ctl的值,ctl=[3位]線程池狀態(tài) + [29位]線程池中線程數(shù)量。
private static int ctlOf(int rs, int wc) { return rs | wc; } //進(jìn)行或運(yùn)算
上面我們針對(duì)各個(gè)狀態(tài)以及那么多的二進(jìn)制表示符有點(diǎn)懵,當(dāng)然如果不會(huì)二進(jìn)制運(yùn)算的,大家可以先自己去了解一下二進(jìn)制的運(yùn)算邏輯。通過(guò)源碼中的英文,我們知道CTL的值其實(shí)分成兩部分組成,高三位是狀態(tài),其余均為當(dāng)前線程數(shù)。如下的圖:
線程池狀態(tài)
上面的圖的描述解釋,其實(shí)也都是英文注釋版的翻譯,我們?cè)賮?lái)看一下有了這些狀態(tài),這些狀態(tài)是怎么流轉(zhuǎn)的,英文注釋是這樣的:
/*** RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
* /
上面的描述不太直觀,老貓將流程串了起來(lái),得到了下面的狀態(tài)機(jī)流轉(zhuǎn)圖。如下圖:
狀態(tài)機(jī)流程
寫(xiě)到這里,其實(shí)ctl已經(jīng)很清楚了,ctl說(shuō)白了就是狀態(tài)位和活躍線程數(shù)的表示方式。通過(guò)ctl咱們可以知道當(dāng)前是什么狀態(tài)以及活躍線程數(shù)量是多少 (設(shè)計(jì)很巧妙,如果此處還有問(wèn)題,歡迎大家私聊老貓)。
(3) 線程池中的線程數(shù)小于核心線程數(shù)
讀完ctl之后,我們來(lái)看一下接下來(lái)的代碼。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; //添加新的線程
c = ctl.get(); //重新獲取當(dāng)前的狀態(tài)以及線程數(shù)量
}
繼上述的workerCountOf,我們知道這個(gè)方法可以獲取當(dāng)前活躍的線程數(shù)。如果當(dāng)前線程數(shù)小于配置的核心線程數(shù),則會(huì)調(diào)用addWorker進(jìn)行添加新的線程。如果添加失敗了,則重新獲取ctl的值。
(4) 任務(wù)添加到隊(duì)列的相關(guān)邏輯
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次check一下,當(dāng)前線程池是否是運(yùn)行狀態(tài),如果不是運(yùn)行時(shí)狀態(tài),則把剛剛添加到workQueue中的command移除掉
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
上述我們知道當(dāng)添加線程池失敗的時(shí)候,我們會(huì)重新獲取ctl的值。此時(shí)咱們的第一步就很清楚了:
- 通過(guò)isRunning方法來(lái)判斷線程池狀態(tài)是不是運(yùn)行中狀態(tài),如果是,則將command任務(wù)放到阻塞隊(duì)列workQueue中。
- 再次check一下,當(dāng)前線程池是否是運(yùn)行狀態(tài),如果不是運(yùn)行時(shí)狀態(tài),則把剛剛添加到workQueue中的command移除掉,并調(diào)用拒絕策略。否則,判斷如果當(dāng)前活動(dòng)的線程數(shù)如果為0,則表明只去創(chuàng)建線程,而此處,并不執(zhí)行任務(wù)(因?yàn)?,任?wù)已經(jīng)在上面的offer方法中被添加到了workQueue中了,等待線程池中的線程去消費(fèi)隊(duì)列中的任務(wù))
(5) 線程池中的線程數(shù)量小于最大線程數(shù)代碼邏輯以及拒絕策略的代碼邏輯
接下來(lái),我們看一下最后的一個(gè)步驟
/**
* 進(jìn)入第三步驟前提:
* 1.線程池不是運(yùn)行狀態(tài),所以isRunning(c)為false
* 2.workCount >= corePoolSize的時(shí)候于此同時(shí)并且添加到queue失敗的時(shí)候執(zhí)行
*/
else if (!addWorker(command, false))
reject(command);
}
由于調(diào)用addWorker的第二個(gè)參數(shù)是false,則表示對(duì)比的是最大線程數(shù),那么如果往線程池中創(chuàng)建線程依然失敗,即addWorker返回false,那么則進(jìn)入if語(yǔ)句中,直接調(diào)用reject方法調(diào)用拒絕策略了。
寫(xiě)到這里大家估計(jì)會(huì)對(duì)這個(gè)第二個(gè)參數(shù)是false為什么比較的是最大線程數(shù)有疑問(wèn)。其實(shí)這個(gè)是addWorker中的方法。我們可以大概看一下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
我們很明顯地看到當(dāng)core為flase的時(shí)候咱們獲取的是maximumPoolSize,也就是最大線程數(shù)。
寫(xiě)到這里,其實(shí)咱們的核心主流程大概就已經(jīng)結(jié)束了。這里其實(shí)老貓也只是寫(xiě)了一個(gè)算是比較入門(mén)的開(kāi)頭。當(dāng)然我們還可以再深入去理addWorker的源碼。這個(gè)其實(shí)就交給大家去細(xì)看了,篇幅過(guò)長(zhǎng),相信大家也會(huì)失去閱讀的興趣了,感興趣的可以自己研究一下,如果說(shuō)還是有問(wèn)題的,可以找老貓一起探討,老貓的公眾號(hào):"程序員老貓"。老貓覺(jué)得在上述的源碼中比較重要的其實(shí)就是ctl值的流轉(zhuǎn)順序以及計(jì)算方式,讀懂這個(gè)的話,后面一切的源碼只要順藤摸瓜即可理解。
5.Executors線程池模板
我們上述主要和大家分享了比較核心的theadPoolExecutor。除此之外,線程池Executors里面包含了很多其他的線程池模板。當(dāng)然這也是小貓直接面試的時(shí)候說(shuō)的那些,其實(shí)小貓也就僅僅只是背了線程池模板而已,并不知曉其工作原理。如下幾種:
- newCachedThreadPool 創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程,若無(wú)可回收,則新建線程。
- newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
- newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
- newSingleThreadScheduleExecutor 創(chuàng)建一個(gè)單線程執(zhí)行程序,它可安排在給定延遲后運(yùn)行命令或者定期地執(zhí)行。(注意,如果因?yàn)樵陉P(guān)閉前的執(zhí)行期間出現(xiàn)失敗而終止了此單個(gè)線程,那么如果需要,一個(gè)新線程會(huì)代替它執(zhí)行后續(xù)的任務(wù))。可保證順序地執(zhí)行各個(gè)任務(wù),并且在任意給定的時(shí)間不會(huì)有多個(gè)線程是活動(dòng)的。與其他等效的 newScheduledThreadPool(1) 不同,可保證無(wú)需重新配置此方法所返回的執(zhí)行程序即可使用其他的線程。
- newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
6.多樣化的blockingQueue
- PriorityBlockingQueue 它是一個(gè)無(wú)界的并發(fā)隊(duì)列。無(wú)法向這個(gè)隊(duì)列中插入null值。所有插入到這個(gè)隊(duì)列中的元素必須實(shí)現(xiàn)Comparable接口。因此該隊(duì)列中元素的排序就取決于你自己的Comparable實(shí)現(xiàn)。
- SynchronousQueue 它是一個(gè)特殊的隊(duì)列,它的內(nèi)部同時(shí)只能夠容納單個(gè)元素。如果該隊(duì)列已有一個(gè)元素的話,那么試圖向隊(duì)列中插入一個(gè)新元素的線程將會(huì)阻塞,直到另一個(gè)新線程將該元素從隊(duì)列中抽走。同樣的,如果隊(duì)列為空,試圖向隊(duì)列中抽取一個(gè)元素的線程將會(huì)被阻塞,直到另一個(gè)線程向隊(duì)列中插入了一條新的元素。因此,它其實(shí)不太像是一個(gè)隊(duì)列,而更像是一個(gè)匯合點(diǎn)。
- ArrayBlockingQueue 它是一個(gè)有界的阻塞隊(duì)列,其內(nèi)部實(shí)現(xiàn)是將對(duì)象放到一個(gè)數(shù)組里。一但初始化,大小就無(wú)法修改
- LinkedBlockingQueue 它內(nèi)部以一個(gè)鏈?zhǔn)浇Y(jié)構(gòu)(鏈接節(jié)點(diǎn))對(duì)其元素進(jìn)行存儲(chǔ)??梢灾付ㄔ厣舷?,否則,上限則為Integer.MAX_VALUE。
- DelayQueue 它對(duì)元素進(jìn)行持有直到一個(gè)特定的延遲到期。注意:進(jìn)入其中的元素必須實(shí)現(xiàn)Delayed接口。
上述針對(duì)這些羅列了一下,其實(shí)很多官網(wǎng)上也有相關(guān)的介紹,當(dāng)然感興趣的小伙伴也可以再去刨一刨里面的源碼實(shí)現(xiàn)。
7.拒絕策略
- AbortPolicy 丟棄任務(wù)并拋出RejectedExecutionException異常。
- DiscardPolicy 丟棄任務(wù),但是不拋出異常。
- DiscardOldestPolicy 丟棄隊(duì)列中最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)。
- CallerRunsPolicy 由調(diào)用線程處理該任務(wù)。
總結(jié)
很多小伙伴在用一些線程池或者第三方中間件的時(shí)候可能只停留在如何使用上,一旦出了問(wèn)題或者被人深入問(wèn)到其實(shí)現(xiàn)原理的時(shí)候就比較頭大。所以在日常開(kāi)發(fā)的過(guò)程中,我們不僅僅需要知道如何去用,其實(shí)更應(yīng)該知道底層的原理是什么。這樣才能長(zhǎng)立于不敗之地。