自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

我們在順序消息和事務(wù)消息方面的實(shí)踐

開發(fā)
最近團(tuán)隊(duì)內(nèi)部在RocketMQ的業(yè)務(wù)實(shí)踐上有一些心得,想給大家分享一下,首先轉(zhuǎn)轉(zhuǎn)這邊是有架構(gòu)團(tuán)隊(duì)自研的ZZMQ的,所以我們自然而然的用的ZZMQ,考慮到受眾人群,開篇會(huì)先講開源版本的一些基礎(chǔ)知識(shí),然后從順序消息和事務(wù)消息2個(gè)熾手可熱的話題上逐漸轉(zhuǎn)入到與ZZMQ的比較,希望可以幫助到大家繞過"坑"。

第一部分: 基本介紹

1. 領(lǐng)域模型概述

圖片

1.1 消息生產(chǎn)

生產(chǎn)者(Producer):

     Apache RocketMQ 中用于產(chǎn)生消息的運(yùn)行實(shí)體,一般集成于業(yè)務(wù)調(diào)用鏈路的上游。生產(chǎn)者是輕量級(jí)匿名無身份的。

1.2 消息存儲(chǔ)

  • 主題(Topic):

Apache RocketMQ 消息傳輸和存儲(chǔ)的分組容器,主題內(nèi)部由多個(gè)隊(duì)列組成,消息的存儲(chǔ)和水平擴(kuò)展實(shí)際是通過主題內(nèi)的隊(duì)列實(shí)現(xiàn)的。

  • 隊(duì)列(MessageQueue):

      Apache RocketMQ 消息傳輸和存儲(chǔ)的實(shí)際單元容器,類比于其他消息隊(duì)列中的分區(qū)。Apache RocketMQ 通過流式特性的無限隊(duì)列結(jié)構(gòu)來存儲(chǔ)消息,消息在隊(duì)列內(nèi)具備順序性存儲(chǔ)特征。

  • 消息(Message):

     Apache RocketMQ 的最小傳輸單元。消息具備不可變性,在初始化發(fā)送和完成存儲(chǔ)后即不可變。

1.3 消息消費(fèi)

  • 消費(fèi)者分組(ConsumerGroup):

Apache RocketMQ 發(fā)布訂閱模型中定義的獨(dú)立的消費(fèi)身份分組,用于統(tǒng)一管理底層運(yùn)行的多個(gè)消費(fèi)者(Consumer)。同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者必須保持消費(fèi)邏輯和配置一致,共同分擔(dān)該消費(fèi)組訂閱的消息,實(shí)現(xiàn)消費(fèi)能力的水平擴(kuò)展。

  • 消費(fèi)者(Consumer):

    Apache RocketMQ 消費(fèi)消息的運(yùn)行實(shí)體,一般集成在業(yè)務(wù)調(diào)用鏈路的下游。消費(fèi)者必須被指定到某一個(gè)消費(fèi)組中。

  • 訂閱關(guān)系(Subscription):

    Apache RocketMQ 發(fā)布訂閱模型中消息過濾、重試、消費(fèi)進(jìn)度的規(guī)則配置。訂閱關(guān)系以消費(fèi)組粒度進(jìn)行管理,消費(fèi)組通過定義訂閱關(guān)系控制指定消費(fèi)組下的消費(fèi)者如何實(shí)現(xiàn)消息過濾、消費(fèi)重試及消費(fèi)進(jìn)度恢復(fù)等。

    Apache RocketMQ 的訂閱關(guān)系除過濾表達(dá)式之外都是持久化的,即服務(wù)端重啟或請(qǐng)求斷開,訂閱關(guān)系依然保留。

2. 消息傳輸模型介紹

主流的消息中間件的傳輸模型主要為點(diǎn)對(duì)點(diǎn)模型和發(fā)布訂閱模型。

點(diǎn)對(duì)點(diǎn)模型

圖片

