Java線程進(jìn)階之ThreadPoolExecutor線程池執(zhí)行原理機(jī)制詳解
前言
線程池有很多優(yōu)點(diǎn):
降低資源消耗;
提高響應(yīng)速度;
提高線程的可管理性等等;
今天我們就來分析探討下原理實(shí)現(xiàn)
一、線程池接口簡(jiǎn)單分析
1、Executor接口
- public interface Executor {
- // 執(zhí)行一個(gè)任務(wù)。任務(wù)都被封裝成Runnable的實(shí)現(xiàn)
- void execute(Runnable command);
- }
2、 ExecutorService接口
- public interface ExecutorService extends Executor {
- // 啟動(dòng)有序的關(guān)閉,之前提交的任務(wù)將會(huì)被執(zhí)行,但不會(huì)接受新的任務(wù)。
- void shutdown();
- // 嘗試停止所有正在執(zhí)行的任務(wù),停止等待處理的任務(wù),病返回任務(wù)列表
- List<Runnable> shutdownNow();
- // 判斷線程池是否已經(jīng)關(guān)閉
- boolean isShutdown();
- // 如果關(guān)閉后所有任務(wù)都已完成。但是前提是必須先執(zhí)行:shutdown 或者 shutdownNow
- boolean isTerminated();
- // 在開啟shutdown之后,阻止所有的任務(wù)知道執(zhí)行完成
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- // 提交任務(wù),帶返回結(jié)果的
- <T> Future<T> submit(Callable<T> task);
- // 提交任務(wù),封裝返回結(jié)果為T
- <T> Future<T> submit(Runnable task, T result);
- // 提交一個(gè)普通任務(wù),返回結(jié)果任意
- Future<?> submit(Runnable task);
- // 執(zhí)行一批任務(wù),返回結(jié)果為 List<Future<T>>
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
其具有5個(gè)核心的內(nèi)部類。其中4內(nèi)部類對(duì)應(yīng)的是拒絕策略。Worker是核心的執(zhí)行代碼;
3、 RejectedExecutionHandler
- public interface RejectedExecutionHandler {
- // 拒絕執(zhí)行策略
- void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
- }
4、 AbortPolicy 策略
Java線程池默認(rèn)的阻塞策略,不執(zhí)行此任務(wù),而且直接拋出一個(gè)運(yùn)行時(shí)異常
- public static class AbortPolicy implements RejectedExecutionHandler {
- public AbortPolicy() { }
- // 直接拋出異常,描述前線程的基本信息
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException("Task " + r.toString() +
- " rejected from " +
- e.toString());
- }
- }
5、DiscardPolicy策略
空方法,不做任何處理
- public static class DiscardPolicy implements RejectedExecutionHandler {
- public DiscardPolicy() { }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- }
- }
6、DiscardOldestPolicy 策略
從隊(duì)列里面拋棄一個(gè)最老的任務(wù),并再次execute 此task
- public static class DiscardOldestPolicy implements RejectedExecutionHandler {
- public DiscardOldestPolicy() { }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- // 從隊(duì)列里面取出最老的一個(gè)任務(wù)
- e.getQueue().poll();
- // 手動(dòng)調(diào)用execute方法執(zhí)行,將任務(wù)添加到隊(duì)列中
- e.execute(r);
- }
- }
- }
7、CallerRunsPolicy 策略
- public static class CallerRunsPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code CallerRunsPolicy}.
- */
- public CallerRunsPolicy() { }
- // 如果當(dāng)前線程池沒有關(guān)閉,則調(diào)用線程的run方法
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- r.run();
- }
- }
- }
8、ThreadPoolExecutor
構(gòu)造函數(shù)詳解
- public class ThreadPoolExecutor extends AbstractExecutorService {
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
- }
構(gòu)造函數(shù)參數(shù)說明:
- corePoolSize:線程池中的核心線程數(shù),空閑時(shí)候線程也不會(huì)回收,除非把a(bǔ)llowCoreThreadTimeOut設(shè)置為 true,這時(shí)核心線程才會(huì)被回收;
- maximumPoolSize:線程池中可以創(chuàng)建的最大線程數(shù),限定為2^29-1;
- keepAliveTime:當(dāng)線程池中創(chuàng)建的線程超過了核心線程數(shù)的時(shí)候,在沒有新任務(wù)加入的等待時(shí)間;
- workQueue:存放任務(wù)的隊(duì)列,只有當(dāng)線程數(shù) > 核心線程數(shù),才會(huì)把其他的任務(wù)放入queue,一般常用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue;
- threadFactory:創(chuàng)建線程的工廠類;
- handler:當(dāng)queue滿了和線程數(shù)達(dá)到最大限制,對(duì)于繼續(xù)到達(dá)的任務(wù)采取的策略。默認(rèn)采取AbortPolicy , 也就是拒絕策略,直接拋出異常;
9、核心成員變量分析
- 線程池中設(shè)計(jì)非常巧妙的一個(gè)地方是把線程池的狀態(tài)和運(yùn)行的線程數(shù)量用一個(gè)int類型進(jìn)行存儲(chǔ);這樣一來可以保持線程池狀態(tài)和線程池活躍線程數(shù)量的一致性。因?yàn)锳tomicInteger是線程安全的;
- workerCount:線程池中當(dāng)前活動(dòng)的線程數(shù)量,占據(jù)ctl的低29位;
- runState:線程池運(yùn)行狀態(tài),占據(jù)ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態(tài);
- 為了將線程池的狀態(tài)和線程池中的工作線程的數(shù)量放到一個(gè)int里面進(jìn)行管理。他們利用了二進(jìn)制數(shù)據(jù)進(jìn)行位運(yùn)算。其中int類型有4個(gè)字節(jié),一個(gè)字節(jié)8位??偣灿?2位。其中高的3位表示線程的狀態(tài)。低29位代表線程的數(shù)量;
其中32位中,高三位代表的是狀態(tài):
- 111 > RUNNING
- 000 > SHUTDOWN
- 001 > STOP
- 010 > TIDYING
- 110 > TERMINATED
低29位代表線程的數(shù)量。所以最大的線程數(shù)為 2^29 -1 = 536870911;
// 記錄線程池狀態(tài)和線程數(shù)量(總共32位,前三位表示線程池狀態(tài),后29位表示線程數(shù)量),保證線程安全性
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // int 字節(jié)32位,COUNT_BITS代表的是29位
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 線程的最大容量:000 11111111111111111111111111111
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- // 運(yùn)行狀態(tài):111 00000000000000000000000000000
- private static final int RUNNING = -1 << COUNT_BITS;
- // 關(guān)閉狀態(tài):000 00000000000000000000000000000
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- // 停止?fàn)顟B(tài):001 00000000000000000000000000000
- private static final int STOP = 1 << COUNT_BITS;
- // 整理狀態(tài):010 00000000000000000000000000000
- private static final int TIDYING = 2 << COUNT_BITS;
- // 終止?fàn)顟B(tài):011 00000000000000000000000000000
- private static final int TERMINATED = 3 << COUNT_BITS;
- /**
- * 是按位取反的意思,CAPACITY表示的是高位的3個(gè)0,和低位的29個(gè)1,而~CAPACITY則表示高位的3個(gè)1,2低位的9個(gè)0,
- * 然后再與入?yún)執(zhí)行按位與操作,即高3位保持原樣,低29位全部設(shè)置為0,也就獲取了線程池的運(yùn)行狀態(tài)runState
- */
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- /**
- * 返回當(dāng)前線程的數(shù)量。其中c代表線程池的狀態(tài),即是高三位。:
- * 而CAPACITY 代表的是線程的容量,即000 11111111111111111111111111111
- * c & CAPACITY ,只有當(dāng)都為1的時(shí)候,才為真,這樣直接舍棄高位
- */
- private static int workerCountOf(int c) { return c & CAPACITY; }
- /**
- * 傳入的rs表示線程池運(yùn)行狀態(tài)runState,其是高3位有值,低29位全部為0的int,
- * 而wc則代表線程池中有效線程的數(shù)量workerCount,其為高3位全部為0,而低29位有值得int,
- * 將runState和workerCount做或操作|處理,即用runState的高3位,workerCount的低29位填充的數(shù)字,而默認(rèn)傳入的
- */
- private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池的狀態(tài)轉(zhuǎn)換:
- // 調(diào)用了shutdown()方法
- RUNNING -> SHUTDOWN
- // 調(diào)用了shutdownNow()
- (RUNNING 或 SHUTDOWN) -> STOP
- // 當(dāng)隊(duì)列和線程池為空
- SHUTDOWN -> TIDYING
- // 當(dāng)線程池為空
- STOP -> TIDYING
- // 當(dāng)terminated()鉤子方法執(zhí)行完成
- TIDYING -> TERMINATED
二、線程池執(zhí)行流程源碼分析
1、程序入口:execute 方法
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- //獲取當(dāng)前線程池的狀態(tài)+線程個(gè)數(shù)變量
- int c = ctl.get();
- //當(dāng)前線程池線程個(gè)數(shù)是否小于corePoolSize,小于則開啟新線程運(yùn)行
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- //如果線程池處于RUNNING狀態(tài),則添加任務(wù)到阻塞隊(duì)列
- if (isRunning(c) && workQueue.offer(command)) {
- //二次檢查
- int recheck = ctl.get();
- //如果當(dāng)前線程池狀態(tài)不是RUNNING則從隊(duì)列刪除任務(wù),并執(zhí)行拒絕策略
- if (! isRunning(recheck) && remove(command))
- reject(command);
- //否者如果當(dāng)前線程池線程空,則添加一個(gè)線程
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- //如果隊(duì)列滿了,則新增線程,新增失敗則執(zhí)行拒絕策略
- else if (!addWorker(command, false))
- reject(command);
- }
- 如果當(dāng)前線程池線程個(gè)數(shù)小于corePoolSize則開啟新線程;
- 否則添加任務(wù)到任務(wù)隊(duì)列;
- 如果任務(wù)隊(duì)列滿了,則嘗試新開啟線程執(zhí)行任務(wù),如果線程個(gè)數(shù)>maximumPoolSize則執(zhí)行拒絕策略;
重點(diǎn)看addWorkder方法:
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- //1、 檢查隊(duì)列是否只在必要時(shí)為空.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- //循環(huán)cas增加線程個(gè)數(shù)
- for (;;) {
- int wc = workerCountOf(c);
- //如果線程個(gè)數(shù)超限則返回false
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- //cas增加線程個(gè)數(shù),同時(shí)只有一個(gè)線程成功
- if (compareAndIncrementWorkerCount(c))
- break retry;
- //cas失敗了,則看線程池狀態(tài)是否變化了,變化則跳到外層循環(huán)重試重新獲取線程池狀態(tài),否者內(nèi)層循環(huán)重新cas。
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- }
- }
- //2、到這里說明cas成功了
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- //創(chuàng)建worker
- final ReentrantLock mainLock = this.mainLock;
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- //加獨(dú)占鎖,為了workers同步,因?yàn)榭赡芏鄠€(gè)線程調(diào)用了線程池的execute方法。
- mainLock.lock();
- try {
- //重新檢查線程池狀態(tài),為了避免在獲取鎖前調(diào)用了shutdown接口
- int c = ctl.get();
- int rs = runStateOf(c);
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- //添加任務(wù)
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- //添加成功則啟動(dòng)任務(wù)
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
- 第(1)雙重循環(huán)目的是通過cas增加線程池線程個(gè)數(shù);
- 第(2)主要是并發(fā)安全的把任務(wù)添加到workers里面,并且啟動(dòng)任務(wù)執(zhí)行;
- rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty())
也就是說下面幾種情況下會(huì)返回false:
- 當(dāng)前線程池狀態(tài)為STOP,TIDYING,TERMINATED;
- 當(dāng)前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個(gè)任務(wù);
- 當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空;
- 內(nèi)層循環(huán)作用是使用cas增加線程個(gè)數(shù),如果線程個(gè)數(shù)超限則返回false,否者進(jìn)行cas,cas成功則退出雙循環(huán),否者cas失敗了,要看當(dāng)前線程池的狀態(tài)是否變化了,如果變了,則重新進(jìn)入外層循環(huán)重新獲取線程池狀態(tài),否者進(jìn)入內(nèi)層循環(huán)繼續(xù)進(jìn)行cas嘗試;
- 到了第(2)說明CAS成功了,也就是說線程個(gè)數(shù)加一了,但是現(xiàn)在任務(wù)還沒開始執(zhí)行,這里使用全局的獨(dú)占鎖來控制workers里面添加任務(wù),其實(shí)也可以使用并發(fā)安全的set,但是性能沒有獨(dú)占鎖好;
- 這里需要注意的是要在獲取鎖后重新檢查線程池的狀態(tài),這是因?yàn)槠渌€程可可能在本方法獲取鎖前改變了線程池的狀態(tài),比如調(diào)用了shutdown方法。添加成功則啟動(dòng)任務(wù)執(zhí)行;
2、 工作線程Worker
先看下構(gòu)造函數(shù):
- Worker(Runnable firstTask) {
- setState(-1); // 在調(diào)用runWorker前禁止中斷
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);//創(chuàng)建一個(gè)線程
- }
- 這里添加一個(gè)新狀態(tài)-1是為了避免當(dāng)前線程worker線程被中斷;
- 這里設(shè)置了-1所以條件不滿足就不會(huì)中斷該線程了;
- 運(yùn)行runWorker時(shí)候會(huì)調(diào)用unlock方法,該方法吧status變?yōu)榱?,所以這時(shí)候調(diào)用shutdownNow會(huì)中斷worker線程;
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // status設(shè)置為0,允許中斷
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // 如果線程池當(dāng)前狀態(tài)至少是stop,則設(shè)置中斷標(biāo)志;
- // 如果線程池當(dāng)前狀態(tài)是RUNNININ,則重置中斷標(biāo)志,重置后需要重新
- //檢查下線程池狀態(tài),因?yàn)楫?dāng)重置中斷標(biāo)志時(shí)候,可能調(diào)用了線程池的shutdown方法
- //改變了線程池狀態(tài)。
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- //任務(wù)執(zhí)行前干一些事情
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();//執(zhí)行任務(wù)
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- //任務(wù)執(zhí)行完畢后干一些事情
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- //統(tǒng)計(jì)當(dāng)前worker完成了多少個(gè)任務(wù)
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- //執(zhí)行清了工作
- processWorkerExit(w, completedAbruptly);
- }
- }
- 如果當(dāng)前task為空,則直接執(zhí)行;
- 否者調(diào)用getTask從任務(wù)隊(duì)列獲取一個(gè)任務(wù)執(zhí)行,如果任務(wù)隊(duì)列為空,則worker退出;
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 如果當(dāng)前線程池狀態(tài)>=STOP 或者線程池狀態(tài)為shutdown并且工作隊(duì)列為空則,減少工作線程個(gè)數(shù)
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- boolean timed; // Are workers subject to culling?
- for (;;) {
- int wc = workerCountOf(c);
- timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if (wc <= maximumPoolSize && ! (timedOut && timed))
- break;
- if (compareAndDecrementWorkerCount(c))
- return null;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- try {
- //根據(jù)timed選擇調(diào)用poll還是阻塞的take
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- //統(tǒng)計(jì)整個(gè)線程池完成的任務(wù)個(gè)數(shù)
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- //嘗試設(shè)置線程池狀態(tài)為TERMINATED,如果當(dāng)前是shutdonw狀態(tài)并且工作隊(duì)列為空
- //或者當(dāng)前是stop狀態(tài)當(dāng)前線程池里面沒有活動(dòng)線程
- tryTerminate();
- //如果當(dāng)前線程個(gè)數(shù)小于核心個(gè)數(shù),則增加
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
3、 shutdown操作
- 調(diào)用shutdown后,線程池就不會(huì)在接受新的任務(wù)了;
- 但是工作隊(duì)列里面的任務(wù)還是要執(zhí)行的,但是該方法立刻返回的,并不等待隊(duì)列任務(wù)完成在返回;
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //權(quán)限檢查
- checkShutdownAccess();
- //設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是SHUTDOWN則直接返回
- advanceRunState(SHUTDOWN);
- //設(shè)置中斷標(biāo)志
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- //嘗試狀態(tài)變?yōu)門ERMINATED
- tryTerminate();
- }
如果當(dāng)前狀態(tài)>=targetState則直接返回,否者設(shè)置當(dāng)前狀態(tài)為targetState;
- private void advanceRunState(int targetState) {
- for (;;) {
- int c = ctl.get();
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
- 設(shè)置所有線程的中斷標(biāo)志,主要這里首先加了全局鎖;
- 同時(shí)只有一個(gè)線程可以調(diào)用shutdown時(shí)候設(shè)置中斷標(biāo)志,然后嘗試獲取worker自己的鎖,獲取成功則設(shè)置中斷標(biāo)示;
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
4、shutdownNow操作
調(diào)用shutdown后,線程池就不會(huì)在接受新的任務(wù)了,并且丟棄工作隊(duì)列里面里面的任務(wù),正在執(zhí)行的任務(wù)會(huì)被中斷,但是該方法立刻返回的,并不等待激活的任務(wù)執(zhí)行完成在返回。返回隊(duì)列里面的任務(wù)列表;
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();//權(quán)限檢查
- advanceRunState(STOP);// 設(shè)置線程池狀態(tài)為stop
- interruptWorkers();//中斷線程
- tasks = drainQueue();//移動(dòng)隊(duì)列任務(wù)到tasks
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
- 調(diào)用隊(duì)列的drainTo一次當(dāng)前隊(duì)列的元素到taskList;
- 可能失敗,如果調(diào)用drainTo后隊(duì)列海不為空,則循環(huán)刪除,并添加到taskList;
- private List<Runnable> drainQueue() {
- BlockingQueue<Runnable> q = workQueue;
- List<Runnable> taskList = new ArrayList<Runnable>();
- q.drainTo(taskList);
- if (!q.isEmpty()) {
- for (Runnable r : q.toArray(new Runnable[0])) {
- if (q.remove(r))
- taskList.add(r);
- }
- }
- return taskList;
- }
5、 awaitTermination操作
- 等待線程池狀態(tài)變?yōu)門ERMINATED則返回,或者時(shí)間超時(shí);
- 由于整個(gè)過程獨(dú)占鎖,所以一般調(diào)用shutdown或者shutdownNow后使用;
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- if (runStateAtLeast(ctl.get(), TERMINATED))
- return true;
- if (nanos <= 0)
- return false;
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- mainLock.unlock();
- }
- }
總結(jié)
當(dāng)往線程池中添加任務(wù)的時(shí)候,每次添加一個(gè)任務(wù)都回去新增一個(gè)線程。直到不滿足 wc < corePoolSize;
當(dāng)前線程池的大小已經(jīng)達(dá)到了corePoolSize的時(shí)候,每次添加任務(wù)會(huì)被存放到阻塞任務(wù)隊(duì)列中。等待執(zhí)行;
等等待任務(wù)隊(duì)列也滿的時(shí)候,且添加失敗。此時(shí)在來新的任務(wù),就會(huì)接著增加線程的個(gè)數(shù),直到滿足:wc >= maximumPoolSize ,添加線程失敗執(zhí)行拒絕策略;
線程池中,把線程的狀態(tài)和數(shù)量通過int類型進(jìn)行維護(hù),高三位表示狀態(tài),低29位表示線程數(shù)量。這樣可以保證線程的狀態(tài)和數(shù)量的一致性;
線程池巧妙的使用一個(gè)Integer類型原子變量來記錄線程池狀態(tài)和線程池線程個(gè)數(shù);