彌補(bǔ)延時(shí)消息的不足,RocketMQ 基于時(shí)間輪算法實(shí)現(xiàn)了定時(shí)消息!
?大家好,我是君哥。
在 RocketMQ 4.x 版本,使用延時(shí)消息來(lái)實(shí)現(xiàn)消息的定時(shí)消費(fèi)。延時(shí)消息可以一定程度上實(shí)現(xiàn)定時(shí)發(fā)送,但是有一些局限。
RocketMQ 新版本基于時(shí)間輪算法引入了定時(shí)消息,目前,精確到秒級(jí)的定時(shí)消息實(shí)現(xiàn)的 pr 已經(jīng)提交到社區(qū),今天來(lái)介紹一下。
1 延時(shí)消息
1.1 簡(jiǎn)介
RocketMQ 的延時(shí)消息是指 Producer 發(fā)送消息后,Consumer 不會(huì)立即消費(fèi),而是需要等待固定的時(shí)間才能消費(fèi)。在一些場(chǎng)景下,延時(shí)消息是很有用的,比如電商場(chǎng)景下關(guān)閉 30 分鐘內(nèi)未支付的訂單。
使用延時(shí)消息非常簡(jiǎn)單,只需要給消息的 delayTimeLevel 屬性賦值就可以。參考下面代碼:
延時(shí)消息有 18 個(gè)級(jí)別,如下:
1.2 實(shí)現(xiàn)原理
延時(shí)消息的實(shí)現(xiàn)原理如下圖:
Producer 把消息發(fā)送到 Broker 后,Broker 判斷到是延時(shí)消息,首先會(huì)把消息投遞到延時(shí)隊(duì)列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定時(shí)任務(wù)線程池會(huì)有 18 個(gè)線程來(lái)對(duì)延時(shí)隊(duì)列進(jìn)行調(diào)度,每個(gè)線程調(diào)度一個(gè)延時(shí)級(jí)別,調(diào)度任務(wù)把延時(shí)消息再投遞到原始隊(duì)列,這樣 Consumer 就可以拉取到了。
1.3 存在不足
延時(shí)消息存在著一些不足:
1.延時(shí)級(jí)別只有 18 個(gè),并不能滿足所有場(chǎng)景;
2.如果通過(guò)修改 messageDelayLevel 配置來(lái)自定義延時(shí)級(jí)別,并不靈活,比如一個(gè)在大規(guī)模的平臺(tái)上,延時(shí)級(jí)別成百上千,而且隨時(shí)可能增加新的延時(shí)時(shí)間;
3.延時(shí)時(shí)間不準(zhǔn)確,后臺(tái)的定時(shí)線程可能會(huì)因?yàn)樘幚硐⒘看髮?dǎo)致延時(shí)誤差大。
2 定時(shí)消息
為了彌補(bǔ)延時(shí)消息的不足,RocketMQ 5.0 引入了定時(shí)消息。
2.1 時(shí)間輪算法
為了解決定時(shí)任務(wù)隊(duì)列遍歷任務(wù)導(dǎo)致的性能開(kāi)銷,RocketMQ 定時(shí)消息引入了秒級(jí)的時(shí)間輪算法。如下圖:
圖中是一個(gè) 60s 的時(shí)間輪,時(shí)間輪上會(huì)有一個(gè)指向當(dāng)前時(shí)間的指針定時(shí)地移動(dòng)到下一個(gè)時(shí)間(秒級(jí))。
時(shí)間輪算法的優(yōu)勢(shì)是不用去遍歷所有的任務(wù),每一個(gè)時(shí)間節(jié)點(diǎn)上的任務(wù)用鏈表串起來(lái),當(dāng)時(shí)間輪上的指針移動(dòng)到當(dāng)前的時(shí)間時(shí),這個(gè)時(shí)間節(jié)點(diǎn)上的全部任務(wù)都執(zhí)行。
雖然上面只是一個(gè) 60s 的時(shí)間輪,但是對(duì)于所有的時(shí)間延時(shí),都是支持的??梢栽诿總€(gè)時(shí)間節(jié)點(diǎn)增加一個(gè) round 字段,記錄時(shí)間輪轉(zhuǎn)動(dòng)的圈數(shù),比如對(duì)于延時(shí) 130s 的任務(wù),round 就是 2,放在第 10 個(gè)時(shí)間刻度的鏈表中。這樣當(dāng)時(shí)間輪轉(zhuǎn)到一個(gè)節(jié)點(diǎn),執(zhí)行節(jié)點(diǎn)上的任務(wù)時(shí),首先判斷 round 是否等于 0,如果等于 0,則把這個(gè)任務(wù)從任務(wù)鏈表中移出交給異步線程執(zhí)行,否則將 round 減 1 繼續(xù)檢查后面的任務(wù)。
2.2 使用方式
基于時(shí)間輪算法的思想,RocketMQ 實(shí)現(xiàn)了精準(zhǔn)的定時(shí)消息。使用 RocketMQ 定時(shí)消息時(shí),客戶端定義消息的示例代碼如下:
2.3 實(shí)現(xiàn)原理
2.3.1 消息投遞
上面的代碼構(gòu)中,Producer 創(chuàng)建消息時(shí)給消息傳了一個(gè)系統(tǒng)屬性 deliveryTimestamp,這個(gè)屬性指定了消息投遞的時(shí)間,并且封裝到消息的 TIMER_DELIVER_MS 屬性,代碼如下:
Broker 收到這個(gè)消息后,如果判斷到 TIMER_DELIVER_MS 這個(gè)屬性有值,就會(huì)把這個(gè)消息投遞到 Topic 是 rmq_sys_wheel_timer 的隊(duì)列中,queueId 是 0,同時(shí)會(huì)保存原始消息的 Topic、queueId、投遞時(shí)間(TIMER_OUT_MS)。
TimerMessageStore 中有個(gè)定時(shí)任務(wù) TimerEnqueueGetService 會(huì)從 rmq_sys_wheel_timer 這個(gè) Topic 中讀取消息,然后封裝 TimerRequest 請(qǐng)求并放到隊(duì)列 enqueuePutQueue。
2.3.2 綁定時(shí)間輪
RocketMQ 使用 TimerLog 來(lái)保存消息的原始數(shù)據(jù)綁定到時(shí)間輪上。首先看一下 TimerLog 保存的數(shù)據(jù)結(jié)構(gòu),如下圖:
參考下面代碼:
TimerEnqueuePutService 這個(gè)定時(shí)任務(wù)從上面的 enqueuePutQueue(2.3.1節(jié)) 取出 TimerRequest 然后封裝成 TimerLog。
那時(shí)間輪是怎么跟 TimerLog 關(guān)聯(lián)起來(lái)的呢?RocketMQ 使用 TimerWheel 來(lái)描述時(shí)間輪,TimerWheel 中每一個(gè)時(shí)間節(jié)點(diǎn)是一個(gè) Slot,Slot 保存了這個(gè)延時(shí)時(shí)間的 TimerLog 信息。數(shù)據(jù)結(jié)構(gòu)如下圖:
參考下面代碼:
這樣時(shí)間輪跟 TimerLog 就關(guān)聯(lián)起來(lái)了,見(jiàn)下圖:
如果時(shí)間輪的一個(gè)時(shí)間節(jié)點(diǎn)(Slot)上有一條新的消息到來(lái),那只要新建一個(gè) TimerLog,然后把它的指針指向該時(shí)間節(jié)點(diǎn)的最后一個(gè) TimerLog,然后把 Slot 的 lastPos 屬性指向新建的這個(gè) TimerLog,如下圖:
從源碼上看,RocketMQ 定義了一個(gè) 7 天的以秒為單位的時(shí)間輪。
2.3.3 時(shí)間輪轉(zhuǎn)動(dòng)
轉(zhuǎn)動(dòng)時(shí)間輪時(shí),TimerDequeueGetService 這個(gè)定時(shí)任務(wù)從當(dāng)前時(shí)間節(jié)點(diǎn)(Slot)對(duì)應(yīng)的 TimerLog 中取出數(shù)據(jù),封裝成 TimerRequest 放入 dequeueGetQueue 隊(duì)列。
2.3.4 CommitLog 中讀取消息
定時(shí)任務(wù) TimerDequeueGetMessageService 從隊(duì)列 dequeueGetQueue 中拉取 TimerRequest 請(qǐng)求,然后根據(jù) TimerRequest 中的參數(shù)去 CommitLog(MessageExt) 中查找消息,查出后把消息封裝到 TimerRequest 中,然后把 TimerRequest 寫(xiě)入 dequeuePutQueue 這個(gè)隊(duì)列。
2.3.5 寫(xiě)入原隊(duì)列
定時(shí)任務(wù) TimerDequeuePutMessageService 從 dequeuePutQueue 隊(duì)列中獲取消息,把消息轉(zhuǎn)換成原始消息,投入到原始隊(duì)列中,這樣消費(fèi)者就可以拉取到了。
3 總結(jié)
RocketMQ 4.x 版本只支持延時(shí)消息,有一些局限性。而 RocketMQ 新版本引入了定時(shí)消息,彌補(bǔ)了延時(shí)消息的不足。定時(shí)消息的處理流程如下圖:
可以看到,RocketMQ 的定時(shí)消息的實(shí)現(xiàn)還是有一定復(fù)雜度的,這里用到 5 個(gè)定時(shí)任務(wù)和 3 個(gè)隊(duì)列來(lái)實(shí)現(xiàn)。
最后,對(duì)于定時(shí)時(shí)間的定義,客戶端、Broker 和時(shí)間輪的默認(rèn)最大延時(shí)時(shí)間定義是不同的,使用的時(shí)候需要注意。