線程池中線程是如何保活和回收的
面試時經(jīng)常被問的一個問題,線程池中線程是如何保活的?
這個問題對于看過線程池源碼的同學應該已經(jīng)知道答案了,沒有看過源碼的也不要慌,現(xiàn)在我們就一起看一下線程池線程的?;畈呗浴?/p>
一、線程池中在哪執(zhí)行任務
首先進入ThreadPoolExecutor的execute方法。
- 首先檢查當前工作線程數(shù)量,workerCountOf(c) 是否小于核心線程數(shù)量corePoolSize,如果小于就創(chuàng)建一個新線程addWorker()執(zhí)行這個任務。
- 如果線程池在運行且任務加入隊列成功,但是當前工作線程數(shù)量為0,也會創(chuàng)建一個新線程addWorker()。
- 如果任務不能被加入任務隊列(workQueue.offer(command)返回false), 也會創(chuàng)建一個新線程addWorker()。
在execute方法內(nèi)部有三處調(diào)用addWorker()方法的位置,進入到addWorker() 方法中,可以看到有這樣一行代碼new Worker(firstTask) 。
而 Worker類又實現(xiàn)了 Runnable,所以線程池中的線程也就是Worker,而執(zhí)行就在run()方法內(nèi)部,我們只需要找到Worker類中的run方法即可。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
run 方法內(nèi)部調(diào)用了 runWorker,秘密就在runWorker方法內(nèi)部。
所以線程池中線程最終就在runWorker方法中執(zhí)行的。
二、getTask 獲取任務方法
在上面ThreadPoolExecutor的execute方法中,有一步是往阻塞隊列中放入任務(workQueue.offer(command))。
上面我們找到了線程池中線程執(zhí)行任務的地方,那么我們看看是從哪讀取的任務?
runWorker方法中有一行代碼task = getTask(),此處就是從阻塞隊列中獲取任務的代碼,讓我們看一下 getTask() 的內(nèi)部實現(xiàn)。
核心代碼就是
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
根據(jù)timed 的狀態(tài)判斷,當工作線程數(shù)量大于核心線程數(shù)量時調(diào)用poll方法獲取任務,當工作線程數(shù)量小于和核心線程數(shù)量時調(diào)用take方法。
對于阻塞隊列的介紹如下:
- poll 方法如果隊列不為空,返回頭部元素。如果隊列為空會將線程阻塞在此處,阻塞時間是keepAliveTime。當時間到了獲取不到任務時返回null。
- take方法如果隊列不為空返回頭部元素。如果隊列為空會將線程阻塞在此處,直到隊列中有元素可供使用。
對于非核心線程,當線程池中的線程數(shù)量超過核心線程數(shù)量且空閑時間超過keepAliveTime時,非核心線程會被回收。
通過這種方式,線程在沒有任務時就阻塞在隊列上,從而實現(xiàn)?;?。
上面我們知道了非核心線程的?;畈呗裕敲磳τ诤诵木€程又是如何?;畹哪??
三、核心線程如何實現(xiàn)的?;?/h4>
在線程池中核心線程默認是不會被回收的。
不過我們可以通過設置allowCoreThreadTimeOut來實現(xiàn),getTask方法中timed的校驗。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
所以當allowCoreThreadTimeOut 設置為true時,核心線程在沒有任務可以執(zhí)行時會使用take方法進行阻塞,直到獲取到任務位置,而不是因為沒有任務就被銷毀,從而實現(xiàn)的線程保活。
任務執(zhí)行的過程是無法保證不出問題的,不管是核心線程還是非核心線程,當線程中出現(xiàn)異常之后,線程池是如何處理的呢?
四、線程異常之后如何?;?/h4>
繼續(xù)回到runWorker方法中,其中task.run()是真正執(zhí)行任務的地方,當此處發(fā)生異常之后,有try catch。
所以當任務執(zhí)行異常之后,會依次執(zhí)三個finally塊中的代碼。
再看processWorkerExit() 方法之前,我們先看一個參數(shù)completedAbruptly。
completedAbruptly 代表是否是異常退出的,默認是true代表異常退出。
在runWorker方法中,如果線程任務是正常執(zhí)行完成的,會在最后修改該值。
由于我們是異常線程,所以代碼肯定不會走到這,直接走到finally中的processWorkerExit方法。
在processWorkerExit方法中,它會根據(jù)completedAbruptly的值來調(diào)整線程池中的工作線程數(shù)量,從工作線程集合中移除該線程,并根據(jù)線程池的狀態(tài)和工作線程數(shù)量決定是否需要添加新線程。
!completedAbruptly 判斷工作線程是不是異常退出的,如果不是異常退出的計算最小線程數(shù)量。
如果允許核心線程回收allowCoreThreadTimeOut=true,min為0。
如果min為0 且工作隊列不為空! workQueue.isEmpty(),min為1。
如果當前工作線程workerCountOf(c)大于等于這個最小的線程數(shù)量min,直接返回。
如果小于這個最小的工作線程數(shù)量min,調(diào)用addWorker。
此處addWorker 的觸發(fā)條件就是當線程池的狀態(tài)小于STOP 也就是線程池還在運行runStateLessThan(c, STOP)時且不滿足上述不需要添加新線程的判斷。
當上述條件滿足的時候,則調(diào)用addWorker(null,false)添加一個新的工作線程,因為傳入的參數(shù)Runnable為null,所以這個新線程會從任務隊列中繼續(xù)讀取任務來執(zhí)行。
最后總結一下,當線程異常之后,按照正常情況來說線程就直接消失了,但是通過processWorkerExit方法的補救,增加了一個新的線程,保證線程池的運行。
五、總結
線程池中的線程分為核心線程與非核心線程。
核心線程默認不回收,可以通過設置allowCoreThreadTimeOut為true 來回收。
非核心線程在獲取任務為空且空閑時間超過一定時間之后進行回收。
線程池的?;畈呗酝ㄟ^阻塞隊列的阻塞特性實現(xiàn),poll 方法實現(xiàn)可以指定超時時間的阻塞,take 方法實現(xiàn)阻塞直到獲取到任務。
當線程異常之后,通過新增線程的方式實現(xiàn)線程的補救,保證線程池的運行。