高并發(fā)高性能的定時器實(shí)現(xiàn)
前言
我們經(jīng)常都會碰到延遲任務(wù),定時任務(wù)這種需求。在網(wǎng)絡(luò)連接的場景中,常常會出現(xiàn)一些超時控制。隨著連接數(shù)量的增加,這些超時任務(wù)的數(shù)量往往也是很龐大的。實(shí)現(xiàn)對大量任務(wù)的超時管理并不是一個容易的事情。
幾種定時任務(wù)的實(shí)現(xiàn)
java.util.Timer
JDK 在 1.3 的時候引入了Timer數(shù)據(jù)結(jié)構(gòu)用于實(shí)現(xiàn)定時任務(wù)。Timer的實(shí)現(xiàn)思路比較簡單,其內(nèi)部有兩個主要屬性:
- TaskQueue:定時任務(wù)抽象類TimeTask的列表。
- TimerThread:用于執(zhí)行定時任務(wù)的線程。
- private final TaskQueue queue = new TaskQueue();
- private final TimerThread thread = new TimerThread(queue);
Timer結(jié)構(gòu)還定義了一個抽象類TimerTask并且繼承了Runnable接口。業(yè)務(wù)系統(tǒng)實(shí)現(xiàn)了這個抽象類的run方法用于提供具體的延時任務(wù)邏輯。
TaskQueue內(nèi)部采用大頂堆的方式,依據(jù)任務(wù)的觸發(fā)時間進(jìn)行排序。而TimerThread則以死循環(huán)的方式從TaskQueue獲取隊(duì)列頭,等待隊(duì)列頭的任務(wù)的超時時間到達(dá)后觸發(fā)該任務(wù),并且將任務(wù)從隊(duì)列中移除。
Timer的數(shù)據(jù)結(jié)構(gòu)和算法都很容易理解。所有的超時任務(wù)都首先進(jìn)入延時隊(duì)列。后臺超時線程不斷的從延遲隊(duì)列中獲取任務(wù)并且等待超時時間到達(dá)后執(zhí)行任務(wù)。延遲隊(duì)列采用大頂堆排序,在延遲任務(wù)的場景中有三種操作,分別是:添加任務(wù),提取隊(duì)列頭任務(wù),查看隊(duì)列頭任務(wù)。
查看隊(duì)列頭任務(wù)的事件復(fù)雜度是 O(1) 。而添加任務(wù)和提取隊(duì)列頭任務(wù)的時間復(fù)雜度都是 O(Logn) 。當(dāng)任務(wù)數(shù)量較大時,添加和刪除的開銷也是比較大的。此外,由于Timer內(nèi)部只有一個處理線程,如果有一個延遲任務(wù)的處理消耗了較多的時間,會對應(yīng)的延遲后續(xù)任務(wù)的處理。
代碼如下:
- public static void main(String[] args) {
- Timer timer = new Timer();
- // 延遲 1秒 執(zhí)行任務(wù)
- timer.schedule(
- new java.util.TimerTask() {
- @Override
- public void run() {
- System.out.println("延遲 1秒 執(zhí)行任務(wù)"+System.currentTimeMillis());
- }
- }
- ,1000);
- timer.schedule(
- new java.util.TimerTask() {
- @Override
- public void run() {
- System.out.println("延遲 2秒 執(zhí)行任務(wù)"+System.currentTimeMillis());
- }
- }
- ,2000);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- timer.cancel();
- }
ScheduledThreadPoolExecutor
由于Timer只有一個線程用來處理延遲任務(wù),在任務(wù)數(shù)量很多的時候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService后,也對應(yīng)的提供了一個用于處理延時任務(wù)的ScheduledExecutorService子類接口。該接口內(nèi)部也一樣使用了一個使用小頂堆進(jìn)行排序的延遲隊(duì)列存放任務(wù)。線程池中的線程會在這個隊(duì)列上等待直到有任務(wù)可以提取。
整體來說,ScheduledExecutorService 區(qū)別于 Timer 的地方就在于前者依賴了線程池來執(zhí)行任務(wù),而任務(wù)本身會判斷是什么類型的任務(wù),需要重復(fù)執(zhí)行的在任務(wù)執(zhí)行結(jié)束后會被重新添加到任務(wù)隊(duì)列。
而對于后者來說,它只依賴一個線程不停的去獲取隊(duì)列首部的任務(wù)并嘗試執(zhí)行它,無論是效率上、還是安全性上都比不上前者。
ScheduledExecutorService的實(shí)現(xiàn)上有一些特殊,只有一個線程能夠提取到延遲隊(duì)列頭的任務(wù),并且根據(jù)任務(wù)的超時時間進(jìn)行等待。在這個等待期間,其他的線程是無法獲取任務(wù)的。這樣的實(shí)現(xiàn)是為了避免多個線程同時獲取任務(wù),導(dǎo)致超時時間未到達(dá)就任務(wù)觸發(fā)或者在等待任務(wù)超時時間時有新的任務(wù)被加入而無法響應(yīng)。
由于ScheduledExecutorService可以使用多個線程,這樣也緩解了因?yàn)閭€別任務(wù)執(zhí)行時間長導(dǎo)致的后續(xù)任務(wù)被阻塞的情況。不過延遲隊(duì)列也是一樣采用小頂堆的排序方式,因此添加任務(wù)和刪除任務(wù)的時間復(fù)雜度都是 O(Logn) 。在任務(wù)數(shù)量很大的情況下,性能表現(xiàn)比較差。
代碼如下:
- public class ScheduledThreadPoolServiceTest {
- // 參數(shù)代表可以同時執(zhí)行的定時任務(wù)個數(shù)
- private ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
- /**
- * schedule:延時2秒執(zhí)行一次任務(wù)
- */
- public void task0() {
- service.schedule(() -> {
- System.out.println("task0-start");
- sleep(2);
- System.out.println("task0-end");
- }, 2, TimeUnit.SECONDS);
- }
- /**
- * scheduleAtFixedRate:2秒后,每間隔4秒執(zhí)行一次任務(wù)
- * 注意,如果任務(wù)的執(zhí)行時間(例如6秒)大于間隔時間,則會等待任務(wù)執(zhí)行結(jié)束后直接開始下次任務(wù)
- */
- public void task1() {
- service.scheduleAtFixedRate(() -> {
- System.out.println("task1-start");
- sleep(2);
- System.out.println("task1-end");
- }, 2, 4, TimeUnit.SECONDS);
- }
- /**
- * scheduleWithFixedDelay:2秒后,每次延時4秒執(zhí)行一次任務(wù)
- * 注意,這里是等待上次任務(wù)執(zhí)行結(jié)束后,再延時固定時間后開始下次任務(wù)
- */
- public void task2() {
- service.scheduleWithFixedDelay(() -> {
- System.out.println("task2-start");
- sleep(2);
- System.out.println("task2-end");
- }, 2, 4, TimeUnit.SECONDS);
- }
- private void sleep(long time) {
- try {
- TimeUnit.SECONDS.sleep(time);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- ScheduledThreadPoolServiceTest test = new ScheduledThreadPoolServiceTest();
- System.out.println("main start");
- test.task0();
- //test.task1();
- // test.task2();
- test.sleep(10);
- System.out.println("main end");
- }
- }
DelayQueue
Java 中還有個延遲隊(duì)列 DelayQueue,加入延遲隊(duì)列的元素都必須實(shí)現(xiàn) Delayed 接口。延遲隊(duì)列內(nèi)部是利用 PriorityQueue 實(shí)現(xiàn)的,所以還是利用優(yōu)先隊(duì)列!Delayed 接口繼承了Comparable 因此優(yōu)先隊(duì)列是通過 delay 來排序的。
Redis sorted set
Redis的數(shù)據(jù)結(jié)構(gòu)Zset,同樣可以實(shí)現(xiàn)延遲隊(duì)列的效果,主要利用它的score屬性,redis通過score來為集合中的成員進(jìn)行從小到大的排序。zset 內(nèi)部是用跳表實(shí)現(xiàn)的。
跳表數(shù)據(jù)結(jié)構(gòu)的示意圖:
總體上,跳躍表刪除操作的時間復(fù)雜度是O(logN)。
有沒有更高效的數(shù)據(jù)結(jié)構(gòu)?
Timer 、ScheduledThreadPool 、 DelayQueue,總結(jié)的說下它們都是通過優(yōu)先隊(duì)列來獲取最早需要執(zhí)行的任務(wù),因此插入和刪除任務(wù)的時間復(fù)雜度都為O(logn),并且 Timer 、ScheduledThreadPool 的周期性任務(wù)是通過重置任務(wù)的下一次執(zhí)行時間來完成的。
但是由于新增任務(wù)和提取任務(wù)的時間復(fù)雜度都是 O(Logn) ,在任務(wù)數(shù)量很大,比如幾萬,十幾萬的時候,性能的開銷就變得很巨大。
問題就出在時間復(fù)雜度上,插入刪除時間復(fù)雜度是O(logn),那么假設(shè)頻繁插入刪除次數(shù)為 m,總的時間復(fù)雜度就是O(mlogn)
那么,是否存在新增任務(wù)和提取任務(wù)比 O(Log2n) 復(fù)雜度更低的數(shù)據(jù)結(jié)構(gòu)呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設(shè)計(jì)了一種名為時間輪( Timing Wheels )的數(shù)據(jù)結(jié)構(gòu),這種結(jié)構(gòu)在處理延遲任務(wù)時,其新增任務(wù)和刪除任務(wù)的時間復(fù)雜度降低到了 O(1) 。
時間輪算法
基本原理
見名知意,時間輪的數(shù)據(jù)結(jié)構(gòu)很類似于我們鐘表上的數(shù)據(jù)指針。
時間輪用環(huán)形數(shù)組實(shí)現(xiàn),數(shù)組的每個元素可以稱為槽,和 HashMap一樣稱呼。
槽的內(nèi)部用雙向鏈表存著待執(zhí)行的任務(wù),添加和刪除的鏈表操作時間復(fù)雜度都是 O(1),槽位本身也指代時間精度,比如一秒掃一個槽,那么這個時間輪的最高精度就是 1 秒。
也就是說延遲 1.2 秒的任務(wù)和 1.5 秒的任務(wù)會被加入到同一個槽中,然后在 1 秒的時候遍歷這個槽中的鏈表執(zhí)行任務(wù)。
任務(wù)插入
當(dāng)有一個延遲任務(wù)要插入時間輪時,首先計(jì)算其延遲時間與單位時間的余值,從指針指向的當(dāng)前槽位移動余值的個數(shù)槽位,就是該延遲任務(wù)需要被放入的槽位。
舉個例子,時間輪有8個槽位,編號為 0 ~ 7 。指針當(dāng)前指向槽位 2 。新增一個延遲時間為 4 秒的延遲任務(wù),4 % 8 = 4,因此該任務(wù)會被插入 4 + 2 = 6,也就是槽位6的延遲任務(wù)隊(duì)列。
時間槽位的實(shí)現(xiàn)
時間輪的槽位實(shí)現(xiàn)可以采用循環(huán)數(shù)組的方式達(dá)成,也就是讓指針在越過數(shù)組的邊界后重新回到起始下標(biāo)。概括來說,可以將時間輪的算法描述為:
用隊(duì)列來存儲延遲任務(wù),同一個隊(duì)列中的任務(wù),其延遲時間相同。用循環(huán)數(shù)組的方式來存儲元素,數(shù)組中的每一個元素都指向一個延遲任務(wù)隊(duì)列。
有一個當(dāng)前指針指向數(shù)組中的某一個槽位,每間隔一個單位時間,指針就移動到下一個槽位。被指針指向的槽位的延遲隊(duì)列,其中的延遲任務(wù)全部被觸發(fā)。
在時間輪中新增一個延遲任務(wù),將其延遲時間除以單位時間得到的余值,從當(dāng)前指針開始,移動余值對應(yīng)個數(shù)的槽位,就是延遲任務(wù)被放入的槽位。
基于這樣的數(shù)據(jù)結(jié)構(gòu),插入一個延遲任務(wù)的時間復(fù)雜度就下降到 O(1) 。而當(dāng)指針指向到一個槽位時,該槽位連接的延遲任務(wù)隊(duì)列中的延遲任務(wù)全部被觸發(fā)。
延遲任務(wù)的觸發(fā)和執(zhí)行不應(yīng)該影響指針向后移動的時間精確性。因此一般情況下,用于移動指針的線程只負(fù)責(zé)任務(wù)的觸發(fā),任務(wù)的執(zhí)行交由其他的線程來完成。比如,可以將槽位上的延遲任務(wù)隊(duì)列放入到額外的線程池中執(zhí)行,然后在槽位上新建一個空白的新的延遲任務(wù)隊(duì)列用于后續(xù)任務(wù)的添加。
關(guān)于擴(kuò)容
那假設(shè)現(xiàn)在要加入一個50秒后執(zhí)行的任務(wù)怎么辦?這槽好像不夠啊?難道要加槽嘛?和HashMap一樣擴(kuò)容?
假設(shè)要求精度為 1 秒,要能支持延遲時間為 1 天的延遲任務(wù),時間輪的槽位數(shù)需要 60 × 60 × 24 = 86400 。這就需要消耗更多的內(nèi)存。顯然,單純增加槽位數(shù)并不是一個好的解決方案。
常見有兩種方式:
通過增加輪次。50 % 8 + 1 = 3,即應(yīng)該放在槽位是 3,下標(biāo)是 2 的位置。然后 (50 - 1) / 8 = 6,即輪數(shù)記為 6。也就是說當(dāng)循環(huán) 6 輪之后掃到下標(biāo)的 2 的這個槽位會觸發(fā)這個任務(wù)。Netty 中的 HashedWheelTimer 使用的就是這種方式。
通過多層次。這個和我們的手表就更像了,像我們秒針走一圈,分針走一格,分針走一圈,時針走一格。
多層次時間輪就是這樣實(shí)現(xiàn)的。假設(shè)上圖就是第一層,那么第一層走了一圈,第二層就走一格。
可以得知第二層的一格就是8秒,假設(shè)第二層也是 8 個槽,那么第二層走一圈,第三層走一格,可以得知第三層一格就是 64 秒。
那么一格三層,每層8個槽,一共 24 個槽時間輪就可以處理最多延遲 512 秒的任務(wù)。
而多層次時間輪還會有降級的操作,假設(shè)一個任務(wù)延遲 500 秒執(zhí)行,那么剛開始加進(jìn)來肯定是放在第三層的,當(dāng)時間過了 436 秒后,此時還需要 64 秒就會觸發(fā)任務(wù)的執(zhí)行,而此時相對而言它就是個延遲 64 秒后的任務(wù),因此它會被降低放在第二層中,第一層還放不下它。
再過個 56 秒,相對而言它就是個延遲 8 秒后執(zhí)行的任務(wù),因此它會再被降級放在第一層中,等待執(zhí)行。
降級是為了保證時間精度一致性。Kafka內(nèi)部用的就是多層次的時間輪算法。
降級過程:
本文轉(zhuǎn)載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系小汪哥寫代碼公眾號。