招行一面:Java 線程池的拒絕策略有哪些?如何選擇?
Java線程池中的拒絕策略是線程池框架提供的一種機制,用于處理當線程池中的任務隊列已滿且沒有空閑線程可用來執(zhí)行新任務時的情況。這篇文章,我們來一起了解這些拒絕策略的原理、源碼實現(xiàn)及其適用場景。
Java的線程池類ThreadPoolExecutor位于java.util.concurrent 包中,它是一個靈活且廣泛使用的線程池實現(xiàn)。線程池通過重用線程來減少線程創(chuàng)建和銷毀的開銷,提高應用程序的性能,線程池的基本組成如下:
- 核心線程數(shù) (corePoolSize): 核心線程數(shù)是線程池在空閑時仍保留的線程數(shù)。
- 最大線程數(shù) (maximumPoolSize): 線程池中允許的最大線程數(shù)。
- 任務隊列 (workQueue): 用于保存等待執(zhí)行任務的阻塞隊列。
- 線程工廠 (ThreadFactory): 用于創(chuàng)建新線程的工廠。
- 拒絕策略 (RejectedExecutionHandler): 當任務無法提交到線程池時,如何處理任務的策略。
拒絕策略的類型
ThreadPoolExecutor 提供了四種內(nèi)置的拒絕策略:
- AbortPolicy: 默認策略。直接拋出 RejectedExecutionException,阻止系統(tǒng)正常工作。
- CallerRunsPolicy: 提交任務的線程自己執(zhí)行該任務。
- DiscardPolicy: 直接丟棄任務,不予任何處理。
- DiscardOldestPolicy: 丟棄隊列中最舊的任務,然后嘗試重新提交當前任務。
AbortPolicy
AbortPolicy策略是直接拋出 RejectedExecutionException,不執(zhí)行任務。適合在需要明確知道任務被拒絕時使用。
下面是AbortPolicy的源碼實現(xiàn):
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
使用場景:
- 在高可靠性系統(tǒng)中,AbortPolicy 可用于快速發(fā)現(xiàn)問題并進行處理。
- 當任務提交失敗后需要立即采取補救措施時。
CallerRunsPolicy
CallerRunsPolicy策略由提交任務的線程(通常是主線程)來執(zhí)行該任務,通過降低任務提交速率來緩解壓力。
下面是CallerRunsPolicy的源碼實現(xiàn):
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
使用場景:
- 適用于不希望丟棄任務且能接受任務執(zhí)行延遲的場景。
- 可用于削峰填谷,防止任務過快提交。
DiscardPolicy
DiscardPolicy策略是指直接丟棄無法執(zhí)行的任務,不拋異常,也就是不對被丟棄的任務進行任何處理。
下面是DiscardPolicy的源碼實現(xiàn):
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Do nothing
}
}
使用場景:
- 適用于不關心單個任務被丟棄的場景。
- 在負載極高且系統(tǒng)能容忍數(shù)據(jù)丟失的情況下。
DiscardOldestPolicy
DiscardOldestPolicy策略會丟棄隊列中最舊的任務,然后嘗試重新提交當前任務,這種策略通常用于保證新任務有機會被執(zhí)行。
下面是DiscardOldestPolicy的源碼實現(xiàn):
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // discard oldest
e.execute(r); // retry
}
}
}
使用場景:
- 適用于需要保證最新任務的優(yōu)先級高于舊任務的場景。
- 在新任務更重要的實時系統(tǒng)中。
自定義拒絕策略
除了內(nèi)置策略,開發(fā)者可以實現(xiàn) RejectedExecutionHandler 接口來定義自己的拒絕策略,通過這種方式,開發(fā)者可以根據(jù)具體需求來處理被拒絕的任務。下面是實現(xiàn)自定義策略的步驟:
- 實現(xiàn)RejectedExecutionHandler接口。
- 覆蓋rejectedExecution方法,定義拒絕策略。
- 在ThreadPoolExecutor的構造函數(shù)中傳入自定義策略。
代碼示例如下:
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定義拒絕邏輯,例如日志記錄或重新嘗試
log.warn("This is custom rejected: " + r.toString());
// 可以選擇重新提交任務或其他處理
}
}
最后,我們再通過代碼來展示如何創(chuàng)建一個線程池以及如何使用拒絕策略:
import java.util.concurrent.*;
public class ThreadPoolExample {
public static void main(String[] args) {
// 定義線程池的參數(shù)
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
// 創(chuàng)建線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy() // 默認策略
// new ThreadPoolExecutor.CallerRunsPolicy()
// new ThreadPoolExecutor.DiscardPolicy()
// new ThreadPoolExecutor.DiscardOldestPolicy()
// new CustomRejectedExecutionHandler()
);
// 關閉線程池
threadPool.shutdown();
}
}
使用場景分析
不同的拒絕策略適合不同場景,下面是選擇拒絕策略的一些參考因素:
- 實時性要求高: 如果系統(tǒng)不能接受任務被長時間阻塞或丟棄,可以選擇 CallerRunsPolicy 或自定義策略,以確保任務被及時處理。
- 任務重要性不同: 對于有些場景,新任務比舊任務更重要,可以選擇 DiscardOldestPolicy。
- 任務丟失可接受: 在任務丟失對系統(tǒng)影響較小的情況下,可以選擇 DiscardPolicy,以保證系統(tǒng)整體的吞吐量。
- 系統(tǒng)可靠性: 在系統(tǒng)需要對任務被拒絕進行明確處理時,AbortPolicy 可以幫助快速發(fā)現(xiàn)和響應。
總結
本文,我們通過源碼分析了Java 線程池提供的拒絕策略,整體來說拒絕策略是比較簡單的一個知識點,如果業(yè)務代碼中使用了線程池,拒絕策略是必須掌握的一個知識點,開發(fā)者可以根據(jù)具體的場景選擇合適的策略,甚至可以設計自定義策略來滿足特定需求,避免因過載導致的系統(tǒng)崩潰。