自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

RocketMQ如何保證消息的可靠性投遞?

開發(fā) 架構(gòu)
對于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列RocketMQ版會(huì)自動(dòng)不斷地進(jìn)行消息重試(每次間隔時(shí)間為1秒),這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。

[[396087]]

介紹

要想保證消息的可靠型投遞,無非保證如下3個(gè)階段的正常執(zhí)行即可。

  1. 生產(chǎn)者將消息成功投遞到broker
  2. broker將投遞過程的消息持久化下來
  3. 消費(fèi)者能從broker消費(fèi)到消息

發(fā)送端消息重試

producer向broker發(fā)送消息后,沒有收到broker的ack時(shí),rocketmq會(huì)自動(dòng)重試。重試的次數(shù)可以設(shè)置,默認(rèn)為2次

  1. DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME); 
  2. // 同步發(fā)送設(shè)置重試次數(shù)為5次 
  3. producer.setRetryTimesWhenSendFailed(5); 
  4. // 異步發(fā)送設(shè)置重試次數(shù)為5次 
  5. producer.setRetryTimesWhenSendAsyncFailed(5); 

消息持久化

我們先來了解一下消息的存儲(chǔ)流程,這個(gè)知識對后面分析消費(fèi)端消息重試非常重要。

和消息相關(guān)的文件有如下幾種

  1. CommitLog:存儲(chǔ)消息的元數(shù)據(jù)
  2. ConsumerQueue:存儲(chǔ)消息在CommitLog的索引
  3. IndexFile:可以通過Message Key,時(shí)間區(qū)間快速查找到消息

整個(gè)消息的存儲(chǔ)流程如下

  1. Producer將消息順序?qū)懙紺ommitLog中
  2. 有一個(gè)線程根據(jù)消息的隊(duì)列信息,寫入到相關(guān)的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當(dāng)前消費(fèi)到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
  3. Consumer從ConsumerQueue的consumerOffset讀取到當(dāng)前應(yīng)該消費(fèi)的消息在CommitLog中的偏移量,到CommitLog中找到對應(yīng)的消息,消費(fèi)成功后移動(dòng)consumerOffset

刷盤機(jī)制

「異步刷盤」:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當(dāng)磁盤損壞時(shí),會(huì)丟失消息

「同步刷盤」:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量低,但不會(huì)造成消息丟失

主從復(fù)制

如果一個(gè)broker有master和slave時(shí),就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種

  1. 「同步復(fù)制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會(huì)增加數(shù)據(jù)寫入延遲,降低吞吐量
  2. 「異步復(fù)制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當(dāng)master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失

消費(fèi)端消息重試

順序消息的重試

對于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列RocketMQ版會(huì)自動(dòng)不斷地進(jìn)行消息重試(每次間隔時(shí)間為1秒),這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生

「順序消息消費(fèi)失敗后不會(huì)消費(fèi)下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過這條消息消費(fèi)后面的消息會(huì)對業(yè)務(wù)邏輯產(chǎn)生影響」

「順序消息暫時(shí)僅支持集群消費(fèi)模式,不支持廣播消費(fèi)模式」

無序消息的重試

對于無序消息(普通、定時(shí)、延時(shí)、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí),您可以通過設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。

「無序消息的重試只針對集群消費(fèi)方式生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息」

「消費(fèi)時(shí)候后,重試的配置方式有如下三種」

  1. 返回Action.ReconsumeLater(推薦)
  2. 返回Null
  3. 拋出異常
  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         //消息處理邏輯拋出異常,消息將重試。 
  6.         doConsumeMessage(message); 
  7.         //方式1:返回Action.ReconsumeLater,消息將重試。 
  8.         return Action.ReconsumeLater; 
  9.         //方式2:返回null,消息將重試。 
  10.         return null
  11.         //方式3:直接拋出異常,消息將重試。 
  12.         throw new RuntimeException("Consumer Message exception"); 
  13.     } 

「消費(fèi)失敗后,無需重試的配置方式」

集群消費(fèi)方式下,消息失敗后期望消息不重試,需要捕獲消費(fèi)邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會(huì)再重試。

  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         try { 
  6.             doConsumeMessage(message); 
  7.         } catch (Throwable e) { 
  8.             //捕獲消費(fèi)邏輯中的所有異常,并返回Action.CommitMessage; 
  9.             return Action.CommitMessage; 
  10.         } 
  11.         //消息處理正常,直接返回Action.CommitMessage; 
  12.         return Action.CommitMessage; 
  13.     } 

「消息重試次數(shù)」

「RocketMQ默認(rèn)允許每條消息最多重試16次,每次消費(fèi)失敗發(fā)送一條延時(shí)消息到重試隊(duì)列,同一條消息失敗一次將延時(shí)等級提高一次,然后再放到重試隊(duì)列。重試16次后如果還沒有消費(fèi)成功,則將消息放到死信隊(duì)列中?!?/p>

「注意:重試隊(duì)列和死信隊(duì)列都是按照Consumer Group劃分的」

重試隊(duì)列topic名字:%RETRY% + consumerGroup

死信隊(duì)列topic名字:%DLQ% + consumerGroup

