自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

彌補(bǔ)延時(shí)消息的不足,RocketMQ 基于時(shí)間輪算法實(shí)現(xiàn)了定時(shí)消息!

開(kāi)發(fā) 架構(gòu)
RocketMQ 的延時(shí)消息是指 Producer 發(fā)送消息后,Consumer 不會(huì)立即消費(fèi),而是需要等待固定的時(shí)間才能消費(fèi)。在一些場(chǎng)景下,延時(shí)消息是很有用的,比如電商場(chǎng)景下關(guān)閉 30 分鐘內(nèi)未支付的訂單。

?大家好,我是君哥。

在 RocketMQ 4.x 版本,使用延時(shí)消息來(lái)實(shí)現(xiàn)消息的定時(shí)消費(fèi)。延時(shí)消息可以一定程度上實(shí)現(xiàn)定時(shí)發(fā)送,但是有一些局限。

RocketMQ 新版本基于時(shí)間輪算法引入了定時(shí)消息,目前,精確到秒級(jí)的定時(shí)消息實(shí)現(xiàn)的 pr 已經(jīng)提交到社區(qū),今天來(lái)介紹一下。

1 延時(shí)消息

1.1 簡(jiǎn)介

RocketMQ 的延時(shí)消息是指 Producer 發(fā)送消息后,Consumer 不會(huì)立即消費(fèi),而是需要等待固定的時(shí)間才能消費(fèi)。在一些場(chǎng)景下,延時(shí)消息是很有用的,比如電商場(chǎng)景下關(guān)閉 30 分鐘內(nèi)未支付的訂單。

使用延時(shí)消息非常簡(jiǎn)單,只需要給消息的 delayTimeLevel 屬性賦值就可以。參考下面代碼:

Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//3 個(gè)級(jí)別,10s
message.setDelayTimeLevel(3);
producer.send(message);

延時(shí)消息有 18 個(gè)級(jí)別,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

1.2 實(shí)現(xiàn)原理

延時(shí)消息的實(shí)現(xiàn)原理如下圖:

圖片

Producer 把消息發(fā)送到 Broker 后,Broker 判斷到是延時(shí)消息,首先會(huì)把消息投遞到延時(shí)隊(duì)列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定時(shí)任務(wù)線程池會(huì)有 18 個(gè)線程來(lái)對(duì)延時(shí)隊(duì)列進(jìn)行調(diào)度,每個(gè)線程調(diào)度一個(gè)延時(shí)級(jí)別,調(diào)度任務(wù)把延時(shí)消息再投遞到原始隊(duì)列,這樣 Consumer 就可以拉取到了。

1.3 存在不足

延時(shí)消息存在著一些不足:

1.延時(shí)級(jí)別只有 18 個(gè),并不能滿足所有場(chǎng)景;

2.如果通過(guò)修改 messageDelayLevel 配置來(lái)自定義延時(shí)級(jí)別,并不靈活,比如一個(gè)在大規(guī)模的平臺(tái)上,延時(shí)級(jí)別成百上千,而且隨時(shí)可能增加新的延時(shí)時(shí)間;

3.延時(shí)時(shí)間不準(zhǔn)確,后臺(tái)的定時(shí)線程可能會(huì)因?yàn)樘幚硐⒘看髮?dǎo)致延時(shí)誤差大。

2 定時(shí)消息

為了彌補(bǔ)延時(shí)消息的不足,RocketMQ 5.0 引入了定時(shí)消息。

2.1 時(shí)間輪算法

為了解決定時(shí)任務(wù)隊(duì)列遍歷任務(wù)導(dǎo)致的性能開(kāi)銷,RocketMQ 定時(shí)消息引入了秒級(jí)的時(shí)間輪算法。如下圖:

圖片

圖中是一個(gè) 60s 的時(shí)間輪,時(shí)間輪上會(huì)有一個(gè)指向當(dāng)前時(shí)間的指針定時(shí)地移動(dòng)到下一個(gè)時(shí)間(秒級(jí))。

時(shí)間輪算法的優(yōu)勢(shì)是不用去遍歷所有的任務(wù),每一個(gè)時(shí)間節(jié)點(diǎn)上的任務(wù)用鏈表串起來(lái),當(dāng)時(shí)間輪上的指針移動(dòng)到當(dāng)前的時(shí)間時(shí),這個(gè)時(shí)間節(jié)點(diǎn)上的全部任務(wù)都執(zhí)行。

