RocketMQ如何保證消息的可靠性?
消息的發(fā)送方式有哪幾種?存儲消息的可靠性面臨哪些挑戰(zhàn)?消費消息的確認機制是怎樣的?本文通過分析消息流轉(zhuǎn)的整個過程,從消息發(fā)送、消息存儲和消息消費三個階段介紹RocketMQ是如何保證消息的可靠性的。
分布式系統(tǒng)中一個重要的前提假設(shè)是所有的網(wǎng)絡(luò)傳輸都是不可靠的,在網(wǎng)絡(luò)傳輸不可靠的情況下,保證消息的可靠傳輸,除了進行重試投遞別無他法。常用的絕大多數(shù)消息隊列RocketMQ、RabbitMQ等在消息傳輸上都只能保證至少傳輸成功一次,也即(At least once),而不能保證只傳輸成功一次(Exactly once)。由于分布式系統(tǒng)網(wǎng)絡(luò)的不可靠,可能就會出現(xiàn)消息丟失的現(xiàn)象,那么RocketMQ是如何最大限度的保證消息不丟失的呢?那就需要從消息的產(chǎn)生到最終消費的整個過程來分析,消息完整鏈路可以劃分為以下三個階段:
- 生產(chǎn)階段:消息在 Producer 發(fā)送端創(chuàng)建出來,經(jīng)過網(wǎng)絡(luò)傳輸發(fā)送到 Broker 存儲端。
- 存儲階段:消息在 Broker 端存儲,如果是主備或者多副本,消息會在這個階段被復(fù)制到其他的節(jié)點或者副本上。
- 消費階段:Consumer 消費端從 Broker存儲端拉取消息,經(jīng)過網(wǎng)絡(luò)傳輸發(fā)送到 Consumer 消費端上,并通過重試來最大限度的保證消息的消費。
一 發(fā)送端消息可靠性
發(fā)送端Producer發(fā)送消息Broker端的核心邏輯如下圖所示:
消息發(fā)送一般有以下幾種方式:同步發(fā)送、異步發(fā)送以及單向發(fā)送,業(yè)務(wù)具體選擇哪種方式進行消息發(fā)送,需要根據(jù)情況進行判斷,下面具體介紹不同的發(fā)送方式實現(xiàn)的消息可靠性保證。
1 同步發(fā)送
同步發(fā)送是指發(fā)送端在發(fā)送消息時,阻塞線程進行等待,直到服務(wù)器返回發(fā)送的結(jié)果。發(fā)送端如果需要保證消息的可靠性,防止消息發(fā)送失敗,可以采用同步阻塞式的發(fā)送,然后同步檢查Brocker返回的狀態(tài)來判斷消息是否持久化成功。如果發(fā)送超時或者失敗,則會默認重試2次,RocketMQ選擇至少傳輸成功一次的消息模型,但是有可能發(fā)生重復(fù)投遞,因為網(wǎng)絡(luò)傳輸是不可靠的,具體的重試策略可以參照第四小節(jié)。
2 異步發(fā)送
異步發(fā)送是指發(fā)送端在發(fā)送消息時,傳入回調(diào)接口實現(xiàn)類,調(diào)用該發(fā)送接口后不會阻塞,發(fā)送方法會立即返回,回調(diào)任務(wù)會在另一個線程中執(zhí)行,消息發(fā)送結(jié)果會回傳給相應(yīng)的回調(diào)函數(shù)。具體的業(yè)務(wù)實現(xiàn)可以根據(jù)發(fā)送的結(jié)果信息來判斷是否需要重試來保證消息的可靠性。
3 單向發(fā)送
單向發(fā)送是指發(fā)送端發(fā)送完成之后,調(diào)用該發(fā)送接口后立刻返回,并不返回發(fā)送的結(jié)果,業(yè)務(wù)方無法根據(jù)發(fā)送的狀態(tài)來判斷消息是否發(fā)送成功,單向發(fā)送相對前兩種發(fā)送方式來說是一種不可靠的消息發(fā)送方式,因此要保證消息發(fā)送的可靠性,不推薦采用這種方式來發(fā)送消息。
4 發(fā)送重試策略
RocketMQ架構(gòu)模型中會有多個Borker為某個topic提供服務(wù),一個topic下的消息分散存儲在多個Broker存儲端,它們是多對多關(guān)系。Broker會將其提供存儲服務(wù)的topic的元數(shù)據(jù)信息上報到NameServer,對等NameServer節(jié)點組成的高可用服務(wù)會維護topic與Broker之間的映射關(guān)系,多對多的映射關(guān)系為消息可以重試發(fā)送到多個Broker端提供了前提與基礎(chǔ)。
當(dāng)發(fā)送端需要發(fā)送消息時,如果發(fā)送端中緩存了topic的路由信息,并包含了消息隊列,則直接返回該路由信息,如果沒有緩存或沒有消息隊列,則向NameServer查詢該topic的路由信息,查詢到路由消息之后,采用指定的隊列選擇策略選擇相應(yīng)的queue發(fā)送消息,默認是采用輪詢策略,發(fā)送成功則返回, 收到異常則根據(jù)相應(yīng)的策略進行重試,可以根據(jù)發(fā)送端感知到的Broker的時延、上次發(fā)送失敗的Broker信息和發(fā)送端配置的是否重試不同Broker的參數(shù)以及發(fā)送端設(shè)置的最大超時時間等等策略來靈活地實現(xiàn)不同等級的消息發(fā)送可靠性保證。重試策略可以有效的保證消息發(fā)送成功的概率,最終提高消息發(fā)送的可靠性。
二 存儲端消息可靠性
RocketMQ的消息存儲結(jié)構(gòu)如下圖所示:
- 消息隊列存儲的最小單位是消息Message。
- 同一個Topic下的消息映射成多個邏輯隊列。
- 不同Topic的消息按照到達broker的先后順序以Append的方式添加至CommitLog,順序?qū)?,隨機讀。
目前RocketMQ存儲模型使用本地磁盤進行存儲,數(shù)據(jù)寫入為producer -> direct memory -> pagecache -> 磁盤,數(shù)據(jù)讀取如果pagecache有數(shù)據(jù)則直接從pagecache讀,否則需要先從磁盤加載到pagecache中。Broker存儲節(jié)點的文件存儲模式如下圖所示:
Broker端CommitLog采用順序?qū)?,可以大大提高寫入效率,同時采用不同的刷盤模式提供不同的數(shù)據(jù)可靠性保證,此外采用了ConsumeQueue中間結(jié)構(gòu)來存儲偏移量信息,實現(xiàn)消息的分發(fā)。由于ConsumeQueue結(jié)構(gòu)固定且大小有限,在實際情況中,大部分的ConsumeQueue 能夠被全部讀入內(nèi)存,可以達到內(nèi)存讀取的速度。此外為了保證CommitLog和ConsumeQueue的一致性, CommitLog里存儲了Consume Queues 、Message Key、Tag等所有信息,即使ConsumeQueue丟失,也可以通過 commitLog完全恢復(fù)出來,這樣只要保證commitLog數(shù)據(jù)的可靠性,就可以保證Consume Queue的可靠性。
RocketMQ存儲端采用本地磁盤進行CommitLog消息數(shù)據(jù)的存儲,不可避免的就會帶來存儲可靠性的挑戰(zhàn),如何保證消息不丟失,RocketMQ消息服務(wù)一直在不斷提高數(shù)據(jù)的可靠性。
1 存儲可靠性挑戰(zhàn)
RocketMQ存儲端也即Broker端在存儲消息的時候會面臨以下的存儲可靠性挑戰(zhàn):
- Broker正常關(guān)閉
- Broker異常Crash
- OS Crash
- 機器掉電,但是能立即恢復(fù)供電情況
- 機器無法開機(可能是cpu、主板、內(nèi)存等關(guān)鍵設(shè)備損壞)
- 磁盤設(shè)備損壞
1正常關(guān)閉,Broker 可以正常啟動并恢復(fù)所有數(shù)據(jù)。2、3、4同步刷盤可以保證數(shù)據(jù)不丟失,異步刷盤可能導(dǎo)致少量數(shù)據(jù)丟失。5、6屬于單點故障,且無法恢復(fù)。解決單點故障可以采用增加Slave節(jié)點,主從異步復(fù)制仍然可能有極少量數(shù)據(jù)丟失,同步復(fù)制可以完全避免單點問題。
這里一般來說就需要在性能和可靠性之間做出取舍,對于RocketMQ來說,Broker的可靠性主要由兩個方面保障:
- 單機的刷盤機制
- 主從之間的數(shù)據(jù)復(fù)制
如果設(shè)置為每條消息都強制刷盤、主從復(fù)制,那么性能無疑會降低;如果不這樣設(shè)置,就會有一定的可能性丟失消息。RocketMQ一般都是先把消息寫到PageCache中,然后再持久化到磁盤上,數(shù)據(jù)從pagecache刷新到磁盤有兩種方式,同步和異步。整體的消息寫入和讀取如下圖所示:
針對broker端單機存儲可靠性,主要依賴單機的刷盤策略,主從之間的副本復(fù)制可以參考下一章節(jié)的主從模式。
2 同步刷盤
消息寫入內(nèi)存的 PageCache后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。這種方式可以保證數(shù)據(jù)絕對安全,但是吞吐量不大。
3 異步刷盤(默認)
消息寫入到內(nèi)存的 PageCache中,就立刻給客戶端返回寫操作成功,當(dāng) PageCache中的消息積累到一定的量時,觸發(fā)一次寫操作,或者定時等策略將 PageCache中的消息寫入到磁盤中。這種方式吞吐量大,性能高,但是 PageCache中的數(shù)據(jù)可能丟失,不能保證數(shù)據(jù)絕對的安全。
實際應(yīng)用中要結(jié)合業(yè)務(wù)場景,合理設(shè)置刷盤方式,尤其是同步刷盤的方式,由于頻繁的觸發(fā)磁盤寫動作,會明顯降低性能。
4 過期文件刪除
由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件內(nèi)存映射機制,并且在啟動的時候會將所有的文件加載,為了避免內(nèi)存與磁盤的浪費、能夠讓磁盤能夠循環(huán)利用、避免因為磁盤不足導(dǎo)致消息無法寫入等引入了文件過期刪除機制。最終使得磁盤水位保持在一定水平,最終保證新寫入消息的可靠存儲。
三 消費端消息可靠性
RockerMQ默認提供了至少消費一次的消費語義來保證消息的可靠消費。
通常消費消息的確認機制一般分為兩種思路:
- 先提交后消費
- 先消費,消費成功后再提交
思路1可以解決重復(fù)消費的問題但是會丟失消息,因此RocketMQ默認實現(xiàn)的是思路2,由各自consumer業(yè)務(wù)方保證冪等來解決重復(fù)消費問題。
消費端Consumer消費消息核心邏輯如下圖所示:
1 消費重試
消費者從RocketMQ拉取到消息之后,需要返回消費成功來表示業(yè)務(wù)方正常消費完成。因此只有返回CONSUME_SUCCESS才算消費完成,如果返回CONSUME_LATER則會按照不同的messageDelayLevel時間進行再次消費,時間分級從秒到小時,最長時間為2個小時后再次進行消費重試,如果消費滿16次之后還是未能消費成功,則不再重試,會將消息發(fā)送到死信隊列,從而保證消息存儲的可靠性。
2 死信隊列
未能成功消費的消息,消息隊列并不會立刻將消息丟棄,而是將消息發(fā)送到死信隊列,其名稱是在原隊列名稱前加%DLQ%,如果消息最終進入了死信隊列,則可以通過RocketMQ提供的相關(guān)接口從死信隊列獲取到相應(yīng)的消息,保證了消息消費的可靠性。
3 消息回溯
回溯消費是指Consumer已經(jīng)消費成功的消息,或者之前消費業(yè)務(wù)邏輯有問題,現(xiàn)在需要重新消費。要支持此功能,則Broker存儲端在向Consumer消費端投遞成功消息后,消息仍然需要保留。重新消費一般是按照時間維度,例如由于Consumer系統(tǒng)故障,恢復(fù)后需要重新消費1小時前的數(shù)據(jù)。RocketMQ Broker提供了一種機制,可以按照時間維度來回退消費進度,這樣就可以保證只要發(fā)送成功的消息,只要消息沒有過期,消息始終是可以消費到的。
四 總結(jié)
本文從消息流轉(zhuǎn)的整個過程分析了RocketMQ如何保證消息的可靠性,消息發(fā)送通過不同的重試策略保證了消息的可靠發(fā)送,消息存儲通過不同的刷盤機制以及多副本來保證消息的可靠存儲,消息消費通過至少消費成功一次以及消費重試機制來保證消息的可靠消費,RocketMQ在保證消息的可靠性上做到了全鏈路閉環(huán),最大限度的保證了消息不丟失。