自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

事物消息的實(shí)現(xiàn)-RocketMQ知識(shí)體系6

開發(fā) 前端
RocketMQ提供了事務(wù)消息的功能,采用2PC(兩段式協(xié)議)+補(bǔ)償機(jī)制(事務(wù)回查)的分布式事務(wù)功能,通過消息隊(duì)列 RocketMQ 版事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。

[[411281]]

分布式事務(wù)是指事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點(diǎn)之上。例如在大型電商系統(tǒng)中,下單接口通常會(huì)扣減庫存、減去優(yōu)惠、生成訂單 id, 而訂單服務(wù)與庫存、優(yōu)惠、訂單 id 都是不同的服務(wù),下單接口的成功與否,不僅取決于本地的 db 操作,而且依賴第三方系統(tǒng)的結(jié)果,這時(shí)候分布式事務(wù)就保證這些操作要么全部成功,要么全部失敗。本質(zhì)上來說,分布式事務(wù)就是為了保證不同數(shù)據(jù)庫的數(shù)據(jù)一致性。

目前解決分布式事物的解決方案有seata,lcn 等。

RocketMQ 分布式事物實(shí)現(xiàn)

RocketMQ提供了事務(wù)消息的功能,采用2PC(兩段式協(xié)議)+補(bǔ)償機(jī)制(事務(wù)回查)的分布式事務(wù)功能,通過消息隊(duì)列 RocketMQ 版事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。

首先,我們要知道什么是半事物消息和消息回查:

  • 半事務(wù)消息:

暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊(duì)列 RocketMQ 版服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。

  • 消息回查:

由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊(duì)列 RocketMQ 版服務(wù)端通過掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半事務(wù)消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該詢問過程即消息回查。

【交互流程】

事務(wù)消息發(fā)送步驟如下:

  1. 發(fā)送方將半事務(wù)消息發(fā)送至消息隊(duì)列 RocketMQ 版服務(wù)端。
  2. 消息隊(duì)列 RocketMQ 版服務(wù)端將消息持久化成功之后,向發(fā)送方返回 Ack。確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半事務(wù)消息。
  3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
  4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit 或是 Rollback),服務(wù)端收到 Commit 狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會(huì)接受該消息。

事務(wù)消息回查步驟如下:

  1. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟 4 提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時(shí)間后服務(wù)端將對(duì)該消息發(fā)起消息回查。
  2. 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  3. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟 4 對(duì)半事務(wù)消息進(jìn)行操作。

總體而言RocketMQ事務(wù)消息分為兩條主線:

  • 發(fā)送流程:發(fā)送half message(半消息),執(zhí)行本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果
  • 定時(shí)任務(wù)回查流程:MQ定時(shí)任務(wù)掃描半消息,回查本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果

源碼相關(guān)

Producer發(fā)送事務(wù)半消息的(prepare)

在本地應(yīng)用發(fā)送事務(wù)消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復(fù)用大部分發(fā)送消息相關(guān)的邏輯,這個(gè)類的代碼量非常少只有100來行,下面是這個(gè)類的sendMessageTransaction方法

這里的transactionListener就是上面所說的消息回查的類,它提供了2個(gè)方法:

  • executeLocalTransaction

執(zhí)行本地事務(wù)

  • checkLocalTransaction

回查本地事務(wù)

接著看DefaultMQProducer.sendMessageInTransaction()方法:

該方法主要做了以下事情

  • 給消息打上事務(wù)消息相關(guān)的tag,用于broker區(qū)分普通消息和事務(wù)消息
  • 發(fā)送半消息(half message)
  • 發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)
  • 執(zhí)行endTransaction方法,告訴 broker 執(zhí)行 commit/rollback。

執(zhí)行本地事務(wù)

Producer 半事務(wù)消息發(fā)送成功后,會(huì)調(diào)用transactionListener.executeLocalTransaction方法執(zhí)行本地事務(wù)。只有半消息發(fā)送成功后,才會(huì)執(zhí)行本地事務(wù),如果半消息發(fā)送失敗,則設(shè)置回滾。

