RocketMQ如何保證消息的可靠性投遞?
介紹
要想保證消息的可靠型投遞,無非保證如下3個(gè)階段的正常執(zhí)行即可。
- 生產(chǎn)者將消息成功投遞到broker
- broker將投遞過程的消息持久化下來
- 消費(fèi)者能從broker消費(fèi)到消息
發(fā)送端消息重試
producer向broker發(fā)送消息后,沒有收到broker的ack時(shí),rocketmq會(huì)自動(dòng)重試。重試的次數(shù)可以設(shè)置,默認(rèn)為2次
- DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
- // 同步發(fā)送設(shè)置重試次數(shù)為5次
- producer.setRetryTimesWhenSendFailed(5);
- // 異步發(fā)送設(shè)置重試次數(shù)為5次
- producer.setRetryTimesWhenSendAsyncFailed(5);
消息持久化
我們先來了解一下消息的存儲(chǔ)流程,這個(gè)知識對后面分析消費(fèi)端消息重試非常重要。
和消息相關(guān)的文件有如下幾種
- CommitLog:存儲(chǔ)消息的元數(shù)據(jù)
- ConsumerQueue:存儲(chǔ)消息在CommitLog的索引
- IndexFile:可以通過Message Key,時(shí)間區(qū)間快速查找到消息
整個(gè)消息的存儲(chǔ)流程如下
- Producer將消息順序?qū)懙紺ommitLog中
- 有一個(gè)線程根據(jù)消息的隊(duì)列信息,寫入到相關(guān)的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當(dāng)前消費(fèi)到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
- Consumer從ConsumerQueue的consumerOffset讀取到當(dāng)前應(yīng)該消費(fèi)的消息在CommitLog中的偏移量,到CommitLog中找到對應(yīng)的消息,消費(fèi)成功后移動(dòng)consumerOffset
刷盤機(jī)制
「異步刷盤」:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當(dāng)磁盤損壞時(shí),會(huì)丟失消息
「同步刷盤」:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量低,但不會(huì)造成消息丟失
主從復(fù)制
如果一個(gè)broker有master和slave時(shí),就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種
- 「同步復(fù)制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會(huì)增加數(shù)據(jù)寫入延遲,降低吞吐量
- 「異步復(fù)制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當(dāng)master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失
消費(fèi)端消息重試
順序消息的重試
對于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列RocketMQ版會(huì)自動(dòng)不斷地進(jìn)行消息重試(每次間隔時(shí)間為1秒),這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生
「順序消息消費(fèi)失敗后不會(huì)消費(fèi)下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過這條消息消費(fèi)后面的消息會(huì)對業(yè)務(wù)邏輯產(chǎn)生影響」
「順序消息暫時(shí)僅支持集群消費(fèi)模式,不支持廣播消費(fèi)模式」
無序消息的重試
對于無序消息(普通、定時(shí)、延時(shí)、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí),您可以通過設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。
「無序消息的重試只針對集群消費(fèi)方式生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息」
「消費(fèi)時(shí)候后,重試的配置方式有如下三種」
- 返回Action.ReconsumeLater(推薦)
- 返回Null
- 拋出異常
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- //消息處理邏輯拋出異常,消息將重試。
- doConsumeMessage(message);
- //方式1:返回Action.ReconsumeLater,消息將重試。
- return Action.ReconsumeLater;
- //方式2:返回null,消息將重試。
- return null;
- //方式3:直接拋出異常,消息將重試。
- throw new RuntimeException("Consumer Message exception");
- }
- }
「消費(fèi)失敗后,無需重試的配置方式」
集群消費(fèi)方式下,消息失敗后期望消息不重試,需要捕獲消費(fèi)邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會(huì)再重試。
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- try {
- doConsumeMessage(message);
- } catch (Throwable e) {
- //捕獲消費(fèi)邏輯中的所有異常,并返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- //消息處理正常,直接返回Action.CommitMessage;
- return Action.CommitMessage;
- }
- }
「消息重試次數(shù)」
「RocketMQ默認(rèn)允許每條消息最多重試16次,每次消費(fèi)失敗發(fā)送一條延時(shí)消息到重試隊(duì)列,同一條消息失敗一次將延時(shí)等級提高一次,然后再放到重試隊(duì)列。重試16次后如果還沒有消費(fèi)成功,則將消息放到死信隊(duì)列中?!?/p>
「注意:重試隊(duì)列和死信隊(duì)列都是按照Consumer Group劃分的」
重試隊(duì)列topic名字:%RETRY% + consumerGroup
死信隊(duì)列topic名字:%DLQ% + consumerGroup
「為什么重試隊(duì)列和死信隊(duì)列要按照Consumer Group來進(jìn)行劃分?」
「因?yàn)樵赗ocketMQ的時(shí)候使用一定要保持訂閱關(guān)系一致。即一個(gè)Consumer Group訂閱的topic和tag要完全一致,不然可能會(huì)導(dǎo)致消費(fèi)邏輯混亂,消息丟失」
如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致
- 相同ConsumerGroup下的Consumer實(shí)例訂閱了不同的Topic。
- 相同ConsumerGroup下的Consumer實(shí)例訂閱了相同的Topic,但訂閱的Tag不一致。
我們可以通過控制臺(tái)查看各種類型的主題
消息每次重試的間隔時(shí)間如下
第幾次重試 與上次重試的間隔時(shí)間 第幾次重試 與上次重試的間隔時(shí)間
第幾次重試 | 與上次重試的間隔時(shí)間 | 第幾次重試 | 與上次重試的間隔時(shí)間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時(shí) |
8 | 6 分鐘 | 16 | 2 小時(shí) |
「前面說到RocketMQ的消息重試是通過往重試隊(duì)列發(fā)送定時(shí)消息來實(shí)現(xiàn)的?!? RocketMQ支持18個(gè)級別的定時(shí)延時(shí),每個(gè)級別定時(shí)消息的延時(shí)時(shí)間如下。
- // MessageStoreConfig.java
- private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
消息重試只是把定時(shí)消息的前2個(gè)級別去掉,每次發(fā)送下一個(gè)級別的定時(shí)消息
我們可以設(shè)置消費(fèi)端消息重試次數(shù)
- 最大重試次數(shù)小于等于16次,則重試時(shí)間間隔同上表描述。
- 最大重試次數(shù)大于16次,超過16次的重試時(shí)間間隔均為每次2小時(shí)。
- Properties properties = new Properties();
- // 配置對應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類型。
- properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
- Consumer consumer =ONSFactory.createConsumer(properties);
「那么重試隊(duì)列中的消息是如何被消費(fèi)的?」
消息消費(fèi)者在啟動(dòng)的時(shí)候,會(huì)訂閱正常的topic和重試隊(duì)列的topic
定時(shí)消息的實(shí)現(xiàn)邏輯也比較簡單,可以歸納為如下幾步
1.發(fā)送延時(shí)消息
1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發(fā)到對應(yīng)的consumeQueue中,則消息會(huì)被立馬消費(fèi))
1.2 將消息原來的topic,queueId放到消息擴(kuò)展屬性中
1.3 將消息應(yīng)該執(zhí)行的時(shí)間放到tagsCode中
將消息順序?qū)懙紺ommitLog中
將消息對應(yīng)的信息分發(fā)到對應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個(gè)queue,對應(yīng)18個(gè)延遲級別)
定時(shí)任務(wù)不斷判斷消息是否到達(dá)投遞時(shí)間,沒有到達(dá)則后續(xù)執(zhí)行投遞
如果到達(dá)投遞時(shí)間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來的(原來的topic,queueId在消息擴(kuò)展屬性中),然后將消息投遞到commitLog中,此時(shí)消息就會(huì)被分發(fā)到對應(yīng)的隊(duì)列中,然后被消費(fèi)。
本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。