自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java線程池架構(gòu)(二)多線程調(diào)度器

開(kāi)發(fā) 后端
本文對(duì)這個(gè)java本身的線程池的調(diào)度器做一個(gè)簡(jiǎn)單擴(kuò)展,如果還沒(méi)讀過(guò)上一篇文章,建議讀一下,因?yàn)檫@是調(diào)度器的核心組件部分。

在前面介紹了java的多線程的基本原理信息:《Java線程池架構(gòu)原理和源碼解析》,本文對(duì)這個(gè)java本身的線程池的調(diào)度器做一個(gè)簡(jiǎn)單擴(kuò)展,如果還沒(méi)讀過(guò)上一篇文章,建議讀一下,因?yàn)檫@是調(diào)度器的核心組件部分。

我們?nèi)绻胘ava默認(rèn)的線程池來(lái)做調(diào)度器,一種選擇就是Timer和TimerTask的結(jié)合,在以前的文章:《Timer與 TimerTask的真正原理&使用介紹》中有明確的說(shuō)明:一個(gè)Timer為一個(gè)單獨(dú)的線程,雖然一個(gè)Timer可以調(diào)度多個(gè) TimerTask,但是對(duì)于一個(gè)Timer來(lái)講是串行的,至于細(xì)節(jié)請(qǐng)參看對(duì)應(yīng)的那篇文章的內(nèi)容,本文介紹的多線程調(diào)度器,也就是定時(shí)任務(wù),基于多線程調(diào) 度完成,當(dāng)然你可以為了完成多線程使用多個(gè)Timer,只是這些Timer的管理需要你來(lái)完成,不是一個(gè)框架體系,而 ScheduleThreadPoolExecutor提供了這個(gè)功能,所以我們第一要搞清楚是如何使用調(diào)度器的,其次是需要知道它的內(nèi)部原理是什么,也 就是知其然,再知其所以然!

 

首先如果我們要?jiǎng)?chuàng)建一個(gè)基于java本身的調(diào)度池通常的方法是:

  1. Executors.newScheduledThreadPool(int); 

 

當(dāng)有重載方法,我們最常用的是這個(gè)就從這個(gè),看下定義:

 

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 
  2.     return new ScheduledThreadPoolExecutor(corePoolSize); 

其實(shí)內(nèi)部是new了一個(gè)實(shí)例化對(duì)象出來(lái),并傳入大小,此時(shí)就跟蹤到ScheduledThreadPoolExecutor的構(gòu)造方法中:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) { 
  2.         super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS, 
  3.               new DelayedWorkQueue()); 
  4.  

 

你會(huì)發(fā)現(xiàn)調(diào)用了super,而super你跟蹤進(jìn)去會(huì)發(fā)現(xiàn),是ThreadPoolExecutor中,那么 ScheduledThreadPoolExecutor和ThreadPoolExecutor有何區(qū)別,就是本文要說(shuō)得重點(diǎn)了,首先我們留下個(gè)引子, 你發(fā)現(xiàn)在定義隊(duì)列的時(shí)候,不再是上文中提到的LinkedBlockingQueue,而是DelayedWorkQueue,那么細(xì)節(jié)上我們接下來(lái)就是 要講解的重點(diǎn),既然他們又繼承關(guān)系,其實(shí)搞懂了不同點(diǎn),就搞懂了共同點(diǎn),而且有這樣的關(guān)系大多數(shù)應(yīng)當(dāng)是共同點(diǎn),不同點(diǎn)的猜測(cè):這個(gè)是要實(shí)現(xiàn)任務(wù)調(diào)度,任務(wù) 調(diào)度不是立即的,需要延遲和定期做等情況,那么是如何實(shí)現(xiàn)的呢?

這就是我們需要思考的了,通過(guò)源碼考察,我們發(fā)現(xiàn),他們都有execute方法,只是ScheduledThreadPoolExecutor將源碼進(jìn)行了重寫,并且還有以下四個(gè)調(diào)度器的方法:

  1. public ScheduledFuture<?> schedule(Runnable command, 
  2.                        long delay, TimeUnit unit); 
  3.  
  4. public  ScheduledFuture schedule(Callable callable, 
  5.                        long delay, TimeUnit unit); 
  6.  
  7. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
  8.                           long initialDelay, 
  9.                           long period, 
  10.                           TimeUnit unit); 
  11.  
  12. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
  13.                              long initialDelay, 
  14.                              long delay, 
  15.                              TimeUnit unit); 

 