結(jié)束事務(wù)(commit/rollback)

本地事務(wù)執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務(wù)執(zhí)行狀態(tài),去提交事務(wù)或者回滾事務(wù)。

如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息

Broker端處理事務(wù)消息

Broker端通過SendMessageProcessor.processRequest()方法接收處理 Producer 發(fā)送的消息 最后會(huì)調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類型,進(jìn)行消息存儲(chǔ)。

存儲(chǔ)半消息

代碼 prepareMessage(msgInner) :

在這一步,備份消息的原主題名稱與原隊(duì)列ID,然后取消事務(wù)消息的消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列ID固定為0。與其他普通消息區(qū)分開,然后完成消息持久化。

到這里,Broker 就初步處理完了 Producer 發(fā)送的事務(wù)半消息。

半消息事務(wù)回查

兩段式協(xié)議發(fā)送與提交回滾消息,執(zhí)行完本地事務(wù)消息的狀態(tài)為UNKNOW時(shí),結(jié)束事務(wù)不做任何操作。通過事務(wù)狀態(tài)定時(shí)回查得到發(fā)送端的事務(wù)狀態(tài)是rollback或commit。

通過TransactionalMessageCheckService線程定時(shí)去檢測(cè)RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務(wù)狀態(tài)。

  • RMQ_SYS_TRANS_HALF_TOPIC

prepare消息的主題,事務(wù)消息首先先進(jìn)入到該主題。

  • RMQ_SYS_TRANS_OP_HALF_TOPIC

當(dāng)消息服務(wù)器收到事務(wù)消息的提交或回滾請(qǐng)求后,會(huì)將消息存儲(chǔ)在該主題下。

Broker處理END_TRANSACTION

當(dāng)Producer或者回查定時(shí)任務(wù)提交/回滾事務(wù)的時(shí)候,Broker如何處理事務(wù)消息提交、回滾命令的?其核心實(shí)現(xiàn)如下:

  • 根據(jù)commitlogOffset找到消息
  • 如果是提交動(dòng)作,就恢復(fù)原消息的主題與隊(duì)列,再次存入commitlog文件進(jìn)而轉(zhuǎn)到消息消費(fèi)隊(duì)列,供消費(fèi)者消費(fèi),然后將原預(yù)處理消息存入一個(gè)新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
  • 回滾消息,則直接將原預(yù)處理消息存入一個(gè)新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理。

整體實(shí)現(xiàn)流程

如果消費(fèi)端消費(fèi)失敗了怎么辦?

如果有消息消費(fèi)失敗了,則將失敗的消息回傳給broker,即重新寫入commitLog文件,消費(fèi)者將重新消費(fèi);如果消息回傳的時(shí)候,consumer和broker之間網(wǎng)絡(luò)斷開,則consumer會(huì)調(diào)用submitConsumeRequestLater()方法,在consumer端進(jìn)行重新消費(fèi),如果仍然消費(fèi)失敗,會(huì)不斷重試直到達(dá)到默認(rèn)的16次,你可以使用msg.getReconsumeTimes()方法來獲取當(dāng)前重試次數(shù),如果重試次數(shù)足夠多之后仍然無法消費(fèi)成功,必須通過工單、日志等方式進(jìn)行人工干預(yù)以讓producer事務(wù)進(jìn)行回退處理。

Producer發(fā)送半消息失敗

可能由于網(wǎng)絡(luò)或者mq故障,導(dǎo)致 Producer 訂單系統(tǒng) 發(fā)送半消息(prepare)失敗。

這時(shí)訂單系統(tǒng)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等,走逆向流程退款給用戶。

半消息發(fā)送成功,本地事務(wù)執(zhí)行失敗

如果訂單系統(tǒng)發(fā)送的半消息成功了,但是執(zhí)行本地事務(wù)失敗了,如更新訂單狀態(tài)為“已完成”。

