自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

調(diào)度線程池ScheduledThreadPoolExecutor源碼解析

開發(fā) 前端
ScheduledThreadPoolExecutor可以用來很方便實現(xiàn)我們的調(diào)度任務,具體使用可以參考調(diào)度線程池ScheduledThreadPoolExecutor的正確使用姿勢這篇文章,那大家知道它是怎么實現(xiàn)的嗎,本文就帶大家來揭曉謎底。

?前言

ScheduledThreadPoolExecutor可以用來很方便實現(xiàn)我們的調(diào)度任務,具體使用可以參考調(diào)度線程池ScheduledThreadPoolExecutor的正確使用姿勢這篇文章,那大家知道它是怎么實現(xiàn)的嗎,本文就帶大家來揭曉謎底。

實現(xiàn)機制分析

我們先思考下,如果讓大家去實現(xiàn)ScheduledThreadPoolExecutor可以周期性執(zhí)行任務的功能,需要考慮哪些方面呢?

  • ScheduledThreadPoolExecutor的整體實現(xiàn)思路是什么呢?

答:我們是不是可以繼承線程池類,按照線程池的思路,將任務先丟到阻塞隊列中,等到時間到了,工作線程就從阻塞隊列獲取任務執(zhí)行。

  • 如何實現(xiàn)等到了未來的時間點就開始執(zhí)行呢?

答:我們可以根據(jù)參數(shù)獲取這個任務還要多少時間執(zhí)行,那么我們是不是可以從阻塞隊列中獲取任務的時候,通過條件隊列的的awaitNanos(delay)方法,阻塞一定時間。

  • 如何實現(xiàn) 任務的重復性執(zhí)行呢?

答:這就更加簡單了,任務執(zhí)行完成后,把它再次加入到隊列不就行了嗎。

圖片

源碼解析

類結構圖

圖片

ScheduledThreadPoolExecutor?的類結構圖如上圖所示,很明顯它是在我們的線程池ThreadPoolExecutor框架基礎上擴展的。

  • ScheduledExecutorService:實現(xiàn)了該接口,封裝了調(diào)度相關的API
  • ThreadPoolExecutor:繼承了該類,保留了線程池的能力和整個實現(xiàn)的框架
  • DelayedWorkQueue:內(nèi)部類,延遲阻塞隊列。
  • ScheduledFutureTask:延遲任務對象,包含了任務、任務狀態(tài)、剩余的時間、結果等信息。

重要屬性