點(diǎn)對(duì)點(diǎn)模型也叫隊(duì)列模型,具有如下特點(diǎn):

  • 消費(fèi)匿名:消息上下游溝通的唯一的身份就是隊(duì)列,下游消費(fèi)者從隊(duì)列獲取消息無法申明獨(dú)立身份。
  • 一對(duì)一通信:基于消費(fèi)匿名特點(diǎn),下游消費(fèi)者即使有多個(gè),但都沒有自己獨(dú)立的身份,因此共享隊(duì)列中的消息,每一條消息都只會(huì)被唯一一個(gè)消費(fèi)者處理。因此點(diǎn)對(duì)點(diǎn)模型只能實(shí)現(xiàn)一對(duì)一通信。

發(fā)布訂閱模型

圖片

發(fā)布訂閱模型具有如下特點(diǎn):

  • 消費(fèi)獨(dú)立:相比隊(duì)列模型的匿名消費(fèi)方式,發(fā)布訂閱模型中消費(fèi)方都會(huì)具備的身份,一般叫做訂閱組(訂閱關(guān)系),不同訂閱組之間相互獨(dú)立不會(huì)相互影響。
  • 一對(duì)多通信:基于獨(dú)立身份的設(shè)計(jì),同一個(gè)主題內(nèi)的消息可以被多個(gè)訂閱組處理,每個(gè)訂閱組都可以拿到全量消息。因此發(fā)布訂閱模型可以實(shí)現(xiàn)一對(duì)多通信。

傳輸模型對(duì)比

點(diǎn)對(duì)點(diǎn)模型和發(fā)布訂閱模型各有優(yōu)勢,點(diǎn)對(duì)點(diǎn)模型更為簡單,而發(fā)布訂閱模型的擴(kuò)展性更高。Apache RocketMQ 使用的傳輸模型為發(fā)布訂閱模型,因此也具有發(fā)布訂閱模型的特點(diǎn)。

注:以上信息來源于官網(wǎng)

3. 普通消息的可靠性

普通消息一般應(yīng)用于微服務(wù)解耦、事件驅(qū)動(dòng)、數(shù)據(jù)集成等場景,這些場景大多數(shù)要求數(shù)據(jù)傳輸通道具有可靠傳輸?shù)哪芰?,且?duì)消息的處理時(shí)機(jī)、處理順序沒有特別要求。

3.1 發(fā)送端怎么保證可靠性

3.1.1 ACK機(jī)制

圖片

3.2 存儲(chǔ)端怎么保證消息可靠性

RocketMQ存儲(chǔ)端也即Broker端在存儲(chǔ)消息的時(shí)候會(huì)面臨以下的存儲(chǔ)可靠性挑戰(zhàn):

  1. Broker正常關(guān)閉
  2. Broker異常Crash
  3. OS Crash
  4. 機(jī)器掉電,但是能立即恢復(fù)供電情況
  5. 機(jī)器無法開機(jī)(可能是cpu、主板、內(nèi)存等關(guān)鍵設(shè)備損壞)
  6. 磁盤設(shè)備損壞

1正常關(guān)閉,Broker可以正常啟動(dòng)并恢復(fù)所有數(shù)據(jù)。2、3、4同步刷盤可以保證數(shù)據(jù)不丟失,異步刷盤可能導(dǎo)致少量數(shù)據(jù)丟失。5、6屬于單點(diǎn)故障,且無法恢復(fù)。解決單點(diǎn)故障可以采用增加Slave節(jié)點(diǎn),主從異步復(fù)制仍然可能有極少量數(shù)據(jù)丟失,同步復(fù)制可以完全避免單點(diǎn)問題。

這里一般來說就需要在性能和可靠性之間做出取舍,對(duì)于RocketMQ來說,Broker的可靠性主要由兩個(gè)方面保障:

  • 單機(jī)的刷盤機(jī)制
  • 主從同步

圖片


3.2.1 單機(jī)的刷盤機(jī)制

頁緩存:操作系統(tǒng)中用于存儲(chǔ)文件系統(tǒng)緩存的內(nèi)存區(qū)域。RocketMQ通過將消息首先寫入頁緩存,實(shí)現(xiàn)了消息在內(nèi)存中的持久化。

