自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

并發(fā)編程之定時(shí)任務(wù)&定時(shí)線程池原理解析

開(kāi)發(fā) 前端
線程池的具體實(shí)現(xiàn)有兩種,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時(shí)線程池,上一篇已經(jīng)分析過(guò)ThreadPoolExecutor原理與使用了,本篇我們來(lái)重點(diǎn)分析下ScheduledThreadPoolExecutor的原理與使用。

[[356766]]

 前言

線程池的具體實(shí)現(xiàn)有兩種,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時(shí)線程池,上一篇已經(jīng)分析過(guò)ThreadPoolExecutor原理與使用了,本篇我們來(lái)重點(diǎn)分析下ScheduledThreadPoolExecutor的原理與使用。

《并發(fā)編程之Executor線程池原理與源碼解讀》

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 與 ThreadPoolExecutor 線程池的概念有些區(qū)別,它是一個(gè)支持任務(wù)周期性調(diào)度的線程池。

ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor,同時(shí)通過(guò)實(shí)現(xiàn) ScheduledExecutorSerivce 來(lái)擴(kuò)展基礎(chǔ)線程池的功能,使其擁有了調(diào)度能力。其整個(gè)調(diào)度的核心在于內(nèi)部類 DelayedWorkQueue ,一個(gè)有序的延時(shí)隊(duì)列。

定時(shí)線程池類的類結(jié)構(gòu)圖如下:

ScheduledThreadPoolExecutor 的出現(xiàn),很好的彌補(bǔ)了傳統(tǒng) Timer 的不足,具體對(duì)比看下表:

TimerScheduledThreadPoolExecutor線程單線程多線程多任務(wù)任務(wù)之間相互影響任務(wù)之間不影響調(diào)度時(shí)間絕對(duì)時(shí)間相對(duì)時(shí)間異常單任務(wù)異常,后續(xù)任務(wù)受影響無(wú)影響

工作原理

它用來(lái)處理延時(shí)任務(wù)或定時(shí)任務(wù)

它接收SchduledFutureTask類型的任務(wù),是線程池調(diào)度任務(wù)的最小單位,有三種提交任務(wù)的方式:

  1. schedule,特定時(shí)間延時(shí)后執(zhí)行一次任務(wù)
  2. scheduledAtFixedRate,固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間無(wú)關(guān),周期是固定的)
  3. scheduledWithFixedDelay,固定延時(shí)執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間有關(guān),延時(shí)從上一次任務(wù)完成后開(kāi)始)

它采用 DelayedWorkQueue 存儲(chǔ)等待的任務(wù)

  1. DelayedWorkQueue 內(nèi)部封裝了一個(gè) PriorityQueue ,它會(huì)根據(jù) time 的先后時(shí)間排序,若 time 相同則根據(jù) sequenceNumber 排序;
  2. DelayedWorkQueue 也是一個(gè)無(wú)界隊(duì)列;

因?yàn)榍懊嬷v阻塞隊(duì)列實(shí)現(xiàn)的時(shí)候,已經(jīng)對(duì)DelayedWorkQueue進(jìn)行了說(shuō)明,更多內(nèi)容請(qǐng)查看《阻塞隊(duì)列 — DelayedWorkQueue源碼分析》

工作線程的執(zhí)行過(guò)程:

  • 工作線程會(huì)從DelayedWorkerQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
  • 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayedWorkerQueue。

take方法是什么時(shí)候調(diào)用的呢? 在ThreadPoolExecutor中,getTask方法,工作線程會(huì)循環(huán)地從workQueue中取任務(wù)。但定時(shí)任務(wù)卻不同,因?yàn)槿绻坏ゞetTask方法取出了任務(wù)就開(kāi)始執(zhí)行了,而這時(shí)可能還沒(méi)有到執(zhí)行的時(shí)間,所以在take方法中,要保證只有在到指定的執(zhí)行時(shí)間的時(shí)候任務(wù)才可以被取走。

PS:對(duì)于以上原理的理解,可以通過(guò)下面的源碼分析加深印象。

源碼分析

構(gòu)造方法

