線程池 ThreadPoolExecutor 為了提供擴(kuò)展,提供了兩個(gè)方法 beforeExecute 和 afterExecute,每個(gè)任務(wù)執(zhí)行前后都會調(diào)用這兩個(gè)方法,相當(dāng)于對線程任務(wù)的執(zhí)行做了一個(gè)切面。
?監(jiān)控線程池:執(zhí)行超時(shí)、等待超時(shí);執(zhí)行超時(shí)數(shù)量、等待超時(shí)數(shù)量;
擴(kuò)展線程池 ThreadPoolExecutor 的兩個(gè)方法 beforeExecute 和 afterExecute
自定義Runnable 記錄關(guān)鍵節(jié)點(diǎn)時(shí)間
關(guān)鍵時(shí)間節(jié)點(diǎn)參數(shù):
- 任務(wù)創(chuàng)建(提交)時(shí)間:submitTime
- 任務(wù)開始執(zhí)行時(shí)間:startExeTime
- 任務(wù)結(jié)束執(zhí)行時(shí)間:endExeTime
- 任務(wù)在隊(duì)列等待時(shí)間:任務(wù)開始執(zhí)行時(shí)間 - 任務(wù)創(chuàng)建(提交)時(shí)間
- 任務(wù)執(zhí)行總時(shí)間:任務(wù)結(jié)束執(zhí)行時(shí)間 - 任務(wù)開始執(zhí)行時(shí)間
源碼分析
線程池 ThreadPoolExecutor 為了提供擴(kuò)展,提供了兩個(gè)方法 beforeExecute 和 afterExecute,每個(gè)任務(wù)執(zhí)行前后都會調(diào)用這兩個(gè)方法,相當(dāng)于對線程任務(wù)的執(zhí)行做了一個(gè)切面。
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* @param t 執(zhí)行任務(wù)的線程
* @param
protected void beforeExecute(Thread t, Runnable r){ }
/**
* @param r 將要被執(zhí)行的任務(wù)
* @param
protected void afterExecute(Runnable r, Throwable t){ }
}
源碼執(zhí)行邏輯:

線程池?cái)U(kuò)展代碼:
public class ThreadPoolExpandTest {
// 定義線程池
public static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardOldestPolicy()
){
@Override
/**
* @param t 執(zhí)行任務(wù)的線程
* @param
protected void beforeExecute(Thread t, Runnable r){
System.out.println("beforeExecute將要被執(zhí)行");
}
/**
* @param r 將要被執(zhí)行的任務(wù)
* @param
@Override
protected void afterExecute(Runnable r, Throwable t){
System.out.println("afterExecute已經(jīng)執(zhí)行完畢");
}
};
public static void main(String[] args){
poolExecutor.execute(()->{
System.out.println("任務(wù)執(zhí)行");
});
}
}
運(yùn)行結(jié)果:
beforeExecute執(zhí)行
任務(wù)執(zhí)行
afterExecute執(zhí)行
總結(jié):從測試代碼可以看出,通過擴(kuò)展線程池參數(shù)可以進(jìn)行任務(wù)執(zhí)行的監(jiān)控。
自定義Runnable
通過自定義Runnable,記錄任務(wù)執(zhí)行的一些時(shí)間:
- 任務(wù)創(chuàng)建(提交)時(shí)間
- 任務(wù)開始執(zhí)行時(shí)間
public class DynamicRunnable implements Runnable{
/**
* runnable
*/
private final Runnable runnable;
/**
* 任務(wù)創(chuàng)建(提交)時(shí)間
*/
private final Long submitTime;
/**
* 任務(wù)開始執(zhí)行時(shí)間
*/
private Long startExeTime;
public DynamicRunnable(Runnable runnable){
this.runnable = runnable;
submitTime = System.currentTimeMillis();
}
@Override
public void run(){
runnable.run();
}
public Long getSubmitTime(){
return submitTime;
}
public void setStartExeTime(Long startExeTime){
this.startExeTime = startExeTime;
}
public Long getStartExeTime(){
return startExeTime;
}
}
繼承線程池+自定義Runnable
核心參數(shù):
/**
* 執(zhí)行超時(shí),單位(毫秒)
*/
private long runTimeout;
/**
* 等待超時(shí),單位(毫秒)
*/
private long queueTimeout;
/**
* 執(zhí)行超時(shí)數(shù)量
*/
private final AtomicInteger runTimeoutCount = new AtomicInteger();
/**
* 等待超時(shí)數(shù)量
*/
private final AtomicInteger queueTimeoutCount = new AtomicInteger();
重寫ThreadPoolExecutor方法:
@Override
public void execute(Runnable command){
if (runTimeout > 0 || queueTimeout > 0) {
// 記錄任務(wù)提交時(shí)間
command = new DynamicRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r){
if (!(r instanceof DynamicRunnable)) {
super.beforeExecute(t, r);
return;
}
DynamicRunnable runnable = (DynamicRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {
// 記錄任務(wù)開始執(zhí)行時(shí)間
runnable.setStartExeTime(currTime);
}
if (queueTimeout > 0) {
// 任務(wù)開始執(zhí)行時(shí)間 - 任務(wù)創(chuàng)建(提交)時(shí)間
long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {
log.error("{} execute queue timeout waitTime: {}ms", this.getThreadPoolName(),waitTime);
}
}
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t){
if (runTimeout > 0) {
DynamicRunnable runnable = (DynamicRunnable) r;
// 任務(wù)執(zhí)行總時(shí)間:任務(wù)結(jié)束執(zhí)行時(shí)間 - 任務(wù)開始執(zhí)行時(shí)間
long runTime = System.currentTimeMillis() - runnable.getStartExeTime();
if (runTime > runTimeout) {
runTimeoutCount.incrementAndGet();
log.error("{} execute, run timeout runTime: {}ms", this.getThreadPoolName(), runTime);
}
}
super.afterExecute(r, t);
}