「為什么重試隊(duì)列和死信隊(duì)列要按照Consumer Group來進(jìn)行劃分?」

「因?yàn)樵赗ocketMQ的時(shí)候使用一定要保持訂閱關(guān)系一致。即一個(gè)Consumer Group訂閱的topic和tag要完全一致,不然可能會(huì)導(dǎo)致消費(fèi)邏輯混亂,消息丟失」

如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致

  • 相同ConsumerGroup下的Consumer實(shí)例訂閱了不同的Topic。
  • 相同ConsumerGroup下的Consumer實(shí)例訂閱了相同的Topic,但訂閱的Tag不一致。

我們可以通過控制臺(tái)查看各種類型的主題

消息每次重試的間隔時(shí)間如下

第幾次重試 與上次重試的間隔時(shí)間 第幾次重試 與上次重試的間隔時(shí)間

第幾次重試 與上次重試的間隔時(shí)間 第幾次重試 與上次重試的間隔時(shí)間
1 10 秒 9 7 分鐘
2 30 秒 10 8 分鐘
3 1 分鐘 11 9 分鐘
4 2 分鐘 12 10 分鐘
5 3 分鐘 13 20 分鐘
6 4 分鐘 14 30 分鐘
7 5 分鐘 15 1 小時(shí)
8 6 分鐘 16 2 小時(shí)

「前面說到RocketMQ的消息重試是通過往重試隊(duì)列發(fā)送定時(shí)消息來實(shí)現(xiàn)的?!? RocketMQ支持18個(gè)級別的定時(shí)延時(shí),每個(gè)級別定時(shí)消息的延時(shí)時(shí)間如下。

  1. // MessageStoreConfig.java 
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

消息重試只是把定時(shí)消息的前2個(gè)級別去掉,每次發(fā)送下一個(gè)級別的定時(shí)消息

我們可以設(shè)置消費(fèi)端消息重試次數(shù)

  1. 最大重試次數(shù)小于等于16次,則重試時(shí)間間隔同上表描述。
  2. 最大重試次數(shù)大于16次,超過16次的重試時(shí)間間隔均為每次2小時(shí)。
  1. Properties properties = new Properties(); 
  2. // 配置對應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類型。 
  3. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); 
  4. Consumer consumer =ONSFactory.createConsumer(properties); 

「那么重試隊(duì)列中的消息是如何被消費(fèi)的?」

消息消費(fèi)者在啟動(dòng)的時(shí)候,會(huì)訂閱正常的topic和重試隊(duì)列的topic

定時(shí)消息的實(shí)現(xiàn)邏輯也比較簡單,可以歸納為如下幾步

1.發(fā)送延時(shí)消息

1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發(fā)到對應(yīng)的consumeQueue中,則消息會(huì)被立馬消費(fèi))

1.2 將消息原來的topic,queueId放到消息擴(kuò)展屬性中

1.3 將消息應(yīng)該執(zhí)行的時(shí)間放到tagsCode中

將消息順序?qū)懙紺ommitLog中

將消息對應(yīng)的信息分發(fā)到對應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個(gè)queue,對應(yīng)18個(gè)延遲級別)

定時(shí)任務(wù)不斷判斷消息是否到達(dá)投遞時(shí)間,沒有到達(dá)則后續(xù)執(zhí)行投遞

如果到達(dá)投遞時(shí)間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來的(原來的topic,queueId在消息擴(kuò)展屬性中),然后將消息投遞到commitLog中,此時(shí)消息就會(huì)被分發(fā)到對應(yīng)的隊(duì)列中,然后被消費(fèi)。

本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。

 

責(zé)任編輯:武曉燕 來源: Java識堂
相關(guān)推薦

2021-02-02 11:01:31

RocketMQ消息分布式

2024-05-09 08:04:23

RabbitMQ消息可靠性

2020-10-14 08:36:10

RabbitMQ消息

2023-03-06 08:16:04

SpringRabbitMQ

2023-10-17 16:30:00

TCP

2017-08-21 08:51:22

CAN網(wǎng)絡(luò)通訊

2010-12-28 19:50:21

可靠性產(chǎn)品可靠性

2024-02-28 10:26:04

物聯(lián)網(wǎng)數(shù)據(jù)存儲(chǔ)

2018-09-27 14:13:27

云服務(wù)可靠故障

2024-07-04 12:36:50

2019-07-26 08:00:00

微服務(wù)架構(gòu)

2021-03-04 06:49:53

RocketMQ事務(wù)

2011-06-20 14:21:01

模塊化數(shù)據(jù)中心IT基礎(chǔ)設(shè)施

2022-03-07 08:13:06

MQ消息可靠性異步通訊

2009-12-17 16:20:20

城域網(wǎng)路由器

2024-08-06 09:55:25

2010-07-28 18:58:54

東海證券負(fù)載均衡Array Netwo

2019-08-30 12:10:05

磁盤數(shù)據(jù)可靠性RAID

2010-12-28 19:55:20

軟件架構(gòu)可靠性

2020-12-06 14:51:23

物聯(lián)網(wǎng)可靠性IOT
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號