ScheduledThreadPoolExecutor有四個(gè)構(gòu)造形式:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) { 
  2.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  3.   new DelayedWorkQueue()); 
  4.  
  5. public ScheduledThreadPoolExecutor(int corePoolSize, 
  6.                                        ThreadFactory threadFactory) { 
  7.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  8.      new DelayedWorkQueue(), threadFactory); 
  9.  
  10. public ScheduledThreadPoolExecutor(int corePoolSize, 
  11.                                        RejectedExecutionHandler handler) { 
  12.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  13.   new DelayedWorkQueue(), handler); 
  14.  
  15. public ScheduledThreadPoolExecutor(int corePoolSize, 
  16.                                        ThreadFactory threadFactory, 
  17.                                        RejectedExecutionHandler handler) { 
  18.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  19.      new DelayedWorkQueue(), threadFactory, handler); 

 當(dāng)然我們也可以使用工具類Executors的newScheduledThreadPool的方法,快速創(chuàng)建。注意這里使用的DelayedWorkQueue。

ScheduledThreadPoolExecutor沒(méi)有提供帶有最大線程數(shù)的構(gòu)造函數(shù)的,默認(rèn)是Integer.MAX_VALUE,說(shuō)明其可以無(wú)限制的開(kāi)啟任意線程執(zhí)行任務(wù),在大量任務(wù)系統(tǒng),應(yīng)注意這一點(diǎn),避免內(nèi)存溢出。

核心方法