CommitLog:是RocketMQ中消息的物理存儲(chǔ)結(jié)構(gòu),包含了所有已發(fā)送的消息。CommitLog的持久化保證了即使在異常情況下,如Broker宕機(jī),消息也能夠被恢復(fù)。同步刷盤:是指將內(nèi)存中的數(shù)據(jù)同步刷寫到磁盤。RocketMQ確保消息在被發(fā)送后,首先在內(nèi)存中得到持久化,然后再刷寫到磁盤,從而防止數(shù)據(jù)的丟失。

異步刷盤:消息寫入到頁緩存中,就立刻給客戶端返回寫操作成功,當(dāng)頁緩存中的消息積累到一定的量時(shí),觸發(fā)一次寫操作,或者定時(shí)等策略將頁緩存中的消息寫入到磁盤中。這種方式吞吐量大,性能高,但是頁緩存中的數(shù)據(jù)可能丟失,不能保證數(shù)據(jù)絕對(duì)的安全。

實(shí)際應(yīng)用中要結(jié)合業(yè)務(wù)場景,合理設(shè)置刷盤方式,尤其是同步刷盤的方式,由于頻繁的觸發(fā)磁盤寫動(dòng)作,會(huì)明顯降低性能。

3.2.2 主從同步

主 Broker:負(fù)責(zé)消息的讀寫和寫入 CommitLog。從 Broker:用于備份主 Broker 的消息,確保在主 Broker 故障時(shí)可以順利切換。同步復(fù)制:主節(jié)點(diǎn)將消息同步復(fù)制到所有從節(jié)點(diǎn),確保從節(jié)點(diǎn)具有相同的消息副本。切換:在主節(jié)點(diǎn)發(fā)生故障時(shí),從節(jié)點(diǎn)可以快速切換為新的主節(jié)點(diǎn),確保消息服務(wù)的持續(xù)性。

3.3 消費(fèi)端怎么保證可靠性

3.3.1 ACK

Rocket Mq是通過offset來標(biāo)記一個(gè)消費(fèi)者組在隊(duì)列上的消費(fèi)進(jìn)度,消費(fèi)成功之后都會(huì)返回一個(gè)ACK消息告訴broker去更新offset,但是RocketMQ并不是每消費(fèi)一條消息就做一次ACK,而是消費(fèi)完批量消息后只做一次ACK。

所以ACK機(jī)制是為了準(zhǔn)確的告知Broker批量消費(fèi)成功的信息并且更新消費(fèi)進(jìn)度。

那批量消費(fèi)時(shí)具體是如何更新消費(fèi)進(jìn)度?

  • 每一條消息消費(fèi)成功后,會(huì)按照當(dāng)前消息最小的offset來更新本地的消費(fèi)進(jìn)度
  • 由5秒的定時(shí)任務(wù)將offset提交到Broker圖片

優(yōu)點(diǎn):防止消息丟失。

