分布式事務(wù)實現(xiàn)方案:一文詳解RocketMQ事務(wù)消息
常見的分布式事務(wù)實現(xiàn)方案有以下幾種:兩階段提交(2PC)、兩階段提交(2PC)、補償事務(wù)(Saga)、MQ事務(wù)消息等。今天就講一下 RocketMQ 的事務(wù)消息,是一種非常特殊的分布式事務(wù)實現(xiàn)方案,基于半消息(Half Message)機制實現(xiàn)的。 看完這篇想一下,RocketMQ事務(wù)消息到底能不能保證分布式系統(tǒng)中數(shù)據(jù)的強一致性?
實現(xiàn)原理
RocketMQ事務(wù)消息執(zhí)行流程如下:
- 生產(chǎn)者將消息發(fā)送至RocketMQ服務(wù)端。
- RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認消息已經(jīng)發(fā)送成功,此時消息被標記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息(Half Message)。
- 生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
- 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認結(jié)果(Commit或是Rollback),服務(wù)端收到確認結(jié)果后處理邏輯如下:
二次確認結(jié)果為Commit:服務(wù)端將半事務(wù)消息標記為可投遞,并投遞給消費者。
二次確認結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
- 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認結(jié)果,或服務(wù)端收到的二次確認結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。
- 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認,服務(wù)端仍按照步驟4對半事務(wù)消息進行處理。
圖片
代碼實現(xiàn)
RocketMQ事務(wù)消息示例如下:
//演示demo,模擬訂單表查詢服務(wù),用來確認訂單事務(wù)是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}
//演示demo,模擬本地事務(wù)的執(zhí)行結(jié)果。
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilderImpl();
//構(gòu)造事務(wù)生產(chǎn)者:事務(wù)消息需要生產(chǎn)者構(gòu)建一個事務(wù)檢查器,用于檢查確認異常半事務(wù)的中間狀態(tài)。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事務(wù)檢查器一般是根據(jù)業(yè)務(wù)的ID去檢查本地事務(wù)是否正確提交還是回滾,此處以訂單ID屬性為例。
* 在訂單表找到了這個訂單,說明本地事務(wù)插入訂單的操作已經(jīng)正確提交;如果訂單表沒有訂單,說明本地事務(wù)已經(jīng)回滾。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 錯誤的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//開啟事務(wù)
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事務(wù)開啟失敗,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。
.setKeys("messageKey")
//設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。
.setTag("messageTag")
//一般事務(wù)消息都會設(shè)置一個本地事務(wù)關(guān)聯(lián)的唯一ID,用來做本地事務(wù)回查的校驗。
.addProperty("OrderId", "xxx")
//消息體。
.setBody("messageBody".getBytes())
.build();
//發(fā)送半事務(wù)消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事務(wù)消息發(fā)送失敗,事務(wù)可以直接退出并回滾。
return;
}
/**
* 執(zhí)行本地事務(wù),并確定本地事務(wù)結(jié)果。
* 1. 如果本地事務(wù)提交成功,則提交消息事務(wù)。
* 2. 如果本地事務(wù)提交失敗,則回滾消息事務(wù)。
* 3. 如果本地事務(wù)未知異常,則不處理,等待事務(wù)消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 業(yè)務(wù)可以自身對實時性的要求選擇是否重試,如果放棄重試,可以依賴事務(wù)消息回查機制進行事務(wù)狀態(tài)的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建議記錄異常信息,回滾異常時可以無需重試,依賴事務(wù)消息回查機制進行事務(wù)狀態(tài)的提交。
e.printStackTrace();
}
}
}
注意事項
- 冪等性: 消費者處理消息時需要確保業(yè)務(wù)邏輯的冪等性,以應(yīng)對消息可能的重復(fù)消費。
- 超時和監(jiān)控: 設(shè)置合理的超時時間,并監(jiān)控事務(wù)消息的性能
總結(jié)
RocketMQ 事務(wù)消息是分布式事務(wù)中一種常見的實現(xiàn)方案,只是把發(fā)送消息和本地事務(wù)放在一個事務(wù)中,并且只保證最終一致性,無法保證強一致性。 原因有兩點:
- 執(zhí)行完成本地事務(wù)后,在commit事務(wù)消息之前,這段時間內(nèi)數(shù)據(jù)是不一致的,所以只是保證了發(fā)送消息和本地事務(wù)的最終一致性。
- 在commit事務(wù)消息之后,然后把消息投遞給消費者。至于消費者是否消費消息,什么時候消費?也都是不可控的,所以也只能盡量保證數(shù)據(jù)最終一致性。