通過ScheduledThreadPoolExecutor類的成員屬性,我們可以了解它的數(shù)據(jù)結構。

  • shutdown 后是否繼續(xù)執(zhí)行周期任務(重復執(zhí)行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  • shutdown 后是否繼續(xù)執(zhí)行延遲任務(只執(zhí)行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  • 調(diào)用cancel()方法后,是否將該任務從隊列中移除,默認false
private volatile boolean removeOnCancel = false;
  • 任務的序列號,保證FIFO隊列的順序,用來比較優(yōu)先級
private static final AtomicLong sequencer = new AtomicLong()
  • ScheduledFutureTask延遲任務類

ScheduledFutureTask? 繼承 FutureTask?,實現(xiàn) RunnableScheduledFuture? 接口,無論是 runnable? 還是 callable?,無論是否需要延遲和定時,所有的任務都會被封裝成 ScheduledFutureTask。

該類具有延遲執(zhí)行的特點, 覆蓋FutureTask? 的 run 方法來實現(xiàn)對延時執(zhí)行、周期執(zhí)行的支持。

對于延時任務調(diào)用FutureTask#run?,而對于周期性任務則調(diào)用FutureTask#runAndReset? 并且在成功之后根據(jù) fixed-delay/fixed-rate模式來設置下次執(zhí)行時間并重新將任務塞到工作隊列。

成員屬性如下:

// 任務序列號
private final long sequenceNumber;
// 任務可以被執(zhí)行的時間,交付時間,以納秒表示
private long time;
// 0 表示非周期任務
// 正數(shù)表示 fixed-rate(兩次開始啟動的間隔)模式的周期,
// 負數(shù)表示 fixed-delay(一次執(zhí)行結束到下一次開始啟動) 模式
private final long period;
// 執(zhí)行的任務對象
RunnableScheduledFuture<V> outerTask = this;
// 任務在隊列數(shù)組中的索引下標, -1表示刪除
int heapIndex;
  • DelayedWorkQueue延遲隊列

DelayedWorkQueue 是支持延時獲取元素的阻塞隊列, 內(nèi)部采用優(yōu)先隊列 PriorityQueue(小根堆、滿二叉樹)存儲元素。

內(nèi)部數(shù)據(jù)結構是數(shù)組,所以延遲隊列出隊頭元素后需要讓其他元素(尾)替換到頭節(jié)點,防止空指針異常。

成員屬性如下:

// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 節(jié)點數(shù)量
private int size = 0;
// 存放任務的數(shù)組
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 控制并發(fā)用的鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件隊列
private final Condition available = lock.newCondition();
//指定用于等待隊列頭節(jié)點任務的線程
private Thread leader = null;

提交延遲任務schedule()原理

延遲執(zhí)行方法,并指定延遲執(zhí)行的時間,只會執(zhí)行一次。

  • schedule()方法是延遲任務方法的入口。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// 判空處理
if (command == null || unit == null)
throw new NullPointerException();
// 將外部傳入的任務封裝成延遲任務對象ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 執(zhí)行延遲任務
delayedExecute(t);
return t;
}
  • decorateTask(...) 該方法是封裝延遲任務

調(diào)用triggerTime(delay, unit)方法計算延遲的時間。

// 返回【當前時間 + 延遲時間】,就是觸發(fā)當前任務執(zhí)行的時間
private long triggerTime(long delay, TimeUnit unit) {
// 設置觸發(fā)的時間
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 如果 delay < Long.Max_VALUE/2,則下次執(zhí)行時間為當前時間 +delay
// 否則為了避免隊列中出現(xiàn)由于溢出導致的排序紊亂,需要調(diào)用overflowFree來修正一下delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

// 下面這種情況很少,大家看不懂可以不用強行理解
// 如果某個任務的 delay 為負數(shù),說明當前可以執(zhí)行(其實早該執(zhí)行了)。
// 阻塞隊列中維護任務順序是基于 compareTo 比較的,比較兩個任務的順序會用 time 相減。
// 那么可能出現(xiàn)一個 delay 為正數(shù)減去另一個為負數(shù)的 delay,結果上溢為負數(shù),則會導致 compareTo 產(chǎn)生錯誤的結果
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 判斷一下隊首的delay是不是負數(shù),如果是正數(shù)就不用管,怎么減都不會溢出
// 否則拿當前 delay 減去隊首的 delay 來比較看,如果不出現(xiàn)上溢,排序不會亂
// 不然就把當前 delay 值給調(diào)整為 Long.MAX_VALUE + 隊首 delay
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
  • 調(diào)用RunnableScheduledFuture的構造方法封裝為延遲任務
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// 任務的觸發(fā)時間
this.time = ns;
// 任務的周期, 延遲任務的為0,因為不需要重復執(zhí)行
this.period = 0;
// 任務的序號 + 1
this.sequenceNumber = sequencer.getAndIncrement();
}
  • 調(diào)用decorateTask()方法裝飾延遲任務
// 沒有做任何操作,直接將 task 返回,該方法主要目的是用于子類擴展
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}

提交周期任務scheduleAtFixedRate()原理

按照固定的頻率周期性的執(zhí)行任務,捕手renwu,一次任務的啟動到下一次任務的啟動的間隔

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任務封裝,【指定初始的延遲時間和周期時間】
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(period));
// 默認返回本身
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開始執(zhí)行這個任務
delayedExecute(t);
return t;
}

提交周期任務scheduleWithFixedDelay()原理

按照指定的延時周期性執(zhí)行任務,上一個任務執(zhí)行完畢后,延時一定時間,再次執(zhí)行任務。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 任務封裝,【指定初始的延遲時間和周期時間】,周期時間為 - 表示是 fixed-delay 模式
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開始執(zhí)行這個任務
delayedExecute(t);
return t;
}

執(zhí)行任務delayedExecute(t)原理

上面多種提交任務的方式,殊途同歸,最終都會調(diào)用delayedExecute()方法執(zhí)行延遲或者周期任務。

delayedExecute()方法是執(zhí)行延遲任務的入口

private void delayedExecute(RunnableScheduledFuture<?> task) {
// 線程池是 SHUTDOWN 狀態(tài),執(zhí)行拒絕策略
if (isShutdown())
// 調(diào)用拒絕策略的方法
reject(task);
else {
// 把當前任務放入阻塞隊列
super.getQueue().add(task);
// 線程池狀態(tài)為 SHUTDOWN 并且不允許執(zhí)行任務了,就從隊列刪除該任務,并設置任務的狀態(tài)為取消狀態(tài)
// 非主流程,可以跳過,不重點看了
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
// 開始執(zhí)行了哈
ensurePrestart();
}
}

ensurePrestart()方法開啟線程執(zhí)行

// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker數(shù)目小于corePoolSize,則添加一個worker。
if (wc < corePoolSize)
// 第二個參數(shù) true 表示采用核心線程數(shù)量限制,false 表示采用 maximumPoolSize
addWorker(null, true);
// corePoolSize = 0的情況,至少開啟一個線程,【擔保機制】
else if (wc == 0)
addWorker(null, false);
}

addWorker()?方法實際上父類ThreadPoolExecutor的方法,這個方法在該文章 Java線程池源碼深度解析中詳細介紹過,這邊做個總結:

  • 如果線程池中工作線程數(shù)量小于最大線程數(shù),創(chuàng)建工作線程,執(zhí)行任務。
  • 如果線程池中工作線程數(shù)量大于最大線程數(shù),直接返回。

獲取延遲任務take()原理

目前工作線程已經(jīng)創(chuàng)建好了,工作線程開始工作了,它會從阻塞隊列中獲取延遲任務執(zhí)行,這部分也是線程池里面的原理,不做展開,那我們看下它是如何實現(xiàn)延遲執(zhí)行的? 主要關注如何從阻塞隊列中獲取任務。

  • DelayedWorkQueue#take()方法獲取延遲任務

該方法會在上面的addWoker()?方法創(chuàng)建工作線程后,工作線程中循環(huán)持續(xù)調(diào)用workQueue.take()方法獲取延遲任務。

該方法主要獲取延遲隊列中任務延遲時間小于等于0 的任務。

如果延遲時間不小于0,那么調(diào)用條件隊列的awaitNanos(delay)阻塞方法等待一段時間,等時間到了,延遲時間自然小于等于0了。

獲取到任務后,工作線程就可以開始執(zhí)行調(diào)度任務了。

// DelayedWorkQueue#take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加可中斷鎖
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 獲取阻塞隊列中的頭結點
RunnableScheduledFuture<?> first = queue[0];
// 如果阻塞隊列沒有數(shù)據(jù),為空
if (first == null)
// 等待隊列不空,直至有任務通過 offer 入隊并喚醒
available.await();
else {
// 獲取頭節(jié)點的的任務還剩余多少時間才執(zhí)行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到達觸發(fā)時間,獲取頭節(jié)點并調(diào)整堆,重新選擇延遲時間最小的節(jié)點放入頭部
return finishPoll(first);

// 邏輯到這說明頭節(jié)點的延遲時間還沒到
first = null;
// 說明有 leader 線程在等待獲取頭節(jié)點,當前線程直接去阻塞等待
if (leader != null)
// 當前線程阻塞
available.await();
else {
// 沒有 leader 線程,【當前線程作為leader線程,并設置頭結點的延遲時間作為阻塞時間】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當前線程通過awaitNanos方法等待delay時間后,會自動喚醒,往后面繼續(xù)執(zhí)行
available.awaitNanos(delay);
// 到達阻塞時間時,當前線程會從這里醒來,進入下一輪循環(huán),就有可能執(zhí)行了
} finally {
// t堆頂更新,leader 置為 null,offer 方法釋放鎖后,
// 有其它線程通過 take/poll 拿到鎖,讀到 leader == null,然后將自身更新為leader。
if (leader == thisThread)
// leader 置為 null 用以接下來判斷是否需要喚醒后繼線程
leader = null;
}
}
}
}
} finally {
// 沒有 leader 線程并且頭結點不為 null,喚醒阻塞獲取頭節(jié)點的線程,
// 【如果沒有這一步,就會出現(xiàn)有了需要執(zhí)行的任務,但是沒有線程去執(zhí)行】
if (leader == null && queue[0] != null)
available.signal();
// 解鎖
lock.unlock();
}
}
  • finishPoll()方法獲取到任務后執(zhí)行

該方法主要做兩個事情, 獲取頭節(jié)點并調(diào)整堆,重新選擇延遲時間最小的節(jié)點放入頭部。

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 獲取尾索引
int s = --size;
// 獲取尾節(jié)點
RunnableScheduledFuture<?> x = queue[s];
// 將堆結構最后一個節(jié)點占用的 slot 設置為 null,因為該節(jié)點要嘗試升級成堆頂,會根據(jù)特性下調(diào)
queue[s] = null;
// s == 0 說明 當前堆結構只有堆頂一個節(jié)點,此時不需要做任何的事情
if (s != 0)
// 從索引處 0 開始向下調(diào)整
siftDown(0, x);
// 出隊的元素索引設置為 -1
setIndex(f, -1);
return f;
}

延遲任務運行的原理

從延遲隊列中獲取任務后,工作線程會調(diào)用延遲任務的run()方法執(zhí)行任務。

  • ScheduledFutureTask#run()方法運行任務

調(diào)用isPeriodic()方法判斷任務是否是周期性任務還是非周期性任務

