Kafka解惑之時(shí)間輪 (TimingWheel)
Kafka中存在大量的延遲操作,比如延遲生產(chǎn)、延遲拉取以及延遲刪除等。Kafka并沒有使用JDK自帶的Timer或者DelayQueue來實(shí)現(xiàn)延遲的功能,而是基于時(shí)間輪自定義了一個(gè)用于實(shí)現(xiàn)延遲功能的定時(shí)器(SystemTimer)。JDK的Timer和DelayQueue插入和刪除操作的平均時(shí)間復(fù)雜度為O(nlog(n)),并不能滿足Kafka的高性能要求,而基于時(shí)間輪可以將插入和刪除操作的時(shí)間復(fù)雜度都降為O(1)。時(shí)間輪的應(yīng)用并非Kafka獨(dú)有,其應(yīng)用場(chǎng)景還有很多,在Netty、Akka、Quartz、Zookeeper等組件中都存在時(shí)間輪的蹤影。
參考下圖,Kafka中的時(shí)間輪(TimingWheel)是一個(gè)存儲(chǔ)定時(shí)任務(wù)的環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表(TimerTaskList)。TimerTaskList是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng)(TimerTaskEntry),其中封裝了真正的定時(shí)任務(wù)TimerTask。
時(shí)間輪由多個(gè)時(shí)間格組成,每個(gè)時(shí)間格代表當(dāng)前時(shí)間輪的基本時(shí)間跨度(tickMs)。時(shí)間輪的時(shí)間格個(gè)數(shù)是固定的,可用wheelSize來表示,那么整個(gè)時(shí)間輪的總體時(shí)間跨度(interval)可以通過公式 tickMs × wheelSize計(jì)算得出。時(shí)間輪還有一個(gè)表盤指針(currentTime),用來表示時(shí)間輪當(dāng)前所處的時(shí)間,currentTime是tickMs的整數(shù)倍。currentTime可以將整個(gè)時(shí)間輪劃分為到期部分和未到期部分,currentTime當(dāng)前指向的時(shí)間格也屬于到期部分,表示剛好到期,需要處理此時(shí)間格所對(duì)應(yīng)的TimerTaskList的所有任務(wù)。
若時(shí)間輪的tickMs=1ms,wheelSize=20,那么可以計(jì)算得出interval為20ms。初始情況下表盤指針currentTime指向時(shí)間格0,此時(shí)有一個(gè)定時(shí)為2ms的任務(wù)插入進(jìn)來會(huì)存放到時(shí)間格為2的TimerTaskList中。隨著時(shí)間的不斷推移,指針currentTime不斷向前推進(jìn),過了2ms之后,當(dāng)?shù)竭_(dá)時(shí)間格2時(shí),就需要將時(shí)間格2所對(duì)應(yīng)的TimeTaskList中的任務(wù)做相應(yīng)的到期操作。
此時(shí)若又有一個(gè)定時(shí)為8ms的任務(wù)插入進(jìn)來,則會(huì)存放到時(shí)間格10中,currentTime再過8ms后會(huì)指向時(shí)間格10。如果同時(shí)有一個(gè)定時(shí)為19ms的任務(wù)插入進(jìn)來怎么辦?新來的TimerTaskEntry會(huì)復(fù)用原來的TimerTaskList,所以它會(huì)插入到原本已經(jīng)到期的時(shí)間格1中??傊?,整個(gè)時(shí)間輪的總體跨度是不變的,隨著指針currentTime的不斷推進(jìn),當(dāng)前時(shí)間輪所能處理的時(shí)間段也在不斷后移,總體時(shí)間范圍在currentTime和currentTime+interval之間。
如果此時(shí)有個(gè)定時(shí)為350ms的任務(wù)該如何處理?直接擴(kuò)充wheelSize的大小么?Kafka中不乏幾萬甚至幾十萬毫秒的定時(shí)任務(wù),這個(gè)wheelSize的擴(kuò)充沒有底線,就算將所有的定時(shí)任務(wù)的到期時(shí)間都設(shè)定一個(gè)上限,比如100萬毫秒,那么這個(gè)wheelSize為100萬毫秒的時(shí)間輪不僅占用很大的內(nèi)存空間,而且效率也會(huì)拉低。Kafka為此引入了層級(jí)時(shí)間輪的概念,當(dāng)任務(wù)的到期時(shí)間超過了當(dāng)前時(shí)間輪所表示的時(shí)間范圍時(shí),就會(huì)嘗試添加到上層時(shí)間輪中。