缺點(diǎn):會(huì)造成消息重復(fù)消費(fèi)(使用方需要做冪等。

3.3.2 重試

消費(fèi)者從RocketMQ拉取到消息之后,需要返回消費(fèi)成功來表示業(yè)務(wù)方正常消費(fèi)完成。因此只有返回CONSUME_SUCCESS才算消費(fèi)完成,如果返回 CONSUME_LATER 則會(huì)按照【重試次數(shù)】進(jìn)行再次消費(fèi),【重試間隔為階梯時(shí)間】。如果消費(fèi)滿16次之后還是未能消費(fèi)成功,則不再重試,會(huì)將消息發(fā)送到死信隊(duì)列,從而保證【消息消費(fèi)】的可靠性。

3.3.3 死信消息

默認(rèn)最多重試16次,總時(shí)長4小時(shí)46分鐘。

未能成功消費(fèi)的消息,消息隊(duì)列并不會(huì)立刻將消息丟棄,而是將消息發(fā)送到死信隊(duì)列,其名稱是在【消費(fèi)組】前加 %DLQ% 的【特殊 topic】,如果消息最終進(jìn)入了死信隊(duì)列,則可以通過RocketMQ提供的相關(guān)接口從死信隊(duì)列獲取到相應(yīng)的消息,進(jìn)行報(bào)警人工干預(yù)或其他手段,保證了消費(fèi)組的至少一次消費(fèi)。

小節(jié)

至此應(yīng)該清晰的知道RocketMq為了保證可靠性做了哪些工作。接下來我們再把視角切換到今天的第一個(gè)核心問題順序消息

第二部分: 順序消息

1. 我們遇到的線上問題

客服同學(xué): @XXX 我們判責(zé)了,為啥客戶還收不到退款? 售后單號(hào)xxxxxx。

 研發(fā)同學(xué): 讓我看看。

…..

30分鐘后

研發(fā)同學(xué): 修復(fù)了,再看一下。

測試同學(xué): 提個(gè)online, 研發(fā)填一下問題原因,責(zé)任歸屬。

研發(fā)同學(xué):問題原因: 歷史代碼,我們沒有順序消費(fèi)消息,正常的流程是 先判責(zé)完成,再打款. 這一單  先消費(fèi)了打款消息, 還沒有消費(fèi)判責(zé)完成消息,狀態(tài)不對(duì)導(dǎo)致打款失敗。

測試同學(xué):那后續(xù)如何修改啊。

研發(fā)同學(xué): 為了保證消息的有序性,我等會(huì)把消息修改為順序消費(fèi)。

……

以上故事純屬于虛構(gòu)

2. RocketMQ消息隊(duì)列為什么會(huì)有順序問題?

從上面的消息隊(duì)列模型我們知道,1個(gè)topic有N個(gè)queue,將數(shù)據(jù)均勻分配到各個(gè)queue上,這樣可以提升消費(fèi)端總體的消費(fèi)性能。比如一個(gè)topic發(fā)送10條消息,這10條消息會(huì)自動(dòng)分散在topic下的所有queue中,所以消費(fèi)的時(shí)候不一定是先消費(fèi)哪個(gè)queue,后消費(fèi)哪個(gè)queue,這就導(dǎo)致了無序消費(fèi)。

3. 順序消息的使用場景

  • 金融交易、訂單流程處理。比如我們的暗拍場景下,相同出價(jià),先出價(jià)商戶優(yōu)先成單,比如我們的訂單的發(fā)生售后時(shí),售后訂單的處理流程。

圖片

  • 實(shí)時(shí)同步數(shù)據(jù)的場景,如數(shù)據(jù)庫增量同步,順序消息也可以發(fā)揮其作用。通過使用順序消息,可以確保數(shù)據(jù)按照正確的順序進(jìn)行同步,從而保持?jǐn)?shù)據(jù)的一致性和準(zhǔn)確性。

圖片

4. 實(shí)際開發(fā)過程中如何保證消息的順序性?

4.1 生產(chǎn)順序性

  • 多生產(chǎn)者單線程

消息生產(chǎn)的順序性僅支持單一生產(chǎn)者,如果不同生產(chǎn)者分布在不同的系統(tǒng),那么不同生產(chǎn)者之間產(chǎn)生的消息,我們無法知道消息之間實(shí)際的先后順序。

  • 單生產(chǎn)者多線程
  • 相同業(yè)務(wù)的消息按照先后順序被存儲(chǔ)在同一個(gè)隊(duì)列。
  • 不同業(yè)務(wù)的消息可以混合在同一個(gè)隊(duì)列中,且不保證連續(xù)。
  • 如果生產(chǎn)者使用多個(gè)線程進(jìn)行并行發(fā)送,那么不同線程間產(chǎn)生的消息,我們無法知道消息之間實(shí)際的先后順序。

圖片

4.2 消費(fèi)者順序性

消費(fèi)消息時(shí)需要嚴(yán)格按照接收—處理—應(yīng)答的順序處理消息,避免使用異步回調(diào)或多線程處理,這樣可以防止消息處理過程中的并發(fā)問題。對(duì)于每條消息,只有當(dāng)它完全處理完畢并發(fā)送應(yīng)答后,才繼續(xù)處理下一條消息。