雖然上面只是一個(gè) 60s 的時(shí)間輪,但是對(duì)于所有的時(shí)間延時(shí),都是支持的??梢栽诿總€(gè)時(shí)間節(jié)點(diǎn)增加一個(gè) round 字段,記錄時(shí)間輪轉(zhuǎn)動(dòng)的圈數(shù),比如對(duì)于延時(shí) 130s 的任務(wù),round 就是 2,放在第 10 個(gè)時(shí)間刻度的鏈表中。這樣當(dāng)時(shí)間輪轉(zhuǎn)到一個(gè)節(jié)點(diǎn),執(zhí)行節(jié)點(diǎn)上的任務(wù)時(shí),首先判斷 round 是否等于 0,如果等于 0,則把這個(gè)任務(wù)從任務(wù)鏈表中移出交給異步線程執(zhí)行,否則將 round 減 1 繼續(xù)檢查后面的任務(wù)。

2.2 使用方式

基于時(shí)間輪算法的思想,RocketMQ 實(shí)現(xiàn)了精準(zhǔn)的定時(shí)消息。使用 RocketMQ 定時(shí)消息時(shí),客戶端定義消息的示例代碼如下:

org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null,
Lists.newArrayList(
Message.newBuilder()
.setTopic(Resource.newBuilder()
.setName(TOPIC)
.build())
.setSystemProperties(SystemProperties.newBuilder()
.setMessageId(msgId)
.setQueueId(0)
.setMessageType(MessageType.DELAY)
.setDeliveryTimestamp(Timestamps.fromMillis(deliveryTime))
//定義消息投遞時(shí)間
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
.setBornHost(StringUtils.defaultString(RemotingUtil.getLocalAddress(), "127.0.0.1:1234"))
.build())
.setBody(ByteString.copyFromUtf8("123"))
.build()
),
Resource.newBuilder().setName(TOPIC).build()).get(0);

2.3 實(shí)現(xiàn)原理

2.3.1 消息投遞

上面的代碼構(gòu)中,Producer 創(chuàng)建消息時(shí)給消息傳了一個(gè)系統(tǒng)屬性 deliveryTimestamp,這個(gè)屬性指定了消息投遞的時(shí)間,并且封裝到消息的 TIMER_DELIVER_MS 屬性,代碼如下:

protected void fillDelayMessageProperty(apache.rocketmq.v2.Message message, org.apache.rocketmq.common.message.Message messageWithHeader){
if (message.getSystemProperties().hasDeliveryTimestamp()) {
Timestamp deliveryTimestamp = message.getSystemProperties().getDeliveryTimestamp();
//delayTime 這個(gè)延時(shí)時(shí)間默認(rèn)不能超過(guò) 1,可以配置
long deliveryTimestampMs = Timestamps.toMillis(deliveryTimestamp);
validateDelayTime(deliveryTimestampMs);
//...
String timestampString = String.valueOf(deliveryTimestampMs);
//MessageConst.PROPERTY_TIMER_DELIVER_MS="TIMER_DELIVER_MS"
MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TIMER_DELIVER_MS, timestampString);
}
}

Broker 收到這個(gè)消息后,如果判斷到 TIMER_DELIVER_MS 這個(gè)屬性有值,就會(huì)把這個(gè)消息投遞到 Topic 是 rmq_sys_wheel_timer 的隊(duì)列中,queueId 是 0,同時(shí)會(huì)保存原始消息的 Topic、queueId、投遞時(shí)間(TIMER_OUT_MS)。

TimerMessageStore 中有個(gè)定時(shí)任務(wù) TimerEnqueueGetService 會(huì)從 rmq_sys_wheel_timer 這個(gè) Topic 中讀取消息,然后封裝 TimerRequest 請(qǐng)求并放到隊(duì)列 enqueuePutQueue。

2.3.2 綁定時(shí)間輪

RocketMQ 使用 TimerLog 來(lái)保存消息的原始數(shù)據(jù)綁定到時(shí)間輪上。首先看一下 TimerLog 保存的數(shù)據(jù)結(jié)構(gòu),如下圖:

圖片

參考下面代碼:

//TimerMessageStore類
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
tmpBuffer.putLong(slot.lastPos); //prev pos
tmpBuffer.putInt(magic); //magic
tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
tmpBuffer.putLong(offsetPy); //offset
tmpBuffer.putInt(sizePy); //size
tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
tmpBuffer.putLong(0); //reserved value, just set to 0 now
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
// If it's a delete message, then slot's total num -1
// TODO: check if the delete msg is in the same slot with "the msg to be deleted".
timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
isDelete ? slot.num - 1 : slot.num + 1, slot.magic);

}

