并發(fā)編程之定時(shí)任務(wù)&定時(shí)線程池原理解析
前言
線程池的具體實(shí)現(xiàn)有兩種,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時(shí)線程池,上一篇已經(jīng)分析過(guò)ThreadPoolExecutor原理與使用了,本篇我們來(lái)重點(diǎn)分析下ScheduledThreadPoolExecutor的原理與使用。
《并發(fā)編程之Executor線程池原理與源碼解讀》
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 與 ThreadPoolExecutor 線程池的概念有些區(qū)別,它是一個(gè)支持任務(wù)周期性調(diào)度的線程池。
ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor,同時(shí)通過(guò)實(shí)現(xiàn) ScheduledExecutorSerivce 來(lái)擴(kuò)展基礎(chǔ)線程池的功能,使其擁有了調(diào)度能力。其整個(gè)調(diào)度的核心在于內(nèi)部類 DelayedWorkQueue ,一個(gè)有序的延時(shí)隊(duì)列。
定時(shí)線程池類的類結(jié)構(gòu)圖如下:
ScheduledThreadPoolExecutor 的出現(xiàn),很好的彌補(bǔ)了傳統(tǒng) Timer 的不足,具體對(duì)比看下表:
TimerScheduledThreadPoolExecutor線程單線程多線程多任務(wù)任務(wù)之間相互影響任務(wù)之間不影響調(diào)度時(shí)間絕對(duì)時(shí)間相對(duì)時(shí)間異常單任務(wù)異常,后續(xù)任務(wù)受影響無(wú)影響
工作原理
它用來(lái)處理延時(shí)任務(wù)或定時(shí)任務(wù)
它接收SchduledFutureTask類型的任務(wù),是線程池調(diào)度任務(wù)的最小單位,有三種提交任務(wù)的方式:
- schedule,特定時(shí)間延時(shí)后執(zhí)行一次任務(wù)
- scheduledAtFixedRate,固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間無(wú)關(guān),周期是固定的)
- scheduledWithFixedDelay,固定延時(shí)執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間有關(guān),延時(shí)從上一次任務(wù)完成后開(kāi)始)
它采用 DelayedWorkQueue 存儲(chǔ)等待的任務(wù)
- DelayedWorkQueue 內(nèi)部封裝了一個(gè) PriorityQueue ,它會(huì)根據(jù) time 的先后時(shí)間排序,若 time 相同則根據(jù) sequenceNumber 排序;
- DelayedWorkQueue 也是一個(gè)無(wú)界隊(duì)列;
因?yàn)榍懊嬷v阻塞隊(duì)列實(shí)現(xiàn)的時(shí)候,已經(jīng)對(duì)DelayedWorkQueue進(jìn)行了說(shuō)明,更多內(nèi)容請(qǐng)查看《阻塞隊(duì)列 — DelayedWorkQueue源碼分析》
工作線程的執(zhí)行過(guò)程:
- 工作線程會(huì)從DelayedWorkerQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
- 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayedWorkerQueue。
take方法是什么時(shí)候調(diào)用的呢? 在ThreadPoolExecutor中,getTask方法,工作線程會(huì)循環(huán)地從workQueue中取任務(wù)。但定時(shí)任務(wù)卻不同,因?yàn)槿绻坏ゞetTask方法取出了任務(wù)就開(kāi)始執(zhí)行了,而這時(shí)可能還沒(méi)有到執(zhí)行的時(shí)間,所以在take方法中,要保證只有在到指定的執(zhí)行時(shí)間的時(shí)候任務(wù)才可以被取走。
PS:對(duì)于以上原理的理解,可以通過(guò)下面的源碼分析加深印象。
源碼分析
構(gòu)造方法
ScheduledThreadPoolExecutor有四個(gè)構(gòu)造形式:
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), threadFactory);
- }
- public ScheduledThreadPoolExecutor(int corePoolSize,
- RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), handler);
- }
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue(), threadFactory, handler);
- }
當(dāng)然我們也可以使用工具類Executors的newScheduledThreadPool的方法,快速創(chuàng)建。注意這里使用的DelayedWorkQueue。
ScheduledThreadPoolExecutor沒(méi)有提供帶有最大線程數(shù)的構(gòu)造函數(shù)的,默認(rèn)是Integer.MAX_VALUE,說(shuō)明其可以無(wú)限制的開(kāi)啟任意線程執(zhí)行任務(wù),在大量任務(wù)系統(tǒng),應(yīng)注意這一點(diǎn),避免內(nèi)存溢出。
核心方法
核心方法主要介紹ScheduledThreadPoolExecutor的調(diào)度方法,其他方法與 ThreadPoolExecutor 一致。調(diào)度方法均由 ScheduledExecutorService 接口定義:
- public interface ScheduledExecutorService extends ExecutorService {
- // 特定時(shí)間延時(shí)后執(zhí)行一次Runnable
- public ScheduledFuture<?> schedule(Runnable command,
- long delay, TimeUnit unit);
- // 特定時(shí)間延時(shí)后執(zhí)行一次Callable
- public <V> ScheduledFuture<V> schedule(Callable<V> callable,
- long delay, TimeUnit unit);
- // 固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間無(wú)關(guān),周期是固定的)
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit);
- // 固定延時(shí)執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間有關(guān),延時(shí)從上一次任務(wù)完成后開(kāi)始)
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
- long initialDelay,
- long delay,
- TimeUnit unit);
- }
我們?cè)賮?lái)看一下接口的實(shí)現(xiàn),具體是怎么來(lái)實(shí)現(xiàn)線程池任務(wù)的提交。因?yàn)樽罱K都回調(diào)用 delayedExecute 提交任務(wù)。所以,我們這里只分析schedule方法,該方法是指任務(wù)在指定延遲時(shí)間到達(dá)后觸發(fā),只會(huì)執(zhí)行一次。源代碼如下:
- public ScheduledFuture<?> schedule(Runnable command,
- long delay,
- TimeUnit unit) {
- //參數(shù)校驗(yàn)
- if (command == null || unit == null)
- throw new NullPointerException();
- //這里是一個(gè)嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask
- //然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴(kuò)展的,默認(rèn)是個(gè)空方法
- RunnableScheduledFuture<?> t = decorateTask(command,
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(delay, unit)));
- //包裝好任務(wù)以后,就進(jìn)行提交了
- delayedExecute(t);
- return t;
- }
delayedExecute 任務(wù)提交方法:
- private void delayedExecute(RunnableScheduledFuture<?> task) {
- //如果線程池已經(jīng)關(guān)閉,則使用拒絕策略把提交任務(wù)拒絕掉
- if (isShutdown())
- reject(task);
- else {
- //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列
- super.getQueue().add(task);//使用用的DelayedWorkQueue
- //如果當(dāng)前狀態(tài)無(wú)法執(zhí)行任務(wù),則取消
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- task.cancel(false);
- else
- //這里是增加一個(gè)worker線程,避免提交的任務(wù)沒(méi)有worker去執(zhí)行
- //原因就是該類沒(méi)有像ThreadPoolExecutor一樣,woker滿了才放入隊(duì)列
- ensurePrestart();
- }
- }
我們可以看到提交到線程池的任務(wù)都包裝成了 ScheduledFutureTask,繼續(xù)往下我們?cè)賮?lái)研究下。
ScheduledFutureTask
從ScheduledFutureTask類的定義可以看出,ScheduledFutureTask類是ScheduledThreadPoolExecutor類的私有內(nèi)部類,繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口。也就是說(shuō),ScheduledFutureTask具有FutureTask類的所有功能,并實(shí)現(xiàn)了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask類的定義如下所示:
- private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>
ScheduledFutureTask類繼承圖如下:

