并發(fā)編程/6種線程池設(shè)計(jì)圖/1大線程池標(biāo)準(zhǔn)設(shè)計(jì)與執(zhí)行規(guī)范/2種線程池管理設(shè)計(jì)(全面篇)
在現(xiàn)代多核處理器時(shí)代,線程池成為了并發(fā)編程中不可或缺的工具,它不僅提高了程序性能,還簡(jiǎn)化了線程管理。線程池允許我們重用有限數(shù)量的線程來(lái)執(zhí)行大量任務(wù),從而減少了線程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo)。Java中的 ExecutorService接口及其實(shí)現(xiàn)類(lèi),如 FixedThreadPool、 SingleThreadExecutor、 CachedThreadPool和 ScheduledThreadPool,提供了強(qiáng)大的線程池管理功能。這些線程池通過(guò)智能地調(diào)度任務(wù)和復(fù)用線程,幫助我們優(yōu)化資源利用,提高響應(yīng)速度,并處理復(fù)雜的并發(fā)場(chǎng)景。對(duì)于Java開(kāi)發(fā)者而言,理解線程池的工作原理和正確選擇適當(dāng)?shù)木€程池類(lèi)型對(duì)于構(gòu)建高效、可伸縮的并發(fā)應(yīng)用至關(guān)重要。
1、線程池工作流程
圖片
- ExecutorService:這是線程池的管理接口,負(fù)責(zé)提交任務(wù)和管理工作線程。
- 任務(wù)隊(duì)列(Task Queue) :這是一個(gè)先進(jìn)先出(FIFO)的隊(duì)列,用于存儲(chǔ)待執(zhí)行的任務(wù)。
- 線程池(Thread Pool) :這是一組工作線程的集合,它們從任務(wù)隊(duì)列中取出任務(wù)并執(zhí)行。
- 工作線程(Worker Thread) :線程池中的每個(gè)線程都會(huì)循環(huán)地從任務(wù)隊(duì)列中取出任務(wù)并執(zhí)行。
- 任務(wù)(Task) :這是需要執(zhí)行的具體任務(wù),可以是 Runnable 或 Callable 對(duì)象。
- 返回結(jié)果(Return Result) :任務(wù)執(zhí)行完成后,會(huì)返回結(jié)果或異常信息。
2、 ExecutorService 設(shè)計(jì)本質(zhì)
- 線程生命周期管理:
在Java 5之前,開(kāi)發(fā)者需要手動(dòng)管理線程的創(chuàng)建和銷(xiāo)毀,這不僅增加了編程復(fù)雜性,還可能導(dǎo)致資源浪費(fèi)和系統(tǒng)開(kāi)銷(xiāo)。 ExecutorService 通過(guò)提供線程池管理功能,簡(jiǎn)化了線程的生命周期管理。
- 系統(tǒng)開(kāi)銷(xiāo)降低:
頻繁地創(chuàng)建和銷(xiāo)毀線程會(huì)導(dǎo)致性能問(wèn)題和資源消耗。 ExecutorService 允許線程池重用線程,從而降低了系統(tǒng)開(kāi)銷(xiāo)。
資源利用率提升:
通過(guò)線程池復(fù)用線程, ExecutorService 提高了資源利用率和程序響應(yīng)速度,使得多線程編程更加靈活和高效。
豐富的任務(wù)調(diào)度和并發(fā)控制:
ExecutorService 提供了豐富的任務(wù)調(diào)度和并發(fā)控制能力,使得多線程編程更加靈活和高效。
硬件發(fā)展推動(dòng):
隨著多核架構(gòu)的出現(xiàn),Java的設(shè)計(jì)者們決定重新修訂Java的內(nèi)存模型,并在JDK1.5中引入了 java.util.concurrent包,其中就包括了 ExecutorService接口,以支持更高效的并行計(jì)算。
簡(jiǎn)化并發(fā)編程:
ExecutorService 作為Java并發(fā)編程的重要工具,簡(jiǎn)化了并發(fā)編程的復(fù)雜性,使得開(kāi)發(fā)者可以更容易地實(shí)現(xiàn)并行處理和任務(wù)調(diào)度。
提高程序性能:
ExecutorService 通過(guò)減少線程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo),提高了程序的性能和可伸縮性。
線程池的易用性:
Executors 類(lèi)提供了便捷的工廠方法來(lái)創(chuàng)建不同類(lèi)型的線程池,使得開(kāi)發(fā)者無(wú)需手動(dòng)實(shí)現(xiàn)復(fù)雜的線程池邏輯,就可以方便地使用線程池。
3、線程池類(lèi)設(shè)計(jì)
圖片
在這個(gè)類(lèi)設(shè)計(jì)圖中,我們有以下組件:
- ExecutorService:這是一個(gè)接口,定義了線程池管理的方法,如 submit、 invokeAll、 invokeAny、 shutdown 等。
- ThreadPoolExecutor:這是 ExecutorService 的一個(gè)具體實(shí)現(xiàn),提供了線程池的詳細(xì)控制,如 execute、 submit、 shutdown 等。
- ScheduledExecutorService:這是 ExecutorService 的一個(gè)子接口,用于延遲執(zhí)行或定期執(zhí)行任務(wù)。
- FutureTask:這是 Future 接口的一個(gè)實(shí)現(xiàn)類(lèi),用于封裝異步任務(wù),并提供方法如 run、 get、 isDone 等。
4、線程池功能范圍設(shè)計(jì)
4.1. 接口定義
- ExecutorService 擴(kuò)展了 Executor 接口,增加了提交任務(wù)后返回 Future 對(duì)象的方法,這些方法允許任務(wù)異步執(zhí)行,并提供了獲取任務(wù)結(jié)果的機(jī)制。
4.2. 任務(wù)提交
- submit(Callable<T>task): 提交一個(gè)返回結(jié)果的任務(wù),并返回一個(gè) Future 對(duì)象。
- submit(Runnabletask): 提交一個(gè)不返回結(jié)果的任務(wù),并返回一個(gè) Future 對(duì)象。
- submit(Runnabletask,T result): 提交一個(gè)不返回結(jié)果的任務(wù),并返回一個(gè)已經(jīng)設(shè)置好結(jié)果的 Future 對(duì)象。
4.3. 批量任務(wù)執(zhí)行
- invokeAll(Collection<?extendsCallable<T>>tasks): 提交一個(gè)任務(wù)集合,等待所有任務(wù)完成,并返回每個(gè)任務(wù)結(jié)果的列表。
- invokeAny(Collection<?extendsCallable<T>>tasks): 提交一個(gè)任務(wù)集合,等待任意一個(gè)任務(wù)完成,并返回該任務(wù)的結(jié)果。
4.4. 線程池管理
- shutdown(): 啟動(dòng)一次有序的關(guān)閉,執(zhí)行已提交的任務(wù),不接受新任務(wù)。
- shutdownNow(): 嘗試停止所有正在執(zhí)行的任務(wù),并返回未執(zhí)行任務(wù)的列表。
- awaitTermination(longtimeout,TimeUnitunit): 等待直到所有任務(wù)完成或超時(shí)。
4.5. 線程生命周期
ExecutorService 允許線程的復(fù)用,減少了線程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo)。線程池可以根據(jù)需要?jiǎng)?chuàng)建新線程或重用空閑線程。
4.6. 線程池的可擴(kuò)展性
ExecutorService 可以與不同的線程池實(shí)現(xiàn)一起工作,如 FixedThreadPool、 CachedThreadPool、 ScheduledThreadPool 等,提供了高度的可擴(kuò)展性和靈活性。
4.7. 異常處理
ExecutorService 提交的任務(wù)如果拋出異常,可以通過(guò) Future 對(duì)象的 get 方法捕獲這些異常。
4.8. 結(jié)果處理
Future 對(duì)象提供了 get 方法來(lái)獲取任務(wù)結(jié)果,如果任務(wù)尚未完成, get 方法會(huì)阻塞直到任務(wù)完成。
4.9. 任務(wù)取消
Future 對(duì)象提供了 cancel 方法來(lái)取消任務(wù),可以傳入一個(gè)布爾值參數(shù)來(lái)決定是否中斷正在執(zhí)行的任務(wù)。
4.10. 線程工廠和拒絕策略
ExecutorService 可以使用自定義的線程工廠來(lái)創(chuàng)建線程,以及自定義的拒絕策略來(lái)處理任務(wù)提交過(guò)多時(shí)的情況。 ExecutorService 的設(shè)計(jì)提供了一個(gè)強(qiáng)大的框架,用于構(gòu)建并發(fā)應(yīng)用程序,它簡(jiǎn)化了并發(fā)編程的復(fù)雜性,同時(shí)提供了豐富的控制和靈活的配置選項(xiàng)。通過(guò) ExecutorService,開(kāi)發(fā)者可以更容易地實(shí)現(xiàn)線程安全的異步任務(wù)執(zhí)行。
5、線程池的種類(lèi)
- FixedThreadPool:
擁有固定數(shù)量線程的線程池,適用于負(fù)載較重的服務(wù)器。
圖片
- SingleThreadExecutor:
只有一個(gè)線程的線程池,用于順序執(zhí)行任務(wù)。
- CachedThreadPool:
根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,對(duì)于短生命周期的異步任務(wù)非常合適。
- ScheduledThreadPool:
用于延遲執(zhí)行或定期執(zhí)行任務(wù)的線程池。
- SingleThreadScheduledExecutor:
單個(gè)線程的變體,用于延遲或定時(shí)執(zhí)行任務(wù)。
- WorkStealingPool:
基于工作竊取算法的線程池,適用于并行計(jì)算。
圖片
這些線程池都是通過(guò) Executors工具類(lèi)提供的工廠方法來(lái)創(chuàng)建的。除了這些,開(kāi)發(fā)者還可以通過(guò)直接實(shí)例化 ThreadPoolExecutor類(lèi)來(lái)創(chuàng)建自定義配置的線程池。
ExecutorService接口本身并不定義線程池的具體實(shí)現(xiàn),而是提供了一組通用的接口,用于管理和執(zhí)行異步任務(wù)。不同的線程池實(shí)現(xiàn)提供了不同的功能和性能特性,以適應(yīng)不同的并發(fā)場(chǎng)景。
6、 ThreadPoolExecutor 線程池設(shè)計(jì)
簡(jiǎn)化版
圖片
- 核心參數(shù)初始化:包括核心線程數(shù)、最大線程數(shù)、任務(wù)隊(duì)列、空閑線程存活時(shí)間和線程工廠等參數(shù)的初始化。
- 任務(wù)提交到線程池:當(dāng)任務(wù)被提交到線程池時(shí),線程池會(huì)根據(jù)當(dāng)前的狀態(tài)和參數(shù)來(lái)決定如何處理這個(gè)任務(wù)。
- 線程獲取任務(wù)并執(zhí)行:如果有空閑的核心線程,它會(huì)直接執(zhí)行任務(wù);如果沒(méi)有空閑的核心線程但任務(wù)隊(duì)列未滿,任務(wù)會(huì)被添加到任務(wù)隊(duì)列中。
- 創(chuàng)建非核心線程執(zhí)行任務(wù):如果任務(wù)隊(duì)列已滿且當(dāng)前線程數(shù)小于最大線程數(shù),會(huì)創(chuàng)建一個(gè)新的非核心線程來(lái)執(zhí)行任務(wù)。
- 拒絕策略處理任務(wù):如果任務(wù)隊(duì)列已滿且線程數(shù)達(dá)到最大線程數(shù),任務(wù)將被拒絕,線程池會(huì)根據(jù)拒絕策略處理器來(lái)處理這個(gè)任務(wù)。
- 線程池狀態(tài)管理: ThreadPoolExecutor 維護(hù)一個(gè) ctl 變量,用于控制線程池的狀態(tài),包括 RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED 等狀態(tài)。
詳細(xì)版
圖片
- 創(chuàng)建 ThreadPoolExecutor:創(chuàng)建一個(gè) ThreadPoolExecutor 實(shí)例,開(kāi)始線程池的初始化過(guò)程。
- 核心參數(shù)初始化:初始化線程池的核心參數(shù),包括核心線程數(shù)、最大線程數(shù)、任務(wù)隊(duì)列、空閑線程存活時(shí)間和線程工廠。
- 任務(wù)提交到線程池:當(dāng)任務(wù)被提交到線程池時(shí),線程池會(huì)根據(jù)當(dāng)前的狀態(tài)和參數(shù)來(lái)決定如何處理這個(gè)任務(wù)。
- 線程獲取任務(wù)并執(zhí)行:如果有空閑的核心線程,它會(huì)直接執(zhí)行任務(wù);如果沒(méi)有空閑的核心線程但任務(wù)隊(duì)列未滿,任務(wù)會(huì)被添加到任務(wù)隊(duì)列中。
- 創(chuàng)建非核心線程執(zhí)行任務(wù):如果任務(wù)隊(duì)列已滿且當(dāng)前線程數(shù)小于最大線程數(shù),會(huì)創(chuàng)建一個(gè)新的非核心線程來(lái)執(zhí)行任務(wù)。
- 拒絕策略處理任務(wù):如果任務(wù)隊(duì)列已滿且線程數(shù)達(dá)到最大線程數(shù),任務(wù)將被拒絕,線程池會(huì)根據(jù)拒絕策略處理器來(lái)處理這個(gè)任務(wù)。
- 線程嘗試獲取新任務(wù):任務(wù)執(zhí)行完畢后,線程會(huì)嘗試從任務(wù)隊(duì)列中獲取新的任務(wù)。
- 線程銷(xiāo)毀或等待新任務(wù):如果任務(wù)隊(duì)列空,線程會(huì)進(jìn)入空閑狀態(tài),如果達(dá)到空閑線程存活時(shí)間,線程將被銷(xiāo)毀。
- 線程池狀態(tài)檢查:線程池會(huì)根據(jù)其狀態(tài)來(lái)決定是否停止接收新任務(wù),是否中斷運(yùn)行中的任務(wù),以及是否等待線程終止。
6.1 ThreadPoolExecutor應(yīng)用
6.1.1. 服務(wù)器端處理請(qǐng)求
在服務(wù)器應(yīng)用中, ThreadPoolExecutor 可以用來(lái)處理客戶端的請(qǐng)求。服務(wù)器可以創(chuàng)建一個(gè)固定大小的線程池來(lái)同時(shí)處理多個(gè)請(qǐng)求,提高響應(yīng)速度和吞吐量。
int corePoolSize = 10; // 核心線程數(shù)
int maximumPoolSize = 50; // 最大線程數(shù)
long keepAliveTime = 120; // 空閑線程存活時(shí)間
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100); // 任務(wù)隊(duì)列
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue
);
// 提交任務(wù)到線程池
executor.execute(new ClientRequestHandler());
6.1.2. 批量數(shù)據(jù)處理
在處理批量數(shù)據(jù)時(shí),如文件處理或數(shù)據(jù)庫(kù)批量操作, ThreadPoolExecutor 可以用來(lái)并行處理數(shù)據(jù),提高處理速度。
List<Data> dataList = ...; // 待處理的數(shù)據(jù)列表
int threadCount = Runtime.getRuntime().availableProcessors(); // 使用可用處理器數(shù)量作為線程數(shù)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount,
threadCount,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
dataList.forEach(data -> executor.execute(new DataProcessorTask(data)));
6.1.3. 異步任務(wù)執(zhí)行
在需要異步執(zhí)行任務(wù)的場(chǎng)景中, ThreadPoolExecutor 可以用來(lái)提交任務(wù)并在未來(lái)某個(gè)時(shí)刻獲取結(jié)果。
Future<String> future = executor.submit(() -> {
// 異步執(zhí)行的任務(wù)
return "任務(wù)結(jié)果";
});
// 獲取異步任務(wù)的結(jié)果
String result = future.get();
6.1.4. 定時(shí)和周期性任務(wù)
ThreadPoolExecutor 可以與 ScheduledExecutorService 結(jié)合使用,來(lái)執(zhí)行定時(shí)和周期性任務(wù)。
ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1);
scheduledExecutor.scheduleAtFixedRate(() -> {
// 定時(shí)執(zhí)行的任務(wù)
}, 0, 10, TimeUnit.SECONDS);
6.1.5. 資源受限環(huán)境下的任務(wù)處理
在資源受限的環(huán)境中,如移動(dòng)設(shè)備或嵌入式系統(tǒng), ThreadPoolExecutor 可以用來(lái)合理分配有限的計(jì)算資源。
int maxThreads = 4; // 根據(jù)設(shè)備性能設(shè)置最大線程數(shù)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
maxThreads,
maxThreads,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>()
);
7、 ScheduledExecutorService 線程池設(shè)計(jì)
圖片
- ScheduledExecutorService:這是線程池的管理接口,負(fù)責(zé)提交和管理任務(wù)。
- 任務(wù)隊(duì)列(Task Queue) :這是一個(gè)延遲隊(duì)列,用于存儲(chǔ)待執(zhí)行的任務(wù),按照預(yù)定的執(zhí)行時(shí)間排序。
- 核心線程池(Core Thread Pool) :線程池中的核心線程會(huì)不斷地從任務(wù)隊(duì)列中取出任務(wù)并執(zhí)行。
- 工作線程(Worker Thread) :線程池中的線程負(fù)責(zé)執(zhí)行任務(wù)。
- 執(zhí)行周期性任務(wù)(scheduleAtFixedRate) :用于安排任務(wù)以固定頻率執(zhí)行。
- 執(zhí)行延遲任務(wù)(scheduleWithFixedDelay) :用于安排任務(wù)在每次執(zhí)行完畢后按照固定延遲執(zhí)行。
- 執(zhí)行單次任務(wù)(schedule) :用于安排任務(wù)在指定延遲后執(zhí)行一次。
- 任務(wù)完成:任務(wù)執(zhí)行完畢后,如果是周期性任務(wù),會(huì)重新調(diào)度下一次執(zhí)行。
- 線程池關(guān)閉(Thread Pool Shutdown) :當(dāng)不再需要線程池時(shí),可以關(guān)閉線程池,等待所有任務(wù)完成或嘗試立即停止所有任務(wù)。
7.1 ScheduledExecutorService應(yīng)用案例
7.1.1. 一次性延遲執(zhí)行任務(wù)
在這個(gè)例子中,我們創(chuàng)建了一個(gè) ScheduledExecutorService 實(shí)例,并安排了一個(gè)任務(wù)在延遲一定時(shí)間后執(zhí)行。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DelayedTaskExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("任務(wù)在延遲后執(zhí)行:" + System.currentTimeMillis());
scheduler.schedule(task, 5, TimeUnit.SECONDS); // 5秒后執(zhí)行
scheduler.shutdown(); // 執(zhí)行完畢后關(guān)閉調(diào)度器
}
}
7.1.2. 固定速率周期執(zhí)行任務(wù)
在這個(gè)例子中,我們安排了一個(gè)任務(wù)以固定的速率周期性執(zhí)行。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FixedRateTaskExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("定期任務(wù)執(zhí)行時(shí)間: " + System.currentTimeMillis());
scheduler.scheduleAtFixedRate(task, 0, 10, TimeUnit.SECONDS); // 每10秒執(zhí)行一次
scheduler.shutdown(); // 執(zhí)行完畢后關(guān)閉調(diào)度器
}
}
7.1.3. 固定延遲周期執(zhí)行任務(wù)
在這個(gè)例子中,我們安排了一個(gè)任務(wù)在每次執(zhí)行完畢后,等待固定的延遲時(shí)間再執(zhí)行下一次。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FixedDelayTaskExample {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("帶有固定延遲的任務(wù)執(zhí)行時(shí)間: " + System.currentTimeMillis());
scheduler.scheduleWithFixedDelay(task, 0, 15, TimeUnit.SECONDS); // 每次執(zhí)行完畢后等待15秒再執(zhí)行
scheduler.shutdown(); // 執(zhí)行完畢后關(guān)閉調(diào)度器
}
}