一文帶你徹底弄懂線程池設(shè)計機制
一、背景介紹
雖然 Java 對線程的創(chuàng)建、中斷、等待、通知、銷毀、同步等功能提供了很多的支持,但是從操作系統(tǒng)角度來說,頻繁的創(chuàng)建線程和銷毀線程,其實是需要大量的時間和資源的。
例如,當有多個任務(wù)同時需要處理的時候,一個任務(wù)對應(yīng)一個線程來執(zhí)行,以此來提升任務(wù)的執(zhí)行效率,模型圖如下:
圖片
如果任務(wù)數(shù)非常少,這種模式倒問題不大,但是如果任務(wù)數(shù)非常的多,可能就會存在很大的問題:
1.線程數(shù)不可控:隨著任務(wù)數(shù)的增多,線程數(shù)也會增多,這些線程都沒辦法進行統(tǒng)一管理
2.系統(tǒng)的開銷很大:創(chuàng)建線程對系統(tǒng)來說開銷很高,隨著線程數(shù)也會增多,可能會出現(xiàn)系統(tǒng)資源緊張的問題,嚴重的情況系統(tǒng)可能直接死機
假如把很多任務(wù)讓一組線程來執(zhí)行,而不是一個任務(wù)對應(yīng)一個新線程,這種通過接受任務(wù)并進行分發(fā)處理的就是線程池。
圖片
線程池內(nèi)部維護了若干個線程,當沒有任務(wù)的時候,這些線程都處于等待狀態(tài);當有新的任務(wù)進來時,就分配一個空閑線程執(zhí)行;當所有線程都處于忙碌狀態(tài)時,新任務(wù)要么放入隊列中等待,要么增加一個新線程進行處理,要么直接拒絕。
很顯然,這種通過線程池來執(zhí)行多任務(wù)的思路,優(yōu)勢明顯:
1.資源更加可控:能有效的控制線程數(shù),防止線程數(shù)過多,導(dǎo)致系統(tǒng)資源緊張
2.資源消耗更低:因為線程可以復(fù)用,可以有效的降低創(chuàng)建和銷毀線程的時間和資源
3.執(zhí)行效率更高:當新的任務(wù)進來時,可以不需要等待線程的創(chuàng)建立即執(zhí)行
關(guān)于這一點,我們可以看一個簡單的對比示例。
/**
* 使用一個任務(wù)對應(yīng)一個線程來執(zhí)行
* @param args
*/
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 一個任務(wù)對應(yīng)一個線程,使用20000個線程執(zhí)行任務(wù)
for (int i = 0; i < 20000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
}
}).start();
}
// 等待任務(wù)執(zhí)行完畢
while (true){
if(list.size() >= 20000){
break;
}
}
System.out.println("一個任務(wù)對應(yīng)一個線程,執(zhí)行耗時:" + (System.currentTimeMillis() - startTime) + "ms");
}
/**
* 使用線程池進行執(zhí)行任務(wù)
* @param args
*/
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 使用線程池進行執(zhí)行任務(wù),默認4個線程
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000));
for (int i = 0; i < 20000; i++) {
// 提交任務(wù)
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
}
});
}
// 等待任務(wù)執(zhí)行完畢
while (true){
if(list.size() >= 20000){
break;
}
}
System.out.println("使用線程池,執(zhí)行耗時:" + (System.currentTimeMillis() - startTime) + "ms");
// 關(guān)閉線程池
executor.shutdown();
}
兩者執(zhí)行耗時情況對比,如下:
一個任務(wù)對應(yīng)一個線程,執(zhí)行耗時:3073ms
---------------------------
使用線程池,執(zhí)行耗時:578ms
從結(jié)果上可以看出,同樣的任務(wù)數(shù),采用線程池和不采用線程池,執(zhí)行耗時差距非常明顯,一個任務(wù)對應(yīng)一個新的線程來執(zhí)行,反而效率不如采用 4 個線程的線程池執(zhí)行的快。
為什么會產(chǎn)生這種現(xiàn)象,下面我們就一起來聊聊線程池。
二、線程池概述
站在專業(yè)的角度講,線程池其實是一種利用池化思想來實現(xiàn)線程管理的技術(shù),它將線程的創(chuàng)建和任務(wù)的執(zhí)行進行解耦,同時復(fù)用已經(jīng)創(chuàng)建的線程來降低頻繁創(chuàng)建和銷毀線程所帶來的資源消耗。通過合理的參數(shù)設(shè)置,可以實現(xiàn)更低的系統(tǒng)資源使用率、更高的任務(wù)并發(fā)執(zhí)行效率。
在 Java 中,線程池最頂級的接口是Executor,名下的實現(xiàn)類關(guān)系圖如下:
圖片
關(guān)鍵接口和實現(xiàn)類,相關(guān)的描述如下:
1.Executor是最頂級的接口,它的作用是將任務(wù)的執(zhí)行和線程的創(chuàng)建進行抽象解藕
2.ExecutorService接口繼承了Executor接口,在Executor的基礎(chǔ)上,增加了一些關(guān)于管理線程池的一些方法,比如查看任務(wù)的狀態(tài)、獲取線程池的狀態(tài)、終止線程池等標準方法
3.ThreadPoolExecutor是一個線程池的核心實現(xiàn)類,完整的封裝了線程池相關(guān)的操作方法,通過它可以創(chuàng)建線程池
4.ScheduledThreadPoolExecutor是一個使用線程池的定時調(diào)度實現(xiàn)類,完整的封裝了定時調(diào)度相關(guān)的操作方法,通過它可以創(chuàng)建周期性線程池
整個關(guān)系圖中,其中ThreadPoolExecutor是線程池最核心的實現(xiàn)類,開發(fā)者可以使用它來創(chuàng)建線程池。
2.1、ThreadPoolExecutor 構(gòu)造方法
ThreadPoolExecutor類的完整構(gòu)造方法一共有七個參數(shù),理解這些參數(shù)的配置對使用好線程池至關(guān)重要,完整的構(gòu)造方法核心源碼如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
各個參數(shù)的解讀如下:
- corePoolSize:核心線程數(shù)量,用于執(zhí)行任務(wù)的核心線程數(shù)。
- maximumPoolSize:最大線程數(shù)量,線程池中允許創(chuàng)建線程的最大數(shù)量
- keepAliveTime:空閑線程存活的時間。只有當線程池中的線程數(shù)大于 corePoolSize 時,這個參數(shù)才會起作用
- unit:空閑線程存活的時間單位
- workQueue:任務(wù)隊列,用于存儲還沒來得及執(zhí)行的任務(wù)
- threadFactory:線程工廠。用于執(zhí)行任務(wù)時創(chuàng)建新線程的工廠
- handler:拒絕策略,當線程池和和隊列容量處于飽滿,使用某種策略來拒絕任務(wù)提交
2.2、ThreadPoolExecutor 執(zhí)行流程
創(chuàng)建完線程池之后就可以提交任務(wù)了,當有新的任務(wù)進來時,線程池就會工作并分配線程去執(zhí)行任務(wù)。
ThreadPoolExecutor的典型用法如下:
// 創(chuàng)建固定大小的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
// 提交任務(wù)
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
...
針對任務(wù)的提交方式,ThreadPoolExecutor還提供了兩種方法。
- execute()方法:一種無返回值的方法,也是最核心的任務(wù)提交方法
- submit()方法:支持有返回值,通過FutureTask對象來獲取任務(wù)執(zhí)行完后的返回值,底層依然調(diào)用的是execute()方法
ThreadPoolExecutor執(zhí)行提交的任務(wù)流程雖然比較復(fù)雜,但是通過對源碼的分析,大致的任務(wù)執(zhí)行流程,可以用如下圖來概括。
圖片
整個執(zhí)行流程,大體步驟如下:
1.初始化完線程池之后,默認情況下,線程數(shù)為0,當有任務(wù)到來后才會創(chuàng)建新線程去執(zhí)行任務(wù)
2.每次收到提交的任務(wù)之后,會先檢查核心線程數(shù)是否已滿,如果沒有,就會繼續(xù)創(chuàng)建新線程來執(zhí)行任務(wù),直到核心線程數(shù)達到設(shè)定值
3.當核心線程數(shù)已滿,會檢查任務(wù)隊列是否已滿,如果沒有,就會將任務(wù)存儲到阻塞任務(wù)隊列中
4.當任務(wù)隊列已滿,會再次檢查線程池中的線程數(shù)是否達到最大值,如果沒有,就會創(chuàng)建新的線程來執(zhí)行任務(wù)
5.如果任務(wù)隊列已滿、線程數(shù)已達到最大值,此時線程池已經(jīng)無法再接受新的任務(wù),當收到任務(wù)之后,會執(zhí)行拒絕策略
我們再回頭來看上文提到的ThreadPoolExecutor構(gòu)造方法中的七個參數(shù),這些參數(shù)會直接影響線程的執(zhí)行情況,各個參數(shù)的變化情況,可以用如下幾點來概括:
1.當線程池中的線程數(shù)小于 corePoolSize 時,新任務(wù)都不排隊而是直接創(chuàng)新新線程來執(zhí)行
2.當線程池中的線程數(shù)大于等于 corePoolSize,workQueue 未滿時,將新任務(wù)添加到 workQueue 中而不是創(chuàng)建新線程來執(zhí)行
3.當線程池中的線程數(shù)大于等于 corePoolSize,workQueue 已滿,但是線程數(shù)小于 maximumPoolSize 時,此時會創(chuàng)建新的線程來處理被添加的任務(wù)
4.當線程池中的線程數(shù)大于等于 maximumPoolSize,并且 workQueue 已滿,新任務(wù)會被拒絕,使用 handler 執(zhí)行被拒絕的任務(wù)
ThreadPoolExecutor執(zhí)行任務(wù)的部分核心源碼如下!
2.2.1、execute 提交任務(wù)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作線程數(shù)量 < corePoolSize,直接創(chuàng)建線程執(zhí)行任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 工作線程數(shù)量 >= corePoolSize,將任務(wù)添加至阻塞隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 往阻塞隊列中添加任務(wù)的時候,如果線程池非運行狀態(tài),將任務(wù)remove,并執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 阻塞隊列已滿,嘗試添加新的線程去執(zhí)行,如果工作線程數(shù)量 >= maximumPoolSize,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
2.2.2、addWorker 創(chuàng)建線程加入線程池
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 線程池狀態(tài)處于非 RUNNING 狀態(tài),添加worker失敗
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 判斷線程池中線程數(shù)量大于等于該線程池允許的最大線程數(shù)量,如果大于則worker失敗,反之cas更新線程池中的線程數(shù)
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建工作線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程池處于 RUNNING 狀態(tài)并且線程已經(jīng)啟動,則拋出線程異常啟動
if (t.isAlive())
throw new IllegalThreadStateException();
// 將線程加入已創(chuàng)建的工作線程集合,更新用于追蹤線程池中線程數(shù)量 largestPoolSize 字段
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動線程執(zhí)行任務(wù)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.2.3、runWorker 執(zhí)行任務(wù)
final void runWorker(Worker w) {
// 獲取執(zhí)行任務(wù)線程
Thread wt = Thread.currentThread();
// 獲取執(zhí)行任務(wù)
Runnable task = w.firstTask;
// 將worker中的任務(wù)置空
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 從當前工作線程種獲取任務(wù),或者循環(huán)從阻塞任務(wù)隊列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// 雙重檢查線程池是否正在停止,如果線程池停止,并且當前線程能夠中斷,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置執(zhí)行任務(wù)鉤子函數(shù)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行當前任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置執(zhí)行任務(wù)鉤子函數(shù)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 回收線程
processWorkerExit(w, completedAbruptly);
}
}
2.2.4、reject 執(zhí)行拒絕策略
final void reject(Runnable command) {
// 執(zhí)行拒絕策略
handler.rejectedExecution(command, this);
}
當線程池中的線程數(shù)大于等于 maximumPoolSize,并且 workQueue 已滿,新任務(wù)會被拒絕,使用RejectedExecutionHandler接口的rejectedExecution()方法來處理被拒絕的任務(wù)。
線程池提供了四種拒絕策略實現(xiàn)類來拒絕任務(wù),具體如下:
類 | 描述 |
AbortPolicy | 直接拋出一個RejectedExecutionException,這也是JDK默認的拒絕策略 |
DiscardPolicy | 什么也不做,直接丟棄任務(wù) |
DiscardOldestPolicy | 將阻塞隊列中的任務(wù)移除出來,然后執(zhí)行當前任務(wù) |
CallerRunsPolicy | 嘗試直接運行被拒絕的任務(wù),如果線程池已經(jīng)被關(guān)閉了,任務(wù)就被丟棄了 |
2.3、ThreadPoolExecutor 線程池狀態(tài)
我們知道 Java 種的線程一共 6 種狀態(tài),其實線程池也有狀態(tài)。
因為線程池也是異步執(zhí)行的,有的任務(wù)正在執(zhí)行,有的任務(wù)存儲在任務(wù)隊列中,有的線程處于工作狀態(tài),有的線程處于空閑狀態(tài)等待回收,為了更加精細化的管理線程池,線程池也設(shè)計了 5 中狀態(tài),部分核心源碼如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 線程池線程數(shù)的bit數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
其中的狀態(tài)流程,可以用如下圖來描述!
圖片
這幾個狀態(tài)的轉(zhuǎn)化關(guān)系,可以用如下幾個步驟來概括:
- 1.線程池創(chuàng)建完之后,默認就進入了可執(zhí)行狀態(tài)RUNNING,此時線程數(shù)為 0,當有任務(wù)進來時,再創(chuàng)建新線程來執(zhí)行,可以看成是一個慢啟動的過程
- 2.當線程池處于運行狀態(tài)時,可以通過shutdown()或者shutdownNow()方法來改變運行狀態(tài)。shutdown()是一個平穩(wěn)的關(guān)閉操作,線程池停止接受新的任務(wù),同時等待已經(jīng)提交的任務(wù)執(zhí)行完畢,包括那些進入隊列還沒有開始的任務(wù),這時候線程池處于 SHUTDOWN 狀態(tài);shutdownNow()是一個立即關(guān)閉的操作,線程池立刻停止接受新的任務(wù),同時線程池取消所有執(zhí)行的任務(wù)和已經(jīng)進入隊列但是還沒有執(zhí)行的任務(wù),這時候線程池處于 STOP 狀態(tài)
- 3.當任務(wù)隊列和線程池均為空的時候,SHUTDOWN 或者 STOP 狀態(tài),就會進入 TIDYING 狀態(tài),等待被終止
- 4.當terminated()方法被調(diào)用完成之后,線程池會從 TIDYING 狀態(tài)進入 TERMINATED 狀態(tài),此時線程池就結(jié)束了
三、線程池應(yīng)用
正如文章的開頭所介紹的,使用線程池的方式,通??梢杂萌缦聨讉€步驟來概括:
// 1.創(chuàng)建固定大小為4的線程數(shù)、空閑線程的存活時間為15秒、阻塞任務(wù)隊列的上限為1000的線程池完整示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// 2.提交任務(wù)
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
...
// 3.使用完畢之后,可以手動關(guān)閉線程池
executor.shutdown();
正如上文所說,其中execute()和submit()方法都可以用來提交任務(wù),稍有不同的是:submit()方法同時還支持獲取任務(wù)執(zhí)行完畢的返回結(jié)果。
針對線程池的使用,Java 還提供了Executors工具類,開發(fā)者可以通過此工具,快速創(chuàng)建不同類型的線程池。
下面我們一起來看下Executors為用戶提供的幾種創(chuàng)建線程池的方法。
3.1、newSingleThreadExecutor
newSingleThreadExecutor()方法表示創(chuàng)建一個單線程的線程池,核心源碼如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
從構(gòu)造參數(shù)上可以很清晰的看到,線程池中的線程數(shù)為 1,不會被線程池自動回收,workQueue 選擇的是無界的LinkedBlockingQueue阻塞隊列,不管來多少任務(wù)存入阻塞隊列中,前面一個任務(wù)執(zhí)行完畢,再執(zhí)行隊列中的剩余任務(wù)。
簡單應(yīng)用示例如下:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 創(chuàng)建一個單線程線程池
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
System.out.println("thread name:" + Thread.currentThread().getName());
}
});
}
while (true){
if(list.size() >= 10){
break;
}
}
System.out.println("執(zhí)行耗時:" + (System.currentTimeMillis() - startTime) + "ms");
// 關(guān)閉線程池
executor.shutdown();
}
運行結(jié)果如下:
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
執(zhí)行耗時:13ms
3.2、newFixedThreadPool
newFixedThreadPool()方法表示創(chuàng)建一個固定大小線程數(shù)的線程池,核心源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定大小的線程池和單線程的線程池有異曲同工之處,無非是讓線程池中能運行的線程數(shù)量支持手動指定。
簡單應(yīng)用示例如下:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 創(chuàng)建固定大小線程數(shù)為3的線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
System.out.println("thread name:" + Thread.currentThread().getName());
}
});
}
while (true){
if(list.size() >= 10){
break;
}
}
System.out.println("執(zhí)行耗時:" + (System.currentTimeMillis() - startTime) + "ms");
// 關(guān)閉線程池
executor.shutdown();
}
運行結(jié)果如下:
thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-2
thread name:pool-1-thread-1
執(zhí)行耗時:10ms
3.3、newCachedThreadPool
newCachedThreadPool()方法表示創(chuàng)建一個可緩存的無界線程池,核心源碼如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從構(gòu)造參數(shù)上可以看出,線程池中的最大線程數(shù)為Integer.MAX_VALUE,也就是Integer的最大值,workQueue 選擇的是SynchronousQueue阻塞隊列,這個阻塞隊列不像LinkedBlockingQueue,它沒有容量,只負責做臨時任務(wù)緩存,如果有任務(wù)進來立刻會被執(zhí)行。
也就是說,只要添加進去了任務(wù),線程就會立刻去執(zhí)行,當任務(wù)超過線程池的線程數(shù)則創(chuàng)建新的線程去執(zhí)行,線程數(shù)量的最大上線為Integer.MAX_VALUE,當線程池中的線程空閑時間超過 60s,則會自動回收該線程。
簡單應(yīng)用示例如下:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 創(chuàng)建可緩存的無界線程池
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
System.out.println("thread name:" + Thread.currentThread().getName());
}
});
}
while (true){
if(list.size() >= 10){
break;
}
}
System.out.println("執(zhí)行耗時:" + (System.currentTimeMillis() - startTime) + "ms");
// 關(guān)閉線程池
executor.shutdown();
}
運行結(jié)果如下:
thread name:pool-1-thread-1
thread name:pool-1-thread-2
thread name:pool-1-thread-3
thread name:pool-1-thread-4
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-4
thread name:pool-1-thread-4
thread name:pool-1-thread-4
執(zhí)行耗時:13ms
3.4、newScheduledThreadPool
newScheduledThreadPool()方法表示創(chuàng)建周期性的線程池,可以指定線程池中的核心線程數(shù),支持定時及周期性任務(wù)的執(zhí)行,核心源碼如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
從構(gòu)造參數(shù)上可以看出,線程池支持指定核心線程數(shù),最大線程數(shù)為Integer.MAX_VALUE,workQueue 選擇的是DelayedWorkQueue延遲阻塞隊列,這個阻塞隊列支持任務(wù)延遲消費,新加入的任務(wù)不會立刻被執(zhí)行,只有時間到期之后才會被取出;當非核心線程處于空閑狀態(tài)時,會立刻進行收回。
ScheduledExecutorService支持三種類型的定時調(diào)度方法,分別如下:
- schedule:支持指定多久執(zhí)行一次任務(wù)
- scheduleAtFixedRate:支持周期性間隔多久的執(zhí)行任務(wù)
- scheduleWithFixedDelay:同樣也是指周期性的執(zhí)行任務(wù),不過它指的是上一個任務(wù)執(zhí)行完之后,延遲多久執(zhí)行下一個任務(wù)
下面我們一起來看看它們的應(yīng)用方式。
3.4.1、schedule 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 創(chuàng)建線程數(shù)量為2的定時調(diào)度線程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) + " 準備啟動");
// 定時執(zhí)行一次的任務(wù),延遲1s后執(zhí)行
executor.schedule(new Runnable() {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + ", schedule");
}
}, 1, TimeUnit.SECONDS);
輸出結(jié)果:
2023-11-17 01:41:12 準備啟動
2023-11-17 01:41:13 thread name:pool-1-thread-1, schedule
3.4.2、scheduleAtFixedRate 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 創(chuàng)建線程數(shù)量為2的定時調(diào)度線程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) + " 準備啟動");
// 周期性地執(zhí)行任務(wù),第一個任務(wù)延遲1s后執(zhí)行,之后每隔2s周期性執(zhí)行任務(wù),需要等待上一次的任務(wù)執(zhí)行完畢才執(zhí)行下一個
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " begin");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " end");
}
}, 1, 2, TimeUnit.SECONDS);
輸出結(jié)果:
2023-11-17 02:00:44 準備啟動
2023-11-17 02:00:45 thread name:pool-1-thread-1 begin
2023-11-17 02:00:48 thread name:pool-1-thread-1 end
2023-11-17 02:00:48 thread name:pool-1-thread-1 begin
2023-11-17 02:00:51 thread name:pool-1-thread-1 end
2023-11-17 02:00:51 thread name:pool-1-thread-1 begin
2023-11-17 02:00:54 thread name:pool-1-thread-1 end
3.4.3、scheduleWithFixedDelay 方法使用示例
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 創(chuàng)建線程數(shù)量為2的定時調(diào)度線程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) + " 準備啟動");
// 周期性地執(zhí)行任務(wù),第一個任務(wù)延遲1s后執(zhí)行,之后上一個任務(wù)執(zhí)行完畢之后,延遲2秒再執(zhí)行下一個任務(wù)
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " begin");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " end");
}
}, 1, 2, TimeUnit.SECONDS);
輸出結(jié)果:
2023-11-17 01:53:26 準備啟動
2023-11-17 01:53:27 thread name:pool-1-thread-1 begin
2023-11-17 01:53:30 thread name:pool-1-thread-1 end
2023-11-17 01:53:32 thread name:pool-1-thread-1 begin
2023-11-17 01:53:35 thread name:pool-1-thread-1 end
2023-11-17 01:53:37 thread name:pool-1-thread-1 begin
2023-11-17 01:53:40 thread name:pool-1-thread-1 end
3.5、工廠方法小結(jié)
從以上的介紹中,我們可以對這四種線程池的參數(shù)做一個匯總,內(nèi)容如下表:
工廠方法 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newFixedThreadPool | nThreads | nThreads | 0 | LinkedBlockingQueue |
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newScheduledThreadPool | corePoolSize | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
這四個線程池,主要的區(qū)別在于:corePoolSize、maximumPoolSize、keepAliveTime、workQueue 這四個參數(shù),其中線程工廠為默認類DefaultThreadFactory,線程飽和的拒絕策略為默認類AbortPolicy。
04、小結(jié)
結(jié)合以上的分析,最后我們再來總結(jié)一下。
對于線程池的使用,不太建議采用Executors工具去創(chuàng)建,盡量通過ThreadPoolExecutor的構(gòu)造方法來創(chuàng)建,原因在于:有利于規(guī)避資源耗盡的風險;同時建議開發(fā)者手動設(shè)定任務(wù)隊列的上限,防止服務(wù)出現(xiàn) OOM。
雖然Executors工具提供了四種創(chuàng)建線程池的方法,能幫助開發(fā)者省去繁瑣的參數(shù)配置,但是newSingleThreadExecutor和newFixedThreadPool方法創(chuàng)建的線程池,任務(wù)隊列上限為Integer.MAX_VALUE,這意味著可以無限提交任務(wù),這在高并發(fā)的環(huán)境下,系統(tǒng)可能會出現(xiàn) OOM,導(dǎo)致整個線程池不可用;其次newCachedThreadPool方法也存在同樣的問題,無限的創(chuàng)建線程可能會給系統(tǒng)帶來更多的資源消耗。
其次,創(chuàng)建線程池的時候應(yīng)該盡量給線程定義一個具體的業(yè)務(wù)名字前綴,方便定位問題,不同類型的業(yè)務(wù)盡量使用不同的線程池來實現(xiàn)。
例如可以使用guava包,創(chuàng)建自定義的線程工廠。
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
當然,你也可以自行實現(xiàn)一個線程工廠,需要繼承ThreadFactory接口,案例如下:
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 線程工廠,它設(shè)置線程名稱,有利于我們定位問題。
*/
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
/**
* 創(chuàng)建一個帶名字的線程池生產(chǎn)工廠
*/
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + "-" + threadNum.incrementAndGet());
return t;
}
}
創(chuàng)建一個線程名稱以order開頭的線程工廠。
NamingThreadFactory threadFactory = new NamingThreadFactory(Executors.defaultThreadFactory(), "order");
最后,再來說說關(guān)于線程池中線程數(shù),如何合理設(shè)定的問題?
- 對于需要消耗 CPU 資源的密集型任務(wù),可以將線程數(shù)設(shè)置為 N(CPU 核心數(shù))+1,比 CPU 核心數(shù)多出來的一個線程是為了防止線程偶發(fā)的缺頁中斷,或者其它原因?qū)е碌娜蝿?wù)暫停而帶來的影響
- 對于需要消耗 I/O 資源的密集型任務(wù),可以將線程數(shù)設(shè)置為 2N,原因在于:線程在處理 I/O 的時間段內(nèi)不會占用 CPU 資源,這時就可以將 CPU 交出給其它線程使用,因此可以多配置一些線程數(shù)
那如何判斷當前是 CPU 密集型任務(wù)還是 I/O 密集型任務(wù)呢?
最簡單的方法就是:如果當前任務(wù)涉及到網(wǎng)絡(luò)讀取,文件讀取等,這類都是 IO 密集型任務(wù),除此之外,可以看成是 CPU 密集型任務(wù)。
本文篇幅比較長,難免有描述不對的地方,歡迎大家留言指出!