京東一面:Java 提供哪幾種線程池,什么場(chǎng)景用
前言
大家好,我是田螺。
我們來(lái)看一道京東一面面試題:Java 提供哪幾種線程池,什么場(chǎng)景使用?
- newFixedThreadPool
- newCachedThreadPool
- newSingleThreadExecutor
- newScheduledThreadPool
1. newFixedThreadPool
newFixedThreadPool的構(gòu)造函數(shù):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1.1 線程池特點(diǎn):
- 核心線程數(shù)和最大線程數(shù)大小一樣
- 沒(méi)有所謂的非空閑時(shí)間,即keepAliveTime為0
- 阻塞隊(duì)列為無(wú)界隊(duì)列LinkedBlockingQueue
1.2 newFixedThreadPool工作機(jī)制
圖片
- 提交任務(wù)
- 如果線程數(shù)少于核心線程,創(chuàng)建核心線程執(zhí)行任務(wù)
- 如果線程數(shù)已經(jīng)等于核心線程,把任務(wù)添加到LinkedBlockingQueue阻塞隊(duì)列
- 如果線程執(zhí)行完任務(wù),去阻塞隊(duì)列取任務(wù),繼續(xù)執(zhí)行。
- 如果持續(xù)無(wú)限添加任務(wù),可能會(huì)導(dǎo)致OOM,因?yàn)樗菬o(wú)界隊(duì)列。
1.3 無(wú)界隊(duì)列OOM的實(shí)例代碼
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}
為了驗(yàn)證OOM,IDE指定JVM參數(shù):-Xmx8m -Xms8m
運(yùn)行結(jié)果:
圖片
newFixedThreadPool使用了無(wú)界的阻塞隊(duì)列LinkedBlockingQueue,如果線程獲取一個(gè)任務(wù)后,任務(wù)的執(zhí)行時(shí)間比較長(zhǎng)(比如,上面demo設(shè)置了10秒),會(huì)導(dǎo)致隊(duì)列的任務(wù)越積越多,導(dǎo)致機(jī)器內(nèi)存使用不停飆升, 最終導(dǎo)致OOM:
圖片
大家有興趣可以看看源碼哈:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
...
}
1.4 使用場(chǎng)景
- 定時(shí)任務(wù)調(diào)度
對(duì)于需要定時(shí)執(zhí)行的任務(wù),如每天的報(bào)表生成、數(shù)據(jù)備份或清理任務(wù),F(xiàn)ixedThreadPool 可以保持固定數(shù)量的線程來(lái)按時(shí)執(zhí)行這些任務(wù),確保系統(tǒng)在高峰期也能穩(wěn)定運(yùn)行。
- 一些后臺(tái)服務(wù)中,比如郵件發(fā)送、短信通知等
在一些后臺(tái)服務(wù)中,比如郵件發(fā)送、短信通知等,使用 FixedThreadPool 可以確保有足夠的線程來(lái)處理發(fā)送請(qǐng)求,而不會(huì)因?yàn)橥话l(fā)的高并發(fā)請(qǐng)求導(dǎo)致系統(tǒng)崩潰。例如,在一個(gè)活動(dòng)結(jié)束后,用戶會(huì)收到活動(dòng)總結(jié)郵件,固定線程池可以有效管理郵件發(fā)送任務(wù),確保每封郵件都能及時(shí)發(fā)送。
- 適用于處理CPU密集型的任務(wù)
CPU密集型任務(wù)是指那些主要依賴于CPU計(jì)算能力的任務(wù)。這類任務(wù)通常需要大量的計(jì)算資源,且其執(zhí)行時(shí)間與CPU的處理能力密切相關(guān)。與之相對(duì)的是I/O密集型任務(wù),后者主要受限于輸入/輸出操作(如磁盤讀寫、網(wǎng)絡(luò)請(qǐng)求等)。
2. newCachedThreadPool
newCachedThreadPool 的構(gòu)造函數(shù)。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2.1 線程池特點(diǎn)
- 核心線程數(shù)為0
- 最大線程數(shù)為Integer.MAX_VALUE
- 阻塞隊(duì)列是SynchronousQueue
- 非核心線程空閑存活時(shí)間為60秒
當(dāng)提交任務(wù)的速度大于處理任務(wù)的速度時(shí),每次提交一個(gè)任務(wù),就必然會(huì)創(chuàng)建一個(gè)線程。極端情況下會(huì)創(chuàng)建過(guò)多的線程,耗盡 CPU 和內(nèi)存資源。由于空閑 60 秒的線程會(huì)被終止,長(zhǎng)時(shí)間保持空閑的 CachedThreadPool 不會(huì)占用任何資源。
2.2 newCachedThreadPool工作機(jī)制
圖片
- 提交任務(wù)
- 因?yàn)闆](méi)有核心線程,所以任務(wù)直接加到SynchronousQueue隊(duì)列。
- 判斷是否有空閑線程,如果有,就去取出任務(wù)執(zhí)行。
- 如果沒(méi)有空閑線程,就新建一個(gè)線程執(zhí)行。
- 執(zhí)行完任務(wù)的線程,還可以存活60秒,如果在這期間,接到任務(wù),可以繼續(xù)活下去;否則,被銷毀。
2.3 無(wú)界隊(duì)列OOM的實(shí)例代碼
newCachedThreadPool 使用不當(dāng),也是會(huì)導(dǎo)致OOM的,比如以下這個(gè)demo:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolOOMExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)無(wú)界線程池
ExecutorService executorService = Executors.newCachedThreadPool();
try {
// 不斷提交任務(wù),模擬內(nèi)存消耗
while (true) {
executorService.submit(() -> {
// 模擬一個(gè)長(zhǎng)時(shí)間運(yùn)行的任務(wù)
try {
// 創(chuàng)建一個(gè)大的對(duì)象來(lái)消耗內(nèi)存
int[] largeArray = new int[1000000]; // 1,000,000 integers
// 模擬一些計(jì)算
for (int i = 0; i < largeArray.length; i++) {
largeArray[i] = i;
}
// 讓線程稍微休眠一下
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
2.4 使用場(chǎng)景
使用用于用于并發(fā)執(zhí)行大量短期的小任務(wù)。比如一些網(wǎng)絡(luò)爬蟲(chóng)、Web服務(wù)器處理請(qǐng)求。
3. newSingleThreadExecutor
newSingleThreadExecutor的構(gòu)造函數(shù):
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3.1 線程池特點(diǎn)
- 核心線程數(shù)為1
- 最大線程數(shù)也為1
- 阻塞隊(duì)列是LinkedBlockingQueue
- keepAliveTime為0
3.2 newSingleThreadExecutor的工作機(jī)制
圖片
- 提交任務(wù)
- 線程池是否有一條線程在,如果沒(méi)有,新建線程執(zhí)行任務(wù)
- 如果有,講任務(wù)加到阻塞隊(duì)列
- 當(dāng)前的唯一線程,從隊(duì)列取任務(wù),執(zhí)行完一個(gè),再繼續(xù)取,一個(gè)人(一條線程)夜以繼日地干活。
3.3 newSingleThreadExecutor的實(shí)例代碼
newSingleThreadExecutor 使用的也是無(wú)界隊(duì)列。如果任務(wù)提交速率過(guò)高,可能會(huì)導(dǎo)致系統(tǒng)資源耗盡(如內(nèi)存溢出)。我們來(lái)看一個(gè)簡(jiǎn)單使用demo:
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在執(zhí)行");
});
}
運(yùn)行結(jié)果:
圖片
3.4 使用場(chǎng)景
適用于串行執(zhí)行任務(wù)的場(chǎng)景,一個(gè)任務(wù)一個(gè)任務(wù)地執(zhí)行。比如任務(wù)調(diào)度
在某些業(yè)務(wù)場(chǎng)景中,任務(wù)之間存在依賴關(guān)系,即一個(gè)任務(wù)的輸出是另一個(gè)任務(wù)的輸入。在這種情況下,使用單線程執(zhí)行器可以確保任務(wù)按預(yù)期的順序執(zhí)行。
4. newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
4.1 線程池特點(diǎn)
- 最大線程數(shù)為Integer.MAX_VALUE
- 阻塞隊(duì)列是DelayedWorkQueue
- keepAliveTime為0
- scheduleAtFixedRate() :按某種速率周期執(zhí)行
- scheduleWithFixedDelay():在某個(gè)延遲后執(zhí)行
4.2 工作機(jī)制
- 添加一個(gè)任務(wù)
- 線程池中的線程從 DelayQueue 中取任務(wù)
- 線程從 DelayQueue 中獲取 time 大于等于當(dāng)前時(shí)間的task
- 執(zhí)行完后修改這個(gè) task 的 time 為下次被執(zhí)行的時(shí)間
- 這個(gè) task 放回DelayQueue隊(duì)列中
4.3 實(shí)例代碼
/**
創(chuàng)建一個(gè)給定初始延遲的間隔性的任務(wù),之后的下次執(zhí)行時(shí)間是上一次任務(wù)從執(zhí)行到結(jié)束所需要的時(shí)間+* 給定的間隔時(shí)間
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleWithFixedDelay(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在執(zhí)行");
}, 1, 3, TimeUnit.SECONDS);
運(yùn)行結(jié)果:圖片
/**
創(chuàng)建一個(gè)給定初始延遲的間隔性的任務(wù),之后的每次任務(wù)執(zhí)行時(shí)間為 初始延遲 + N * delay(間隔)
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(()->{
System.out.println("current Time" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"正在執(zhí)行");
}, 1, 3, TimeUnit.SECONDS);;
周期性執(zhí)行任務(wù)的場(chǎng)景,需要限制線程數(shù)量的場(chǎng)景。比如定時(shí)清理任務(wù):
在某些應(yīng)用程序中,可能會(huì)產(chǎn)生臨時(shí)文件或日志記錄。為了保持系統(tǒng)的整潔和性能,需要定期清理這些臨時(shí)文件或日志??梢允褂?newScheduledThreadPool 來(lái)安排清理任務(wù),例如每小時(shí)或每天清理一次。