TimerEnqueuePutService 這個(gè)定時(shí)任務(wù)從上面的 enqueuePutQueue(2.3.1節(jié)) 取出 TimerRequest 然后封裝成  TimerLog。

那時(shí)間輪是怎么跟 TimerLog 關(guān)聯(lián)起來(lái)的呢?RocketMQ 使用 TimerWheel 來(lái)描述時(shí)間輪,TimerWheel 中每一個(gè)時(shí)間節(jié)點(diǎn)是一個(gè) Slot,Slot 保存了這個(gè)延時(shí)時(shí)間的 TimerLog 信息。數(shù)據(jù)結(jié)構(gòu)如下圖:

圖片

參考下面代碼:

//類 TimerWheel
public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic){
localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
localBuffer.get().putLong(timeMs / precisionMs);
localBuffer.get().putLong(firstPos);
localBuffer.get().putLong(lastPos);
localBuffer.get().putInt(num);
localBuffer.get().putInt(magic);
}

這樣時(shí)間輪跟 TimerLog 就關(guān)聯(lián)起來(lái)了,見(jiàn)下圖:

圖片

如果時(shí)間輪的一個(gè)時(shí)間節(jié)點(diǎn)(Slot)上有一條新的消息到來(lái),那只要新建一個(gè) TimerLog,然后把它的指針指向該時(shí)間節(jié)點(diǎn)的最后一個(gè) TimerLog,然后把 Slot 的 lastPos 屬性指向新建的這個(gè) TimerLog,如下圖:

圖片

從源碼上看,RocketMQ 定義了一個(gè) 7 天的以秒為單位的時(shí)間輪。

2.3.3 時(shí)間輪轉(zhuǎn)動(dòng)

轉(zhuǎn)動(dòng)時(shí)間輪時(shí),TimerDequeueGetService 這個(gè)定時(shí)任務(wù)從當(dāng)前時(shí)間節(jié)點(diǎn)(Slot)對(duì)應(yīng)的 TimerLog 中取出數(shù)據(jù),封裝成 TimerRequest 放入 dequeueGetQueue 隊(duì)列。

2.3.4 CommitLog 中讀取消息

定時(shí)任務(wù) TimerDequeueGetMessageService 從隊(duì)列 dequeueGetQueue 中拉取 TimerRequest 請(qǐng)求,然后根據(jù) TimerRequest 中的參數(shù)去 CommitLog(MessageExt) 中查找消息,查出后把消息封裝到 TimerRequest 中,然后把 TimerRequest 寫(xiě)入 dequeuePutQueue 這個(gè)隊(duì)列。

2.3.5 寫(xiě)入原隊(duì)列

定時(shí)任務(wù) TimerDequeuePutMessageService 從 dequeuePutQueue 隊(duì)列中獲取消息,把消息轉(zhuǎn)換成原始消息,投入到原始隊(duì)列中,這樣消費(fèi)者就可以拉取到了。

3 總結(jié)

RocketMQ 4.x 版本只支持延時(shí)消息,有一些局限性。而 RocketMQ 新版本引入了定時(shí)消息,彌補(bǔ)了延時(shí)消息的不足。定時(shí)消息的處理流程如下圖:

圖片

可以看到,RocketMQ 的定時(shí)消息的實(shí)現(xiàn)還是有一定復(fù)雜度的,這里用到 5 個(gè)定時(shí)任務(wù)和 3 個(gè)隊(duì)列來(lái)實(shí)現(xiàn)。

最后,對(duì)于定時(shí)時(shí)間的定義,客戶端、Broker 和時(shí)間輪的默認(rèn)最大延時(shí)時(shí)間定義是不同的,使用的時(shí)候需要注意。

責(zé)任編輯:武曉燕 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2024-10-11 09:15:33

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2022-07-12 17:33:00

消息定時(shí)提醒鴻蒙

2022-12-22 10:03:18

消息集成

2022-05-24 10:43:02

延時(shí)消息分布式MQ

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-11-13 00:59:13

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-13 11:52:47

順序消息RocketMQkafka

2023-12-30 13:47:48

Redis消息隊(duì)列機(jī)制

2023-07-17 08:34:03

RocketMQ消息初體驗(yàn)

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-18 09:03:01

RocketMQ場(chǎng)景消息

2025-04-09 08:20:00

RocketMQ消息隊(duì)列開(kāi)發(fā)

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2023-09-04 08:00:53

提交事務(wù)消息

2021-10-03 21:41:13

RocketMQKafkaPulsar

2024-09-25 08:32:05

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)