自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

高并發(fā)高性能的定時器實(shí)現(xiàn)

開發(fā) 架構(gòu)
我們經(jīng)常都會碰到延遲任務(wù),定時任務(wù)這種需求。在網(wǎng)絡(luò)連接的場景中,常常會出現(xiàn)一些超時控制。隨著連接數(shù)量的增加,這些超時任務(wù)的數(shù)量往往也是很龐大的。實(shí)現(xiàn)對大量任務(wù)的超時管理并不是一個容易的事情。

[[413551]]

前言

我們經(jīng)常都會碰到延遲任務(wù),定時任務(wù)這種需求。在網(wǎng)絡(luò)連接的場景中,常常會出現(xiàn)一些超時控制。隨著連接數(shù)量的增加,這些超時任務(wù)的數(shù)量往往也是很龐大的。實(shí)現(xiàn)對大量任務(wù)的超時管理并不是一個容易的事情。

幾種定時任務(wù)的實(shí)現(xiàn)

java.util.Timer

JDK 在 1.3 的時候引入了Timer數(shù)據(jù)結(jié)構(gòu)用于實(shí)現(xiàn)定時任務(wù)。Timer的實(shí)現(xiàn)思路比較簡單,其內(nèi)部有兩個主要屬性:

  • TaskQueue:定時任務(wù)抽象類TimeTask的列表。
  • TimerThread:用于執(zhí)行定時任務(wù)的線程。
  1. private final TaskQueue queue = new TaskQueue();   
  2.     private final TimerThread thread = new TimerThread(queue); 

Timer結(jié)構(gòu)還定義了一個抽象類TimerTask并且繼承了Runnable接口。業(yè)務(wù)系統(tǒng)實(shí)現(xiàn)了這個抽象類的run方法用于提供具體的延時任務(wù)邏輯。

TaskQueue內(nèi)部采用大頂堆的方式,依據(jù)任務(wù)的觸發(fā)時間進(jìn)行排序。而TimerThread則以死循環(huán)的方式從TaskQueue獲取隊(duì)列頭,等待隊(duì)列頭的任務(wù)的超時時間到達(dá)后觸發(fā)該任務(wù),并且將任務(wù)從隊(duì)列中移除。

Timer的數(shù)據(jù)結(jié)構(gòu)和算法都很容易理解。所有的超時任務(wù)都首先進(jìn)入延時隊(duì)列。后臺超時線程不斷的從延遲隊(duì)列中獲取任務(wù)并且等待超時時間到達(dá)后執(zhí)行任務(wù)。延遲隊(duì)列采用大頂堆排序,在延遲任務(wù)的場景中有三種操作,分別是:添加任務(wù),提取隊(duì)列頭任務(wù),查看隊(duì)列頭任務(wù)。

查看隊(duì)列頭任務(wù)的事件復(fù)雜度是 O(1) 。而添加任務(wù)和提取隊(duì)列頭任務(wù)的時間復(fù)雜度都是 O(Logn) 。當(dāng)任務(wù)數(shù)量較大時,添加和刪除的開銷也是比較大的。此外,由于Timer內(nèi)部只有一個處理線程,如果有一個延遲任務(wù)的處理消耗了較多的時間,會對應(yīng)的延遲后續(xù)任務(wù)的處理。

代碼如下:

  1. public static void main(String[] args) { 
  2.         Timer timer = new Timer(); 
  3.         // 延遲 1秒 執(zhí)行任務(wù) 
  4.         timer.schedule( 
  5.                 new java.util.TimerTask() { 
  6.                     @Override 
  7.                     public void run() { 
  8.                         System.out.println("延遲 1秒 執(zhí)行任務(wù)"+System.currentTimeMillis()); 
  9.                     } 
  10.                 } 
  11.         ,1000); 
  12.  
  13.         timer.schedule( 
  14.                 new java.util.TimerTask() { 
  15.                     @Override 
  16.                     public void run() { 
  17.                         System.out.println("延遲 2秒 執(zhí)行任務(wù)"+System.currentTimeMillis()); 
  18.                     } 
  19.                 } 
  20.                 ,2000); 
  21.         try { 
  22.             Thread.sleep(5000); 
  23.         } catch (InterruptedException e) { 
  24.             e.printStackTrace(); 
  25.         } 
  26.         timer.cancel(); 
  27.     } 

ScheduledThreadPoolExecutor

由于Timer只有一個線程用來處理延遲任務(wù),在任務(wù)數(shù)量很多的時候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService后,也對應(yīng)的提供了一個用于處理延時任務(wù)的ScheduledExecutorService子類接口。該接口內(nèi)部也一樣使用了一個使用小頂堆進(jìn)行排序的延遲隊(duì)列存放任務(wù)。線程池中的線程會在這個隊(duì)列上等待直到有任務(wù)可以提取。