5. 使用順序消息需要注意的點(diǎn)

  • 消費(fèi)消息時(shí)異常如何處理
  • 如果發(fā)生異常,需要消費(fèi)方進(jìn)行處理,順序消費(fèi)默認(rèn)是 無限重試消費(fèi)的 , 無限重試會(huì)導(dǎo)致當(dāng)前消息隊(duì)列阻塞,影響后續(xù)消息消費(fèi)。
  • 需要我們在重試一定次數(shù)后進(jìn)行處理,即一條消息如果一直重試失敗,超過最大重試次數(shù)后將不再重試,跳過這條消息消費(fèi) 并 監(jiān)控和告警,人工介入處理,不能一直阻塞后續(xù)消息處理。
  • 比如我們業(yè)務(wù)售后流程中,某個(gè)節(jié)點(diǎn)的售后單消息消費(fèi)異常,我們目前解決方案是:把重試3次仍然失敗的消息存儲(chǔ)到數(shù)據(jù)庫中,同時(shí)把同一售后單后續(xù)消息也先存入數(shù)據(jù)庫中,同時(shí)發(fā)出告警,后續(xù)人工介入處理后重新消費(fèi)該異常售后單的消息。保證不影響同一隊(duì)列下其它售后單消息消費(fèi)。
  • 消息組盡可能打散,避免集中導(dǎo)致熱點(diǎn)
  • Apache RocketMQ 保證相同消息組的消息存儲(chǔ)在同一個(gè)隊(duì)列中,如果不同業(yè)務(wù)場景的消息都集中在少量或一個(gè)消息組中,則這些消息存儲(chǔ)壓力都會(huì)集中到服務(wù)端的少量隊(duì)列或一個(gè)隊(duì)列中。容易導(dǎo)致性能熱點(diǎn),為了提高系統(tǒng)的吞吐量和穩(wěn)定性,避免因?yàn)槟承┫⒔M過于集中而導(dǎo)致資源瓶頸或性能下降。
  • 在設(shè)計(jì)消息鍵時(shí),應(yīng)盡量避免讓消息集中到少數(shù)幾個(gè)MessageQueue中??梢钥紤]將業(yè)務(wù)相關(guān)的多個(gè)字段組合成消息鍵,比如訂單ID、用戶ID作為消息鍵,或者使用哈希算法來生成消息鍵,以增加消息分發(fā)的隨機(jī)性和均勻性。可實(shí)現(xiàn)同一用戶、同一訂單的消息按照順序處理,不同用戶或者不同訂單的消息無需保證順序。

小節(jié)

順序消息是一個(gè)老生常談的問題,但是簡單粗暴的硬搬網(wǎng)上的解決方案往往效果不盡如意,實(shí)際想要解決的徹底的確不是那么容易,希望可以帶給各位看官一些思考和幫助。我們再把視角切換到事務(wù)消息身上去。

第三部分: 事務(wù)消息

1. 我們遇到的線上問題

客服同學(xué): @XXX 用戶在我們的app上一直獲取不到報(bào)價(jià),比較急,麻煩看一下怎么回事。

產(chǎn)品同學(xué): 好的,收到,我這邊馬上找研發(fā)同學(xué)看一下,@XXX 需要幫忙看一下。

 研發(fā)同學(xué):好的,我看一下。

…..

30分鐘后

研發(fā)同學(xué): 修復(fù)了,再看一下。

產(chǎn)品同學(xué): 把問題原因同步一下吧。

研發(fā)同學(xué):詢價(jià)過程中,風(fēng)控命中了特殊報(bào)價(jià)池,我們這邊把特殊報(bào)價(jià)池的數(shù)據(jù)存到了數(shù)據(jù)庫,同時(shí)發(fā)送了MQ消息,結(jié)果MQ消息發(fā)送成功了,但是數(shù)據(jù)庫存儲(chǔ)失敗,導(dǎo)致再次詢價(jià)的時(shí)候,查不到數(shù)據(jù)導(dǎo)致的。

測試同學(xué):那后續(xù)如何修改啊。

研發(fā)同學(xué): 后續(xù)我們會(huì)把普通消息改成事務(wù)消息,這樣就能保證消息發(fā)送和數(shù)據(jù)庫存儲(chǔ)的一致性了。

