自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ延時消息解析!你學會了嗎?

開發(fā) 前端
CommitLog?中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進行的。在轉(zhuǎn)發(fā)過程中,會對延遲消息進行特殊處理,主要是計算這條延遲消息需要在什么時候進行投遞。

什么是延時消息?

指的是當消息寫入到Broker后,不能立刻被消費者消費,需要等待指定的時長后才可被消費處理的消息。

延時消息等級

RocketMQ延時消息的延遲時長不支持隨意時長的延遲。

  • 是通過特定的延遲等級來指定的。

默認支持18個等級的延遲消息。

延時等級定義在RocketMQ服務端的MessageStoreConfig類中的如下變量中:

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

level 有以下三種情況:

  • level == 0,消息為非延遲消息。
  • 1<=level<=maxLevel,消息延遲特定時間,例如:level==1,延遲1s。
  • level > maxLevel,則level== maxLevel,例如level==20,延遲2h。

發(fā)消息時,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。

例如指定的延時等級為3,則表示延遲時長為10s,延遲等級是從1開始計數(shù)的。

使用場景

1、電商交易系統(tǒng)的訂單超時未支付,自動取消訂單。

2、超時自動審批,系統(tǒng)審批流程可以設(shè)置為超過設(shè)定時間后自動執(zhí)行通過或者拒絕流程。

3、限時優(yōu)惠活動,商品需要促銷,在活動開始時,發(fā)送一個兩小時后觸發(fā)的定時消息,用于在活動結(jié)束時恢復原價。

為什么不支持任意時間

按照《RocketMQ Developer Guide》中的說法:

  • 如果提供任意時間,就會涉及到消息的排序,會有一定的性能損耗。

而RocketMQ這種利用固定延遲級別到單個隊列的實現(xiàn)方式是一種妥協(xié),靈活性和極致性能的妥協(xié)。

延遲消息與消費重試的關(guān)系

消息重試的16個級別,實際上是把延遲消息18個級別的前兩個Level去掉了。

事實上,RocketMQ的消息重試也是基于延遲消息來完成的。

  • 在消息消費失敗的情況下,將其重新當做延遲消息投遞回Broker。

在投遞回去時,會跳過前兩個Level,因此只重試16次。

詳細內(nèi)容可以看我之前的文章!?。?/p>

實現(xiàn)原理

RocketMQ延時消息會暫存在名為SCHEDULE_TOPIC_XXXX的Topic中。

  • 并根據(jù)delayTimeLevel存入特定的Queue。

queueId = delayTimeLevel – 1:即一個Queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費。

Broker會調(diào)度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的Topic。

圖片圖片

主要步驟:

圖片圖片

修改消息Topic名稱和隊列信息:

RocketMQ Broker端在存儲生產(chǎn)者寫入的消息時,首先都會將其寫入到CommitLog中。

之后根據(jù)消息中的Topic信息和隊列信息,將其轉(zhuǎn)發(fā)到目標Topic的指定隊列(ConsumeQueue)中。

由于消息一旦存儲到ConsumeQueue中,消費者就能消費到,而延遲消息不能被立即消費。

所以將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級別確定要投遞到哪個隊列下。

  • 同時,還會將消息原來要發(fā)送到的目標Topic和隊列信息存儲到消息的屬性中。

轉(zhuǎn)發(fā)消息到延遲主題SCHEDULE_TOPIC_XXXX的CosumeQueue中:

CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進行的。

在轉(zhuǎn)發(fā)過程中,會對延遲消息進行特殊處理,主要是計算這條延遲消息需要在什么時候進行投遞。

  • 投遞時間 = 消息存儲時間(StoreTimestamp) + 延遲級別對應的時間。

圖片圖片

延遲服務消費SCHEDULE_TOPIC_XXXX消息:

Broker內(nèi)部有一個ScheduleMessageService類,其充當延遲服務。

  • 主要是消費SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標Topic中。

ScheduleMessageService在啟動時,其會創(chuàng)建一個定時器Timer,并根據(jù)延遲級別的個數(shù),啟動對應數(shù)量的TimerTask。

  • 每個TimerTask負責一個延遲級別的消費與投遞。

如果可以投放,則在投放到原本的目的Topic。

每隔100ms,從TopicSCHEDULE_TOPIC_XXXX判斷18個隊列里的第一個消息是否可以被投放。

需要注意

每個TimeTask在檢查消息是否到期時:

  • 首先檢查對應隊列中尚未投遞第一條消息。
  • 如果這條消息沒到期,那么之后的消息都不會檢查。
  • 如果到期了,則進行投遞,并檢查之后的消息是否到期。

圖片圖片

圖片圖片

將信息重新存儲到CommitLog中:

在將消息到期后,需要投遞到目標Topic。

由于在第一步已經(jīng)記錄了原來的Topic和隊列信息,因此這里重新設(shè)置,再存儲到CommitLog即可。

將消息投遞到目標Topic中:

由于消息

的Topic名稱已經(jīng)改為了目標Topic。

因此消息會直接投遞到目標Topic的ConsumeQueue中,之后消費者即消費到這條消息。

消費者消費目標Topic中的數(shù)據(jù)。

責任編輯:武曉燕 來源: 月伴飛魚
相關(guān)推薦

2022-07-13 08:16:49

RocketMQRPC日志

2023-06-26 13:08:52

GraphQL服務數(shù)據(jù)

2024-02-05 13:52:30

?Thread對象強引用

2025-01-02 10:02:44

2023-12-27 07:31:45

json產(chǎn)品場景

2024-09-06 07:29:05

2022-12-22 08:14:54

2024-07-11 11:17:00

消息隊列Java

2024-01-02 12:05:26

Java并發(fā)編程

2023-08-01 12:51:18

WebGPT機器學習模型

2023-01-10 08:43:15

定義DDD架構(gòu)

2024-02-04 00:00:00

Effect數(shù)據(jù)組件

2023-07-26 13:11:21

ChatGPT平臺工具

2024-01-19 08:25:38

死鎖Java通信

2024-02-01 15:03:14

RocketMQKosmos高可用

2024-07-31 08:26:47

2024-04-01 08:29:09

Git核心實例

2025-01-14 08:32:55

JWT令牌.NET

2022-06-16 07:50:35

數(shù)據(jù)結(jié)構(gòu)鏈表

2022-12-06 07:53:33

MySQL索引B+樹
點贊
收藏

51CTO技術(shù)棧公眾號