那么這四個(gè)方法有什么區(qū)別呢?其實(shí)第一個(gè)和第二個(gè)區(qū)別不大,一個(gè)是Runnable、一個(gè)是Callable,內(nèi)部包裝后是一樣的效果;所以把頭兩個(gè)方法幾乎當(dāng)成一種調(diào)度,那么三種情況分別是:

1、 進(jìn)行一次延遲調(diào)度:延遲delay這么長(zhǎng)時(shí)間,單位為:TimeUnit傳入的的一個(gè)基本單位,例如:TimeUnit.SECONDS屬于提供好的枚舉信息;(適合于方法1和方法2)。

2、 多次調(diào)度,每次依照上一次預(yù)計(jì)調(diào)度時(shí)間進(jìn)行調(diào)度,例如:延遲2s開(kāi)始,5s一次,那么就是2、7、12、17,如果中間由于某種原因?qū)е戮€程不夠用,沒(méi)有 得到調(diào)度機(jī)會(huì),那么接下來(lái)計(jì)算的時(shí)間會(huì)優(yōu)先計(jì)算進(jìn)去,因?yàn)樗呐判驎?huì)被排在前面,有點(diǎn)類似Timer中的:scheduleAtFixedRate方法, 只是這里是多線程的,它的方法名也叫:scheduleAtFixedRate,所以這個(gè)是比較好記憶的(適合方法3)

3、 多次調(diào)度,每次按照上一次實(shí)際執(zhí)行的時(shí)間進(jìn)行計(jì)算下一次時(shí)間,同上,如果在第7秒沒(méi)有被得到調(diào)度,而是第9s才得到調(diào)度,那么計(jì)算下一次調(diào)度時(shí)間就不是12秒,而是9+5=14s,如果再次延遲,就會(huì)延遲一個(gè)周期以上,也就會(huì)出現(xiàn)少調(diào)用的情況(適合于方法3);