……

以上故事純屬于虛構(gòu)

2. 為什么使用了消息隊(duì)列反而不可控呢?

MQ消息本身就具有解耦性,消息本身并不關(guān)注接收方的狀態(tài)是否符合預(yù)期,只要消息成功發(fā)送并且被成功接收,在MQ本身看來就是成功,如果想要保證發(fā)送方和接受方的狀態(tài)變更符合預(yù)期,就要保證本次事務(wù)操作和消息發(fā)送的一致性,這里我們就必須要提到事務(wù)消息。

所謂事務(wù)消息,其實(shí)是為了解決上下游寫一致性,也即是完成當(dāng)前操作的同時(shí)給下游發(fā)送指令,并且保證上下游要么同時(shí)成功或者同時(shí)失敗。

3. 事務(wù)消息的使用場景

  • 經(jīng)典場景
  • 支付發(fā)起后,當(dāng)筆訂單處于中間狀態(tài),給支付網(wǎng)關(guān)發(fā)起指令,如果發(fā)起轉(zhuǎn)賬失敗則不發(fā)送指令,發(fā)送成功后等待支付網(wǎng)關(guān)反饋更新支付狀態(tài)。如果在同一個(gè)數(shù)據(jù)庫中進(jìn)行,事務(wù)可以保證這兩步操作,要么同時(shí)成功,要么同時(shí)不成功。這樣就保證了轉(zhuǎn)賬的數(shù)據(jù)一致性。但是在微服務(wù)架構(gòu)中,因?yàn)楦鱾€(gè)服務(wù)都是獨(dú)立的模塊,都是遠(yuǎn)程調(diào)用,都沒法在同一個(gè)事務(wù)中,都會(huì)遇到分布式事務(wù)問題。
  • 我方場景
  • 在我們售后判責(zé)(用戶與賣家發(fā)生了糾紛)的過程中,如果用戶對(duì)于判責(zé)結(jié)果不滿意,可以進(jìn)行復(fù)檢申訴,我方需要對(duì)復(fù)檢申訴進(jìn)行改判或者維持原判,如果需要改判,我方需要將改判結(jié)果進(jìn)行保存,同時(shí),對(duì)于我們下游的行星售后等系統(tǒng)進(jìn)行發(fā)送消息,之所以發(fā)送消息而不是直接調(diào)用,是因?yàn)橄⒌慕邮辗讲恢挂粋€(gè),如果全部是RPD調(diào)用的話,代碼的侵入性太強(qiáng),但是消息有很強(qiáng)的解耦性,并不能保證上下游狀態(tài)的一致性,這個(gè)時(shí)候,事務(wù)消息就很符合這個(gè)場景,如果我們本地事務(wù)提交成功,就發(fā)送事務(wù)消息,下游同步修改如果我們本地事務(wù)失敗,就不在發(fā)送消息,從而保持本地事務(wù)與消息的一致性。

4. 為什么需要引入分布式事務(wù)消息

  • MQ本身就具備了實(shí)現(xiàn)了系統(tǒng)之間的解耦特性。
  • 分布式事務(wù)保障本地事務(wù)和消息發(fā)送的原子性。
  • 具備以上特性的同時(shí)還可以保證最終的數(shù)據(jù)一致性。

5. 開源版本事務(wù)消息

5.1 基本實(shí)現(xiàn)原理

基于MQ的事務(wù)消息方案主要依靠MQ的Half消息機(jī)制來實(shí)現(xiàn)投遞消息和參與者自身本地事務(wù)的一致性保障。

Half消息:在原有隊(duì)列消息執(zhí)行后的邏輯,如果后面的本地邏輯出錯(cuò),則不發(fā)送該消息,如果通過則告知MQ發(fā)送。Half消息機(jī)制實(shí)現(xiàn)原理其實(shí)借鑒的2PC的思路,是二階段提交的廣義拓展。

