RocketMQ消息重試機(jī)制解析!
由于網(wǎng)絡(luò)抖動、服務(wù)宕機(jī)等一些不確定的因素,RocketMQ在發(fā)送消息的時候很有可能出現(xiàn)消息發(fā)送或者消費(fèi)失敗的問題。
所以RocketMQ消息重試分為2種:
?Producer端重試和Consumer端重試。
Producer端重試
?生產(chǎn)者端的消息失敗,也就是Producer往MQ上發(fā)消息沒有發(fā)送成功。
- 比如網(wǎng)絡(luò)抖動致使生產(chǎn)者發(fā)送消息到MQ失敗。
這種消息失敗重試可以手動設(shè)置發(fā)送失敗重試的次數(shù)。
producer.setRetryTimesWhenSendFailed(3);
官方說明
?Producer的send方法本身支持內(nèi)部重試。
重試邏輯:
- 默認(rèn)至多重試2次。
- 這個方法的總耗時時間不超過sendMsgTimeout設(shè)置的值,默認(rèn)10s。
如果本身向Broker發(fā)送消息產(chǎn)生超時異常,就不會再重試。
- 以上策略也是在一定程度上保證了消息可以發(fā)送成功。
如果業(yè)務(wù)對消息可靠性要求比較高,建議增加相應(yīng)的重試邏輯:
- 比如調(diào)用send同步方法發(fā)送失敗時,則嘗試將消息存儲到DB。
- 然后由后臺線程定時重試,確保消息一定到達(dá)Broker。
重試策略
消息發(fā)送重試有三種策略:
?同步發(fā)送失敗策略、異步發(fā)送失敗策略和消息刷盤失敗策略。
同步發(fā)送失敗策略:
?普通消息,消息發(fā)送默認(rèn)采用round-robin策略(輪轉(zhuǎn))來選擇所發(fā)送到的隊(duì)列。
- 如果發(fā)送失敗,默認(rèn)重試2次。
但在重試時是不會選擇上次發(fā)送失敗的Broker,而是選擇其它Broker。
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 設(shè)置同步發(fā)送失敗時重試發(fā)送的次數(shù),默認(rèn)為2次
producer.setRetryTimesWhenSendFailed(3);
// 設(shè)置發(fā)送超時時限為5s,默認(rèn)10s
producer.setSendMsgTimeout(5000);
異步發(fā)送失敗策略:
?異步發(fā)送失敗重試時,異步重試不會選擇其他Broker,僅在當(dāng)前Broker上做重試。
- 所以該策略無法保證消息不丟失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定異步發(fā)送失敗后不進(jìn)行重試發(fā)送
producer.setRetryTimesWhenSendAsyncFailed(0);
消息刷盤失敗策略:
?消息刷盤超時,默認(rèn)是不會將消息嘗試發(fā)送到其他Broker。
對于重要消息可以通過在Broker的配置文件設(shè)置retryAnotherBrokerWhenNotStoreOK屬性為true來開啟。
幾種情況
異步發(fā)送在發(fā)送過程中出現(xiàn)異常進(jìn)行重試:
?在解析請求結(jié)果時,發(fā)現(xiàn)響應(yīng)狀態(tài)碼有其它異常(消息可能未正確被Broker處理)會繼續(xù)進(jìn)行重試。
- 重試依然選擇當(dāng)前Broker。
但是如果響應(yīng)結(jié)果不為空的話,即使處理響應(yīng)時發(fā)生異常也不會進(jìn)行重試。
同步發(fā)送時:
?如果發(fā)送過程中沒有異常,但是發(fā)送結(jié)果不OK,也會選擇另一個Broker繼續(xù)進(jìn)行重試。
順序消息發(fā)送失敗不進(jìn)行重試:
?順序消息:指的是同步+指定消息隊(duì)列的方式發(fā)送。
Consumer端重試
消息正常的到了消費(fèi)者,結(jié)果消費(fèi)者發(fā)生異常,處理失敗了。
?例如反序列化失敗,消息數(shù)據(jù)本身無法處理等。
順序消息
順序消息的消費(fèi)重試
?順序消息,當(dāng)Consumer消費(fèi)消息失敗后,為了保證消息的順序性,其會自動不斷地進(jìn)行消息重試,直到消費(fèi)成功。
- 消費(fèi)重試默認(rèn)間隔時間為1000ms。
重試期間應(yīng)用會出現(xiàn)消息消費(fèi)被阻塞的情況。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費(fèi)失敗的消費(fèi)重試時間間隔,單位毫秒,默認(rèn)為1000,其取值范圍為[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);
由于對順序消息的重試是無休止的,不間斷的,直至消費(fèi)成功。
- 所以,對于順序消息的消費(fèi),務(wù)必要保證應(yīng)用能夠及時監(jiān)控并處理消費(fèi)失敗的情況,避免消費(fèi)被永久性阻塞。
?注意:順序消息沒有發(fā)送失敗重試機(jī)制,但具有消費(fèi)失敗重試機(jī)制。
消費(fèi)狀態(tài)
?順序消費(fèi)目前兩個狀態(tài):SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。
SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費(fèi)一下:
- 過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊(duì)列里。
public enum ConsumeOrderlyStatus {
SUCCESS,
@Deprecated
ROLLBACK,
@Deprecated
COMMIT,
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
并發(fā)消息
并發(fā)消息的消費(fèi)重試
?在并發(fā)消費(fèi)中,可能會有多個線程同時消費(fèi)一個隊(duì)列的消息。
因此即使發(fā)送端通過發(fā)送順序消息保證消息在同一個隊(duì)列中按照FIFO的順序,也無法保證消息實(shí)際被順序消費(fèi)。
- 所有并發(fā)消費(fèi)也可以稱之為無序消費(fèi)。
對于無序消息(普通消息、延時消息、事務(wù)消息):
- 當(dāng)Consumer消費(fèi)消息失敗時,可以通過設(shè)置返回狀態(tài)達(dá)到消息重試的效果。
注意:
?無序消息的重試只針對集群消費(fèi)模式生效。
廣播消費(fèi)模式不提供失敗重試特性:即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息。
消費(fèi)狀態(tài)
Consumer端消息消費(fèi)有兩種狀態(tài):
?一個是成功(CONSUME_SUCCESS),一個是失敗&稍后重試(RECONSUME_LATER)。
Consumer為了保證消息消費(fèi)成功,只有使用方明確表示消費(fèi)成功。
- 返回CONSUME_SUCCESS,RocketMQ才會認(rèn)為消息消費(fèi)成功。
若是消息消費(fèi)失敗,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER。
- RocketMQ就會認(rèn)為消息消費(fèi)失敗了,要重新投遞。
public enum ConsumeConcurrentlyStatus {
CONSUME_SUCCESS,
RECONSUME_LATER;
}
重試機(jī)制
?為了保證消息是確定被至少消費(fèi)成功一次,RocketMQ會把這批消息重發(fā)回Broker。
- Topic不是原Topic而是一個RETRY Topic。
在延遲的某個時間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后,再次投遞。
?而若是一直這樣重復(fù)消費(fèi)都持續(xù)失敗到必定次數(shù)(默認(rèn)16次),就會投遞到死信隊(duì)列。
在啟動Broker的過程當(dāng)中,能夠觀察到以下輸出:
2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RECONSUME_LATER策略:
?若是消費(fèi)失敗,那么1S后再次消費(fèi),若是失敗,那么5S后,再次消費(fèi),…… 直至2H后若是消費(fèi)還失敗。
- 那么該條消息就會終止發(fā)送給消費(fèi)者了。
消息重試間隔時間如下:
重試次數(shù) | 與上次重試的間隔時間 | 重試次數(shù) | 與上次重試的間隔時間 |
1 | 10秒 | 9 | 7分鐘 |
2 | 30秒 | 10 | 8分鐘 |
3 | 1分鐘 | 11 | 9分鐘 |
4 | 2分鐘 | 12 | 10分鐘 |
5 | 3分鐘 | 13 | 20分鐘 |
6 | 4分鐘 | 14 | 30分鐘 |
7 | 5分鐘 | 15 | 1小時 |
8 | 6分鐘 | 16 | 2小時 |
?某條消息在一直消費(fèi)失敗的前提下,將會在接下來的4小時46分鐘之內(nèi)進(jìn)行16次重試。
- 超過這個時間范圍消息將不再重試投遞,而被投遞至死信隊(duì)列。
修改消費(fèi)重試次數(shù):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消費(fèi)重試次數(shù)
consumer.setMaxReconsumeTimes(10);
基本原理
?重試的 Message,RocketMQ 的做法并不是將其投遞回原 Topic重試隊(duì)列。
每個 ConsumerGroup 都有自己的重試隊(duì)列:
- 其名稱是由特定的前綴拼接上 ConsumerGroup 所組成,默認(rèn) %RETRY%+消費(fèi)者組名稱。
- 所以在 Consumer 啟動時,就會同時消費(fèi)其 ConsumerGroup 對應(yīng)的重試隊(duì)列和普通隊(duì)列。
消費(fèi)失敗的 Message,Consumer 會將其投回 Broker:
- 相當(dāng)于這條 Message 已經(jīng)被消費(fèi)掉了,之后重試的只是內(nèi)容相同、但實(shí)際不是同一條的 Message。
- 然后會校驗(yàn)重試的次數(shù),如果達(dá)到16次則會進(jìn)入死信隊(duì)列 ,組成為 %DLQ%+消費(fèi)者組名稱。
- 未達(dá)到最大重試次數(shù),則會根據(jù)重試間隔時間等級將其投遞到延遲隊(duì)列SCHEDULE_TOPIC_XXXX中。
- 然后等到了延遲等級對應(yīng)的時間之后,再投遞到 ConsumerGroup 所對應(yīng)的重試隊(duì)列當(dāng)中,供后續(xù)消費(fèi)。
消息重復(fù)
如果消費(fèi)端收到兩條一樣的消息,應(yīng)該怎樣處理?
《RocketMQ 原理簡介》中講到:
?RocketMQ 無法避免消息重復(fù)。
所以如果業(yè)務(wù)對消費(fèi)重復(fù)非常敏感,務(wù)必要在業(yè)務(wù)側(cè)去重,有以下幾種去重方式:
?
消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性。
- 如何保證冪等性,可以看我之前的文章!
保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現(xiàn)。
- 利用一張日志表來記錄已經(jīng)處理成功的消息的ID。
- 如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息。