RocketMQ延時消息解析!你學會了嗎?
什么是延時消息?
指的是當消息寫入到Broker后,不能立刻被消費者消費,需要等待指定的時長后才可被消費處理的消息。
延時消息等級
RocketMQ延時消息的延遲時長不支持隨意時長的延遲。
- 是通過特定的延遲等級來指定的。
默認支持18個等級的延遲消息。
延時等級定義在RocketMQ服務端的MessageStoreConfig類中的如下變量中:
// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
level 有以下三種情況:
- level == 0,消息為非延遲消息。
- 1<=level<=maxLevel,消息延遲特定時間,例如:level==1,延遲1s。
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h。
發(fā)消息時,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。
例如指定的延時等級為3,則表示延遲時長為10s,延遲等級是從1開始計數(shù)的。
使用場景
1、電商交易系統(tǒng)的訂單超時未支付,自動取消訂單。
2、超時自動審批,系統(tǒng)審批流程可以設(shè)置為超過設(shè)定時間后自動執(zhí)行通過或者拒絕流程。
3、限時優(yōu)惠活動,商品需要促銷,在活動開始時,發(fā)送一個兩小時后觸發(fā)的定時消息,用于在活動結(jié)束時恢復原價。
為什么不支持任意時間
按照《RocketMQ Developer Guide》中的說法:
- 如果提供任意時間,就會涉及到消息的排序,會有一定的性能損耗。
而RocketMQ這種利用固定延遲級別到單個隊列的實現(xiàn)方式是一種妥協(xié),靈活性和極致性能的妥協(xié)。
延遲消息與消費重試的關(guān)系
消息重試的16個級別,實際上是把延遲消息18個級別的前兩個Level去掉了。
事實上,RocketMQ的消息重試也是基于延遲消息來完成的。
- 在消息消費失敗的情況下,將其重新當做延遲消息投遞回Broker。
在投遞回去時,會跳過前兩個Level,因此只重試16次。
詳細內(nèi)容可以看我之前的文章!?。?/p>
實現(xiàn)原理
RocketMQ延時消息會暫存在名為SCHEDULE_TOPIC_XXXX的Topic中。
- 并根據(jù)delayTimeLevel存入特定的Queue。
queueId = delayTimeLevel – 1:即一個Queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費。
Broker會調(diào)度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的Topic。
圖片
主要步驟:
圖片
修改消息Topic名稱和隊列信息:
RocketMQ Broker端在存儲生產(chǎn)者寫入的消息時,首先都會將其寫入到CommitLog中。
之后根據(jù)消息中的Topic信息和隊列信息,將其轉(zhuǎn)發(fā)到目標Topic的指定隊列(ConsumeQueue)中。
由于消息一旦存儲到ConsumeQueue中,消費者就能消費到,而延遲消息不能被立即消費。
所以將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級別確定要投遞到哪個隊列下。
- 同時,還會將消息原來要發(fā)送到的目標Topic和隊列信息存儲到消息的屬性中。
轉(zhuǎn)發(fā)消息到延遲主題SCHEDULE_TOPIC_XXXX的CosumeQueue中:
CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進行的。
在轉(zhuǎn)發(fā)過程中,會對延遲消息進行特殊處理,主要是計算這條延遲消息需要在什么時候進行投遞。
- 投遞時間 = 消息存儲時間(StoreTimestamp) + 延遲級別對應的時間。
圖片
延遲服務消費SCHEDULE_TOPIC_XXXX消息:
Broker內(nèi)部有一個ScheduleMessageService類,其充當延遲服務。
- 主要是消費SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標Topic中。
ScheduleMessageService在啟動時,其會創(chuàng)建一個定時器Timer,并根據(jù)延遲級別的個數(shù),啟動對應數(shù)量的TimerTask。
- 每個TimerTask負責一個延遲級別的消費與投遞。
如果可以投放,則在投放到原本的目的Topic。
每隔100ms,從TopicSCHEDULE_TOPIC_XXXX判斷18個隊列里的第一個消息是否可以被投放。
需要注意
每個TimeTask在檢查消息是否到期時:
- 首先檢查對應隊列中尚未投遞第一條消息。
- 如果這條消息沒到期,那么之后的消息都不會檢查。
- 如果到期了,則進行投遞,并檢查之后的消息是否到期。
圖片
圖片
將信息重新存儲到CommitLog中:
在將消息到期后,需要投遞到目標Topic。
由于在第一步已經(jīng)記錄了原來的Topic和隊列信息,因此這里重新設(shè)置,再存儲到CommitLog即可。
將消息投遞到目標Topic中:
由于消息
的Topic名稱已經(jīng)改為了目標Topic。
因此消息會直接投遞到目標Topic的ConsumeQueue中,之后消費者即消費到這條消息。
消費者消費目標Topic中的數(shù)據(jù)。