圖片

  • 事務(wù)發(fā)起方producer首先發(fā)送Half消息到broker
  • MQ通知發(fā)送方消息發(fā)送成功
  • 在發(fā)送Half消息成功后producer執(zhí)行本地事務(wù)
  • 本地事務(wù)完畢,根據(jù)事務(wù)的狀態(tài),Producer向Broker發(fā)送二次確認(rèn)消息,確認(rèn)該Half Message的Commit或者Rollback狀態(tài)。Broker收到二次確認(rèn)消息后,對(duì)于Commit狀態(tài),則直接發(fā)送到Consumer端執(zhí)行消費(fèi)邏輯,而對(duì)于Rollback則直接標(biāo)記為失敗,一段時(shí)間后清除,并不會(huì)發(fā)給Consumer。正常情況下,到此分布式事務(wù)已經(jīng)完成,剩下要處理的就是超時(shí)問題,即一段時(shí)間后Broker仍沒有收到Producer的二次確認(rèn)消息;
  • 針對(duì)超時(shí)狀態(tài),Broker主動(dòng)向Producer發(fā)起消息回查;
  • Producer處理回查消息,返回對(duì)應(yīng)的本地事務(wù)的執(zhí)行結(jié)果;
  • Broker針對(duì)回查消息的結(jié)果,執(zhí)行Commit或Rollback操作,同4;

事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):

CommitTransaction: 提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。RollbackTransaction: 回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。Unknown: 中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。事務(wù)消息的核心類為TransactionListenerImpl,里面提供了兩個(gè)核心方法,具體的代碼如下圖:

圖片

executeLocalTransaction方法:用來執(zhí)行本地事務(wù),返回本地事務(wù)給到broker,同時(shí),將事務(wù)狀態(tài)進(jìn)行記錄:

checkLocalTransaction 方法:用來查詢本地事務(wù)的執(zhí)行結(jié)果提供給broker

5.2 事務(wù)消息發(fā)送邏輯–producer發(fā)送

事務(wù)消息是由兩個(gè)消息來實(shí)現(xiàn)的,一個(gè)是RMQ_SYS_TRANS_HALF_TOPIC消息,作用是用來存儲(chǔ)第一階段的parpare消息,事務(wù)消息首先先進(jìn)入到該主題消息,消息具體是提交還是回滾要根據(jù)第二階段的消息來判斷。另一個(gè)是RMQ_SYS_TRANS_OP_HALF_TOPIC消息,用來接收第二階段的Commit或Rollback消息。

特別需要注意的一點(diǎn),RMQ_SYS_TRANS_HALF_TOPIC消息是用來存儲(chǔ)不能被消費(fèi)者發(fā)現(xiàn)的消息,通過RMQ_SYS_TRANS_OP_HALF_TOPIC消息,來對(duì)RMQ_SYS_TRANS_HALF_TOPIC消息對(duì)應(yīng)的事務(wù)狀態(tài)來進(jìn)行確認(rèn)的,確認(rèn)commit之后,需要將一階段中設(shè)置的特殊Topic和Queue替換成真正的目標(biāo)的Topic和Queue,后通過一次普通消息的寫入操作來生成一條對(duì)用戶可見的消息。所以RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲(chǔ)的消息的內(nèi)容,在二階段時(shí)恢復(fù)出一條完整的普通消息。

5.3 事務(wù)消息發(fā)送邏輯--broker回查

如果在RocketMQ事務(wù)消息的二階段過程中失敗了,例如在做Commit操作時(shí),出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補(bǔ)償機(jī)制,稱為“回查”。

Broker端對(duì)未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對(duì)應(yīng)的Producer端(同一個(gè)Group的Producer),由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài),進(jìn)而執(zhí)行Commit或者Rollback。Broker端通過對(duì)比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。

需要注意的是,RocketMQ并不會(huì)無休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。

6. 轉(zhuǎn)轉(zhuǎn)版本事務(wù)消息

