深入線程池的問題連環(huán)炮
這一篇是看了這一篇文章之后用于個人的學習記錄,加入了一些個人的理解,其中一些圖片也是來源于這篇文章https://mp.weixin.qq.com/s/NDOx94yY06OnHjrYq2lVYw
1、為什么會有線程池
JVM中的一個線程即對應一個操作系統(tǒng)的線程,也就是JVM的線程是由操作系統(tǒng)創(chuàng)建而來,創(chuàng)建線程和銷毀線程這些都需要操作系統(tǒng)來分別賦予資源和釋放資源等
也就意味著創(chuàng)建線程變成了一個比較重的操作
我們可以利用多線程去進行不同的工作,更高效的利用CPU資源,但是這并不意味著線程數(shù)量越多越好
我們的時代已經由原來的單核時代變成現(xiàn)在的多核時代了,這個核指的就是CPU,在原來的單核時代,如果一個線程一直是運算的邏輯過程,也就不涉及到線程的切換,因為這個線程一直在占用CPU,也就是屬于計算密集型
但是如果這個線程屬于IO密集型,也就是這個線程很多的時間都是在等待IO操作和處理IO操作,這樣就浪費了CPU這個大腦的處理能力了
于是就有了多線程,一個線程等待IO操作,另一個線程可以頂上,充分利用了CPU的資源
隨著多核時代的到來,對于這個CPU高效利用也就變得更加迫切,CPU的核心越來越多,能同時運行的線程數(shù)越來越多了,也就意味著此時的多線程并不只是去提高單核的處理能力,更是為了充分利用這個多核的大腦
但 CPU 的核心數(shù)有限,同時能運行的線程數(shù)有限,所以需要根據(jù)調度算法切換執(zhí)行的線程,而線程的切換需要開銷,比如替換寄存器的內容、高速緩存的失效等等。
如果線程數(shù)太多,切換的頻率就變高,可能使得多線程帶來的好處抵不過線程切換帶來的開銷,得不償失。
因此線程的數(shù)量需要得以控制
2、什么是線程池
線程的數(shù)量太少無法充分利用CPU,線程數(shù)太多的話會導致頻繁切換線程,上下文切換消耗資源,我們需要根據(jù)系統(tǒng)資源和業(yè)務性能來決定線程數(shù)量
而線程的創(chuàng)建又是屬于一個比較重的操作,所以我們想到的就是緩存一批線程,這種思想大家都明白應該,就像是數(shù)據(jù)庫某張表需要經常查詢,造成DB壓力過大,我們就先把經常訪問訪問的數(shù)據(jù)放入到緩存中,用于緩解對于DB的訪問壓力
這個也是類似的道理,每次去新建和銷毀線程比較重,我們就可以通過緩存這些線程來減輕不必要的消耗
線程的數(shù)量我們需要根據(jù)硬件的資源和線程要執(zhí)行的任務這些等綜合來決定
高并發(fā)、任務執(zhí)行時間短的業(yè)務,線程池線程數(shù)可以設置為CPU核數(shù)+1,減少線程上下文的切換
并發(fā)不高、任務執(zhí)行時間長的業(yè)務要分情況來討論
假如是業(yè)務時間長集中在IO操作上,也就是IO密集型的任務,因為IO操作并不占用CPU,所以不要讓所有的CPU閑下來,可以加大線程池中的線程數(shù)目,讓CPU處理更多的業(yè)務
假如是業(yè)務時間長集中在計算操作上,也就是計算密集型任務,這個就沒辦法了,線程數(shù)設置為CPU核數(shù)+1,線程池中的線程數(shù)設置得少一些,減少線程上下文的切換
并發(fā)高、業(yè)務執(zhí)行時間長,解決這種類型任務的關鍵不在于線程池而在于整體架構的設計,看看這些業(yè)務里面某些數(shù)據(jù)是否能做緩存是第一步,增加服務器是第二步,至于線程池的設置,參考上面的設置即可。最后,業(yè)務執(zhí)行時間長的問題,也可能需要分析一下,看看能不能使用中間件對任務進行拆分和解耦。
大家應該都聽過對象池、連接池這些,池化的技術就是通過在池子里取出資源,然是使用完再放回到池子里,而線程池這一點稍微不太一樣,這里線程池相對來說更黑盒一些
不是我們從線程池中取線程使用,而是直接往線程池里扔任務,然后線程池幫我們去執(zhí)行
3、實現(xiàn)線程池
線程池內部也是一個典型的生產者-消費者模型
線程池內部有一個存放任務列表的隊列,而內部會不斷的有線程去隊列中取任務來執(zhí)行,用來消費
來看一個簡易版的線程池實現(xiàn),這段代碼同樣來源于上面的博文
首先線程池內需要定義兩個成員變量,分別是阻塞隊列和線程列表,然后自定義線程使它的任務就是不斷的從阻塞隊列中拿任務然后執(zhí)行。
- @Slf4j
- public class YesThreadPool {
- BlockingQueue<Runnable> taskQueue; //存放任務的阻塞隊列
- List<YesThread> threads; //線程列表
- YesThreadPool(BlockingQueue<Runnable> taskQueue, int threadSize) {
- this.taskQueue = taskQueue;
- threads = new ArrayList<>(threadSize);
- // 初始化線程,并定義名稱
- IntStream.rangeClosed(1, threadSize).forEach((i)-> {
- YesThread thread = new YesThread("yes-task-thread-" + i);
- thread.start();
- threads.add(thread);
- });
- }
- //提交任務只是往任務隊列里面塞任務
- public void execute(Runnable task) throws InterruptedException {
- taskQueue.put(task);
- }
- class YesThread extends Thread { //自定義一個線程
- public YesThread(String name) {
- super(name);
- }
- @Override
- public void run() {
- while (true) { //死循環(huán)
- Runnable task = null;
- try {
- task = taskQueue.take(); //不斷從任務隊列獲取任務
- } catch (InterruptedException e) {
- logger.error("記錄點東西.....", e);
- }
- task.run(); //執(zhí)行
- }
- }
- }
- }
當然,這只是個最簡易版的,也有很多可以優(yōu)化的點
4、線程池核心參數(shù)
第1個參數(shù):設置核心線程數(shù)。默認情況下核心線程會一直存活
第2個參數(shù):設置最大線程數(shù)。決定線程池最多可以創(chuàng)建的多少線程
第3個參數(shù)和第4個參數(shù):用來設置線程空閑時間,和空閑時間的單位,當線程閑置超過空閑時間就會被銷毀??梢酝ㄟ^AllowCoreThreadTimeOut方法來允許核心線程被回收
第5個參數(shù):設置緩沖隊列,圖中左下方的三個隊列是設置線程池時常使用的緩沖隊列
其中Array Blocking Queue是一個有界隊列,就是指隊列有最大容量限制。Linked Blocking Queue是無界隊列,就是隊列不限制容量。最后一個是Synchronous Queue,是一個同步隊列,內部沒有緩沖區(qū)
第6個參數(shù):設置線程池工廠方法,線程工廠用來創(chuàng)建新線程,可以用來對線程的一些屬性進行定制,例如線程的Group、線程名、優(yōu)先級等。一般使用默認工廠類即可
第7個參數(shù):設置線程池滿時的拒絕策略
ThreadPoolExecutor默認有四個拒絕策略:
- ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException,這個是默認的拒絕策略
- ThreadPoolExecutor.CallerRunsPolicy() 直接在提交失敗時,由提交任務的線程直接執(zhí)行提交的任務
- ThreadPoolExecutor.DiscardPolicy() 直接丟棄后來的任務
- ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊列中最早提交的任務
5、線程池原理
我們向線程提交任務時可以使用Execute和Submit,區(qū)別就是Submit可以返回一個Future對象,通過Future對象可以了解任務執(zhí)行情況,可以取消任務的執(zhí)行,還可獲取執(zhí)行結果或執(zhí)行異常。Submit最終也是通過Execute執(zhí)行的
線程池提交任務時的執(zhí)行順序如下:
向線程池提交任務時,會首先判斷線程池中的線程數(shù)是否大于設置的核心線程數(shù),如果不大于,就創(chuàng)建一個核心線程來執(zhí)行任務。
如果大于核心線程數(shù),就會判斷緩沖隊列是否滿了,如果沒有滿,則放入隊列,等待線程空閑時執(zhí)行任務。
如果隊列已經滿了,則判斷是否達到了線程池設置的最大線程數(shù),如果沒有達到,就創(chuàng)建新線程來執(zhí)行任務。
如果已經達到了最大線程數(shù),則執(zhí)行指定的拒絕策略。這里需要注意隊列的判斷與最大線程數(shù)判斷的順序,不要搞反
線程池中的線程并不是一開始就將活躍線程直接拉滿的,而是隨著用的數(shù)量的增加,才會逐步增加線程的,這是一種懶加載思想
但是這里有一個靈魂問題,沒研究的小伙伴肯定是不知道的
6、當線程數(shù)小于活躍線程數(shù)的時候,并且線程數(shù)都處于空閑狀態(tài),現(xiàn)在提交一個任務,是新起一個線程還是用之前的線程來執(zhí)行該任務?
李老是這樣說的:
- If fewer than corePoolSize threads are running, try to start
- a new thread with the given command as its first task.
也就是無論其余線程是否空閑,只要此時線程數(shù)量小于核心線程數(shù)量,就會通過啟動一個線程來執(zhí)行該任務
線程池是懶加載的,但是這里又顯得很勤快
也就是線程池是想要快速擁有核心線程數(shù)量的線程,這個作為線程池的中堅力量
而最大線程數(shù)其實是為了應付突發(fā)狀況。
舉個裝修的例子,正常情況下施工隊只要 5 個人去干活,這 5 人其實就是核心線程,但是由于工頭接的活太多了,導致 5 個人在約定工期內干不完,所以工頭又去找了 2 個人來一起干,所以 5 是核心線程數(shù),7 是最大線程數(shù)。
平時就是 5 個人干活,特別忙的時候就找 7 個,等閑下來就會把多余的 2 個辭了
7、看到這里你可能會覺得核心線程在線程池里面會有特殊標記?
并沒有,不論是核心還是非核心線程,在線程池里面都是一視同仁,當淘汰的時候不會管是哪些線程,反正留下核心線程數(shù)個線程即可
8、你是怎么理解 KeepAliveTime 的?
線程池的重點是保留核心數(shù)量的線程,但是會預留一些線程來用于突發(fā)情況,當突發(fā)情況過去之后,還是只想保留核心線程,所以這個時候就通過這個時間來控制
當線程數(shù)量大于核心線程數(shù)量的時候,并且空閑時間超過KeepAliveTime的時候,就回收線程,直到線程數(shù)量和核心數(shù)量持平為止
看了上面的線程池的邏輯,不知道大家有沒有產生一個疑問
為什么要把任務先放在任務隊列里面,而不是把線程先拉滿到最大線程數(shù)?
這里我先說下我的個人理解
線程池的重點應該是核心線程池,而當線程數(shù)量不夠處理的時候,先放到隊列中也是屬于一種緩沖的思想,因為我們在設計核心線程數(shù)量的時候都是考慮的盡可能的最優(yōu)的數(shù)量,所以重點也就變成了盡力去維持核心線程的數(shù)量
而隊列是可以自定義數(shù)量的,我們可以通過控制隊列的長度,來控制我們可以接受的任務堆積的程度,只有當任務堆積無法忍受的時候,才會繼續(xù)去啟動新的線程來執(zhí)行這些任務
當我看了Yes大佬的看法之后,發(fā)現(xiàn)也是這樣理解的,但是解釋的更深一些,我來和大家解釋下
原生版線程池的實現(xiàn)可以認為是偏向 CPU 密集的,也就是當任務過多的時候不是先去創(chuàng)建更多的線程,而是先緩存任務,讓核心線程去消化,從上面的分析我們可以知道,當處理 CPU 密集型任務的時,線程太多反而會由于線程頻繁切換的開銷而得不償失,所以優(yōu)先堆積任務而不是創(chuàng)建新的線程。
而像 Tomcat 這種業(yè)務場景,大部分情況下是需要大量 I/O 處理的情況就做了一些定制,修改了原生線程池的實現(xiàn),使得在隊列沒滿的時候,可以創(chuàng)建線程至最大線程數(shù)。
9、如何修改原生線程池,使得可以先拉滿線程數(shù)再入任務隊列排隊?
這里的邏輯其實上面也說過了,大家看一下源碼就懂了,首先判斷的是工作線程是否小于核心線程,當工作線程小于核心線程時,直接增加線程數(shù)量來執(zhí)行任務
當達到核心線程數(shù)量的時候,則判斷線程池是否在運行中,在運行中即執(zhí)行入隊操作
接下來一起看看Tomcat實現(xiàn)線程池的邏輯
- public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
可以看到先繼承了 JUC 的線程池,然后我們重點關注一下 execute 這個方法
這里可以看到增加了submittedCount作為任務數(shù)的統(tǒng)計,統(tǒng)計所有未完成的任務數(shù)量
首先調用原生過程,如果捕獲到拒絕的異常,則判斷隊列類型,不正確,丟棄該任務,任務數(shù)量減一。
然后執(zhí)行再次入隊列,試圖增加一次挽救的機會,入隊失敗,任務數(shù)量減一,最后處理捕獲異常,任務數(shù)量減一
然后我們再來看下代碼里出現(xiàn)的 TaskQueue,這個就是上面提到的定制關鍵點了。
可以看到這個任務隊列繼承了 LinkedBlockingQueue,并且有個 ThreadPoolExecutor 類型的成員變量 parent ,我們再來看下 offer 方法的實現(xiàn),這里就是修改原來線程池任務提交與線程創(chuàng)建邏輯的核心了。
這里就是對于offer邏輯進行了加強,我們看一下
先是如果沒有線程實例,則直接按照原方法執(zhí)行
接著判斷如果線程數(shù)量是最大線程數(shù)量,直接入隊
未完成的任務數(shù)小于線程數(shù),證明此時還有閑著摸魚的線程,直接入隊即可,會自動消費
到最后,也就意味著此時核心線程都在運行,此時判斷線程數(shù)量是否小于最大線程數(shù)量,如果小于,這里就直接返回false即可,這個false就映射了上面ThreadPoolExecutor中的execute方法中的offer,然后便會執(zhí)行相應的增加線程的操作,而不是先選擇入隊
10、原生線程池的核心線程一定要伴隨著任務慢慢創(chuàng)建嗎
既然這么問了,答案肯定是否定的,線程池中提供了
線程池提供了兩個方法:
- prestartCoreThread:啟動一個核心線程
- prestartAllCoreThreads :啟動所有核心線程
不要小看這個預創(chuàng)建方法,預熱很重要,不然剛重啟的一些服務有時是頂不住瞬時請求的,就立馬崩了,所以有預熱線程、緩存等等操作。
- /**
- * Starts a core thread, causing it to idly wait for work. This
- * overrides the default policy of starting core threads only when
- * new tasks are executed. This method will return {@code false}
- * if all core threads have already been started.
- * @return {@code true} if a thread was started
- */
- public boolean prestartCoreThread() {
- return workerCountOf(ctl.get()) < corePoolSize &&
- addWorker(null, true);
- }
- /**
- * Starts all core threads, causing them to idly wait for work. This
- * overrides the default policy of starting core threads only when
- * new tasks are executed.
- * @return the number of threads started
- */
- public int prestartAllCoreThreads() {
- int n = 0;
- while (addWorker(null, true))
- ++n;
- return n;
- }
11、線程池的核心線程在空閑的時候一定不會被回收嗎?
有個allowCoreThreadTimeOut方法,把它設置為true ,則所有線程都會超時,不會有核心數(shù)那條線的存在。
12、線程池的關閉方法shutdown和shutdownNow
關閉線程池的方法,一個是安全的關閉線程池,會等待任務都執(zhí)行完畢,一個是粗暴的直接咔嚓了所有線程,管你在不在運行,兩個方法分別調用的就是 interruptIdleWorkers() 和 interruptWorkers() 來中斷線程
- /**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- * <p>This method does not wait for previously submitted tasks to
- * complete execution. Use {@link #awaitTermination awaitTermination}
- * to do that.
- * @throws SecurityException {@inheritDoc}
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(SHUTDOWN);
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
- **
- * Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks
- * that were awaiting execution. These tasks are drained (removed)
- * from the task queue upon return from this method
- * <p>This method does not wait for actively executing tasks to
- * terminate. Use {@link #awaitTermination awaitTermination} to
- * do that.
- * <p>There are no guarantees beyond best-effort attempts to stop
- * processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so any task that
- * fails to respond to interrupts may never terminate
- * @throws SecurityException {@inheritDoc}
- */
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(STOP);
- interruptWorkers();
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
這又可以引申出一個問題,shutdownNow 了之后還在任務隊列中的任務咋辦?眼尖的小伙伴應該已經看到了,線程池還算負責,把未執(zhí)行的任務拖拽到了一個列表中然后返回,至于怎么處理,就交給調用者了
13、你肯定知道線程池里的 ctl 是干嘛的咯?
其實看下注釋就很清楚了,ctl 是一個涵蓋了兩個概念的原子整數(shù)類,它將工作線程數(shù)和線程池狀態(tài)結合在一起維護,低 29 位存放 workerCount,高 3 位存放 runState
其實并發(fā)包中有很多實現(xiàn)都是一個字段存多個值的,比如讀寫鎖的高 16 位存放讀鎖,低 16 位存放寫鎖,這種一個字段存放多個值可以更容易的維護多個值之間的一致性,也算是極簡主義
14、線程池有幾種狀態(tài)嗎?
注解說的很明白,我再翻譯一下:
RUNNING:能接受新任務,并處理阻塞隊列中的任務
SHUTDOWN:不接受新任務,但是可以處理阻塞隊列中的任務
STOP:不接受新任務,并且不處理阻塞隊列中的任務,并且還打斷正在運行任務的線程,就是直接撂擔子不干了!
TIDYING:所有任務都終止,并且工作線程也為0,處于關閉之前的狀態(tài)
TERMINATED:已關閉
15、線程池的狀態(tài)是如何變遷的嗎?