ScheduledThreadPool線程池設(shè)計/場景案例/性能調(diào)優(yōu)/場景適配(架構(gòu)篇)
ScheduledThreadPool是一個強大的工具,它擴展了線程池的功能,允許任務(wù)的定時執(zhí)行以及周期性重復(fù)執(zhí)行。這種線程池特別適用于需要在未來某個時間點或者按照固定頻率執(zhí)行任務(wù)的場景,如調(diào)度作業(yè)、定時報告生成、周期性數(shù)據(jù)刷新等。 ScheduledThreadPool通過提供一個可伸縮的線程池,使得開發(fā)者能夠輕松安排任務(wù)的延遲執(zhí)行,同時保持線程資源的高效利用。對于需要精確控制任務(wù)執(zhí)行時間的應(yīng)用程序, ScheduledThreadPool提供了一種簡潔而強大的解決方案,使得任務(wù)調(diào)度變得簡單而可靠。掌握 ScheduledThreadPool的使用方法和最佳實踐,對于開發(fā)高效、可靠的并發(fā)應(yīng)用程序至關(guān)重要。
1、ScheduledThreadPool制造背景
ScheduledThreadPoolExecutor 是 Java 并發(fā)包中一個非常實用的工具,它允許按照預(yù)定的計劃執(zhí)行命令或任務(wù)。以下是它的設(shè)計因素:
- 定時任務(wù)執(zhí)行:
在許多應(yīng)用場景中,如電商平臺的促銷活動、系統(tǒng)維護任務(wù)或定期的數(shù)據(jù)備份等,需要在特定時間執(zhí)行任務(wù)。 ScheduledThreadPoolExecutor 提供了靈活的API來支持這些需求。
- 多線程執(zhí)行任務(wù):
與 Java 中的 Timer 類相比, ScheduledThreadPoolExecutor 使用多線程執(zhí)行任務(wù),避免了任務(wù)執(zhí)行時間過長導(dǎo)致的任務(wù)相互阻塞的問題。
- 資源優(yōu)化:
ScheduledThreadPoolExecutor 能夠高效地管理和復(fù)用線程資源,避免了大量線程的創(chuàng)建和銷毀開銷,從而提升了系統(tǒng)性能。
- 靈活的任務(wù)調(diào)度:
它支持延遲執(zhí)行和固定頻率執(zhí)行,滿足了各種復(fù)雜場景下的需求,如每隔一段時間自動檢查未支付的訂單并自動取消。
- 周期性和延遲任務(wù):
ScheduledThreadPoolExecutor 內(nèi)部構(gòu)造了兩個內(nèi)部類 ScheduledFutureTask 和 DelayedWorkQueue,分別用于執(zhí)行周期任務(wù)和存儲周期或延遲任務(wù)。
- 線程池功能:
繼承自 ThreadPoolExecutor, ScheduledThreadPoolExecutor 重用了線程池的功能,為任務(wù)提供延遲或周期執(zhí)行。
- 異常處理:
如果任務(wù)執(zhí)行過程中線程失活, ScheduledThreadPoolExecutor 會新建線程執(zhí)行任務(wù),確保任務(wù)的連續(xù)性。
- 運行參數(shù)控制:
支持可選的 run-after-shutdown 參數(shù),在池被關(guān)閉后支持可選的邏輯來決定是否繼續(xù)運行周期或延遲任務(wù)。
2、ScheduledThreadPool設(shè)計結(jié)構(gòu)
用于延遲執(zhí)行或定期執(zhí)行任務(wù)的線程池。
圖片
- ScheduledThreadPoolExecutor:這是調(diào)度線程池,負(fù)責(zé)管理線程和任務(wù)的執(zhí)行。
- 核心線程數(shù):線程池中固定的核心線程數(shù)量。
- 最大線程數(shù):線程池中允許的最大線程數(shù)量。
- 空閑線程存活時間:空閑線程在終止前等待新任務(wù)的最長時間。
- 任務(wù)隊列(DelayedWorkQueue) :用于存儲待執(zhí)行任務(wù)的延遲隊列。
- 線程工廠:用于創(chuàng)建新線程的工廠。
- 拒絕策略處理器:當(dāng)任務(wù)隊列滿且所有線程都忙碌時,用于處理新提交任務(wù)的策略。
- 任務(wù)提交:任務(wù)提交到線程池執(zhí)行。
- ScheduledFutureTask:表示可以延遲執(zhí)行的異步運算任務(wù)。
- 執(zhí)行任務(wù):線程從任務(wù)隊列中取出任務(wù)并執(zhí)行。
- 重新調(diào)度:對于周期性任務(wù),執(zhí)行完畢后重新調(diào)度下一次執(zhí)行。
- 線程空閑或銷毀:任務(wù)執(zhí)行完畢后,線程可能變?yōu)榭臻e狀態(tài),等待新任務(wù),或者在線程池關(guān)閉時被銷毀。
- 線程池終止:當(dāng)線程池關(guān)閉時,所有線程將停止執(zhí)行任務(wù),并等待已提交的任務(wù)完成。
3、ScheduledThreadPool運行流程
圖片
ScheduledThreadPool 的運行流程:
- 創(chuàng)建 ScheduledThreadPoolExecutor 實例:根據(jù)指定的核心線程數(shù)創(chuàng)建 ScheduledThreadPoolExecutor。
- 提交任務(wù):使用 schedule、 scheduleWithFixedDelay 或 scheduleAtFixedRate 方法提交任務(wù)。
- 任務(wù)封裝為 ScheduledFutureTask:提交的任務(wù)被封裝為 ScheduledFutureTask 對象。
- 任務(wù)存儲于 DelayedWorkQueue: ScheduledFutureTask 對象被存儲在 DelayedWorkQueue 隊列中,根據(jù)預(yù)定執(zhí)行時間排序。
- 到達(dá)預(yù)定時間:等待直到任務(wù)的預(yù)定執(zhí)行時間到達(dá)。
- 任務(wù)執(zhí)行:線程池中的線程執(zhí)行任務(wù)。
- 是否周期性任務(wù):檢查任務(wù)是否需要周期性執(zhí)行。
- 重新調(diào)度任務(wù):如果是周期性任務(wù),重新調(diào)度下一次執(zhí)行。
- 任務(wù)完成:非周期性任務(wù)執(zhí)行完畢后,任務(wù)完成。
- 關(guān)閉線程池:當(dāng)不再需要線程池時,調(diào)用 shutdown 方法關(guān)閉線程池。
- 等待任務(wù)完成:調(diào)用 awaitTermination 方法等待所有已提交的任務(wù)完成。
4、ScheduledThreadPool業(yè)務(wù)實戰(zhàn)
4.1. 定時任務(wù)執(zhí)行
ScheduledThreadPoolExecutor 最常見的應(yīng)用場景就是實現(xiàn)調(diào)度任務(wù)。例如,可以用于執(zhí)行定時的數(shù)據(jù)庫清理任務(wù),確保數(shù)據(jù)庫性能和數(shù)據(jù)準(zhǔn)確性。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
// 數(shù)據(jù)庫清理邏輯
}, 0, 24, TimeUnit.HOURS); // 每天執(zhí)行一次
4.2. 周期性任務(wù)執(zhí)行
ScheduledThreadPoolExecutor 可以用于執(zhí)行周期性任務(wù),如定時發(fā)送郵件通知或定時檢查系統(tǒng)狀態(tài)。
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleAtFixedRate(() -> {
// 發(fā)送郵件通知邏輯
}, 0, 8, TimeUnit.HOURS); // 每8小時執(zhí)行一次
4.3. 延遲任務(wù)執(zhí)行
在需要延遲執(zhí)行任務(wù)的場景下, ScheduledThreadPoolExecutor 提供了延遲執(zhí)行的能力,例如,延遲發(fā)送用戶注冊后的歡迎郵件。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(() -> {
// 發(fā)送歡迎郵件邏輯
}, 10, TimeUnit.MINUTES); // 10分鐘后執(zhí)行
4.4. 固定頻率任務(wù)執(zhí)行
對于需要以固定頻率執(zhí)行的任務(wù),如每5分鐘檢查一次訂單狀態(tài), ScheduledThreadPoolExecutor 可以滿足這一需求。
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
scheduledExecutor.scheduleAtFixedRate(() -> {
// 檢查訂單狀態(tài)邏輯
}, 0, 5, TimeUnit.MINUTES); // 每5分鐘執(zhí)行一次
4.5. 綜合案例:每周四定時執(zhí)行任務(wù)
通過 ScheduledThreadPoolExecutor 實現(xiàn)每周四 18:00:00 定時執(zhí)行任務(wù),例如,定期生成周報。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
LocalDateTime now = LocalDateTime.now();
LocalDateTime time = now.with(TemporalAdjusters.nextOrSame(DayOfWeek.THURSDAY))
.withHour(18).withMinute(0).withSecond(0);
long initialDelay = ChronoUnit.MILLIS.between(now, time);
long period = 7 * 24 * 60 * 60 * 1000; // 一周的毫秒數(shù)
pool.scheduleAtFixedRate(() -> {
// 執(zhí)行周報生成邏輯
}, initialDelay, period, TimeUnit.MILLISECONDS);
5、ScheduledThreadPool調(diào)優(yōu)策略
針對 ScheduledThreadPoolExecutor 的調(diào)優(yōu)策略,以下是一些關(guān)鍵點和最佳實踐:
- 合理配置核心線程數(shù):
核心線程數(shù)( corePoolSize)應(yīng)根據(jù)任務(wù)的性質(zhì)和系統(tǒng)的負(fù)載情況來設(shè)置。如果任務(wù)是計算密集型或IO密集型,可能需要不同的配置。通常,對于IO密集型任務(wù),核心線程數(shù)可以設(shè)置為CPU核心數(shù)的兩倍加一。
- 設(shè)置最大線程數(shù):
最大線程數(shù)( maximumPoolSize)應(yīng)該考慮到系統(tǒng)資源的限制,以避免創(chuàng)建過多的線程導(dǎo)致資源耗盡。
- 選擇合適的工作隊列:
ScheduledThreadPoolExecutor 使用 DelayedWorkQueue 作為其工作隊列,這是一個無界隊列,可以容納任意數(shù)量的任務(wù)。如果任務(wù)提交速度超過處理速度,應(yīng)考慮使用有界隊列以避免內(nèi)存溢出。
- 處理線程空閑超時:
keepAliveTime 參數(shù)定義了非核心線程空閑時在終止前的等待時間。合理設(shè)置這個值可以減少資源浪費。
- 優(yōu)雅關(guān)閉線程池:
使用 shutdown() 方法來優(yōu)雅地關(guān)閉線程池,確保所有已提交的任務(wù)都能執(zhí)行完畢。如果需要立即停止,可以使用 shutdownNow(),但這可能會導(dǎo)致正在執(zhí)行的任務(wù)被中斷。
- 監(jiān)控線程池狀態(tài):
監(jiān)控線程池的活動線程數(shù)、任務(wù)隊列長度等指標(biāo),可以幫助及時發(fā)現(xiàn)性能瓶頸和異常情況,并進行相應(yīng)的調(diào)優(yōu)。
- 自定義線程工廠:
通過自定義線程工廠( ThreadFactory),可以為線程設(shè)置有意義的名稱,這有助于在出現(xiàn)問題時快速定位問題線程。
- 合理配置拒絕策略:
當(dāng)任務(wù)隊列滿且達(dá)到最大線程數(shù)時, RejectedExecutionHandler 會介入??梢愿鶕?jù)業(yè)務(wù)需求選擇合適的拒絕策略,如 AbortPolicy、 CallerRunsPolicy 等。
- 周期性任務(wù)的精確度:
對于需要精確執(zhí)行周期性任務(wù)的場景,應(yīng)考慮任務(wù)執(zhí)行時間和系統(tǒng)負(fù)載對調(diào)度精度的影響。 scheduleAtFixedRate 和 scheduleWithFixedDelay 提供了不同的周期性執(zhí)行策略,應(yīng)根據(jù)具體需求選擇。
6、ScheduledThreadPool適應(yīng)場景
ScheduledThreadPoolExecutor 適用于以下場景:
- 定時任務(wù)調(diào)度:
需要在未來的某個時刻執(zhí)行一次性任務(wù),例如,定時清理日志文件、定時備份數(shù)據(jù)庫等。 ScheduledThreadPoolExecutor 提供了 schedule 方法來實現(xiàn)這種需求。
- 周期性任務(wù)執(zhí)行:
對于需要定期執(zhí)行的任務(wù),如每小時統(tǒng)計數(shù)據(jù)、每天發(fā)送報告等,可以使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法來安排周期性任務(wù)。
- 后臺服務(wù)任務(wù):
對于需要在后臺定期執(zhí)行的服務(wù)任務(wù),如心跳檢測、狀態(tài)監(jiān)控等, ScheduledThreadPoolExecutor 可以保證這些任務(wù)按照預(yù)定的時間間隔執(zhí)行。
- 資源管理需求:
當(dāng)需要限制后臺線程數(shù)量以管理資源時, ScheduledThreadPoolExecutor 允許自定義核心線程數(shù),從而控制資源消耗。
- 任務(wù)執(zhí)行監(jiān)控:
ScheduledThreadPoolExecutor 支持對任務(wù)執(zhí)行情況進行監(jiān)控,例如,可以監(jiān)控任務(wù)的延遲執(zhí)行情況、執(zhí)行頻率等,這對于性能調(diào)優(yōu)和故障排查非常有用。
- 復(fù)雜的調(diào)度需求:
對于復(fù)雜的調(diào)度需求,如根據(jù)特定條件觸發(fā)任務(wù)執(zhí)行, ScheduledThreadPoolExecutor 提供了靈活的 API 來滿足這些需求。
- 優(yōu)化系統(tǒng)性能:
通過合理配置 ScheduledThreadPoolExecutor,可以減少系統(tǒng)資源的浪費,提高系統(tǒng)的性能和響應(yīng)速度。
- 保持任務(wù)順序執(zhí)行:
在需要保證任務(wù)順序執(zhí)行的場景下, ScheduledThreadPoolExecutor 可以確保任務(wù)按照特定的順序執(zhí)行。
- 處理長時間運行的任務(wù):
對于可能長時間運行的任務(wù), ScheduledThreadPoolExecutor 可以避免任務(wù)執(zhí)行時間過長而影響其他任務(wù)的執(zhí)行。
- 提高系統(tǒng)的穩(wěn)定性和可靠性:
通過使用 ScheduledThreadPoolExecutor,可以提高系統(tǒng)的穩(wěn)定性和可靠性,尤其是在需要處理大量并發(fā)任務(wù)時。