Java 并發(fā)之線程池
本文轉(zhuǎn)載自微信公眾號「懷夢追碼」,作者水目沾 。轉(zhuǎn)載本文請聯(lián)系懷夢追碼公眾號。
線程池的作用
池化技術(shù)是一種很常見的計算機技術(shù),主要是為了復用和提高性能,如內(nèi)存池、連接池、對象池等。線程池也不例外,他的主要作用如下:
- 提高性能:線程的頻繁創(chuàng)建和銷毀會產(chǎn)生的很大的系統(tǒng)開銷,線程池中的線程復用可以大幅度的減少這種不必要的開銷。
- 復用和管理:方便對池子中的線程進行管理和復用,避免在生產(chǎn)環(huán)境中大量的創(chuàng)建線程。
- 解耦:只暴露提交任務的接口,將線程池的創(chuàng)建、銷毀等工作與業(yè)務解耦。
JDK 在并發(fā)包中為我們定義了一套 Executor 框架,幫助開發(fā)人員有效地進行線程控制,有基礎(chǔ)的線程池類、有線程池工廠,但是最最重要還是 ThreaPoolExecutor,也是面試中最常問的知識點。本文重點介紹 ThreaPoolExecutor 的原理。
線程池的參數(shù)說明
- ThreaPoolExecutor(
- int corePoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
- )
ThreaPoolExecutor 參數(shù)的含義如下
- corePoolSize: 線程池中核心線程的數(shù)量。
- maximumPoolSize: 線程池中的最大線程數(shù)量。
- keepAliveTime: 當線程池數(shù)量超過 corePoolSize 時,多余的空閑線程的存活時間。即超過 coolPoolSize 的空閑線程在 多長時間內(nèi),會被銷毀。
- unit: keepAliveTime 的單位,可以為 時、分、秒等多種值。
- workQueue: 任務隊列,存放被提交但尚未被執(zhí)行的任務。
- threadFactory: 線程工廠,用于創(chuàng)建線程,一般用默認即可。
- handler: 拒絕策略,當線程池處理不過來任務時,如何拒絕任務。
以上參數(shù)中 workQueue、threadFactory、handler 相對復雜,需要單獨介紹,下面主要介紹下 ThreadFactory 和 RejectedExecutionHandler
1. 線程工廠:ThreadFactory
線程池中的線程都由 TrheadFactory 定義的線程工廠來創(chuàng)建,它是一個接口只有 Thread newThread(Runnable r) 方法,用來創(chuàng)建線程。雖然創(chuàng)建 ThreadPoolExecutor 的時候可以不指定該參數(shù),但是阿里巴巴編碼規(guī)約建議最好指定該參數(shù),有以下幾個好處:
- 跟蹤線程池在何時、創(chuàng)建了多少線程。
- 可以自定義線程池的名稱、組以及優(yōu)先級等信息。
- 設(shè)置線程的其他狀態(tài)等,如守護進程。
2. 拒絕策略:RejectedExecutionHandler
當線程池線程數(shù)量達到 maxPoolSize 大小時,再提交新的任務會執(zhí)行拒絕策略,JDK 定義了四種拒絕策略:
- AbortPolicy 該策略直接拋出異常
- CallerRunsPolicy 調(diào)用者線程處理任務,該策略并不是真正的丟棄任務,會讓當前線程來執(zhí)行被拋棄的任務,由于只有一個線程,所有的任務會被串行執(zhí)行。
- DiscardOldestPolicy 丟棄最老的一個請求,即隊列頭部的即將被執(zhí)行的任務,并嘗試再次提交當前任務。
- DiscardPolicy 該策略默默丟棄無法處理的任務。
以上四種拒絕策略都繼承了接口 RejectedExecutionHandler 并實現(xiàn)該接口的 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。如果以上四種拒絕策略都滿足不了你的需求,可以自定義拒絕策略,繼承接口 RejectedExecutionHandler 并實現(xiàn)方法即可。
線程池的調(diào)度邏輯
ThreaPoolExecutor 對提交的任務處理邏輯如下圖,
1. 提交任務時:
- 如果線程池中的線程數(shù)小于 corePoolSize (無論是否有空閑線程),創(chuàng)建新的線程(謂之核心線程)來處理。
- 如果線程池中的線程數(shù)已經(jīng)大于或者 corePoolSize ,新提交的任務將被放置到等候隊列中,等待調(diào)度。
- 如果等待隊列已滿,并且線程池中的線程數(shù)量小于 maxPoolSize,將繼續(xù)創(chuàng)建新線程處理任務。
- 如果隊列已滿且線程數(shù)量也達到了上限,將使用拒絕策略來處理。
2. 任務進行中時:
當隊列中的任務已經(jīng)執(zhí)行完,部分線程開始空閑,非核心線程會在空閑后的 keepAliveTime 的時間內(nèi)自行銷毀。
而空閑核心線程是否退出取決于線程池的另一個參數(shù) allowCoreThreadTimeOut 。當配置為 true 的時候,即使是核心線程,超時也會退出。
線程池的生命周期
線程池同線程一樣也有自己的生命周期,包括 RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED 五種狀態(tài),他們的轉(zhuǎn)換關(guān)系如下圖,并且這些轉(zhuǎn)換時不可逆的。
1. RUNNING
該狀態(tài)是線程池的工作狀態(tài),能夠接受新任務以及對接受的任務進行處理。線程池的初始狀態(tài),即線程創(chuàng)建成功后就處理此狀態(tài)。
2. SHUTDOWN
關(guān)閉狀態(tài),線程池不再接受新的任務,但是能繼續(xù)處理提交到線程池中的任務。線程狀態(tài) RUNNING 的情況下調(diào)用 shutdown() 方法進入該狀態(tài)。
3. STOP
停止狀態(tài),線程池不接受新的任務,也不處理阻塞隊列中的任務,同時會中斷正在執(zhí)行任務的線程。在線程處于 RUNNING 或者 SHUTDOWN 狀態(tài)下調(diào)用 shutdownNow() 方法進入該狀態(tài)。
4. TIDYING
所有任務都銷毀了,workCount 為 0,會自動從 RUNNING 或者 STOP 狀態(tài)轉(zhuǎn)化為 TIDYING 狀態(tài)。在轉(zhuǎn)換過程中會調(diào)用 terminated() 方法,ThreadPoolExecutor 類的 ternimated() 方法為空,如果想在線程池變成 TIDYING 的時候有所處理,可以重載該方法。
線程池在 SHUTDOWN 狀態(tài)下,阻塞隊列為空并且執(zhí)行任務為空時轉(zhuǎn)換為 TIDYING 狀態(tài);線程池在 STOP 狀態(tài)下,執(zhí)行的任務為空時轉(zhuǎn)換為 TIDYING 狀態(tài)。
5. TERMINATED
結(jié)束狀態(tài),線程池的最終狀態(tài),該狀態(tài)的線程池不會再有任何操作。線程池執(zhí)行 terminated() 方法后處于該狀態(tài)。
JDK 四種線程池
了解 ThreaPoolExecutor 的基本原理后再來看看 JDK 在 Executors 中為開發(fā)人員定義的四個線程池工廠方法,其實它們內(nèi)部調(diào)用的是 ThreaPoolExecutor,只是使用了不同的參數(shù),下面來了解下它們的特性。
newFixedThreadPool() 方法:該方法返回一個固定線程池數(shù)量的線程,提交任務時如果線程池中有空閑線程,則立即執(zhí)行,沒有則新的任務會被緩存在一個任務隊列中,它創(chuàng)建線程池的代碼如下:
- public static ExecutorService newFixedThreadPool(int nthread) {
- return new ThreadPoolExecutor(nthread,
- nthread,
- 0L,
- TimeUnit.MILLSECCONDS,
- new LikedBlockingQueue<Runnable>())
- }
newSingleThreadExecutor() 方法:該方法返回只有一個線程的線程池,如果多余的任務提交到線程池,則被提交到任務隊列中。它創(chuàng)建線程池代碼如下:
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,
- 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
newCachedThreadPool() 方法:該方法返回一個可根據(jù)實際情況調(diào)整線程數(shù)的線程池,它的核心線程數(shù)為 0 ,線程總數(shù)為 Integer.MAX_VALUE ,隊列采用的是 SynchronousQueue,這樣即使線程滿,任務也不能提交到隊列中。
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
newScheduledThreadPool():該方法一個固定長度的線程池,并且以延遲或者定時的方式去執(zhí)行任務。它的隊列使用 DelayedWorkQueue,所以任務必須繼承 Delay 接口。
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }