自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

分布式事務(wù)實現(xiàn)方案:一文詳解RocketMQ事務(wù)消息

云計算 分布式
RocketMQ 事務(wù)消息是分布式事務(wù)中一種常見的實現(xiàn)方案,只是把發(fā)送消息和本地事務(wù)放在一個事務(wù)中,并且只保證最終一致性,無法保證強一致性。

常見的分布式事務(wù)實現(xiàn)方案有以下幾種:兩階段提交(2PC)、兩階段提交(2PC)、補償事務(wù)(Saga)、MQ事務(wù)消息等。今天就講一下 RocketMQ 的事務(wù)消息,是一種非常特殊的分布式事務(wù)實現(xiàn)方案,基于半消息(Half Message)機制實現(xiàn)的。 看完這篇想一下,RocketMQ事務(wù)消息到底能不能保證分布式系統(tǒng)中數(shù)據(jù)的強一致性?

實現(xiàn)原理

RocketMQ事務(wù)消息執(zhí)行流程如下:

  1. 生產(chǎn)者將消息發(fā)送至RocketMQ服務(wù)端。
  2. RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認消息已經(jīng)發(fā)送成功,此時消息被標記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息(Half Message)。
  3. 生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
  4. 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認結(jié)果(Commit或是Rollback),服務(wù)端收到確認結(jié)果后處理邏輯如下:

二次確認結(jié)果為Commit:服務(wù)端將半事務(wù)消息標記為可投遞,并投遞給消費者。

二次確認結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。

  1. 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認結(jié)果,或服務(wù)端收到的二次確認結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。
  2. 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  3. 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認,服務(wù)端仍按照步驟4對半事務(wù)消息進行處理。

圖片圖片

代碼實現(xiàn)

RocketMQ事務(wù)消息示例如下:

//演示demo,模擬訂單表查詢服務(wù),用來確認訂單事務(wù)是否提交成功。
private static boolean checkOrderById(String orderId) {
    return true;
}

//演示demo,模擬本地事務(wù)的執(zhí)行結(jié)果。
private static boolean doLocalTransaction() {
    return true;
}

public static void main(String[] args) throws ClientException {
    ClientServiceProvider provider = new ClientServiceProvider();
    MessageBuilder messageBuilder = new MessageBuilderImpl();
    //構(gòu)造事務(wù)生產(chǎn)者:事務(wù)消息需要生產(chǎn)者構(gòu)建一個事務(wù)檢查器,用于檢查確認異常半事務(wù)的中間狀態(tài)。
    Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * 事務(wù)檢查器一般是根據(jù)業(yè)務(wù)的ID去檢查本地事務(wù)是否正確提交還是回滾,此處以訂單ID屬性為例。
                 * 在訂單表找到了這個訂單,說明本地事務(wù)插入訂單的操作已經(jīng)正確提交;如果訂單表沒有訂單,說明本地事務(wù)已經(jīng)回滾。
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 錯誤的消息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            })
            .build();
    //開啟事務(wù)
    final Transaction transaction;
    try {
        transaction = producer.beginTransaction();
    } catch (ClientException e) {
        e.printStackTrace();
        //事務(wù)開啟失敗,直接退出。
        return;
    }
    Message message = messageBuilder.setTopic("topic")
            //設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。
            .setKeys("messageKey")
            //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。
            .setTag("messageTag")
            //一般事務(wù)消息都會設(shè)置一個本地事務(wù)關(guān)聯(lián)的唯一ID,用來做本地事務(wù)回查的校驗。
            .addProperty("OrderId", "xxx")
            //消息體。
            .setBody("messageBody".getBytes())
            .build();
    //發(fā)送半事務(wù)消息
    final SendReceipt sendReceipt;
    try {
        sendReceipt = producer.send(message, transaction);
    } catch (ClientException e) {
        //半事務(wù)消息發(fā)送失敗,事務(wù)可以直接退出并回滾。
        return;
    }
    /**
     * 執(zhí)行本地事務(wù),并確定本地事務(wù)結(jié)果。
     * 1. 如果本地事務(wù)提交成功,則提交消息事務(wù)。
     * 2. 如果本地事務(wù)提交失敗,則回滾消息事務(wù)。
     * 3. 如果本地事務(wù)未知異常,則不處理,等待事務(wù)消息回查。
     *
     */
    boolean localTransactionOk = doLocalTransaction();
    if (localTransactionOk) {
        try {
            transaction.commit();
        } catch (ClientException e) {
            // 業(yè)務(wù)可以自身對實時性的要求選擇是否重試,如果放棄重試,可以依賴事務(wù)消息回查機制進行事務(wù)狀態(tài)的提交。
            e.printStackTrace();
        }
    } else {
        try {
            transaction.rollback();
        } catch (ClientException e) {
            // 建議記錄異常信息,回滾異常時可以無需重試,依賴事務(wù)消息回查機制進行事務(wù)狀態(tài)的提交。
            e.printStackTrace();
        }
    }
}

注意事項

  • 冪等性: 消費者處理消息時需要確保業(yè)務(wù)邏輯的冪等性,以應(yīng)對消息可能的重復(fù)消費。
  • 超時和監(jiān)控: 設(shè)置合理的超時時間,并監(jiān)控事務(wù)消息的性能

總結(jié)

RocketMQ 事務(wù)消息是分布式事務(wù)中一種常見的實現(xiàn)方案,只是把發(fā)送消息和本地事務(wù)放在一個事務(wù)中,并且只保證最終一致性,無法保證強一致性。 原因有兩點:

  1. 執(zhí)行完成本地事務(wù)后,在commit事務(wù)消息之前,這段時間內(nèi)數(shù)據(jù)是不一致的,所以只是保證了發(fā)送消息和本地事務(wù)的最終一致性。
  2. 在commit事務(wù)消息之后,然后把消息投遞給消費者。至于消費者是否消費消息,什么時候消費?也都是不可控的,所以也只能盡量保證數(shù)據(jù)最終一致性。
責任編輯:武曉燕 來源: 一燈架構(gòu)
相關(guān)推薦

2021-06-28 10:03:44

分布式數(shù)據(jù)庫架構(gòu)

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-05-30 10:37:35

分布式事務(wù)反向補償

2023-11-06 13:15:32

分布式事務(wù)Seata

2024-06-11 13:50:43

2024-03-29 13:30:41

分布式事務(wù)節(jié)點

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2024-01-26 13:17:00

rollbackMQ訂單系統(tǒng)

2023-01-06 09:19:12

Seata分布式事務(wù)

2024-08-19 09:05:00

Seata分布式事務(wù)

2022-07-10 20:24:48

Seata分布式事務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2021-06-16 08:33:02

分布式事務(wù)ACID

2023-09-04 08:00:53

提交事務(wù)消息

2019-08-19 10:24:33

分布式事務(wù)數(shù)據(jù)庫

2019-11-04 08:38:45

分布式事務(wù)主流TCC

2020-03-31 08:05:23

分布式開發(fā)技術(shù)

2024-06-07 08:06:36

2020-05-28 09:35:05

分布式事務(wù)方案
點贊
收藏

51CTO技術(shù)棧公眾號