4、 最后補(bǔ)充execute方法是一次調(diào)度,期望被立即調(diào)度,時(shí)間為空:

  1. public void execute(Runnable command) { 
  2.  
  3. if (command == null
  4.  
  5. throw new NullPointerException(); 
  6.  
  7. schedule(command, 0, TimeUnit.NANOSECONDS); 
  8.  

 

我們簡(jiǎn)單看看scheduleAtFixedRate、scheduleWithFixedDelay對(duì)下面的分析會(huì)更加有用途:

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
  2.                                                   long initialDelay, 
  3.                                                   long period, 
  4.                                                   TimeUnit unit) { 
  5.         if (command == null || unit == null
  6.             throw new NullPointerException(); 
  7.         if (period <= 0
  8.             throw new IllegalArgumentException(); 
  9.         RunnableScheduledFuture<?> t = decorateTask(command, 
  10.             new ScheduledFutureTask</pre> 
  11.   兩段源碼唯一的區(qū)別就是在unit.toNanos(int)這唯一一個(gè)地方,scheduleAtFixedRate里面是直接傳入值,
  12. 而scheduleWithFixedDelay里面是取了相反數(shù),也就是假如我們都傳入正數(shù),scheduleWithFixedDelay其實(shí)就取反了,
  13. 沒(méi)有任何區(qū)別,你是否聯(lián)想到前面文章介紹Timer中類似的處理手段通過(guò)正負(fù)數(shù)區(qū)分時(shí)間間隔方法,為0代表僅僅調(diào)度一次,
  14. 其實(shí)在這里同樣是這樣的,他們也同樣有一個(gè)問(wèn)題就是,如果你傳遞負(fù)數(shù),方法的功能正好是相反的。 
  15. 而你會(huì)發(fā)現(xiàn),不論是那個(gè)schedule方法里頭,都會(huì)創(chuàng)建一個(gè)ScheduledFutureTask類的實(shí)例,此類究竟是何方神圣呢,我們來(lái)看看。 
  16.  
  17. ScheduledFutureTask的類(ScheduleThreadPoolExecutor的私有的內(nèi)部類)來(lái)進(jìn)行調(diào)度,那么可以看看內(nèi)部做了什么操作,如下: 
  18.  
  19. 1 
  20.         ScheduledFutureTask(Runnable r, V result, long ns) { 
  21.             super(r, result); 
  22.             this.time = ns; 
  23.             this.period = 0
  24.             this.sequenceNumber = sequencer.getAndIncrement(); 
  25.         } 
  26.  
  27.         /** 
  28.          * Creates a periodic action with given nano time and period. 
  29.          */ 
  30.         ScheduledFutureTask(Runnable r, V result, long ns, long period) { 
  31.             super(r, result); 
  32.             this.time = ns; 
  33.             this.period = period; 
  34.             this.sequenceNumber = sequencer.getAndIncrement(); 
  35.         } 
  36.  
  37.         /** 
  38.          * Creates a one-shot action with given nanoTime-based trigger. 
  39.          */ 
  40.         ScheduledFutureTask(Callable callable, long ns) { 
  41.             super(callable); 
  42.             this.time = ns; 
  43.             this.period = 0
  44.             this.sequenceNumber = sequencer.getAndIncrement(); 
  45.         } 

 

最核心的幾個(gè)參數(shù)正好對(duì)應(yīng)了調(diào)度的延遲的構(gòu)造方法,這些參數(shù)如何用起來(lái)的?那么它還提供了什么方法呢?

  1. public long getDelay(TimeUnit unit) { 
  2.     return unit.convert(time - now(), TimeUnit.NANOSECONDS); 
  3.  
  4. public int compareTo(Delayed other) { 
  5.     if (other == this// compare zero ONLY if same object 
  6.         return 0
  7.     if (other instanceof ScheduledFutureTask) { 
  8.         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 
  9.         long diff = time - x.time; 
  10.         if (diff < 0)                     return -1;                 else if (diff > 0
  11.             return 1
  12.         else if (sequenceNumber < x.sequenceNumber) 
  13.             return -1
  14.         else 
  15.             return 1
  16.     } 
  17.     long d = (getDelay(TimeUnit.NANOSECONDS) - 
  18.               other.getDelay(TimeUnit.NANOSECONDS)); 
  19.     return (d == 0)? 0 : ((d < 0)? -1 : 1);         }       
  20.   /**          * 返回是否為片段,也就是多次調(diào)度          *          */       
  21.   public boolean isPeriodic() {             return period != 0;         }  

 

這里發(fā)現(xiàn)了,他們可以運(yùn)行,且判定時(shí)間的方法是getDelay方法我們知道了。 對(duì)比時(shí)間的方法是:compareTo,傳入了參數(shù)類型為:Delayed類型,不難猜測(cè)出,ScheduledFutureTask和Delayed有 某種繼承關(guān)系,沒(méi)錯(cuò),ScheduledFutureTask實(shí)現(xiàn)了Delayed的接口,只是它是間接實(shí)現(xiàn)的;并且Delayed接口繼承了 Comparable接口,這個(gè)接口可用來(lái)干什么?看過(guò)我前面寫的一篇文章關(guān)于中文和對(duì)象排序的應(yīng)該知道,這個(gè)是用來(lái)自定義對(duì)比和排序的,我們的調(diào)度任務(wù) 是一個(gè)對(duì)象,所以需要排序才行,接下來(lái)我們回溯到開(kāi)始定義的代碼中,找一個(gè)實(shí)際調(diào)用的代碼來(lái)看看它是如何啟動(dòng)到run方法的?如何排序的?如何調(diào)用延遲 的?就是我們下文中會(huì)提到的,而這里我們先提出問(wèn)題,后文我們?cè)賮?lái)說(shuō)明這些問(wèn)題。 我們先來(lái)看下run方法的一些定義。

  1.    /**            * 時(shí)間片類型任務(wù)執(zhí)行            */         
  2.  private void runPeriodic() {             
  3. //運(yùn)行對(duì)應(yīng)的程序,這個(gè)是具體的程序   boolean ok = ScheduledFutureTask.super.runAndReset();             
  4. boolean down = isShutdown();             // Reschedule if not cancelled and not shutdown or policy allows             
  5. if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&!isStopped()))) {
  6. long p = period;                 
  7. if (p > 0)//規(guī)定時(shí)間間隔算出下一次時(shí)間 
  8.             time += p; 
  9.         else//用當(dāng)前時(shí)間算出下一次時(shí)間,負(fù)負(fù)得正 
  10.             time = triggerTime(-p); 
  11.         //計(jì)算下一次時(shí)間,并資深再次放入等待隊(duì)列中 
  12.         ScheduledThreadPoolExecutor.super.getQueue().add(this); 
  13.     } 
  14.     else if (down) 
  15.         interruptIdleWorkers(); 
  16.  
  17. /** 
  18.  * 是否為逐片段執(zhí)行,如果不是,則調(diào)用父親類的run方法 
  19.  */ 
  20. public void run() { 
  21.     if (isPeriodic())//周期任務(wù) 
  22.         runPeriodic(); 
  23.     else//只執(zhí)行一次的任務(wù) 
  24.         ScheduledFutureTask.super.run(); 

 

可以看到run方法首先通過(guò)isPeriod()判定是否為時(shí)間片,判定的依據(jù)就是我們說(shuō)的時(shí)間片是否“不為零”,如果不是周期任務(wù),就直接運(yùn)行一 次,如果是周期任務(wù),則除了運(yùn)行還會(huì)計(jì)算下一次執(zhí)行的時(shí)間,并將其再次放入等待隊(duì)列,這里對(duì)應(yīng)到scheduleAtFixedRate、 scheduleWithFixedDelay這兩個(gè)方法一正一負(fù),在這里得到判定,并且將為負(fù)數(shù)的取反回來(lái),負(fù)負(fù)得正,java就是這么干的,呵呵,所 以不要認(rèn)為什么是不可能的,只要好用什么都是可以的,然后計(jì)算的時(shí)間一個(gè)是基于標(biāo)準(zhǔn)的time加上一個(gè)時(shí)間片,一個(gè)是根據(jù)當(dāng)前時(shí)間計(jì)算一個(gè)時(shí)間片,在上文 中我們已經(jīng)明確說(shuō)明了兩者的區(qū)別。

以:schedule方法為例:

  1. public  ScheduledFuture schedule(Callable callable, 
  2.                                            long delay, 
  3.                                            TimeUnit unit) { 
  4.         if (callable == null || unit == null
  5.             throw new NullPointerException(); 
  6.         RunnableScheduledFuture t = decorateTask(callable, 
  7.             new ScheduledFutureTask(callable, 
  8.                        triggerTime(delay, unit))); 
  9.         delayedExecute(t); 
  10.         return t; 

 

其實(shí)這個(gè)方法內(nèi)部創(chuàng)建的就是一個(gè)我們剛才提到的:ScheduledFutureTask,外面又包裝了下叫做RunnableScheduledFuture,也就是適配了下而已,呵呵,代碼里面就是一個(gè)return操作,java這樣做的目的是方便子類去擴(kuò)展。

關(guān)鍵是delayedExecute(t)方法中做了什么?看名稱是延遲執(zhí)行的意思,難道java的線程可以延遲執(zhí)行,那所有的任務(wù)線程都在運(yùn)行狀態(tài)?
它的源碼是這樣的:

  1. private void delayedExecute(Runnable command) { 
  2.     if (isShutdown()) { 
  3.         reject(command); 
  4.         return
  5.     } 
  6.     if (getPoolSize() < getCorePoolSize()) 
  7.         prestartCoreThread(); 
  8.  
  9.     super.getQueue().add(command); 

 

我們主要關(guān)心prestartCoreThread()和super.getQueue().add(command),因?yàn)槿绻到y(tǒng)關(guān)閉,這些討論都沒(méi)有意義的,我們分別叫他們第二小段代碼和第三小段代碼。

第二個(gè)部分如果線程數(shù)小于核心線程數(shù)設(shè)置,那么就調(diào)用一個(gè)prestartCoreThread(),看方法名應(yīng)該是:預(yù)先啟動(dòng)一個(gè)核心線程的意思,先看完第三個(gè)部分,再跟蹤進(jìn)去看源碼。

第三個(gè)部分很明了,就是調(diào)用super.getQueue().add(command);也就是說(shuō)直接將任務(wù)放入一個(gè)隊(duì)列中,其實(shí)super是什 么?super就是我們上一篇文章所提到的ThreadPoolExecutor,那么這個(gè)Queue就是上一篇文章中提到的等待隊(duì)列,也就是任何 schedule任務(wù)首先放入等待隊(duì)列,然后等待被調(diào)度的。

  1. public boolean prestartCoreThread() { 
  2.     return addIfUnderCorePoolSize(null); 
  3. private boolean addIfUnderCorePoolSize(Runnable firstTask) { 
  4.     Thread t = null
  5.     final ReentrantLock mainLock = this.mainLock; 
  6.     mainLock.lock(); 
  7.     try { 
  8.         if (poolSize < corePoolSize && runState == RUNNING)                 
  9. t = addThread(firstTask);         
  10. finally {             
  11. mainLock.unlock();       
  12. if (t == null)             
  13. return false;        
  14.  t.start();         
  15. return true
  16. }  

 

這個(gè)代碼是否似曾相似,沒(méi)錯(cuò),這個(gè)你在上一篇文章介紹ThreadPoolExecutor的時(shí)候就見(jiàn)到過(guò),說(shuō)明不論是 ThreadPoolExecutor還是ScheduleThreadPoolExecutor他們的Thread都是由一個(gè)Worker來(lái)處理的(上 一篇文章有介紹),而這個(gè)Worker處理的基本機(jī)制就是將當(dāng)前任務(wù)執(zhí)行后,不斷從線程等待隊(duì)列中獲取數(shù)據(jù),然后用以執(zhí)行,直到隊(duì)列為空為止。 那么他們的區(qū)別在哪里呢?延遲是如何實(shí)現(xiàn)的呢?和我們上面介紹的ScheduledFutureTask又有何關(guān)系呢? 那么我們回過(guò)頭來(lái)看看ScheduleThreadPool的定義是如何的。

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {         
  2. super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }  

 

發(fā)現(xiàn)它和ThreadPoolExecutor有個(gè)定義上很大的區(qū)別就是,ThreadPoolExecutor用的是 LinkedBlockingQueue(當(dāng)然可以修改),它用的是DelayedWeorkQueue,而這個(gè)DelayedWorkQueue里面你 會(huì)發(fā)現(xiàn)它僅僅是對(duì)java.util.concurrent.DelayedQueue類一個(gè)簡(jiǎn)單訪問(wèn)包裝,這個(gè)隊(duì)列就是等待隊(duì)列,可以看到任務(wù)是被直接 放到等待隊(duì)列中的,所以取數(shù)據(jù)必然從這里獲取,而這個(gè)延遲的隊(duì)列有何神奇之處呢,它又是如何實(shí)現(xiàn)的呢,我們從什么地方下手去看這個(gè) DelayWorkQueue? 我們還是回頭看看Worker里面的run方法(上一篇文章中已經(jīng)講過(guò)):

  1. public void run() {             
  2. try {
  3. Runnable task = firstTask;                
  4.   firstTask = null;                 
  5. while (task != null || (task = getTask()) != null) {
  6. runTask(task);                   
  7. task = null;                 
  8. }             
  9. } finally {
  10. workerDone(this);             
  11. }         
  12. }  

 

這里面要調(diào)用等待隊(duì)列就是getTask()方法:

  1. Runnable getTask() { 
  2. for (;;) {            
  3.   try { int state = runState;                 
  4. if (state > SHUTDOWN) 
  5.                    return null
  6.                Runnable r; 
  7.                if (state == SHUTDOWN)  // Help drain queue 
  8.                    r = workQueue.poll(); 
  9.                else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
  10.                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
  11.                else 
  12.                    r = workQueue.take(); 
  13.                if (r != null
  14.                    return r; 
  15.                if (workerCanExit()) { 
  16.                    if (runState >= SHUTDOWN) // Wake up others 
  17.                        interruptIdleWorkers(); 
  18.                    return null
  19.                } 
  20.            } catch (InterruptedException ie) { 
  21.            } 
  22.        } 

 

發(fā)現(xiàn)沒(méi)有,如果沒(méi)有設(shè)置超時(shí),默認(rèn)只會(huì)通過(guò)workQueue.take()方法獲取數(shù)據(jù),那么我們就看take方法,而增加到隊(duì)列里面的方法自然看offer相關(guān)的方法。接下來(lái)我們來(lái)看下DelayQueue這個(gè)隊(duì)列的take方法:

  1. public E take() throws InterruptedException { 
  2.         final ReentrantLock lock = this.lock; 
  3.         lock.lockInterruptibly(); 
  4.         try { 
  5.             for (;;) { 
  6.                 E first = q.peek(); 
  7.                 if (first == null) { 
  8.                     available.await();//等待信號(hào),線程一直掛在哪里 
  9.                 } else { 
  10.                     long delay =  first.getDelay(TimeUnit.NANOSECONDS); 
  11.                     if (delay > 0) { 
  12.                         long tl = available.awaitNanos(delay);//最左等delay的時(shí)間段 
  13.                     } else { 
  14.                         E x = q.poll();//可以運(yùn)行,取出一個(gè) 
  15.                         assert x != null
  16.                         if (q.size() != 0
  17.                             available.signalAll(); 
  18.                         return x; 
  19.  
  20.                     } 
  21.                 } 
  22.             } 
  23.         } finally { 
  24.             lock.unlock(); 
  25.         } 

 

這里的for就是要找到數(shù)據(jù)為止,否則就等著,而這個(gè)“q”和“available”是什么呢?

  1. private transient final Condition available = lock.newCondition(); 
  2.  
  3. private final PriorityQueue q = new PriorityQueue(); 

 

怎么里面還有一層隊(duì)列,不用怕,從這里你貌似看出點(diǎn)名稱意味了,就是它是優(yōu)先級(jí)隊(duì)列,而對(duì)于任務(wù)調(diào)度來(lái)講,優(yōu)先級(jí)的方式就是時(shí)間,我們用這中猜測(cè)來(lái)繼續(xù)深入源碼。

上面首先獲取這個(gè)隊(duì)列的第一個(gè)元素,若為空,就等待一個(gè)“available”發(fā)出的信號(hào),我們可以猜測(cè)到這個(gè)offer的時(shí)候會(huì)發(fā)出的信號(hào),一會(huì) 來(lái)驗(yàn)證即可;若不為空,則通過(guò)getDelay方法來(lái)獲取時(shí)間信息,這個(gè)getDelay方法就用上了我們開(kāi)始說(shuō)的 ScheduledFutureTask了,如果是時(shí)間大于0,則也進(jìn)入等待,因?yàn)檫€沒(méi)開(kāi)始執(zhí)行,等待也是“available”發(fā)出信號(hào),但是有一個(gè)最 長(zhǎng)時(shí)間,為什么還要等這個(gè)信號(hào),是因?yàn)橛锌赡苓M(jìn)來(lái)一個(gè)新的任務(wù),比這個(gè)等待的任務(wù)還要先執(zhí)行,所以要等這個(gè)信號(hào);而最多等這么長(zhǎng)時(shí)間,就是因?yàn)槿绻@段時(shí) 間沒(méi)任務(wù)進(jìn)來(lái)肯定就是它執(zhí)行了。然后就返回的這個(gè)值,被Worker(上面有提到)拿到后調(diào)用其run()方法進(jìn)行運(yùn)行。

那么寫入隊(duì)列在那里?他們是如何排序的?

我們看看隊(duì)列的寫入方法是這樣的:

  1. public boolean offer(E e) { 
  2.         final ReentrantLock lock = this.lock; 
  3.         lock.lock(); 
  4.         try { 
  5.             E first = q.peek(); 
  6.             q.offer(e); 
  7.             if (first == null || e.compareTo(first) < 0)                 
  8. available.signalAll();            
  9.   return true;        
  10. finally {   lock.unlock(); } 
  11. }  

 

隊(duì)列也是首先取出第一個(gè)(后面會(huì)用來(lái)和當(dāng)前任務(wù)做比較),而這里“q”是上面提到的“PriorityQueue”,看來(lái)offer的關(guān)鍵還在它的里面,我們看看調(diào)用過(guò)程:

  1.  public boolean offer(E e) {         if (e == null)             throw new NullPointerException();         modCount++;         int i = size;         if (i >= queue.length) 
  2.             grow(i + 1); 
  3.         size = i + 1
  4.         if (i == 0
  5.             queue[0] = e; 
  6.         else 
  7.             siftUp(i, e);//主要是這條代碼很關(guān)鍵 
  8.         return true
  9. private void siftUp(int k, E x) { 
  10.         if (comparator != null
  11.             siftUpUsingComparator(k, x); 
  12.         else 
  13.         //我們默認(rèn)走這里,因?yàn)镈elayQueue定義它的時(shí)候默認(rèn)沒(méi)有給定義comparator 
  14.             siftUpComparable(k, x); 
  15. /* 
  16. 可以發(fā)現(xiàn)這個(gè)方法是將任務(wù)按照compareTo對(duì)比后,放在隊(duì)列的合適位置,但是它肯定不是絕對(duì)順序的,這一點(diǎn)和Timer的內(nèi)部排序機(jī)制類似。 
  17. */ 
  18. private void siftUpComparable(int k, E x) { 
  19.         Comparable<? super E> key = (Comparable<? super E>) x; 
  20.         while (k > 0) { 
  21.             int parent = (k - 1) >>> 1
  22.             Object e = queue[parent]; 
  23.             if (key.compareTo((E) e) >= 0
  24.                 break
  25.             queue[k] = e; 
  26.             k = parent; 
  27.         } 
  28.         queue[k] = key; 

 

你是否發(fā)現(xiàn),compareTo也用上了,就是我們前面描述一大堆的:ScheduledFutureTask類中的一個(gè)方法,那么run方法也用上了,這個(gè)過(guò)程貌似完整了。

我們?cè)賮?lái)理一下思路:

1、調(diào)用的Thread的包裝,由在ThreadPoolExecutor中的Worker調(diào)用你傳入的Runnable的run方法,變成了Worker調(diào)用Runnable的run方法,由它來(lái)處理時(shí)間片的信息調(diào)用你傳入的線程。

2、ScheduledFutureTask類在整個(gè)過(guò)程中提供了基礎(chǔ)參考的方法,其中最為關(guān)鍵的就是實(shí)現(xiàn)了接口Comparable,實(shí)現(xiàn)內(nèi)部的 compareTo方法,也實(shí)現(xiàn)了Delayed接口中的getDelay方法用以判定時(shí)間(當(dāng)然Delayed接口本身也是繼承于 Comparable,我們不要糾結(jié)于細(xì)節(jié)概念就好)。

3、等待隊(duì)列由在ThreadPoolExecutor中默認(rèn)使用的LinkedBlockingQueue換成了DelayQueue(它是被 DelayWorkQueue包裝了一下子,沒(méi)多大區(qū)別),而DelayQueue主要提供了一個(gè)信號(hào)量“available”來(lái)作為寫入和讀取的信號(hào)控 制開(kāi)關(guān),通過(guò)另一個(gè)優(yōu)先級(jí)隊(duì)列“PriorityQueue”來(lái)控制實(shí)際的隊(duì)列順序,他們的順序就是基于上面提到的 ScheduledFutureTask類中的compareTo方法,而是否運(yùn)行也是基于getDelay方法來(lái)實(shí)現(xiàn)的。

4、ScheduledFutureTask類的run方法會(huì)判定是否為時(shí)間片信息,如果為時(shí)間片,在執(zhí)行完對(duì)應(yīng)的方法后,開(kāi)始計(jì)算下一次執(zhí)行時(shí)間 (注意判定時(shí)間片大于0,小于0,分別代表的是以當(dāng)前執(zhí)行完的時(shí)間為準(zhǔn)計(jì)算下一次時(shí)間還是以當(dāng)前時(shí)間為準(zhǔn)),這個(gè)在前面有提到。

5、它是支持多線程的,和Timer的機(jī)制最大的區(qū)別就在于多個(gè)線程會(huì)最征用這個(gè)隊(duì)列,隊(duì)里的排序方式和Timer有很多相似之處,并非完全有序,而是通過(guò)位移動(dòng)來(lái)盡量找到合適的位置,有點(diǎn)類似貪心的算法,呵呵。

原文鏈接:http://ifeve.com/java-scheduledthreadpoolexecutor/

責(zé)任編輯:陳四芳 來(lái)源: ifeve.com
相關(guān)推薦

2021-09-11 15:26:23

Java多線程線程池

2023-06-07 13:49:00

多線程編程C#

2023-11-22 08:37:40

Java線程池

2013-05-23 15:59:00

線程池

2022-12-16 08:31:37

調(diào)度線程池源碼

2023-10-26 21:44:02

Java多線程方法

2009-06-11 10:48:53

Java多線程

2009-03-12 10:52:43

Java線程多線程

2013-07-16 10:57:34

iOS多線程多線程概念多線程入門

2019-10-30 21:27:51

Java中央處理器電腦

2021-12-26 18:22:30

Java線程多線程

2009-06-29 17:49:47

Java多線程

2023-12-29 09:38:00

Java線程池

2012-02-15 10:34:29

JavaJava Socket

2011-06-22 13:57:54

Java多線程

2010-03-15 17:56:23

Java多線程

2013-06-08 10:11:31

Java線程池架構(gòu)

2015-10-10 09:39:42

Java線程池源碼解析

2012-05-15 02:18:31

Java線程池

2020-09-04 10:29:47

Java線程池并發(fā)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)