三分鐘白話RocketMQ系列—— 如何保證消息不丟失
回顧上一篇核心概念,我們知道RocketMQ的消息模型分為 生產(chǎn)、存儲(chǔ)(消息堆積)、消費(fèi) 三大部分。
消息模型三大部分
因此,如何保證消息不丟失,也是從這三個(gè)環(huán)節(jié)來(lái)考慮。
關(guān)鍵字摘要
- 生產(chǎn)、存儲(chǔ)(消息堆積)、消費(fèi) 三個(gè)環(huán)節(jié)保證消息不丟失
- 生產(chǎn)環(huán)節(jié):消息類型,消息確認(rèn)機(jī)制、失敗重試機(jī)制
- 存儲(chǔ)環(huán)節(jié):同步/異步刷盤、同步/異步復(fù)制slave
- 消費(fèi)環(huán)節(jié):消息確認(rèn)機(jī)制(至少消費(fèi)成功一次)、失敗重試機(jī)制、死信隊(duì)列機(jī)制
Q1: 如何保證「消息生產(chǎn)」不丟失?
先想想什么情況下,消息生產(chǎn)會(huì)丟失消息呢?
生產(chǎn)者將發(fā)送消息時(shí),如果出現(xiàn)了網(wǎng)絡(luò)抖動(dòng)或者通信異常等問(wèn)題,消息就有可能會(huì)丟失。
那怎么解決這個(gè)問(wèn)題?
其實(shí)思路是比較直接的,就是 「消息確認(rèn)機(jī)制」和「失敗重試機(jī)制」。
消息發(fā)送成功返回確認(rèn)消息,那就能確保消息不丟失。如果發(fā)送失敗了,mq-client就嘗試自動(dòng)重試,避免網(wǎng)絡(luò)抖動(dòng)導(dǎo)致發(fā)送丟失。
如果超過(guò)一定超時(shí)時(shí)間還是失敗,那就拋出異常,由開(kāi)發(fā)者自己在應(yīng)用層面進(jìn)行處理,手動(dòng)重試發(fā)送 或者 記錄失敗消息后續(xù)補(bǔ)償。
不過(guò)我們需要特別注意是,RocketMQ支持多種「消息類型」,但是并不是對(duì)所有「消息類型」 都會(huì)有「消息確認(rèn)機(jī)制」和「失敗重試機(jī)制」。
RocketMQ生產(chǎn)消息時(shí),支持多種「消息類型」和「消息發(fā)送模式」。咱們白話為主,就不展開(kāi)源碼了,有興趣同學(xué)可以參考o(jì)rg.apache.rocketmq.client.producer.MQProducer這個(gè)接口定義即可。
消息類型:
- 普通消息:發(fā)送普通消息,異常時(shí)默認(rèn)重試。
- 普通有序消息:發(fā)送普通有序消息,通過(guò)指定「消息篩選器selector」,動(dòng)態(tài)決定發(fā)送哪個(gè)隊(duì)列。異常默認(rèn)不重試,可以用戶自己重試,并發(fā)送到其他隊(duì)列。
- 嚴(yán)格有序消息:發(fā)送嚴(yán)格有序消息,通過(guò)指定隊(duì)列,保證嚴(yán)格有序,異常默認(rèn)不重試。
消息發(fā)送模式:
- 同步:調(diào)用發(fā)送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFailed重試次數(shù)。
- 異步:調(diào)用發(fā)送消息方法后,立即返回,發(fā)送結(jié)果會(huì)通過(guò)開(kāi)發(fā)者自己注冊(cè)的回調(diào)函數(shù)SendCallback進(jìn)行處理。配置retryTimesWhenSendAsyncFailed重試次數(shù)。
- 單向發(fā)送:這種方法完全不關(guān)心發(fā)送后的返回結(jié)果。顯然,它具有最大吞吐量,但也存在消息丟失的潛在風(fēng)險(xiǎn)。
消息類型 和 消息發(fā)送模式 是 N * M 的關(guān)系,所以聰明的你一定已經(jīng)想到了,存在9種不同組合,RocketMQ也是定義了9種不同接口方法。
這9種方法里面,涉及到「單向發(fā)送」模式的3種方法,都是不可靠的,存在丟失消息的風(fēng)險(xiǎn)。其他發(fā)送消息的模式和消息類型,可以通過(guò) 消息確認(rèn)、mq-client自動(dòng)「失敗重試機(jī)制」、業(yè)務(wù)自定義重試 等方式,確保消息發(fā)送不丟失。
注意,org.apache.rocketmq.client.producer.MQProducer還定義了「事務(wù)消息」的發(fā)送模式,是屬于分布式事務(wù)范疇了,跟我們這里討論的消息不丟失不太一樣,就不展開(kāi)討論了。后面單獨(dú)寫(xiě)一篇針對(duì)「事務(wù)消息」的分析。
Q2: 如何保證「消息存儲(chǔ)」不丟失?
先想想什么情況下,消息存儲(chǔ)會(huì)丟失呢?
場(chǎng)景1,消息保存到內(nèi)存中,還沒(méi)來(lái)得及刷盤到磁盤,機(jī)器宕機(jī)或者重啟,導(dǎo)致內(nèi)存中消息丟失。場(chǎng)景2,為了提高可用性,Broker通常采用一主多從的部署方式,為了確保消息不丟失,消息需要被復(fù)制到從節(jié)點(diǎn)。當(dāng)消息發(fā)送到master但是還沒(méi)同步到slave broker時(shí),master broker磁盤損壞,導(dǎo)致消息數(shù)據(jù)丟失?;蛘適aster宕機(jī),consumer切換到slave消費(fèi)數(shù)據(jù),消息丟失。
針對(duì)場(chǎng)景1,默認(rèn)情況下,消息在到達(dá) Broker 端后會(huì)首先被保存在內(nèi)存中,并立即向生產(chǎn)者返回確認(rèn)響應(yīng)。隨后,Broker 會(huì)定期批量將一組消息異步刷入磁盤。這種方式減少了 I/O 操作次數(shù),提高了性能。
然而,如果發(fā)生機(jī)器掉電、異常宕機(jī)等情況,未及時(shí)將消息刷入磁盤,就可能導(dǎo)致消息丟失的情況。
如果要確保 Broker 端不丟失消息并保證消息的可靠性,我們需要修改消息保存機(jī)制為同步刷盤方式,即只有當(dāng)消息成功存儲(chǔ)到磁盤后才返回響應(yīng)??梢酝ㄟ^(guò)flushDiskType = SYNC_FLUSH 參數(shù)進(jìn)行控制。
針對(duì)場(chǎng)景2,在默認(rèn)方式下,當(dāng)消息成功寫(xiě)入主節(jié)點(diǎn)時(shí),就會(huì)返回確認(rèn)響應(yīng)給生產(chǎn)者,并異步將消息復(fù)制到從節(jié)點(diǎn)。然而,如果主節(jié)點(diǎn)突然宕機(jī)且無(wú)法恢復(fù),尚未復(fù)制到從節(jié)點(diǎn)的消息將會(huì)丟失。
為了進(jìn)一步提高消息的可靠性,我們可以采用同步復(fù)制方式。主節(jié)點(diǎn)將會(huì)同步等待從節(jié)點(diǎn)完成復(fù)制,然后才返回確認(rèn)響應(yīng)。這樣可以確保消息的可靠性??梢酝ㄟ^(guò)brokerRole=SYNC_MASTER參數(shù)進(jìn)行控制。
注意,同步刷盤 和 同步復(fù)制 雖然能夠保證消息不丟失,但是會(huì)嚴(yán)重降低性能,生產(chǎn)實(shí)踐中需要根據(jù)實(shí)際情況綜合評(píng)估。
Q3: 如何保證「消息消費(fèi)」不丟失?
先想想什么情況下,消息存儲(chǔ)會(huì)丟失呢?
因?yàn)楦鞣N原因消費(fèi)失敗,但是還是提交了消費(fèi)位點(diǎn),這條消息從業(yè)務(wù)角度來(lái)說(shuō)就“丟失”了。
那怎么解決這個(gè)問(wèn)題?
跟消息生產(chǎn)一樣,其實(shí)思路是比較直接的,就是 「消息確認(rèn)機(jī)制」和「失敗重試機(jī)制」。
消費(fèi)者從RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"來(lái)表示業(yè)務(wù)方已經(jīng)正常完成消費(fèi)。只有返回"CONSUME_SUCCESS"才算作消費(fèi)完成。這就是消費(fèi)時(shí)的「消息確認(rèn)機(jī)制」。
如果返回"CONSUME_LATER",則會(huì)按照不同的消息延遲級(jí)別進(jìn)行再次消費(fèi),延遲級(jí)別從秒到小時(shí)不等,最長(zhǎng)延遲時(shí)間為2個(gè)小時(shí)后再次嘗試消費(fèi)。這就是消費(fèi)時(shí)的「失敗重試機(jī)制」。
重試消息會(huì)被存入名為 "%RETRY%+消費(fèi)組名稱" 的Topic中,原始主題Topic會(huì)存入屬性中。然后會(huì)基于定時(shí)任務(wù)機(jī)制,在到期時(shí)將任務(wù)再次拉取出來(lái)。
另外,RocketMQ跟kafka不同的是,天然支持了 「死信隊(duì)列機(jī)制」。
如果在嘗試消費(fèi)的過(guò)程中達(dá)到了最大重試次數(shù)(通常為16次),仍然無(wú)法成功消費(fèi),則消息將被發(fā)送到死信隊(duì)列,以確保消息存儲(chǔ)的可靠性。后續(xù)業(yè)務(wù)可以根據(jù)死信隊(duì)列,來(lái)做相關(guān)補(bǔ)償措施。
關(guān)鍵字總結(jié)
- 生產(chǎn)、存儲(chǔ)(消息堆積)、消費(fèi) 三個(gè)環(huán)節(jié)保證不丟失
- 生產(chǎn)環(huán)節(jié):消息類型,消息確認(rèn)機(jī)制、失敗重試機(jī)制
- 存儲(chǔ)環(huán)節(jié):同步/異步刷盤、同步/異步復(fù)制slave
- 消費(fèi)環(huán)節(jié):消息確認(rèn)機(jī)制(至少消費(fèi)成功一次)、失敗重試機(jī)制、死信隊(duì)列機(jī)制
3分鐘到了嗎?應(yīng)該對(duì)RocketMQ如何生產(chǎn)消息有全面了解了吧。如果還想了解更多,歡迎關(guān)注下一期內(nèi)容。