參考上圖,復(fù)用之前的案例,***層的時(shí)間輪tickMs=1ms, wheelSize=20, interval=20ms。第二層的時(shí)間輪的tickMs為***層時(shí)間輪的interval,即為20ms。每一層時(shí)間輪的wheelSize是固定的,都是20,那么第二層的時(shí)間輪的總體時(shí)間跨度interval為400ms。以此類推,這個(gè)400ms也是第三層的tickMs的大小,第三層的時(shí)間輪的總體時(shí)間跨度為8000ms。
對(duì)于之前所說的350ms的定時(shí)任務(wù),顯然***層時(shí)間輪不能滿足條件,所以就升級(jí)到第二層時(shí)間輪中,最終被插入到第二層時(shí)間輪中時(shí)間格17所對(duì)應(yīng)的TimerTaskList中。如果此時(shí)又有一個(gè)定時(shí)為450ms的任務(wù),那么顯然第二層時(shí)間輪也無法滿足條件,所以又升級(jí)到第三層時(shí)間輪中,最終被插入到第三層時(shí)間輪中時(shí)間格1的TimerTaskList中。注意到在到期時(shí)間在[400ms,800ms)區(qū)間的多個(gè)任務(wù)(比如446ms、455ms以及473ms的定時(shí)任務(wù))都會(huì)被放入到第三層時(shí)間輪的時(shí)間格1中,時(shí)間格1對(duì)應(yīng)的TimerTaskList的超時(shí)時(shí)間為400ms。
隨著時(shí)間的流逝,當(dāng)次TimerTaskList到期之時(shí),原本定時(shí)為450ms的任務(wù)還剩下50ms的時(shí)間,還不能執(zhí)行這個(gè)任務(wù)的到期操作。這里就有一個(gè)時(shí)間輪降級(jí)的操作,會(huì)將這個(gè)剩余時(shí)間為50ms的定時(shí)任務(wù)重新提交到層級(jí)時(shí)間輪中,此時(shí)***層時(shí)間輪的總體時(shí)間跨度不夠,而第二層足夠,所以該任務(wù)被放到第二層時(shí)間輪到期時(shí)間為[40ms,60ms)的時(shí)間格中。再經(jīng)歷了40ms之后,此時(shí)這個(gè)任務(wù)又被“察覺”到,不過還剩余10ms,還是不能立即執(zhí)行到期操作。所以還要再有一次時(shí)間輪的降級(jí),此任務(wù)被添加到***層時(shí)間輪到期時(shí)間為[10ms,11ms)的時(shí)間格中,之后再經(jīng)歷10ms后,此任務(wù)真正到期,最終執(zhí)行相應(yīng)的到期操作。
設(shè)計(jì),其本源于生活。我們常見的鐘表就是一種具有三層結(jié)構(gòu)的時(shí)間輪,***層時(shí)間輪tickMs=1ms, wheelSize=60,interval=1min,此為秒鐘;第二層tickMs=1min,wheelSize=60,interval=1hour,此為分鐘;第三層tickMs=1hour,wheelSize為12,interval為12hours,此為時(shí)鐘。
在Kafka中***層時(shí)間輪的參數(shù)同上面的案例一樣:tickMs=1ms, wheelSize=20, interval=20ms,各個(gè)層級(jí)的wheelSize也固定為20,所以各個(gè)層級(jí)的tickMs和interval也可以相應(yīng)的推算出來。Kafka在具體實(shí)現(xiàn)時(shí)間輪TimingWheel時(shí)還有一些小細(xì)節(jié):
- TimingWheel在創(chuàng)建的時(shí)候以當(dāng)前系統(tǒng)時(shí)間為***層時(shí)間輪的起始時(shí)間(startMs),這里的當(dāng)前系統(tǒng)時(shí)間并沒有簡(jiǎn)單的調(diào)用System.currentTimeMillis(),而是調(diào)用了Time.SYSTEM.hiResClockMs,這是因?yàn)閏urrentTimeMillis()方法的時(shí)間精度依賴于操作系統(tǒng)的具體實(shí)現(xiàn),有些操作系統(tǒng)下并不能達(dá)到毫秒級(jí)的精度,而Time.SYSTEM.hiResClockMs實(shí)質(zhì)上是采用了System.nanoTime()/1_000_000來將精度調(diào)整到毫秒級(jí)。也有其他的某些騷操作可以實(shí)現(xiàn)毫秒級(jí)的精度,但是筆者并不推薦,System.nanoTime()/1_000_000是最有效的方法。(如對(duì)此有想法,可在留言區(qū)探討。)
- TimingWheel中的每個(gè)雙向環(huán)形鏈表TimerTaskList都會(huì)有一個(gè)哨兵節(jié)點(diǎn)(sentinel),引入哨兵節(jié)點(diǎn)可以簡(jiǎn)化邊界條件。哨兵節(jié)點(diǎn)也稱為啞元節(jié)點(diǎn)(dummy node),它是一個(gè)附加的鏈表節(jié)點(diǎn),該節(jié)點(diǎn)作為***個(gè)節(jié)點(diǎn),它的值域中并不存儲(chǔ)任何東西,只是為了操作的方便而引入的。如果一個(gè)鏈表有哨兵節(jié)點(diǎn)的話,那么線性表的***個(gè)元素應(yīng)該是鏈表的第二個(gè)節(jié)點(diǎn)。
- 除了***層時(shí)間輪,其余高層時(shí)間輪的起始時(shí)間(startMs)都設(shè)置為創(chuàng)建此層時(shí)間輪時(shí)前面***輪的currentTime。每一層的currentTime都必須是tickMs的整數(shù)倍,如果不滿足則會(huì)將currentTime修剪為tickMs的整數(shù)倍,以此與時(shí)間輪中的時(shí)間格的到期時(shí)間范圍對(duì)應(yīng)起來。修剪方法為:currentTime = startMs - (startMs % tickMs)。currentTime會(huì)隨著時(shí)間推移而推薦,但是不會(huì)改變?yōu)閠ickMs的整數(shù)倍的既定事實(shí)。若某一時(shí)刻的時(shí)間為timeMs,那么此時(shí)時(shí)間輪的currentTime = timeMs - (timeMs % tickMs),時(shí)間每推進(jìn)一次,每個(gè)層級(jí)的時(shí)間輪的currentTime都會(huì)依據(jù)此公式推進(jìn)。
- Kafka中的定時(shí)器只需持有TimingWheel的***層時(shí)間輪的引用,并不會(huì)直接持有其他高層的時(shí)間輪,但是每一層時(shí)間輪都會(huì)有一個(gè)引用(overflowWheel)指向更高一層的應(yīng)用,以此層級(jí)調(diào)用而可以實(shí)現(xiàn)定時(shí)器間接持有各個(gè)層級(jí)時(shí)間輪的引用。
關(guān)于時(shí)間輪的細(xì)節(jié)就描述到這里,各個(gè)組件中時(shí)間輪的實(shí)現(xiàn)大同小異。讀者讀到這里是否會(huì)好奇文中一直描述的一個(gè)情景——“隨著時(shí)間的流逝”或者“隨著時(shí)間的推移”,那么在Kafka中到底是怎么推進(jìn)時(shí)間的呢?類似采用JDK中的scheduleAtFixedRate來每秒推進(jìn)時(shí)間輪?顯然這樣并不合理,TimingWheel也失去了大部分意義。
Kafka中的定時(shí)器借助了JDK中的DelayQueue來協(xié)助推進(jìn)時(shí)間輪。具體做法是對(duì)于每個(gè)使用到的TimerTaskList都會(huì)加入到DelayQueue中,“每個(gè)使用到的TimerTaskList”特指有非哨兵節(jié)點(diǎn)的定時(shí)任務(wù)項(xiàng)TimerTaskEntry的TimerTaskList。DelayQueue會(huì)根據(jù)TimerTaskList對(duì)應(yīng)的超時(shí)時(shí)間expiration來排序,最短expiration的TimerTaskList會(huì)被排在DelayQueue的隊(duì)頭。Kafka中會(huì)有一個(gè)線程來獲取DelayQueue中的到期的任務(wù)列表,有意思的是這個(gè)線程所對(duì)應(yīng)的名稱叫做“ExpiredOperationReaper”,可以直譯為“過期操作收割機(jī)”,和“SkimpyOffsetMap”有的一拼。當(dāng)“收割機(jī)”線程獲取到DelayQueue中的超時(shí)的任務(wù)列表TimerTaskList之后,既可以根據(jù)TimerTaskList的expiration來推進(jìn)時(shí)間輪的時(shí)間,也可以就獲取到的TimerTaskList執(zhí)行相應(yīng)的操作,對(duì)立面的TimerTaskEntry該執(zhí)行過期操作的就執(zhí)行過期操作,該降級(jí)時(shí)間輪的就降級(jí)時(shí)間輪。
讀者讀到這里或許又非常的困惑,文章開頭明確指明的DelayQueue不適合Kafka這種高性能要求的定時(shí)任務(wù),為何這里還要引入DelayQueue呢?注意對(duì)于定時(shí)任務(wù)項(xiàng)TimerTaskEntry插入和刪除操作而言,TimingWheel時(shí)間復(fù)雜度為O(1),性能高出DelayQueue很多,如果直接將TimerTaskEntry插入DelayQueue中,那么性能顯然難以支撐。就算我們根據(jù)一定的規(guī)則將若干TimerTaskEntry劃分到TimerTaskList這個(gè)組中,然后再將TimerTaskList插入到DelayQueue中,試想下如果這個(gè)TimerTaskList中又要多添加一個(gè)TimerTaskEntry該如何處理?對(duì)于DelayQueue而言,這類操作顯然變得力不從心。
分析到這里可以發(fā)現(xiàn),Kafka中的TimingWheel專門用來執(zhí)行插入和刪除TimerTaskEntry的操作,而DelayQueue專門負(fù)責(zé)時(shí)間推進(jìn)的任務(wù)。再試想一下,DelayQueue中的***個(gè)超時(shí)任務(wù)列表的expiration為200ms,第二個(gè)超時(shí)任務(wù)為840ms,這里獲取DelayQueue的隊(duì)頭只需要O(1)的時(shí)間復(fù)雜度。如果采用每秒定時(shí)推進(jìn),那么獲取到***個(gè)超時(shí)的任務(wù)列表時(shí)執(zhí)行的200次推進(jìn)中有199次屬于“空推進(jìn)”,而獲取到第二個(gè)超時(shí)任務(wù)時(shí)有需要執(zhí)行639次“空推進(jìn)”,這樣會(huì)無故空耗機(jī)器的性能資源,這里采用DelayQueue來輔助以少量空間換時(shí)間,從而做到了“精準(zhǔn)推進(jìn)”。Kafka中的定時(shí)器真可謂是“知人善用”,用TimingWheel做最擅長(zhǎng)的任務(wù)添加和刪除操作,而用DelayQueue做最擅長(zhǎng)的時(shí)間推進(jìn)工作,相輔相成。