核心方法主要介紹ScheduledThreadPoolExecutor的調(diào)度方法,其他方法與 ThreadPoolExecutor 一致。調(diào)度方法均由 ScheduledExecutorService 接口定義:

  1. public interface ScheduledExecutorService extends ExecutorService { 
  2.     // 特定時(shí)間延時(shí)后執(zhí)行一次Runnable 
  3.     public ScheduledFuture<?> schedule(Runnable command, 
  4.                                        long delay, TimeUnit unit); 
  5.     // 特定時(shí)間延時(shí)后執(zhí)行一次Callable 
  6.     public <V> ScheduledFuture<V> schedule(Callable<V> callable, 
  7.                                            long delay, TimeUnit unit); 
  8.     // 固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間無(wú)關(guān),周期是固定的) 
  9.     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
  10.                                                   long initialDelay, 
  11.                                                   long period, 
  12.                                                   TimeUnit unit); 
  13.      // 固定延時(shí)執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間有關(guān),延時(shí)從上一次任務(wù)完成后開(kāi)始) 
  14.     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
  15.                                                      long initialDelay, 
  16.                                                      long delay, 
  17.                                                      TimeUnit unit); 

 我們?cè)賮?lái)看一下接口的實(shí)現(xiàn),具體是怎么來(lái)實(shí)現(xiàn)線程池任務(wù)的提交。因?yàn)樽罱K都回調(diào)用 delayedExecute 提交任務(wù)。所以,我們這里只分析schedule方法,該方法是指任務(wù)在指定延遲時(shí)間到達(dá)后觸發(fā),只會(huì)執(zhí)行一次。源代碼如下:

  1. public ScheduledFuture<?> schedule(Runnable command, 
  2.                                    long delay, 
  3.                                    TimeUnit unit) { 
  4.     //參數(shù)校驗(yàn) 
  5.     if (command == null || unit == null
  6.         throw new NullPointerException(); 
  7.     //這里是一個(gè)嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask 
  8.     //然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴(kuò)展的,默認(rèn)是個(gè)空方法 
  9.     RunnableScheduledFuture<?> t = decorateTask(command, 
  10.         new ScheduledFutureTask<Void>(command, null
  11.                                       triggerTime(delay, unit))); 
  12.    //包裝好任務(wù)以后,就進(jìn)行提交了 
  13.  delayedExecute(t); 
  14.     return t; 

 delayedExecute 任務(wù)提交方法:

  1. private void delayedExecute(RunnableScheduledFuture<?> task) { 
  2.     //如果線程池已經(jīng)關(guān)閉,則使用拒絕策略把提交任務(wù)拒絕掉 
  3.  if (isShutdown()) 
  4.         reject(task); 
  5.     else { 
  6.   //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列 
  7.         super.getQueue().add(task);//使用用的DelayedWorkQueue 
  8.   //如果當(dāng)前狀態(tài)無(wú)法執(zhí)行任務(wù),則取消 
  9.         if (isShutdown() && 
  10.             !canRunInCurrentRunState(task.isPeriodic()) && 
  11.             remove(task)) 
  12.             task.cancel(false); 
  13.         else 
  14.          //這里是增加一個(gè)worker線程,避免提交的任務(wù)沒(méi)有worker去執(zhí)行 
  15.          //原因就是該類沒(méi)有像ThreadPoolExecutor一樣,woker滿了才放入隊(duì)列 
  16.            ensurePrestart(); 
  17.     } 

 我們可以看到提交到線程池的任務(wù)都包裝成了 ScheduledFutureTask,繼續(xù)往下我們?cè)賮?lái)研究下。

ScheduledFutureTask

從ScheduledFutureTask類的定義可以看出,ScheduledFutureTask類是ScheduledThreadPoolExecutor類的私有內(nèi)部類,繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口。也就是說(shuō),ScheduledFutureTask具有FutureTask類的所有功能,并實(shí)現(xiàn)了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask類的定義如下所示:

  1. private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> 

ScheduledFutureTask類繼承圖如下:


成員變量

SchduledFutureTask接收的參數(shù)(成員變量):

  1. // 任務(wù)開(kāi)始的時(shí)間 
  2.  
  3. private long time
  4.  
  5. // 任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號(hào) 
  6.  
  7. private final long sequenceNumber; 
  8.  
  9. // 任務(wù)執(zhí)行的時(shí)間間隔 
  10.  
  11. private final long period; 
  12.  
  13. //ScheduledFutureTask對(duì)象,實(shí)際指向當(dāng)前對(duì)象本身 
  14.  
  15. RunnableScheduledFuture outerTask = this; 
  16.  
  17. //當(dāng)前任務(wù)在延遲隊(duì)列中的索引,能夠更加方便的取消當(dāng)前任務(wù) 
  18.  
  19. int heapIndex; 

 解析:

  • sequenceNumber:任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號(hào),可以根據(jù)這個(gè)序列號(hào)確定唯一的一個(gè)任務(wù),如果在定時(shí)任務(wù)中,如果一個(gè)任務(wù)是周期性執(zhí)行的,但是它們的sequenceNumber的值相同,則被視為是同一個(gè)任務(wù)。
  • time:下一次執(zhí)行任務(wù)的時(shí)間。
  • period:任務(wù)的執(zhí)行周期。
  • outerTask:ScheduledFutureTask對(duì)象,實(shí)際指向當(dāng)前對(duì)象本身。此對(duì)象的引用會(huì)被傳入到周期性執(zhí)行任務(wù)的ScheduledThreadPoolExecutor類的reExecutePeriodic方法中。
  • heapIndex:當(dāng)前任務(wù)在延遲隊(duì)列中的索引,這個(gè)索引能夠更加方便的取消當(dāng)前任務(wù)。

構(gòu)造方法

ScheduledFutureTask類繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口。在ScheduledFutureTask類中提供了如下構(gòu)造方法。

  1. ScheduledFutureTask(Runnable r, V result, long ns) { 
  2.  super(r, result); 
  3.  this.time = ns; 
  4.  this.period = 0; 
  5.  this.sequenceNumber = sequencer.getAndIncrement(); 
  6.  
  7. ScheduledFutureTask(Runnable r, V result, long ns, long period) { 
  8.  super(r, result); 
  9.  this.time = ns; 
  10.  this.period = period; 
  11.  this.sequenceNumber = sequencer.getAndIncrement(); 
  12.  
  13. ScheduledFutureTask(Callable<V> callable, long ns) { 
  14.  super(callable); 
  15.  this.time = ns; 
  16.  this.period = 0; 
  17.  this.sequenceNumber = sequencer.getAndIncrement(); 

 FutureTask的構(gòu)造方法如下:

  1. public FutureTask(Runnable runnable, V result) { 
  2.  this.callable = Executors.callable(runnable, result); 
  3.  this.state = NEW;       // ensure visibility of callable 

 通過(guò)源碼可以看到,在ScheduledFutureTask類的構(gòu)造方法中,首先會(huì)調(diào)用FutureTask類的構(gòu)造方法為FutureTask類的callable和state成員變量賦值,接下來(lái)為ScheduledFutureTask類的time、period和sequenceNumber成員變量賦值。理解起來(lái)比較簡(jiǎn)單。

getDelay方法

我們先來(lái)看getDelay方法的源碼,如下所示:

  1. //獲取下次執(zhí)行任務(wù)的時(shí)間距離當(dāng)前時(shí)間的納秒數(shù) 
  2. public long getDelay(TimeUnit unit) { 
  3.  return unit.convert(time - now(), NANOSECONDS); 

 getDelay方法比較簡(jiǎn)單,主要用來(lái)獲取下次執(zhí)行任務(wù)的時(shí)間距離當(dāng)前系統(tǒng)時(shí)間的納秒數(shù)。

compareTo方法

ScheduledFutureTask類在類的結(jié)構(gòu)上實(shí)現(xiàn)了Comparable接口,compareTo方法主要是對(duì)Comparable接口定義的compareTo方法的實(shí)現(xiàn)。源碼如下所示:

  1. public int compareTo(Delayed other) { 
  2.  if (other == this)  
  3.   return 0; 
  4.  if (other instanceof ScheduledFutureTask) { 
  5.   ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 
  6.   long diff = time - x.time
  7.   if (diff < 0) 
  8.    return -1; 
  9.   else if (diff > 0) 
  10.    return 1; 
  11.   else if (sequenceNumber < x.sequenceNumber) 
  12.    return -1; 
  13.   else 
  14.    return 1; 
  15.  } 
  16.  long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 
  17.  return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 

 這段代碼看上去好像是對(duì)各種數(shù)值類型數(shù)據(jù)的比較,本質(zhì)上是對(duì)延遲隊(duì)列中的任務(wù)進(jìn)行排序。排序規(guī)則為:

  • 首先比較延遲隊(duì)列中每個(gè)任務(wù)下次執(zhí)行的時(shí)間,下次執(zhí)行時(shí)間距離當(dāng)前時(shí)間短的任務(wù)會(huì)排在前面;
  • 如果下次執(zhí)行任務(wù)的時(shí)間相同,則會(huì)比較任務(wù)的sequenceNumber值,sequenceNumber值小的任務(wù)會(huì)排在前面。

isPeriodic方法

isPeriodic方法的源代碼如下所示:

  1. //判斷是否是周期性任務(wù) 
  2. public boolean isPeriodic() { 
  3.  return period != 0; 

 這個(gè)方法主要是用來(lái)判斷當(dāng)前任務(wù)是否是周期性任務(wù)。這里只要判斷運(yùn)行任務(wù)的執(zhí)行周期不等于0就能確定為周期性任務(wù)了。因?yàn)闊o(wú)論period的值是大于0還是小于0,當(dāng)前任務(wù)都是周期性任務(wù)。

setNextRunTime方法

setNextRunTime方法的作用主要是設(shè)置當(dāng)前任務(wù)下次執(zhí)行的時(shí)間,源碼如下所示:

  1. private void setNextRunTime() { 
  2.  long p = period; 
  3.  //固定頻率,上次執(zhí)行任務(wù)的時(shí)間加上任務(wù)的執(zhí)行周期 
  4.  if (p > 0) 
  5.   time += p; 
  6.  //相對(duì)固定的延遲執(zhí)行,當(dāng)前系統(tǒng)時(shí)間加上任務(wù)的執(zhí)行周期 
  7.  else 
  8.   time = triggerTime(-p); 

 這里再一次證明了使用isPeriodic方法判斷當(dāng)前任務(wù)是否為周期性任務(wù)時(shí),只要判斷period的值是否不等于0就可以了。

  • 因?yàn)槿绻?dāng)前任務(wù)時(shí)固定頻率執(zhí)行的周期性任務(wù),會(huì)將周期period當(dāng)作正數(shù)來(lái)處理;
  • 如果是相對(duì)固定的延遲執(zhí)行當(dāng)前任務(wù),則會(huì)將周期period當(dāng)作負(fù)數(shù)來(lái)處理。

這里我們看到在setNextRunTime方法中,調(diào)用了ScheduledThreadPoolExecutor類的triggerTime方法。接下來(lái)我們看下triggerTime方法的源碼。

ScheduledThreadPoolExecutor類的triggerTime方法

triggerTime方法用于獲取延遲隊(duì)列中的任務(wù)下一次執(zhí)行的具體時(shí)間。源碼如下所示。

  1. private long triggerTime(long delay, TimeUnit unit) { 
  2.  return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 
  3.  
  4. long triggerTime(long delay) { 
  5.  return now() + 
  6.   ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 

 這兩個(gè)triggerTime方法的代碼比較簡(jiǎn)單,就是獲取下一次執(zhí)行任務(wù)的具體時(shí)間。有一點(diǎn)需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。

我們看到在triggerTime方法中處理防止溢出的邏輯使用了ScheduledThreadPoolExecutor類的overflowFree方法,接下來(lái),我們就看看ScheduledThreadPoolExecutor類的overflowFree方法的實(shí)現(xiàn)。

ScheduledThreadPoolExecutor類的overflowFree方法

overflowFree方法的源代碼如下所示:

  1. private long overflowFree(long delay) { 
  2.  //獲取隊(duì)列中的節(jié)點(diǎn) 
  3.  Delayed head = (Delayed) super.getQueue().peek(); 
  4.  //獲取的節(jié)點(diǎn)不為空,則進(jìn)行后續(xù)處理 
  5.  if (head != null) { 
  6.   //從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間 
  7.   long headDelay = head.getDelay(NANOSECONDS); 
  8.   //如果從隊(duì)列中獲取的延遲時(shí)間小于0,并且傳遞的delay 
  9.   //值減去從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間小于0 
  10.   if (headDelay < 0 && (delay - headDelay < 0)) 
  11.    //將delay的值設(shè)置為L(zhǎng)ong.MAX_VALUE + headDelay 
  12.    delay = Long.MAX_VALUE + headDelay; 
  13.  } 
  14.  //返回延遲時(shí)間 
  15.  return delay; 

 通過(guò)對(duì)overflowFree方法的源碼分析,可以看出overflowFree方法本質(zhì)上就是為了限制隊(duì)列中的所有節(jié)點(diǎn)的延遲時(shí)間在Long.MAX_VALUE值之內(nèi),防止在compareTo方法中溢出。

cancel方法

cancel方法的作用主要是取消當(dāng)前任務(wù)的執(zhí)行,源碼如下所示:

  1. public boolean cancel(boolean mayInterruptIfRunning) { 
  2.  //取消任務(wù),返回任務(wù)是否取消的標(biāo)識(shí) 
  3.  boolean cancelled = super.cancel(mayInterruptIfRunning); 
  4.  //如果任務(wù)已經(jīng)取消 
  5.  //并且需要將任務(wù)從延遲隊(duì)列中刪除 
  6.  //并且任務(wù)在延遲隊(duì)列中的索引大于或者等于0 
  7.  if (cancelled && removeOnCancel && heapIndex >= 0) 
  8.   //將當(dāng)前任務(wù)從延遲隊(duì)列中刪除 
  9.   remove(this); 
  10.  //返回是否成功取消任務(wù)的標(biāo)識(shí) 
  11.  return cancelled; 

 這段代碼理解起來(lái)相對(duì)比較簡(jiǎn)單,首先調(diào)用取消任務(wù)的方法,并返回任務(wù)是否已經(jīng)取消的標(biāo)識(shí)。如果任務(wù)已經(jīng)取消,并且需要移除任務(wù),同時(shí),任務(wù)在延遲隊(duì)列中的索引大于或者等于0,則將當(dāng)前任務(wù)從延遲隊(duì)列中移除。最后返回任務(wù)是否成功取消的標(biāo)識(shí)。

run方法

run方法可以說(shuō)是ScheduledFutureTask類的核心方法,是對(duì)Runnable接口的實(shí)現(xiàn),源碼如下所示:

  1. public void run() { 
  2.  //當(dāng)前任務(wù)是否是周期性任務(wù) 
  3.  boolean periodic = isPeriodic(); 
  4.  //線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù) 
  5.  if (!canRunInCurrentRunState(periodic)) 
  6.   //取消任務(wù)的執(zhí)行 
  7.   cancel(false); 
  8.  //如果不是周期性任務(wù) 
  9.  else if (!periodic) 
  10.   //則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù) 
  11.   ScheduledFutureTask.super.run(); 
  12.  //如果是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù) 
  13.  //如果任務(wù)執(zhí)行成功 
  14.  else if (ScheduledFutureTask.super.runAndReset()) { 
  15.   //設(shè)置下次執(zhí)行任務(wù)的時(shí)間 
  16.   setNextRunTime(); 
  17.   //重復(fù)執(zhí)行任務(wù) 
  18.   reExecutePeriodic(outerTask); 
  19.  } 

 整理一下方法的邏輯:

  1. 首先判斷當(dāng)前任務(wù)是否是周期性任務(wù)。如果線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù),則取消任務(wù)的執(zhí)行,否則執(zhí)行步驟2;
  2. 如果當(dāng)前任務(wù)不是周期性任務(wù),則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù),會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟3;
  3. 如果當(dāng)前任務(wù)是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù),不會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4;
  4. 如果任務(wù)執(zhí)行成功,則設(shè)置下次執(zhí)行任務(wù)的時(shí)間,同時(shí),將任務(wù)設(shè)置為重復(fù)執(zhí)行。

這里,調(diào)用了FutureTask類的run方法和runAndReset方法,并且調(diào)用了ScheduledThreadPoolExecutor類的reExecutePeriodic方法。接下來(lái),我們分別看下這些方法的實(shí)現(xiàn)。

FutureTask類的run方法

FutureTask類的run方法源碼如下所示:

  1. public void run() { 
  2.     //狀態(tài)如果不是NEW,說(shuō)明任務(wù)或者已經(jīng)執(zhí)行過(guò),或者已經(jīng)被取消,直接返回 
  3.     //狀態(tài)如果是NEW,則嘗試把當(dāng)前執(zhí)行線程保存在runner字段中 
  4.     //如果賦值失敗則直接返回 
  5.     if (state != NEW || 
  6.         !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 
  7.         return
  8.     try { 
  9.         Callable<V> c = callable; 
  10.         if (c != null && state == NEW) { 
  11.             V result; 
  12.             boolean ran; 
  13.             try { 
  14.                 //執(zhí)行任務(wù) 
  15.                 result = c.call(); 
  16.                 ran = true
  17.             } catch (Throwable ex) { 
  18.                 result = null
  19.                 ran = false
  20.                 //任務(wù)異常 
  21.                 setException(ex); 
  22.             } 
  23.             if (ran) 
  24.                 //任務(wù)正常執(zhí)行完畢 
  25.                 set(result); 
  26.         } 
  27.     } finally { 
  28.  
  29.         runner = null
  30.         int s = state; 
  31.         //如果任務(wù)被中斷,執(zhí)行中斷處理 
  32.         if (s >= INTERRUPTING) 
  33.             handlePossibleCancellationInterrupt(s); 
  34.     } 

 代碼的整體邏輯為:

  • 判斷當(dāng)前任務(wù)的state是否等于NEW,如果不為NEW則說(shuō)明任務(wù)或者已經(jīng)執(zhí)行過(guò),或者已經(jīng)被取消,直接返回;
  • 如果狀態(tài)為NEW則接著會(huì)通過(guò)unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中,如果保存失敗,則直接返回;
  • 執(zhí)行任務(wù);如果任務(wù)執(zhí)行發(fā)生異常,則調(diào)用setException()方法保存異常信息。

FutureTask類的runAndReset方法

方法的源碼如下所示:

  1. protected boolean runAndReset() { 
  2.  if (state != NEW || 
  3.   !UNSAFE.compareAndSwapObject(this, runnerOffset, 
  4.           null, Thread.currentThread())) 
  5.   return false
  6.  boolean ran = false
  7.  int s = state; 
  8.  try { 
  9.   Callable<V> c = callable; 
  10.   if (c != null && s == NEW) { 
  11.    try { 
  12.     c.call(); // don't set result 
  13.     ran = true
  14.    } catch (Throwable ex) { 
  15.     setException(ex); 
  16.    } 
  17.   } 
  18.  } finally { 
  19.   // runner must be non-null until state is settled to 
  20.   // prevent concurrent calls to run() 
  21.   runner = null
  22.   // state must be re-read after nulling runner to prevent 
  23.   // leaked interrupts 
  24.   s = state; 
  25.   if (s >= INTERRUPTING) 
  26.    handlePossibleCancellationInterrupt(s); 
  27.  } 
  28.  return ran && s == NEW; 

 FutureTask類的runAndReset方法與run方法的邏輯基本相同,只是runAndReset方法會(huì)重置當(dāng)前任務(wù)的執(zhí)行狀態(tài)。

ScheduledThreadPoolExecutor類的reExecutePeriodic方法

reExecutePeriodic重復(fù)執(zhí)行任務(wù)方法,源代碼如下所示:

  1. void reExecutePeriodic(RunnableScheduledFuture<?> task) { 
  2.  //線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù) 
  3.  if (canRunInCurrentRunState(true)) { 
  4.   //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列 
  5.         super.getQueue().add(task);//使用用的DelayedWorkQueue 
  6.   //線程池當(dāng)前狀態(tài)下不能執(zhí)行任務(wù),并且成功移除任務(wù) 
  7.   if (!canRunInCurrentRunState(true) && remove(task)) 
  8.    //取消任務(wù) 
  9.    task.cancel(false); 
  10.   else 
  11.    //這里是增加一個(gè)worker線程,避免提交的任務(wù)沒(méi)有worker去執(zhí)行 
  12.             //原因就是該類沒(méi)有像ThreadPoolExecutor一樣,woker滿了才放入隊(duì)列   
  13.    ensurePrestart(); 
  14.  } 

 總體來(lái)說(shuō)reExecutePeriodic方法的邏輯比較簡(jiǎn)單,需要注意的是:調(diào)用reExecutePeriodic方法的時(shí)候已經(jīng)執(zhí)行過(guò)一次任務(wù),所以,并不會(huì)觸發(fā)線程池的拒絕策略;傳入reExecutePeriodic方法的任務(wù)一定是周期性的任務(wù)。

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要自己實(shí)現(xiàn)阻塞的工作隊(duì)列,是因?yàn)?ScheduleThreadPoolExecutor 要求的工作隊(duì)列有些特殊。

DelayedWorkQueue是一個(gè)基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時(shí)任務(wù)的時(shí)候,每個(gè)任務(wù)的執(zhí)行時(shí)間都不同,所以DelayedWorkQueue的工作就是按照?qǐng)?zhí)行時(shí)間的升序來(lái)排列,執(zhí)行時(shí)間距離當(dāng)前時(shí)間越近的任務(wù)在隊(duì)列的前面(注意:這里的順序并不是絕對(duì)的,堆中的排序只保證了子節(jié)點(diǎn)的下次執(zhí)行時(shí)間要比父節(jié)點(diǎn)的下次執(zhí)行時(shí)間要大,而葉子節(jié)點(diǎn)之間并不一定是順序的)。

堆結(jié)構(gòu)如下圖:

 可見(jiàn),DelayedWorkQueue是一個(gè)基于最小堆結(jié)構(gòu)的隊(duì)列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組:

并發(fā)編程之定時(shí)任務(wù)&定時(shí)線程池原理解析

 在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性: 假設(shè)“第一個(gè)元素” 在數(shù)組中的索引為 0 的話,則父結(jié)點(diǎn)和子結(jié)點(diǎn)的位置關(guān)系如下:

  • 索引為 的左孩子的索引是 (2∗i+1);
  • 索引為 的右孩子的索引是 (2∗i+2);
  • 索引為 的父結(jié)點(diǎn)的索引是 floor((i−1)/2);

為什么要使用DelayedWorkQueue呢?

  • 定時(shí)任務(wù)執(zhí)行時(shí)需要取出最近要執(zhí)行的任務(wù),所以任務(wù)在隊(duì)列中每次出隊(duì)時(shí)一定要是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的,所以自然要使用優(yōu)先級(jí)隊(duì)列。
  • DelayedWorkQueue是一個(gè)優(yōu)先級(jí)隊(duì)列,它可以保證每次出隊(duì)的任務(wù)都是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的,由于它是基于堆結(jié)構(gòu)的隊(duì)列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時(shí)的最壞時(shí)間復(fù)雜度是 O(logN)。

因?yàn)榍懊嬷v阻塞隊(duì)列實(shí)現(xiàn)的時(shí)候,已經(jīng)對(duì)DelayedWorkQueue進(jìn)行了說(shuō)明,更多內(nèi)容請(qǐng)查看《阻塞隊(duì)列 — DelayedWorkQueue源碼分析》

總結(jié)

  1. 與Timer執(zhí)行定時(shí)任務(wù)比較,相比Timer,ScheduledThreadPoolExecutor有說(shuō)明優(yōu)點(diǎn)?(文章前面分析過(guò))
  2. ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個(gè)線程池,也有 coorPoolSize 和 workQueue,但是 ScheduledThreadPoolExecutor特殊的地方在于,自己實(shí)現(xiàn)了優(yōu)先工作隊(duì)列 DelayedWorkQueue ;
  3. ScheduledThreadPoolExecutor 實(shí)現(xiàn)了 ScheduledExecutorService,所以就有了任務(wù)調(diào)度的方法,如 schedule 、 scheduleAtFixedRate 、 scheduleWithFixedDelay ,同時(shí)注意他們之間的區(qū)別;
  4. 內(nèi)部類 ScheduledFutureTask 繼承者FutureTask,實(shí)現(xiàn)了任務(wù)的異步執(zhí)行并且可以獲取返回結(jié)果。同時(shí)實(shí)現(xiàn)了Delayed接口,可以通過(guò)getDelay方法獲取將要執(zhí)行的時(shí)間間隔;
  5. 周期任務(wù)的執(zhí)行其實(shí)是調(diào)用了FutureTask的 runAndReset 方法,每次執(zhí)行完不設(shè)置結(jié)果和狀態(tài)。
  6. DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu),它是一個(gè)基于最小堆結(jié)構(gòu)的優(yōu)先隊(duì)列,并且每次出隊(duì)時(shí)能夠保證取出的任務(wù)是當(dāng)前隊(duì)列中下次執(zhí)行時(shí)間最小的任務(wù)。同時(shí)注意一下優(yōu)先隊(duì)列中堆的順序,堆中的順序并不是絕對(duì)的,但要保證子節(jié)點(diǎn)的值要比父節(jié)點(diǎn)的值要大,這樣就不會(huì)影響出隊(duì)的順序。

總體來(lái)說(shuō),ScheduedThreadPoolExecutor的重點(diǎn)是要理解下次執(zhí)行時(shí)間的計(jì)算,以及優(yōu)先隊(duì)列的出隊(duì)、入隊(duì)和刪除的過(guò)程,這兩個(gè)是理解ScheduedThreadPoolExecutor的關(guān)鍵。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2020-12-08 08:53:53

編程ThreadPoolE線程池

2024-02-28 09:54:07

線程池配置

2022-03-28 08:31:29

線程池定時(shí)任務(wù)

2024-12-27 08:24:55

2020-12-21 07:31:23

實(shí)現(xiàn)單機(jī)JDK

2017-01-10 13:39:57

Python線程池進(jìn)程池

2023-01-16 08:49:20

RocketMQ定時(shí)任務(wù)源代

2009-10-28 10:05:29

Ubuntucrontab定時(shí)任務(wù)

2012-02-07 13:31:14

SpringJava

2010-03-10 15:47:58

crontab定時(shí)任務(wù)

2022-11-09 09:01:08

并發(fā)編程線程池

2016-10-21 11:04:07

JavaScript異步編程原理解析

2024-11-04 16:01:01

2010-01-07 13:38:41

Linux定時(shí)任務(wù)

2024-05-13 09:49:30

.NETQuartz庫(kù)Cron表達(dá)式

2023-11-07 07:47:35

Topic線程PUSH

2022-08-15 15:43:29

Linuxcron

2023-12-19 08:09:06

Python定時(shí)任務(wù)Cron表達(dá)式

2021-12-16 14:25:03

Linux定時(shí)任務(wù)

2016-12-27 19:29:14

Linux命令定時(shí)任務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)