整體來說,ScheduledExecutorService 區(qū)別于 Timer 的地方就在于前者依賴了線程池來執(zhí)行任務(wù),而任務(wù)本身會判斷是什么類型的任務(wù),需要重復(fù)執(zhí)行的在任務(wù)執(zhí)行結(jié)束后會被重新添加到任務(wù)隊(duì)列。

而對于后者來說,它只依賴一個線程不停的去獲取隊(duì)列首部的任務(wù)并嘗試執(zhí)行它,無論是效率上、還是安全性上都比不上前者。

ScheduledExecutorService的實(shí)現(xiàn)上有一些特殊,只有一個線程能夠提取到延遲隊(duì)列頭的任務(wù),并且根據(jù)任務(wù)的超時時間進(jìn)行等待。在這個等待期間,其他的線程是無法獲取任務(wù)的。這樣的實(shí)現(xiàn)是為了避免多個線程同時獲取任務(wù),導(dǎo)致超時時間未到達(dá)就任務(wù)觸發(fā)或者在等待任務(wù)超時時間時有新的任務(wù)被加入而無法響應(yīng)。

由于ScheduledExecutorService可以使用多個線程,這樣也緩解了因?yàn)閭€別任務(wù)執(zhí)行時間長導(dǎo)致的后續(xù)任務(wù)被阻塞的情況。不過延遲隊(duì)列也是一樣采用小頂堆的排序方式,因此添加任務(wù)和刪除任務(wù)的時間復(fù)雜度都是 O(Logn) 。在任務(wù)數(shù)量很大的情況下,性能表現(xiàn)比較差。