這種情況下,執(zhí)行本地事務(wù)失敗后,會(huì)返回rollback給 MQ,MQ會(huì)刪除之前發(fā)送的半消息。 也就不會(huì)調(diào)用優(yōu)惠券系統(tǒng)了。

半消息發(fā)送成功,沒收到MQ返回的響應(yīng)

假如訂單系統(tǒng)發(fā)送半消息成功后,沒有收到MQ返回的響應(yīng)。

這個(gè)時(shí)候可能是因?yàn)榫W(wǎng)絡(luò)問題,或者其他異常報(bào)錯(cuò),訂單系統(tǒng)誤以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。

但這個(gè)時(shí)候其實(shí)mq已經(jīng)保存半消息成功了,那這個(gè)消息怎么處理?

這個(gè)時(shí)候MQ的后臺(tái)消息回查定時(shí)任務(wù)TransactionalMessageCheckService會(huì)每隔1分鐘掃描一次半消息隊(duì)列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務(wù),這時(shí)MQ就會(huì)發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時(shí)就要發(fā)送rollback請(qǐng)求給mq,刪除之前的半消息。

如果commit/rollback失敗了

這個(gè)其實(shí)也是通過定時(shí)任務(wù)TransactionalMessageCheckService,它會(huì)發(fā)現(xiàn)這個(gè)消息超過一定時(shí)間還沒有進(jìn)行二階段處理,就會(huì)回查本地事務(wù)。

小結(jié)

消息隊(duì)列RocketMQ分布式事務(wù)消息不僅可以實(shí)現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性。同時(shí),傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會(huì)因?yàn)槟骋粋€(gè)關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。在極端情況下,如果關(guān)聯(lián)的某一個(gè)應(yīng)用始終無法處理成功,也只需對(duì)當(dāng)前應(yīng)用進(jìn)行補(bǔ)償或數(shù)據(jù)訂正處理,而無需對(duì)整體業(yè)務(wù)進(jìn)行回滾。

從RocketMQ事務(wù)型消息鏈路體現(xiàn)了面向失敗的設(shè)計(jì)思路,也體現(xiàn)了事務(wù)型系統(tǒng)的嚴(yán)謹(jǐn)性,在第二階段的消息沒有送達(dá)的時(shí)候,broker會(huì)主動(dòng)請(qǐng)求producer端去做check,producer做完check后會(huì)將事務(wù)的狀態(tài)再次返回。雖然說實(shí)現(xiàn)最終一致的方案有很多,但是事務(wù)型消息是比較優(yōu)雅實(shí)現(xiàn)方式之一。

本文轉(zhuǎn)載自微信公眾號(hào)「小汪哥寫代碼」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系小汪哥寫代碼公眾號(hào)。

 

責(zé)任編輯:武曉燕 來源: 小汪哥寫代碼
相關(guān)推薦

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2021-07-07 15:29:52

存儲(chǔ)RocketMQ體系

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-16 18:44:42

RocketMQ知識(shí)

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2023-07-18 09:03:01

RocketMQ場(chǎng)景消息

2021-07-07 07:06:31

Brokerkafka架構(gòu)

2015-07-28 17:52:36

IOS知識(shí)體系

2012-03-08 11:13:23

企業(yè)架構(gòu)

2017-06-22 13:07:21

2017-02-27 16:42:23

Spark識(shí)體系

2017-04-03 15:35:13

知識(shí)體系架構(gòu)

2021-07-05 06:26:08

生產(chǎn)者kafka架構(gòu)

2021-07-08 05:52:34

Kafka架構(gòu)主從架構(gòu)

2015-07-16 10:15:44

web前端知識(shí)體系

2020-09-09 09:15:58

Nginx體系進(jìn)程

2020-10-26 08:34:18

知識(shí)體系普適性

2020-03-09 10:31:58

vue前端開發(fā)

2017-07-25 17:34:54

大數(shù)據(jù)機(jī)器學(xué)習(xí)數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)