小米面試:什么是線程池?工作原理是什么?線程池可以動(dòng)態(tài)修改嗎?
大家好,我是碼哥,《Redis 高手心法》暢銷(xiāo)書(shū)作者。
有讀者分享小米 Java 后端面試,其中有一個(gè)問(wèn)題,當(dāng)時(shí)沒(méi)有回答好:什么是線程池、工作原理是什么、線程池可以動(dòng)態(tài)修改嗎?
回答這個(gè)問(wèn)題之前,首先我們來(lái)了解下什么是線程池,它的工作原理是什么。
什么是線程池
線程池(Thread Pool)是一種基于池化思想管理線程的工具,它維護(hù)多個(gè)線程。在線程池中,總有幾個(gè)活躍線程。當(dāng)需要使用線程來(lái)執(zhí)行任務(wù)時(shí),可以從池子中隨便拿一個(gè)空閑線程來(lái)用,當(dāng)完成工作時(shí),該線程并不會(huì)死亡,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)。
這種做法,一方面避免了處理任務(wù)時(shí)創(chuàng)建銷(xiāo)毀線程開(kāi)銷(xiāo)的代價(jià),另一方面避免了線程數(shù)量膨脹導(dǎo)致的過(guò)分調(diào)度問(wèn)題,保證了對(duì)內(nèi)核的充分利用。
線程池狀態(tài)
然后,我們來(lái)看下線程池有哪些狀態(tài)呢?
線程池有五種狀態(tài):這五種狀態(tài)并不能任意轉(zhuǎn)換,只會(huì)有以下幾種轉(zhuǎn)換情況:線程池的五種狀態(tài)是如何流轉(zhuǎn)的?
- RUNNING:會(huì)接收新任務(wù)并且會(huì)處理隊(duì)列中的任務(wù)
- SHUTDOWN:不會(huì)接收新任務(wù)并且會(huì)處理隊(duì)列中的任務(wù)
- STOP:不會(huì)接收新任務(wù)并且不會(huì)處理隊(duì)列中的任務(wù),并且會(huì)中斷在處理的任務(wù)(注意:一個(gè)任務(wù)能不能被中斷得看任務(wù)本身)
- TIDYING:所有任務(wù)都終止了,線程池中也沒(méi)有線程了,這樣線程池的狀態(tài)就會(huì)轉(zhuǎn)為 TIDYING,一旦達(dá)到此狀態(tài),就會(huì)調(diào)用線程池的 terminated()
- TERMINATED:terminated()執(zhí)行完之后就會(huì)轉(zhuǎn)變?yōu)?TERMINATED
線程池工作原理
如何自定義一個(gè)線程池?
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
// 核心線程池大小,表示線程池常駐線程數(shù)量
30,
// 最大線程數(shù),表示線程池最多創(chuàng)建的線程數(shù)量
100,
// ?;顣r(shí)間,表示一個(gè)非核心線程多久沒(méi)有使用,會(huì)被回收
10,
TimeUnit.MINUTES,
// 阻塞隊(duì)列,表示隊(duì)列最多緩存多少任務(wù),如果隊(duì)列滿了,將觸發(fā) RejectedExecutionHandler
new ArrayBlockingQueue<>(1000),
// 線程工廠,創(chuàng)建線程時(shí)候用的,可以給線程命名等
new NamedThreadFactory("cust-task")
);
// 拒絕策略,當(dāng)阻塞隊(duì)列滿了之后,會(huì)觸發(fā)這里的handler
// 默認(rèn)是丟棄新任務(wù)
executor.setRejectedExecutionHandler((r, executor1) -> {
log.warn("thread pool is full");
});
}
線程池執(zhí)行流程圖
- 首先檢測(cè)線程池運(yùn)行狀態(tài),如果不是 RUNNING,則直接拒絕,線程池要保證在 RUNNING 的狀態(tài)下執(zhí)行任務(wù)。
- 如果當(dāng)前線程數(shù)未超過(guò)核心線程數(shù),則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)。
- 如果當(dāng)前線程數(shù)超過(guò)核心線程數(shù),且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中。
- 如果當(dāng)前線程數(shù)超過(guò)核心線程數(shù)且 線程池內(nèi)的阻塞隊(duì)列已滿,且未超過(guò)最大線程數(shù),則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)。
- 如果已超過(guò)最大線程數(shù),并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來(lái)處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
注意:提交一個(gè) Runnable 時(shí),不管當(dāng)前線程池中的線程是否空閑,只要數(shù)量小于核心線程數(shù)就會(huì)創(chuàng)建新線程。
線程池的拒絕策略
ThreadPoolExecutor 內(nèi)部有實(shí)現(xiàn) 4 個(gè)拒絕策略:
- CallerRunsPolicy,由調(diào)用 execute 方法提交任務(wù)的線程來(lái)執(zhí)行這個(gè)任務(wù)。
- AbortPolicy,拋出異常 RejectedExecutionException 拒絕提交任務(wù)。
- DiscardPolicy,直接拋棄任務(wù),不做任何處理。
- DiscardOldestPolicy,去除任務(wù)隊(duì)列中的第一個(gè)任務(wù)(最舊的),重新提。
如何監(jiān)控線程池?
好了,言歸正傳,再回歸到這個(gè)題目本身,在修改線程池之前,我們要如何監(jiān)控線程池的信息呢?
比如線程池的執(zhí)行任務(wù)前后總時(shí)間,當(dāng)前任務(wù)數(shù)等信息。
- 統(tǒng)計(jì)任務(wù)執(zhí)行時(shí)間可以通過(guò)實(shí)現(xiàn) beforeExecute 和 afterExecute 方法,計(jì)算出任務(wù)總耗時(shí)。
- 統(tǒng)計(jì)線程池的任務(wù)數(shù),線程數(shù)等信息,可定時(shí)上報(bào)到 kafka,展示到可視化的界面上比如 Grafana。
監(jiān)控核心代碼
@Slf4j
public class ThreadPoolMonitor {
private final ThreadPoolExecutor customThreadPool;
private final String poolName;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public ThreadPoolMonitor(ThreadPoolExecutor customThreadPool, String poolName) {
this.customThreadPool = customThreadPool;
this.poolName = poolName;
}
public void startMonitoring(long period, TimeUnit unit) {
scheduler.scheduleAtFixedRate(this::monitor, 0, period, unit);
}
private void monitor() {
//核心線程數(shù)
int corePoolSize = customThreadPool.getCorePoolSize();
//最大線程數(shù)
int maximumPoolSize = customThreadPool.getMaximumPoolSize();
//活躍線程數(shù)
int activeCount = customThreadPool.getActiveCount();
//隊(duì)列任務(wù)數(shù)
int queueSize = customThreadPool.getQueue().size();
//已執(zhí)行完成任務(wù)數(shù)
long completedTaskCount = customThreadPool.getCompletedTaskCount();
//隊(duì)列任務(wù)數(shù)峰值
int largestPoolSize = customThreadPool.getLargestPoolSize();
//上報(bào)監(jiān)控?cái)?shù)據(jù)
sendToKafka(corePoolSize,maximumPoolSize, activeCount, queueSize, completedTaskCount, largestPoolSize);
}
private void sendToKafka(int corePoolSize,int maximumPoolSize, int activeCount, int queueSize, long completedTaskCount, int largestPoolSize) {
// 自定義實(shí)現(xiàn)發(fā)送kafka邏輯或上報(bào)到prometheus邏輯
}
}
如何動(dòng)態(tài)調(diào)整線程池?
一般我們?cè)谠O(shè)置線程池的線程數(shù)時(shí),會(huì)參考實(shí)際業(yè)務(wù)場(chǎng)景。比較通用的公式是
- IO 密集型場(chǎng)景:線程數(shù)=CPU 核心數(shù)*2+1
- CPU 密集型場(chǎng)景線程數(shù)=CPU 核心數(shù)+1
但這只是比較簡(jiǎn)單粗暴的計(jì)算方式,在實(shí)際使用過(guò)程中,我們還是不可避免的需要調(diào)整線程池的一些參數(shù),以達(dá)到最佳性能。
那么我們通過(guò)會(huì)比較關(guān)注線程池以下的幾個(gè)參數(shù)
線程池參數(shù) | 說(shuō)明 |
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
queueCapacity | 等待隊(duì)列大小 |
keepAliveTime | 空閑時(shí)間 |
- corePoolSize、maximumPoolSize 和 keepAliveTime 可以通過(guò)調(diào)用 setCorePoolSize、setMaximumPoolSize、setKeepAliveTime 方法修改。
- queueCapacity 雖然不能直接修改,我們可以通過(guò)實(shí)現(xiàn)自定義一個(gè)阻塞隊(duì)列的方式去實(shí)現(xiàn) setQueueCapacity 方法來(lái)修改隊(duì)列大小的屬性。
最后可以通過(guò) Apollo、Nacos 配置中心實(shí)現(xiàn)動(dòng)態(tài)監(jiān)聽(tīng)的方法,達(dá)到實(shí)時(shí)更新線程池的效果。
擴(kuò)展 1:線程池核心線程數(shù)會(huì)被銷(xiāo)毀嗎?
擴(kuò)展 2:線程發(fā)生異常,會(huì)被移出線程池嗎?