代碼如下:

  1. public class ScheduledThreadPoolServiceTest { 
  2.     // 參數(shù)代表可以同時執(zhí)行的定時任務(wù)個數(shù) 
  3.     private ScheduledExecutorService service = Executors.newScheduledThreadPool(3); 
  4.     /** 
  5.      * schedule:延時2秒執(zhí)行一次任務(wù) 
  6.      */ 
  7.     public void task0() { 
  8.         service.schedule(() -> { 
  9.             System.out.println("task0-start"); 
  10.             sleep(2); 
  11.             System.out.println("task0-end"); 
  12.         }, 2, TimeUnit.SECONDS); 
  13.     } 
  14.  
  15.     /** 
  16.      * scheduleAtFixedRate:2秒后,每間隔4秒執(zhí)行一次任務(wù) 
  17.      * 注意,如果任務(wù)的執(zhí)行時間(例如6秒)大于間隔時間,則會等待任務(wù)執(zhí)行結(jié)束后直接開始下次任務(wù) 
  18.      */ 
  19.     public void task1() { 
  20.         service.scheduleAtFixedRate(() -> { 
  21.             System.out.println("task1-start"); 
  22.             sleep(2); 
  23.             System.out.println("task1-end"); 
  24.         }, 2, 4, TimeUnit.SECONDS); 
  25.     } 
  26.  
  27.     /** 
  28.      * scheduleWithFixedDelay:2秒后,每次延時4秒執(zhí)行一次任務(wù) 
  29.      * 注意,這里是等待上次任務(wù)執(zhí)行結(jié)束后,再延時固定時間后開始下次任務(wù) 
  30.      */ 
  31.     public void task2() { 
  32.         service.scheduleWithFixedDelay(() -> { 
  33.             System.out.println("task2-start"); 
  34.             sleep(2); 
  35.             System.out.println("task2-end"); 
  36.         }, 2, 4, TimeUnit.SECONDS); 
  37.     } 
  38.  
  39.     private void sleep(long time) { 
  40.         try { 
  41.             TimeUnit.SECONDS.sleep(time); 
  42.         } catch (InterruptedException e) { 
  43.             e.printStackTrace(); 
  44.         } 
  45.     } 
  46.  
  47.     public static void main(String[] args) { 
  48.         ScheduledThreadPoolServiceTest test = new ScheduledThreadPoolServiceTest(); 
  49.         System.out.println("main start"); 
  50.         test.task0(); 
  51.         //test.task1(); 
  52.        // test.task2(); 
  53.         test.sleep(10); 
  54.         System.out.println("main end"); 
  55.     } 

DelayQueue

Java 中還有個延遲隊(duì)列 DelayQueue,加入延遲隊(duì)列的元素都必須實(shí)現(xiàn) Delayed 接口。延遲隊(duì)列內(nèi)部是利用 PriorityQueue 實(shí)現(xiàn)的,所以還是利用優(yōu)先隊(duì)列!Delayed 接口繼承了Comparable 因此優(yōu)先隊(duì)列是通過 delay 來排序的。

Redis sorted set

Redis的數(shù)據(jù)結(jié)構(gòu)Zset,同樣可以實(shí)現(xiàn)延遲隊(duì)列的效果,主要利用它的score屬性,redis通過score來為集合中的成員進(jìn)行從小到大的排序。zset 內(nèi)部是用跳表實(shí)現(xiàn)的。

跳表數(shù)據(jù)結(jié)構(gòu)的示意圖:

總體上,跳躍表刪除操作的時間復(fù)雜度是O(logN)。

有沒有更高效的數(shù)據(jù)結(jié)構(gòu)?

Timer 、ScheduledThreadPool 、 DelayQueue,總結(jié)的說下它們都是通過優(yōu)先隊(duì)列來獲取最早需要執(zhí)行的任務(wù),因此插入和刪除任務(wù)的時間復(fù)雜度都為O(logn),并且 Timer 、ScheduledThreadPool 的周期性任務(wù)是通過重置任務(wù)的下一次執(zhí)行時間來完成的。

但是由于新增任務(wù)和提取任務(wù)的時間復(fù)雜度都是 O(Logn) ,在任務(wù)數(shù)量很大,比如幾萬,十幾萬的時候,性能的開銷就變得很巨大。

問題就出在時間復(fù)雜度上,插入刪除時間復(fù)雜度是O(logn),那么假設(shè)頻繁插入刪除次數(shù)為 m,總的時間復(fù)雜度就是O(mlogn)

那么,是否存在新增任務(wù)和提取任務(wù)比 O(Log2n) 復(fù)雜度更低的數(shù)據(jù)結(jié)構(gòu)呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設(shè)計(jì)了一種名為時間輪( Timing Wheels )的數(shù)據(jù)結(jié)構(gòu),這種結(jié)構(gòu)在處理延遲任務(wù)時,其新增任務(wù)和刪除任務(wù)的時間復(fù)雜度降低到了 O(1) 。

時間輪算法

基本原理

見名知意,時間輪的數(shù)據(jù)結(jié)構(gòu)很類似于我們鐘表上的數(shù)據(jù)指針。

時間輪用環(huán)形數(shù)組實(shí)現(xiàn),數(shù)組的每個元素可以稱為槽,和 HashMap一樣稱呼。

槽的內(nèi)部用雙向鏈表存著待執(zhí)行的任務(wù),添加和刪除的鏈表操作時間復(fù)雜度都是 O(1),槽位本身也指代時間精度,比如一秒掃一個槽,那么這個時間輪的最高精度就是 1 秒。

也就是說延遲 1.2 秒的任務(wù)和 1.5 秒的任務(wù)會被加入到同一個槽中,然后在 1 秒的時候遍歷這個槽中的鏈表執(zhí)行任務(wù)。

任務(wù)插入

當(dāng)有一個延遲任務(wù)要插入時間輪時,首先計(jì)算其延遲時間與單位時間的余值,從指針指向的當(dāng)前槽位移動余值的個數(shù)槽位,就是該延遲任務(wù)需要被放入的槽位。

舉個例子,時間輪有8個槽位,編號為 0 ~ 7 。指針當(dāng)前指向槽位 2 。新增一個延遲時間為 4 秒的延遲任務(wù),4 % 8 = 4,因此該任務(wù)會被插入 4 + 2 = 6,也就是槽位6的延遲任務(wù)隊(duì)列。

時間槽位的實(shí)現(xiàn)

時間輪的槽位實(shí)現(xiàn)可以采用循環(huán)數(shù)組的方式達(dá)成,也就是讓指針在越過數(shù)組的邊界后重新回到起始下標(biāo)。概括來說,可以將時間輪的算法描述為:

用隊(duì)列來存儲延遲任務(wù),同一個隊(duì)列中的任務(wù),其延遲時間相同。用循環(huán)數(shù)組的方式來存儲元素,數(shù)組中的每一個元素都指向一個延遲任務(wù)隊(duì)列。

有一個當(dāng)前指針指向數(shù)組中的某一個槽位,每間隔一個單位時間,指針就移動到下一個槽位。被指針指向的槽位的延遲隊(duì)列,其中的延遲任務(wù)全部被觸發(fā)。

在時間輪中新增一個延遲任務(wù),將其延遲時間除以單位時間得到的余值,從當(dāng)前指針開始,移動余值對應(yīng)個數(shù)的槽位,就是延遲任務(wù)被放入的槽位。

基于這樣的數(shù)據(jù)結(jié)構(gòu),插入一個延遲任務(wù)的時間復(fù)雜度就下降到 O(1) 。而當(dāng)指針指向到一個槽位時,該槽位連接的延遲任務(wù)隊(duì)列中的延遲任務(wù)全部被觸發(fā)。

延遲任務(wù)的觸發(fā)和執(zhí)行不應(yīng)該影響指針向后移動的時間精確性。因此一般情況下,用于移動指針的線程只負(fù)責(zé)任務(wù)的觸發(fā),任務(wù)的執(zhí)行交由其他的線程來完成。比如,可以將槽位上的延遲任務(wù)隊(duì)列放入到額外的線程池中執(zhí)行,然后在槽位上新建一個空白的新的延遲任務(wù)隊(duì)列用于后續(xù)任務(wù)的添加。

關(guān)于擴(kuò)容

那假設(shè)現(xiàn)在要加入一個50秒后執(zhí)行的任務(wù)怎么辦?這槽好像不夠啊?難道要加槽嘛?和HashMap一樣擴(kuò)容?

假設(shè)要求精度為 1 秒,要能支持延遲時間為 1 天的延遲任務(wù),時間輪的槽位數(shù)需要 60 × 60 × 24 = 86400 。這就需要消耗更多的內(nèi)存。顯然,單純增加槽位數(shù)并不是一個好的解決方案。

常見有兩種方式:

通過增加輪次。50 % 8 + 1 = 3,即應(yīng)該放在槽位是 3,下標(biāo)是 2 的位置。然后 (50 - 1) / 8 = 6,即輪數(shù)記為 6。也就是說當(dāng)循環(huán) 6 輪之后掃到下標(biāo)的 2 的這個槽位會觸發(fā)這個任務(wù)。Netty 中的 HashedWheelTimer 使用的就是這種方式。

通過多層次。這個和我們的手表就更像了,像我們秒針走一圈,分針走一格,分針走一圈,時針走一格。

多層次時間輪就是這樣實(shí)現(xiàn)的。假設(shè)上圖就是第一層,那么第一層走了一圈,第二層就走一格。

可以得知第二層的一格就是8秒,假設(shè)第二層也是 8 個槽,那么第二層走一圈,第三層走一格,可以得知第三層一格就是 64 秒。

那么一格三層,每層8個槽,一共 24 個槽時間輪就可以處理最多延遲 512 秒的任務(wù)。

而多層次時間輪還會有降級的操作,假設(shè)一個任務(wù)延遲 500 秒執(zhí)行,那么剛開始加進(jìn)來肯定是放在第三層的,當(dāng)時間過了 436 秒后,此時還需要 64 秒就會觸發(fā)任務(wù)的執(zhí)行,而此時相對而言它就是個延遲 64 秒后的任務(wù),因此它會被降低放在第二層中,第一層還放不下它。

再過個 56 秒,相對而言它就是個延遲 8 秒后執(zhí)行的任務(wù),因此它會再被降級放在第一層中,等待執(zhí)行。

降級是為了保證時間精度一致性。Kafka內(nèi)部用的就是多層次的時間輪算法。

降級過程:

本文轉(zhuǎn)載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系小汪哥寫代碼公眾號。

 

責(zé)任編輯:武曉燕 來源: 小汪哥寫代碼
相關(guān)推薦

2020-11-10 07:46:09

服務(wù)器高并發(fā)高性能

2023-11-01 11:13:58

Linux信號處理定時器

2021-05-24 09:28:41

軟件開發(fā) 技術(shù)

2024-12-04 10:58:57

TomcatJetty高并發(fā)

2017-11-27 09:14:29

2023-11-06 08:32:17

FastAPIPython

2016-12-21 09:33:40

2022-06-02 12:56:25

容器網(wǎng)絡(luò)云原生

2009-06-18 11:07:17

Spring fram

2010-07-28 15:56:22

FlexTimer定時

2009-11-11 10:14:10

linux定時器操作系統(tǒng)

2009-06-15 15:02:48

Spring定時器

2018-05-13 22:23:32

2021-08-11 10:10:26

Linux定時器數(shù)組

2022-11-02 11:40:16

Flowable定時器流程

2023-12-11 09:50:35

Linux定時器

2021-09-22 16:25:17

服務(wù)器戴爾科技集團(tuán)

2021-06-28 06:00:11

systemd定時器系統(tǒng)運(yùn)維

2016-09-12 14:07:14

Android 定時器

2019-12-31 10:33:57

Netty高性能內(nèi)存
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號