如何手寫一個線程池?
手寫一個異步工具類
我是小識,新來了一個公司。這個公司呢,有個特點,就是很鼓勵大家封裝各種實用的工具類,提高開發(fā)效率。
于是我就到處看項目的源碼,看看有沒有什么能改進的?果然讓我發(fā)現(xiàn)了。項目中到處充斥著 new Thread 類來異步執(zhí)行代碼的邏輯。
new Thread(r).start();
我們可以封裝一個異步工具類啊!
第一版
說干就干,把上面的代碼簡單封裝一下,一個簡單的異步工具類就封裝好了。
public interface Executor {
void execute(Runnable r);
}
public class AsyncExecutorV1 implements Executor {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
}
于是開開心心的提交了 merge request。
第二版
正當我滿懷期待工具類代碼能被合并的時候,沒想代碼被組長杰哥打回來了。
「杰哥」:有心封裝工具類值得鼓勵,不過還可以改進一下。
「小識」:還能再改進?沒感覺我這個工具類還有改進的余地??!
「杰哥」:假如說有10000個異步任務,你這創(chuàng)建10000個線程,資源耗費太嚴重了!
「小識」:這樣啊,那我加個隊列,任務都放到隊列中,用一個線程從隊列中取任務執(zhí)行。
public class AsyncExecutorV2 implements Executor {
private BlockingQueue<Runnable> workQueue;
public AsyncExecutorV2(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
WorkThread workThread = new WorkThread();
workThread.start();
}
@SneakyThrows
@Override
public void execute(Runnable r) {
workQueue.add(r);
}
class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
第三版
「小識」:杰哥,快幫我看看,還有啥改進的沒?
「杰哥」:小伙子不錯啊,居然能想到用隊列來緩沖任務,不愧是我招進來的人!但是用一個異步線程執(zhí)行任務,你確定這個工具類比同步執(zhí)行的效率快?
「小識」:哈哈,又一個工具類翻車的案例,應該多開幾個異步線程來執(zhí)行任務,但是應該開多少呢?
「杰哥」:誰最清楚異步工具類應該用多少個線程來執(zhí)行呢?
「小識」:使用工具類的人。
「杰哥」:這不對了,你可以定義一個線程數(shù)量參數(shù),讓用戶來決定開多少線程?!噶硗饽氵@個工具類還個問題,隊列滿了會直接拋出異常!」
「小識」:那我增加一個拒絕策略類(RejectedExecutionHandler),當線程池滿了讓用戶決定執(zhí)行策略,比如直接拋異常,用當前線程同步執(zhí)行任務。
public class AsyncExecutorV3 implements Executor {
private BlockingQueue<Runnable> workQueue;
private List<WorkThread> workThreadList = new ArrayList<>();
private RejectedExecutionHandler handler;
public AsyncExecutorV3(int corePoolSize,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
WorkThread workThread = new WorkThread();
workThread.start();
workThreadList.add(workThread);
}
}
@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
// 隊列滿了,執(zhí)行拒絕策略
handler.rejectedExecution(r);
}
}
class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
// 拒絕策略類
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r);
}
// 當線程池滿了之后直接拋出異常
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 當線程池滿了之后直接拋出異常
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 當線程池滿了之后,用提交任務的線程同步執(zhí)行任務
public class CallerRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
r.run();
}
}
再次提交 merge request,終于被合并了,別的團隊都開始使用我的工具類了!
過了幾天小亮急匆匆找到我。
「小亮」:小識,你的工具類挺好用的。但是我最近遇到了一個問題,我用了CountDownLatch批量執(zhí)行任務,但是我這個任務好像卡住了,我用jstack想看看線程的執(zhí)行情況,快告訴我你異步線程的名字設置的是啥?
「小識」:哎呀,我們沒設置線程的名字,應該用的是默認的線程名字 Thread-n。
「小亮」:你可得給工具類加個線程名字的參數(shù)啊,不然一個一個看線程的狀態(tài)太累了,而且效率也不高。
「小識」:我這就加。
第四版
趕緊加了一個線程名字的參數(shù),然后再次提交代碼。
「杰哥」:哎呀,沒想到我也疏忽了,沒發(fā)現(xiàn)這個問題,確實應該加個線程名字的參數(shù),代碼的可擴展性太重要了,改來改去可不行。
「小識」:是??!
「杰哥」:你覺得你只加一個線程名字參數(shù),可擴展性高嗎?如果有的團隊想修改異步線程的優(yōu)先級,你再加個優(yōu)先級參數(shù)?
「小識」:感覺不太行,那讓用戶把線程傳給我吧!
「杰哥」:哈哈,可以,你還可以用工廠模式優(yōu)化一下,用戶傳入線程工廠類,工具類用工廠類創(chuàng)建線程。
「小識」:不愧是杰哥,這樣一來代碼更清爽了!
public class AsyncExecutorV4 implements Executor {
private BlockingQueue<Runnable> workQueue;
private List<WorkThread> workThreadList = new ArrayList<>();
private RejectedExecutionHandler handler;
public AsyncExecutorV4(int corePoolSize,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler,
ThreadFactory threadFactory) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
// 用工廠類創(chuàng)建線程
WorkThread workThread = threadFactory.newThread();
workThread.start();
workThreadList.add(workThread);
}
}
@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
handler.rejectedExecution(r);
}
}
// 異步線程
public class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
// 異步線程工廠類
public interface ThreadFactory {
WorkThread newThread();
}
}
代碼提交之后,小亮給線程起了一個名字,async-thread,現(xiàn)在他通過名字很快就能知道線程池中的線程在干嘛了!
大家不斷的進行改進
隨著這個異步工具類在公司內(nèi)部使用的越來越多,大家也提交了很多改進的代碼。
- 按需創(chuàng)建線程,不要一開始就創(chuàng)建「corePoolSize」個線程,而是在調(diào)用者提交任務的過程中逐漸創(chuàng)建出來,最后創(chuàng)建了「corePoolSize」個就不再創(chuàng)建了。
- 提高工具的彈性,當任務突增時,隊列會被放滿,然后多余的任務有可能會被直接扔掉。當然我們可以把「corePoolSize」設的很大,但是這樣并不優(yōu)雅,因為大部分情況下是用不到這么多線程的。當任務突增時,我們可以適當增加線程,提高執(zhí)行速度,當然創(chuàng)建的總線程數(shù)還是要限制一下的,我們把能創(chuàng)建的總數(shù)定為「maximumPoolSize」。
- 及時關(guān)閉不需要的線程,當任務突增時,線程數(shù)可能增加「maximumPoolSize」,但是大多數(shù)時間「corePoolSize」個線程就足夠用了,因此可以定義一個超時時間,當一個線程在「keepAliveTime」時間內(nèi)沒有執(zhí)行任務,就把它給關(guān)掉。
異步工具類執(zhí)行流程圖
經(jīng)過大家的不斷改進之后,構(gòu)造函數(shù)中的參數(shù)也越來越多了,杰哥讓我寫個文檔吧,把這個異步工具類的構(gòu)造函數(shù)和執(zhí)行流程總結(jié)一下,不然新來的小伙伴看到這個工具類一臉懵可不行!
這個工具類的構(gòu)造函數(shù)目前有如下7個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
參數(shù) | 含義 |
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
keepAliveTime | 非核心線程的空閑時間 |
TimeUnit | 空閑時間的單位 |
BlockingQueue<Runnable> | 任務隊列 |
ThreadFactory | 線程工廠 |
RejectedExecutionHandler | 拒絕策略 |
「執(zhí)行流程圖如下」:
對了,最后大家給這個異步工具類起了一個牛的名字,「線程池」。