RocketMQ 事務(wù)消息初體驗
事務(wù)消息是 RocketMQ 的高級特性之一 。這篇文章,筆者會從應(yīng)用場景、功能原理、實戰(zhàn)例子三個模塊慢慢為你揭開事務(wù)消息的神秘面紗。
1 應(yīng)用場景
舉一個電商場景的例子:用戶購物車結(jié)算時,系統(tǒng)會創(chuàng)建支付訂單。
用戶支付成功后支付訂單的狀態(tài)會由未支付修改為支付成功,然后系統(tǒng)給用戶增加積分。
通常我們會使用普通消費方案,該方案能夠發(fā)揮 MQ 的優(yōu)勢:異步和解耦 , 同時架構(gòu)設(shè)計非常簡單。
圖片
- 用戶購物車結(jié)算時,系統(tǒng)創(chuàng)建支付訂單;
- 支付成功后,更新訂單的狀態(tài)從未支付修改為支付成功;
- 發(fā)送一條普通消息到消息隊列服務(wù)端;
- 積分服務(wù)消費消息,添加積分記錄。
但該方案有個非常直觀的缺點:容易出現(xiàn)不一致的現(xiàn)象。
- 假如先發(fā)送消息,后修改訂單狀態(tài),消息發(fā)送成功,訂單沒有執(zhí)行成功,需要回滾整個事務(wù)(訂單數(shù)據(jù)事務(wù)回滾,積分服務(wù)消費時,需要先反查事務(wù)狀態(tài),若事務(wù)提交,才能插入積分記錄)。
- 假如先修改訂單狀態(tài),后發(fā)送消息,訂單狀態(tài)修改成功,但消息發(fā)送失敗,需要補償操作才能保持最終一致。
- 假如先修改訂單,后發(fā)送消息,訂單狀態(tài)修改成功,但消息發(fā)送超時,此時無法判斷需要回滾訂單還是提交訂單變更。
我們看到,為了完善普通消費方案,業(yè)務(wù)層還需要做到兩點:補償機制和提供事務(wù)狀態(tài)查詢接口。
要做到這兩點,難不難呢?
不難,但是業(yè)務(wù)層代碼會比較混亂,更優(yōu)的方案還是得從中間件層面解決。
2 功能原理
RocketMQ 事務(wù)消息是支持在分布式場景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。交互流程如下圖所示:
圖片
1、生產(chǎn)者將消息發(fā)送至 Broker 。
2、Broker 將消息持久化成功之后,向生產(chǎn)者返回 Ack 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息被標(biāo)記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息。
3、生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
4、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果( Commit 或是 Rollback ),Broker 收到確認(rèn)結(jié)果后處理邏輯如下:
- 二次確認(rèn)結(jié)果為 Commit :Broker 將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費者。
- 二次確認(rèn)結(jié)果為 Rollback :Broker 將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
5、在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若 Broker 未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或 Broker 收到的二次確認(rèn)結(jié)果為 Unknown 未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。
- 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進行處理。
筆者認(rèn)為事務(wù)消息的精髓在于:
- 本地事務(wù)執(zhí)行成功,消費者才能消費事務(wù)消息;
- 消息回查本身就是補償機制的實現(xiàn),事務(wù)生產(chǎn)者需提供了事務(wù)狀態(tài)查詢接口。
3 實戰(zhàn)例子
為了便于大家理解事務(wù)消息 ,筆者新建一個工程用于模擬支付訂單創(chuàng)建、支付成功、贈送積分的流程。
首先,我們創(chuàng)建一個真實的訂單主題:order-topic 。
圖片
然后在數(shù)據(jù)庫中創(chuàng)建三張表 訂單表、事務(wù)日志表、積分表。
圖片
最后我們創(chuàng)建一個 Demo 工程,生產(chǎn)者模塊用于創(chuàng)建支付訂單、修改支付訂單成功,消費者模塊用于新增積分記錄。
圖片
接下來,我們展示事務(wù)消息的實現(xiàn)流程。
1、創(chuàng)建支付訂單
調(diào)用訂單生產(chǎn)者服務(wù)創(chuàng)建訂單接口 ,在 t_order 表中插入一條支付訂單記錄。
圖片
2、調(diào)用生產(chǎn)者服務(wù)修改訂單狀態(tài)接口
接口的邏輯就是執(zhí)行事務(wù)生產(chǎn)者的 sendMessageInTransaction 方法。
圖片
生產(chǎn)者端需要配置事務(wù)生產(chǎn)者和事務(wù)監(jiān)聽器。
圖片
發(fā)送事務(wù)消息的方法內(nèi)部包含三個步驟 :
圖片
事務(wù)生產(chǎn)者首先發(fā)送半事務(wù)消息,發(fā)送成功后,生產(chǎn)者才開始執(zhí)行本地事務(wù)邏輯。
事務(wù)監(jiān)聽器實現(xiàn)了兩個功能:執(zhí)行本地事務(wù)和供 Broker 回查事務(wù)狀態(tài) 。
圖片
執(zhí)行本地事務(wù)的邏輯內(nèi)部就是執(zhí)行 orderService.updateOrder 方法。
方法執(zhí)行成功則返回 LocalTransactionState.COMMIT_MESSAGE , 若執(zhí)行失敗則返回 LocalTransactionState.ROLLBACK_MESSAGE 。
圖片
需要注意的是: orderService.updateOrder 方法添加了事務(wù)注解,并將修改訂單狀態(tài)和插入事務(wù)日志表放進一個事務(wù)內(nèi),避免訂單狀態(tài)和事務(wù)日志表的數(shù)據(jù)不一致。
最后,生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向 Broker 提交二次確認(rèn)結(jié)果。
Broker 收到生產(chǎn)者確認(rèn)結(jié)果后處理邏輯如下:
- 二次確認(rèn)結(jié)果為 Commit :Broker 將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費者。
- 二次確認(rèn)結(jié)果為 Rollback :Broker 將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
3、積分消費者消費消息,添加積分記錄
當(dāng) Broker 將半事務(wù)消息標(biāo)記為可投遞時,積分消費者就可以開始消費主題 order-topic 的消息了。
圖片
積分消費者服務(wù),我們定義了消費者組名,以及訂閱主題和消費監(jiān)聽器。
圖片
在消費監(jiān)聽器邏輯里,冪等非常重要 。當(dāng)收到訂單信息后,首先判斷該訂單是否有積分記錄,若沒有記錄,才插入積分記錄。
而且我們在創(chuàng)建積分表時,訂單編號也是唯一鍵,數(shù)據(jù)庫中也必然不會存在相同訂單的多條積分記錄。
4 總結(jié)
RocketMQ 事務(wù)消息是支持在分布式場景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。
編寫一個實戰(zhàn)例子并不復(fù)雜,但使用事務(wù)消息時需要注意如下三點:
1、事務(wù)生產(chǎn)者和消費者共同協(xié)作才能保證業(yè)務(wù)數(shù)據(jù)的最終一致性;
2、事務(wù)生產(chǎn)者需要實現(xiàn)事務(wù)監(jiān)聽器,并且保存事務(wù)的執(zhí)行結(jié)果(比如事務(wù)日志表) ;
3、消費者要保證冪等。消費失敗時,通過重試、告警+人工介入等手段保證消費結(jié)果正確。
筆者會在后續(xù)的文章里,詳細解析事務(wù)消息的實現(xiàn)原理,敬請期待。
實戰(zhàn)代碼地址:
https://github.com/makemyownlife/rocketmq4-learning