成員變量
SchduledFutureTask接收的參數(shù)(成員變量):
- // 任務(wù)開(kāi)始的時(shí)間
- private long time;
- // 任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號(hào)
- private final long sequenceNumber;
- // 任務(wù)執(zhí)行的時(shí)間間隔
- private final long period;
- //ScheduledFutureTask對(duì)象,實(shí)際指向當(dāng)前對(duì)象本身
- RunnableScheduledFuture outerTask = this;
- //當(dāng)前任務(wù)在延遲隊(duì)列中的索引,能夠更加方便的取消當(dāng)前任務(wù)
- int heapIndex;
解析:
- sequenceNumber:任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號(hào),可以根據(jù)這個(gè)序列號(hào)確定唯一的一個(gè)任務(wù),如果在定時(shí)任務(wù)中,如果一個(gè)任務(wù)是周期性執(zhí)行的,但是它們的sequenceNumber的值相同,則被視為是同一個(gè)任務(wù)。
- time:下一次執(zhí)行任務(wù)的時(shí)間。
- period:任務(wù)的執(zhí)行周期。
- outerTask:ScheduledFutureTask對(duì)象,實(shí)際指向當(dāng)前對(duì)象本身。此對(duì)象的引用會(huì)被傳入到周期性執(zhí)行任務(wù)的ScheduledThreadPoolExecutor類的reExecutePeriodic方法中。
- heapIndex:當(dāng)前任務(wù)在延遲隊(duì)列中的索引,這個(gè)索引能夠更加方便的取消當(dāng)前任務(wù)。
構(gòu)造方法
ScheduledFutureTask類繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口。在ScheduledFutureTask類中提供了如下構(gòu)造方法。
- ScheduledFutureTask(Runnable r, V result, long ns) {
- super(r, result);
- this.time = ns;
- this.period = 0;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
- ScheduledFutureTask(Runnable r, V result, long ns, long period) {
- super(r, result);
- this.time = ns;
- this.period = period;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
- ScheduledFutureTask(Callable<V> callable, long ns) {
- super(callable);
- this.time = ns;
- this.period = 0;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
FutureTask的構(gòu)造方法如下:
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
通過(guò)源碼可以看到,在ScheduledFutureTask類的構(gòu)造方法中,首先會(huì)調(diào)用FutureTask類的構(gòu)造方法為FutureTask類的callable和state成員變量賦值,接下來(lái)為ScheduledFutureTask類的time、period和sequenceNumber成員變量賦值。理解起來(lái)比較簡(jiǎn)單。
getDelay方法
我們先來(lái)看getDelay方法的源碼,如下所示:
- //獲取下次執(zhí)行任務(wù)的時(shí)間距離當(dāng)前時(shí)間的納秒數(shù)
- public long getDelay(TimeUnit unit) {
- return unit.convert(time - now(), NANOSECONDS);
- }
getDelay方法比較簡(jiǎn)單,主要用來(lái)獲取下次執(zhí)行任務(wù)的時(shí)間距離當(dāng)前系統(tǒng)時(shí)間的納秒數(shù)。
compareTo方法
ScheduledFutureTask類在類的結(jié)構(gòu)上實(shí)現(xiàn)了Comparable接口,compareTo方法主要是對(duì)Comparable接口定義的compareTo方法的實(shí)現(xiàn)。源碼如下所示:
- public int compareTo(Delayed other) {
- if (other == this)
- return 0;
- if (other instanceof ScheduledFutureTask) {
- ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
- long diff = time - x.time;
- if (diff < 0)
- return -1;
- else if (diff > 0)
- return 1;
- else if (sequenceNumber < x.sequenceNumber)
- return -1;
- else
- return 1;
- }
- long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
- return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
- }
這段代碼看上去好像是對(duì)各種數(shù)值類型數(shù)據(jù)的比較,本質(zhì)上是對(duì)延遲隊(duì)列中的任務(wù)進(jìn)行排序。排序規(guī)則為:
- 首先比較延遲隊(duì)列中每個(gè)任務(wù)下次執(zhí)行的時(shí)間,下次執(zhí)行時(shí)間距離當(dāng)前時(shí)間短的任務(wù)會(huì)排在前面;
- 如果下次執(zhí)行任務(wù)的時(shí)間相同,則會(huì)比較任務(wù)的sequenceNumber值,sequenceNumber值小的任務(wù)會(huì)排在前面。
isPeriodic方法
isPeriodic方法的源代碼如下所示:
- //判斷是否是周期性任務(wù)
- public boolean isPeriodic() {
- return period != 0;
- }
這個(gè)方法主要是用來(lái)判斷當(dāng)前任務(wù)是否是周期性任務(wù)。這里只要判斷運(yùn)行任務(wù)的執(zhí)行周期不等于0就能確定為周期性任務(wù)了。因?yàn)闊o(wú)論period的值是大于0還是小于0,當(dāng)前任務(wù)都是周期性任務(wù)。
setNextRunTime方法
setNextRunTime方法的作用主要是設(shè)置當(dāng)前任務(wù)下次執(zhí)行的時(shí)間,源碼如下所示:
- private void setNextRunTime() {
- long p = period;
- //固定頻率,上次執(zhí)行任務(wù)的時(shí)間加上任務(wù)的執(zhí)行周期
- if (p > 0)
- time += p;
- //相對(duì)固定的延遲執(zhí)行,當(dāng)前系統(tǒng)時(shí)間加上任務(wù)的執(zhí)行周期
- else
- time = triggerTime(-p);
- }
這里再一次證明了使用isPeriodic方法判斷當(dāng)前任務(wù)是否為周期性任務(wù)時(shí),只要判斷period的值是否不等于0就可以了。
- 因?yàn)槿绻?dāng)前任務(wù)時(shí)固定頻率執(zhí)行的周期性任務(wù),會(huì)將周期period當(dāng)作正數(shù)來(lái)處理;
- 如果是相對(duì)固定的延遲執(zhí)行當(dāng)前任務(wù),則會(huì)將周期period當(dāng)作負(fù)數(shù)來(lái)處理。
這里我們看到在setNextRunTime方法中,調(diào)用了ScheduledThreadPoolExecutor類的triggerTime方法。接下來(lái)我們看下triggerTime方法的源碼。
ScheduledThreadPoolExecutor類的triggerTime方法
triggerTime方法用于獲取延遲隊(duì)列中的任務(wù)下一次執(zhí)行的具體時(shí)間。源碼如下所示。
- private long triggerTime(long delay, TimeUnit unit) {
- return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
- }
- long triggerTime(long delay) {
- return now() +
- ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
- }
這兩個(gè)triggerTime方法的代碼比較簡(jiǎn)單,就是獲取下一次執(zhí)行任務(wù)的具體時(shí)間。有一點(diǎn)需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。
我們看到在triggerTime方法中處理防止溢出的邏輯使用了ScheduledThreadPoolExecutor類的overflowFree方法,接下來(lái),我們就看看ScheduledThreadPoolExecutor類的overflowFree方法的實(shí)現(xiàn)。
ScheduledThreadPoolExecutor類的overflowFree方法
overflowFree方法的源代碼如下所示:
- private long overflowFree(long delay) {
- //獲取隊(duì)列中的節(jié)點(diǎn)
- Delayed head = (Delayed) super.getQueue().peek();
- //獲取的節(jié)點(diǎn)不為空,則進(jìn)行后續(xù)處理
- if (head != null) {
- //從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間
- long headDelay = head.getDelay(NANOSECONDS);
- //如果從隊(duì)列中獲取的延遲時(shí)間小于0,并且傳遞的delay
- //值減去從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間小于0
- if (headDelay < 0 && (delay - headDelay < 0))
- //將delay的值設(shè)置為L(zhǎng)ong.MAX_VALUE + headDelay
- delay = Long.MAX_VALUE + headDelay;
- }
- //返回延遲時(shí)間
- return delay;
- }
通過(guò)對(duì)overflowFree方法的源碼分析,可以看出overflowFree方法本質(zhì)上就是為了限制隊(duì)列中的所有節(jié)點(diǎn)的延遲時(shí)間在Long.MAX_VALUE值之內(nèi),防止在compareTo方法中溢出。
cancel方法
cancel方法的作用主要是取消當(dāng)前任務(wù)的執(zhí)行,源碼如下所示:
- public boolean cancel(boolean mayInterruptIfRunning) {
- //取消任務(wù),返回任務(wù)是否取消的標(biāo)識(shí)
- boolean cancelled = super.cancel(mayInterruptIfRunning);
- //如果任務(wù)已經(jīng)取消
- //并且需要將任務(wù)從延遲隊(duì)列中刪除
- //并且任務(wù)在延遲隊(duì)列中的索引大于或者等于0
- if (cancelled && removeOnCancel && heapIndex >= 0)
- //將當(dāng)前任務(wù)從延遲隊(duì)列中刪除
- remove(this);
- //返回是否成功取消任務(wù)的標(biāo)識(shí)
- return cancelled;
- }
這段代碼理解起來(lái)相對(duì)比較簡(jiǎn)單,首先調(diào)用取消任務(wù)的方法,并返回任務(wù)是否已經(jīng)取消的標(biāo)識(shí)。如果任務(wù)已經(jīng)取消,并且需要移除任務(wù),同時(shí),任務(wù)在延遲隊(duì)列中的索引大于或者等于0,則將當(dāng)前任務(wù)從延遲隊(duì)列中移除。最后返回任務(wù)是否成功取消的標(biāo)識(shí)。
run方法
run方法可以說(shuō)是ScheduledFutureTask類的核心方法,是對(duì)Runnable接口的實(shí)現(xiàn),源碼如下所示:
- public void run() {
- //當(dāng)前任務(wù)是否是周期性任務(wù)
- boolean periodic = isPeriodic();
- //線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù)
- if (!canRunInCurrentRunState(periodic))
- //取消任務(wù)的執(zhí)行
- cancel(false);
- //如果不是周期性任務(wù)
- else if (!periodic)
- //則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù)
- ScheduledFutureTask.super.run();
- //如果是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù)
- //如果任務(wù)執(zhí)行成功
- else if (ScheduledFutureTask.super.runAndReset()) {
- //設(shè)置下次執(zhí)行任務(wù)的時(shí)間
- setNextRunTime();
- //重復(fù)執(zhí)行任務(wù)
- reExecutePeriodic(outerTask);
- }
- }
整理一下方法的邏輯:
- 首先判斷當(dāng)前任務(wù)是否是周期性任務(wù)。如果線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù),則取消任務(wù)的執(zhí)行,否則執(zhí)行步驟2;
- 如果當(dāng)前任務(wù)不是周期性任務(wù),則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù),會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟3;
- 如果當(dāng)前任務(wù)是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù),不會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4;
- 如果任務(wù)執(zhí)行成功,則設(shè)置下次執(zhí)行任務(wù)的時(shí)間,同時(shí),將任務(wù)設(shè)置為重復(fù)執(zhí)行。
這里,調(diào)用了FutureTask類的run方法和runAndReset方法,并且調(diào)用了ScheduledThreadPoolExecutor類的reExecutePeriodic方法。接下來(lái),我們分別看下這些方法的實(shí)現(xiàn)。
FutureTask類的run方法
FutureTask類的run方法源碼如下所示:
- public void run() {
- //狀態(tài)如果不是NEW,說(shuō)明任務(wù)或者已經(jīng)執(zhí)行過(guò),或者已經(jīng)被取消,直接返回
- //狀態(tài)如果是NEW,則嘗試把當(dāng)前執(zhí)行線程保存在runner字段中
- //如果賦值失敗則直接返回
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
- return;
- try {
- Callable<V> c = callable;
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- //執(zhí)行任務(wù)
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- //任務(wù)異常
- setException(ex);
- }
- if (ran)
- //任務(wù)正常執(zhí)行完畢
- set(result);
- }
- } finally {
- runner = null;
- int s = state;
- //如果任務(wù)被中斷,執(zhí)行中斷處理
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
代碼的整體邏輯為:
- 判斷當(dāng)前任務(wù)的state是否等于NEW,如果不為NEW則說(shuō)明任務(wù)或者已經(jīng)執(zhí)行過(guò),或者已經(jīng)被取消,直接返回;
- 如果狀態(tài)為NEW則接著會(huì)通過(guò)unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中,如果保存失敗,則直接返回;
- 執(zhí)行任務(wù);如果任務(wù)執(zhí)行發(fā)生異常,則調(diào)用setException()方法保存異常信息。
FutureTask類的runAndReset方法
方法的源碼如下所示:
- protected boolean runAndReset() {
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return false;
- boolean ran = false;
- int s = state;
- try {
- Callable<V> c = callable;
- if (c != null && s == NEW) {
- try {
- c.call(); // don't set result
- ran = true;
- } catch (Throwable ex) {
- setException(ex);
- }
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- s = state;
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- return ran && s == NEW;
- }
FutureTask類的runAndReset方法與run方法的邏輯基本相同,只是runAndReset方法會(huì)重置當(dāng)前任務(wù)的執(zhí)行狀態(tài)。
ScheduledThreadPoolExecutor類的reExecutePeriodic方法
reExecutePeriodic重復(fù)執(zhí)行任務(wù)方法,源代碼如下所示:
- void reExecutePeriodic(RunnableScheduledFuture<?> task) {
- //線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù)
- if (canRunInCurrentRunState(true)) {
- //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列
- super.getQueue().add(task);//使用用的DelayedWorkQueue
- //線程池當(dāng)前狀態(tài)下不能執(zhí)行任務(wù),并且成功移除任務(wù)
- if (!canRunInCurrentRunState(true) && remove(task))
- //取消任務(wù)
- task.cancel(false);
- else
- //這里是增加一個(gè)worker線程,避免提交的任務(wù)沒(méi)有worker去執(zhí)行
- //原因就是該類沒(méi)有像ThreadPoolExecutor一樣,woker滿了才放入隊(duì)列
- ensurePrestart();
- }
- }
總體來(lái)說(shuō)reExecutePeriodic方法的邏輯比較簡(jiǎn)單,需要注意的是:調(diào)用reExecutePeriodic方法的時(shí)候已經(jīng)執(zhí)行過(guò)一次任務(wù),所以,并不會(huì)觸發(fā)線程池的拒絕策略;傳入reExecutePeriodic方法的任務(wù)一定是周期性的任務(wù)。
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己實(shí)現(xiàn)阻塞的工作隊(duì)列,是因?yàn)?ScheduleThreadPoolExecutor 要求的工作隊(duì)列有些特殊。
DelayedWorkQueue是一個(gè)基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時(shí)任務(wù)的時(shí)候,每個(gè)任務(wù)的執(zhí)行時(shí)間都不同,所以DelayedWorkQueue的工作就是按照?qǐng)?zhí)行時(shí)間的升序來(lái)排列,執(zhí)行時(shí)間距離當(dāng)前時(shí)間越近的任務(wù)在隊(duì)列的前面(注意:這里的順序并不是絕對(duì)的,堆中的排序只保證了子節(jié)點(diǎn)的下次執(zhí)行時(shí)間要比父節(jié)點(diǎn)的下次執(zhí)行時(shí)間要大,而葉子節(jié)點(diǎn)之間并不一定是順序的)。
堆結(jié)構(gòu)如下圖:
可見(jiàn),DelayedWorkQueue是一個(gè)基于最小堆結(jié)構(gòu)的隊(duì)列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組:

在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性: 假設(shè)“第一個(gè)元素” 在數(shù)組中的索引為 0 的話,則父結(jié)點(diǎn)和子結(jié)點(diǎn)的位置關(guān)系如下:
- 索引為 的左孩子的索引是 (2∗i+1);
- 索引為 的右孩子的索引是 (2∗i+2);
- 索引為 的父結(jié)點(diǎn)的索引是 floor((i−1)/2);
為什么要使用DelayedWorkQueue呢?
- 定時(shí)任務(wù)執(zhí)行時(shí)需要取出最近要執(zhí)行的任務(wù),所以任務(wù)在隊(duì)列中每次出隊(duì)時(shí)一定要是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的,所以自然要使用優(yōu)先級(jí)隊(duì)列。
- DelayedWorkQueue是一個(gè)優(yōu)先級(jí)隊(duì)列,它可以保證每次出隊(duì)的任務(wù)都是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的,由于它是基于堆結(jié)構(gòu)的隊(duì)列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時(shí)的最壞時(shí)間復(fù)雜度是 O(logN)。
因?yàn)榍懊嬷v阻塞隊(duì)列實(shí)現(xiàn)的時(shí)候,已經(jīng)對(duì)DelayedWorkQueue進(jìn)行了說(shuō)明,更多內(nèi)容請(qǐng)查看《阻塞隊(duì)列 — DelayedWorkQueue源碼分析》
總結(jié)
- 與Timer執(zhí)行定時(shí)任務(wù)比較,相比Timer,ScheduledThreadPoolExecutor有說(shuō)明優(yōu)點(diǎn)?(文章前面分析過(guò))
- ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個(gè)線程池,也有 coorPoolSize 和 workQueue,但是 ScheduledThreadPoolExecutor特殊的地方在于,自己實(shí)現(xiàn)了優(yōu)先工作隊(duì)列 DelayedWorkQueue ;
- ScheduledThreadPoolExecutor 實(shí)現(xiàn)了 ScheduledExecutorService,所以就有了任務(wù)調(diào)度的方法,如 schedule 、 scheduleAtFixedRate 、 scheduleWithFixedDelay ,同時(shí)注意他們之間的區(qū)別;
- 內(nèi)部類 ScheduledFutureTask 繼承者FutureTask,實(shí)現(xiàn)了任務(wù)的異步執(zhí)行并且可以獲取返回結(jié)果。同時(shí)實(shí)現(xiàn)了Delayed接口,可以通過(guò)getDelay方法獲取將要執(zhí)行的時(shí)間間隔;
- 周期任務(wù)的執(zhí)行其實(shí)是調(diào)用了FutureTask的 runAndReset 方法,每次執(zhí)行完不設(shè)置結(jié)果和狀態(tài)。
- DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu),它是一個(gè)基于最小堆結(jié)構(gòu)的優(yōu)先隊(duì)列,并且每次出隊(duì)時(shí)能夠保證取出的任務(wù)是當(dāng)前隊(duì)列中下次執(zhí)行時(shí)間最小的任務(wù)。同時(shí)注意一下優(yōu)先隊(duì)列中堆的順序,堆中的順序并不是絕對(duì)的,但要保證子節(jié)點(diǎn)的值要比父節(jié)點(diǎn)的值要大,這樣就不會(huì)影響出隊(duì)的順序。
總體來(lái)說(shuō),ScheduedThreadPoolExecutor的重點(diǎn)是要理解下次執(zhí)行時(shí)間的計(jì)算,以及優(yōu)先隊(duì)列的出隊(duì)、入隊(duì)和刪除的過(guò)程,這兩個(gè)是理解ScheduedThreadPoolExecutor的關(guān)鍵。
PS:以上代碼提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git