6.1 差異

  • 設(shè)計(jì)方式的不同
  • 開源版本基于MQ消息本身立場,在垂直方向做了拓展,在RocketMQ的服務(wù)里面,直接嵌入了事務(wù)消息,相當(dāng)于把這種能力重新下沉到MQ中,便于使用,不用在做任何的額外工作。
  • 轉(zhuǎn)轉(zhuǎn)版本基于公司業(yè)務(wù)場景發(fā)展,在水平方向做了拓展,對(duì)RocketMQ統(tǒng)一做了一層封裝(此時(shí)開源版本并沒有事務(wù)消息),方便使用,我們的設(shè)計(jì)思想可以遷移到任何不支持事務(wù)消息的MQ中,沒有額外依賴,便于拓展,使得事務(wù)消息不在局限于RocketMQ本身。
  • 開源版本
  • 轉(zhuǎn)轉(zhuǎn)版本
  • 實(shí)現(xiàn)方式的不同
  • 開源版本是通過內(nèi)部隊(duì)列和狀態(tài)回查實(shí)現(xiàn)了事務(wù)的最終一致性。
  • 轉(zhuǎn)轉(zhuǎn)版本由數(shù)據(jù)庫的本地事務(wù)來保證事務(wù)的原子性,并由重試機(jī)制保證消息發(fā)送的可靠性。相當(dāng)于把業(yè)務(wù)系統(tǒng)和消息隊(duì)列的分布式事務(wù)重新“降級(jí)”為數(shù)據(jù)庫中的本地事務(wù)。
  • 開源版本
  • 轉(zhuǎn)轉(zhuǎn)版本

6.2 基本實(shí)現(xiàn)原理

  • 事務(wù)消息的發(fā)送流程,在事務(wù)過程中,會(huì)將事務(wù)信息消息記錄到數(shù)據(jù)庫中。

圖片

  • 獲取到msg之后,執(zhí)行校驗(yàn)邏輯,在發(fā)送失敗或者未查詢到數(shù)據(jù)后,會(huì)將這個(gè)msg丟入到另一個(gè)隊(duì)列timeWheelQueue ,由另一個(gè)定時(shí)任務(wù)去處理。具體的流程如下圖。

圖片

  • 因?yàn)樯婕暗絡(luò)vm的內(nèi)存存儲(chǔ),所以要考慮上線或者其他情況導(dǎo)致服務(wù)重啟,未發(fā)送完的消息該如何處理。

圖片

小節(jié)

事務(wù)消息無論是開源版本還是轉(zhuǎn)轉(zhuǎn)版本,都是繞不過去的點(diǎn),因?yàn)槲覀冏鳛闃I(yè)務(wù)側(cè)團(tuán)隊(duì)在如今遍地都是分布式系統(tǒng)的情況下太需要這樣的能力來幫我們兜底了。而作為各位看官,為了能夠正確得使用事務(wù)消息以及方便排查這里的問題,也是很有必要了解清楚這里的技術(shù)實(shí)現(xiàn)細(xì)節(jié)。

作者

黃培祖 轉(zhuǎn)轉(zhuǎn)采貨俠后端工程師

朱洪旭 轉(zhuǎn)轉(zhuǎn)采貨俠后端工程師

責(zé)任編輯:龐桂玉 來源: 轉(zhuǎn)轉(zhuǎn)技術(shù)
相關(guān)推薦

2021-04-15 09:17:01

SpringBootRocketMQ

2023-09-04 08:00:53

提交事務(wù)消息

2023-12-04 09:23:49

分布式消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2012-02-17 09:33:52

虛擬化桌面虛擬化

2023-07-17 08:34:03

RocketMQ消息初體驗(yàn)

2022-12-13 09:19:26

分布式消息隊(duì)列

2009-01-20 09:12:16

PHPJava數(shù)據(jù)庫

2023-12-06 21:44:28

RocksDBvivo

2010-07-30 13:06:22

NFS端口

2019-01-10 09:11:51

消息順序性分布式服務(wù)端

2025-03-31 10:49:16

2023-09-05 09:49:03

2018-11-20 08:00:00

持續(xù)集成持續(xù)部署Git代碼倉庫

2019-07-19 07:56:13

消息隊(duì)列消息代理消息中間件

2022-01-24 10:26:46

Kubernetes微服務(wù)

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2024-06-05 06:37:19

2021-09-30 07:26:15

MQ消息丟失
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)