如果任務是非周期任務,就調(diào)用父類的FutureTask#run()執(zhí)行一次

如果任務是非周期任務,就調(diào)用父類的FutureTask#runAndReset(), 返回true會設置下一次的執(zhí)行時間,重新放入線程池的阻塞隊列中,等待下次獲取執(zhí)行

public void run() {
// 是否周期性,就是判斷 period 是否為 0
boolean periodic = isPeriodic();
// 根據(jù)是否是周期任務檢查當前狀態(tài)能否執(zhí)行任務,不能執(zhí)行就取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任務,直接調(diào)用 FutureTask#run 執(zhí)行一次
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任務的執(zhí)行,返回 true 表示執(zhí)行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 設置周期任務的下一次執(zhí)行時間
setNextRunTime();
// 任務的下一次執(zhí)行安排,如果當前線程池狀態(tài)可以執(zhí)行周期任務,加入隊列,并開啟新線程
reExecutePeriodic(outerTask);
}
}
  • FutureTask#runAndReset()執(zhí)行周期性任務

周期任務正常完成后任務的狀態(tài)不會變化,依舊是 NEW,不會設置 outcome 屬性。

但是如果本次任務執(zhí)行出現(xiàn)異常,會進入 setException 方法將任務狀態(tài)置為異常,把異常保存在 outcome 中。

方法返回 false,后續(xù)的該任務將不會再周期的執(zhí)行

protected boolean runAndReset(){
// 任務不是新建的狀態(tài)了,或者被別的線程執(zhí)行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 執(zhí)行方法,沒有返回值
c.call();
ran = true;
} catch (Throwable ex) {
// 出現(xiàn)異常,把任務設置為異常狀態(tài),喚醒所有的 get 阻塞線程
setException(ex);
}
}
} finally {
// 執(zhí)行完成把執(zhí)行線程引用置為 null
runner = null;
s = state;
// 如果線程被中斷進行中斷處理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 如果正常執(zhí)行,返回 true,并且任務狀態(tài)沒有被取消
return ran && s == NEW;
}
  • ScheduledFutureTask#setNextRunTime()設置下次執(zhí)行時間

如果屬性period大于0,表示fixed-rate模式,直接加上period時間即可。

如果屬性period小于等于0, 表示是fixed-delay模式, 調(diào)用triggerTime重新計算下次時間。

// 任務下一次的觸發(fā)時間
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【時間設置為上一次執(zhí)行任務的時間 + p】,兩次任務執(zhí)行的時間差
time += p;
else
// fixed-delay 模式,下一次執(zhí)行時間是【當前這次任務結束的時間(就是現(xiàn)在) + delay 值】
time = triggerTime(-p);
}
  • ScheduledFutureTask#reExecutePeriodic(),重新放入阻塞任務隊列,等待獲取,進行下一輪執(zhí)行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 【放入任務隊列】
super.getQueue().add(task);
// 如果提交完任務之后,線程池狀態(tài)變?yōu)榱?shutdown 狀態(tài),需要再次檢查是否可以執(zhí)行,
// 如果不能執(zhí)行且任務還在隊列中未被取走,則取消任務
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 當前線程池狀態(tài)可以執(zhí)行周期任務,加入隊列,并【根據(jù)線程數(shù)量是否大于核心線程數(shù)確定是否開啟新線程】
ensurePrestart();
}
}

責任編輯:武曉燕 來源: JAVA旭陽
相關推薦

2013-06-08 10:11:31

Java線程池架構

2015-10-10 09:39:42

Java線程池源碼解析

2021-05-26 11:30:24

Java線程池代碼

2013-06-08 13:07:23

Java線程池調(diào)度器

2013-05-28 13:57:12

MariaDB

2011-06-22 15:50:45

QT 線程

2020-11-25 11:33:47

Java線程技術

2020-12-10 07:00:38

編程線程池定時任務

2023-12-29 09:38:00

Java線程池

2020-12-08 08:53:53

編程ThreadPoolE線程池

2011-08-19 17:36:42

iPhone操作隊列Java

2021-11-10 16:10:18

鴻蒙HarmonyOS應用

2018-10-31 15:54:47

Java線程池源碼

2023-12-28 07:49:11

線程池源碼應用場景

2024-01-29 15:54:41

Java線程池公平鎖

2024-05-06 00:00:00

ThreadPool線程調(diào)度

2023-05-19 08:01:24

Key消費場景

2023-11-26 18:54:29

Linux調(diào)度器

2024-02-04 08:43:20

源碼線程池緩沖

2024-07-15 08:20:24

點贊
收藏

51CTO技術棧公眾號