自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

實(shí)戰(zhàn)與原理:如何基于RocketMQ實(shí)現(xiàn)分布式事務(wù)?

開發(fā) 前端
由于此時(shí)MQ中的消息一直處于half狀態(tài),超過一定的超時(shí)時(shí)間后,MQ會發(fā)現(xiàn)這個(gè)half消息有問題,然后回調(diào)你的訂單系統(tǒng)的接口。此時(shí)訂單系統(tǒng)需要根據(jù)訂單狀態(tài)來決定執(zhí)行commit請求還是rollback請求。

使用事務(wù)消息

在DailyMart系統(tǒng)中,用戶發(fā)起支付后,訂單系統(tǒng)需要調(diào)用庫存服務(wù)執(zhí)行庫存扣減邏輯。圖片

由于這是跨服務(wù)調(diào)用,因此會產(chǎn)生分布式事務(wù)。在這里,我們使用RocketMQ的事務(wù)消息來實(shí)現(xiàn)分布式事務(wù)。

1、首先,在訂單服務(wù)的應(yīng)用服務(wù)層處理支付邏輯,并調(diào)用RocketMQ發(fā)送事務(wù)消息:

@Override
public String payment(String orderSn) {
    // todo 集成支付寶支付
    // 支付流水號
    String outOrderNo = IdUtils.get32UUID();
    TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("訂單編號不存在"));

    // 如果訂單處于待支付狀態(tài)
    if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {

        OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);

        TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");

        if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
            return tradeOrder.getOrderSn();
        } else {
            throw new BusinessException("支付失敗...");
        }
    } else {
        throw new BusinessException("訂單已支付,請勿重復(fù)提交...");
    }
}

2、在訂單服務(wù)的基礎(chǔ)設(shè)施層,創(chuàng)建一個(gè)類實(shí)現(xiàn) RocketMQLocalTransactionListener 接口:

該接口有兩個(gè)方法:

  • executeLocalTransaction:用于執(zhí)行本地事務(wù)。
  • checkLocalTransaction:在RocketMQ執(zhí)行消息回查時(shí)檢查本地事務(wù)執(zhí)行結(jié)果,用于確定消息提交還是回滾。
@Component
@Slf4j
public class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    @Resource
    private TradeOrderService tradeOrderService;
    
   
  /**
     * 執(zhí)行本地事務(wù)
     * 將訂單狀態(tài)修改成已支付
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
        try {
            // 放到同一個(gè)本地事務(wù)中
            this.transactionTemplate.executeWithoutResult(status -> {
                String orderSn = orderPaidEvent.getOrderSn();
                // 修改成待發(fā)貨
                tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);
            });
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("修改訂單狀態(tài)失敗", e);
            // ROLLBACK 則回滾消息,rocketmq將廢棄這條消息
            return RocketMQLocalTransactionState.ROLLBACK;
            // 如果是UNKNOWN, 則觸發(fā)回查
        }

    }

    /**
     * 檢查本地事務(wù)執(zhí)行狀態(tài)
     * 消息回查時(shí),對于正在進(jìn)行中的事務(wù)不要返回Rollback或Commit結(jié)果,應(yīng)繼續(xù)保持Unknown的狀態(tài)。
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);

        String orderSn = orderPaidEvent.getOrderSn();
        TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);
        // 如果已經(jīng)修改成待發(fā)貨說明本地事務(wù)執(zhí)行成功,此時(shí)消費(fèi)端可以直接消費(fèi)
        if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 這里查不到的時(shí)候返回 UNKNOWN在于,有可能事務(wù)還沒有提交,回查就開始了
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

3、在庫存服務(wù)的基礎(chǔ)設(shè)施層,監(jiān)聽消息以執(zhí)行庫存扣減邏輯:

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")
public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {
    
    @Resource
    private InventoryDomainService inventoryDomainService;
    
    @Override
    public void onMessage(OrderPaidEvent orderPaidEvent) {
        super.dispatchMessage(orderPaidEvent);
    }
    
    @Override
    protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {
        // 執(zhí)行庫存扣減邏輯
        String orderSn = orderPaidEvent.getOrderSn();
        inventoryDomainService.deductionInventory(orderSn);
    }
}

通過以上步驟,我們完成了RocketMQ事務(wù)消息的發(fā)送,利用事務(wù)消息的特性保證分布式事務(wù)的最終一致性。與普通消息相比,事務(wù)消息在處理時(shí)需要實(shí)現(xiàn) RocketMQLocalTransactionListener 接口,這是事務(wù)消息的核心。

介紹完事務(wù)消息的使用,接下來我們再來聊聊事務(wù)消息的原理。

事務(wù)消息的原理

首先,讓我們思考一下,如果不使用事務(wù)消息會有什么問題。

很容易想到的一個(gè)問題就是消息丟失。當(dāng)保存訂單后由于網(wǎng)絡(luò)問題導(dǎo)致消息丟失,如下圖所示:

圖片圖片

在不使用RocketMQ的情況下,我們往往會通過 本地消息表 + 補(bǔ)償重試 的機(jī)制來保證消息一定會發(fā)送出去。其原理可以參考上篇文章 [Dailymart26:微服務(wù)中躲不過的坑 - 分布式事務(wù)]。

那RocketMQ是如何解決這個(gè)問題的呢?

1. 發(fā)送half消息,探測MQ是否正常

在基于RocketMQ的事務(wù)消息中,我們不是先執(zhí)行自身的訂單支付邏輯,而是先讓訂單系統(tǒng)發(fā)送一條 half消息 到MQ去。這個(gè)half消息本質(zhì)上是一個(gè)訂單支付成功的消息,只不過此時(shí)庫存系統(tǒng)是看不見這個(gè)half消息的。然后,我們等待接收這個(gè)half消息寫入成功的響應(yīng)通知。

圖片圖片

發(fā)送half消息的本質(zhì)其實(shí)是為了探測MQ是否仍然正常運(yùn)行。但問題來了,如上所述,消息會發(fā)生丟失,那么half消息丟失怎么辦呢?

2. half消息發(fā)送失敗

在發(fā)送half消息時(shí),由于網(wǎng)絡(luò)原因或者M(jìn)Q直接掛了,就會導(dǎo)致half消息發(fā)送失敗。這個(gè)時(shí)候訂單系統(tǒng)需要執(zhí)行一系列的回滾操作。在我們的場景中,應(yīng)該執(zhí)行退款操作,將錢退還給用戶,并告知用戶交易失敗。

3. half消息成功,訂單系統(tǒng)執(zhí)行自己的業(yè)務(wù)邏輯

如果成功收到half消息的正常響應(yīng),此時(shí)訂單系統(tǒng)應(yīng)該執(zhí)行自己的業(yè)務(wù)邏輯。在我們這個(gè)場景中,就是修改訂單數(shù)據(jù)庫狀態(tài),將其修改為待發(fā)貨狀態(tài)。這部分邏輯就對應(yīng)上述代碼中的executeLocalTransaction()方法。

圖片圖片

4. 訂單本地事務(wù)執(zhí)行失敗

如果訂單系統(tǒng)執(zhí)行本地事務(wù)失敗,則需要發(fā)送一個(gè)rollback請求給MQ,讓其刪除這條half消息。

圖片圖片

5. 訂單本地事務(wù)執(zhí)行成功

如果訂單系統(tǒng)的本地事務(wù)執(zhí)行正常,此時(shí)需要發(fā)送一個(gè)commit請求給MQ,要求MQ對之前的half消息進(jìn)行commit操作,這樣庫存系統(tǒng)就可以消費(fèi)這條消息了。

圖片圖片

訂單創(chuàng)建消息處于half狀態(tài)時(shí),庫存系統(tǒng)是看不見它的。必須等到訂單系統(tǒng)執(zhí)行commit請求,消息被commit后,庫存系統(tǒng)才能看到并獲取這條消息進(jìn)行后續(xù)處理。

6. half消息發(fā)送成功,但是沒收到half的響應(yīng)

以上就是RocketMQ事務(wù)消息的正向流程。

然而,還有一個(gè)問題:如果訂單系統(tǒng)發(fā)送half消息成功后卻沒有收到half消息的響應(yīng),該如何處理呢?

在這種情況下,訂單系統(tǒng)可能會誤以為是發(fā)送half消息到MQ失敗了。訂單系統(tǒng)就會執(zhí)行回滾流程,退還支付金額,關(guān)閉訂單。

圖片圖片

然而,此時(shí)MQ系統(tǒng)中已經(jīng)存在了一條half消息。這條half消息又該如何處理呢?

在RocketMQ中,有一套補(bǔ)償流程。RocketMQ會定期掃描處于half狀態(tài)的消息。如果一直沒有對這個(gè)消息執(zhí)行 commit/rollback 操作,超過了一定的時(shí)間,RocketMQ就會回調(diào)你的訂單系統(tǒng)的一個(gè)接口,用以確認(rèn)你本地事務(wù)的情況。

當(dāng)訂單系統(tǒng)收到MQ的回查請求時(shí),就需要檢索一下數(shù)據(jù)庫,根據(jù)訂單狀態(tài)決定執(zhí)行commit還是rollback。

這部分邏輯就對應(yīng)上述代碼中checkLocalTransaction()方法。

圖片圖片

7. rollback 或者 commit 失敗怎么辦?

通過上述說明,可以看到,RocketMQ是根據(jù)rollback或commit操作來決定half消息的狀態(tài)的。如果業(yè)務(wù)系統(tǒng)執(zhí)行了commit操作,則將half消息設(shè)置為可見,庫存系統(tǒng)可以消費(fèi);如果業(yè)務(wù)系統(tǒng)執(zhí)行了rollback操作,MQ就會刪除half消息。那么問題來了:如果訂單系統(tǒng)在執(zhí)行rollback或commit操作時(shí)失敗又該如何處理呢?

這時(shí)候仍然依賴于前文提到的回查機(jī)制。

由于此時(shí)MQ中的消息一直處于half狀態(tài),超過一定的超時(shí)時(shí)間后,MQ會發(fā)現(xiàn)這個(gè)half消息有問題,然后回調(diào)你的訂單系統(tǒng)的接口。此時(shí)訂單系統(tǒng)需要根據(jù)訂單狀態(tài)來決定執(zhí)行commit請求還是rollback請求。

以上,就是RocketMQ事務(wù)消息的原理。結(jié)合文章開頭的代碼,是不是已經(jīng)很清晰了呢?

責(zé)任編輯:武曉燕 來源: JAVA日知錄
相關(guān)推薦

2025-04-11 09:57:16

2022-06-21 08:27:22

Seata分布式事務(wù)

2022-08-26 00:02:03

RocketMQ單體架構(gòu)MQ

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2024-06-13 09:25:14

2023-01-06 09:19:12

Seata分布式事務(wù)

2023-05-12 08:02:43

分布式事務(wù)應(yīng)用

2022-07-10 20:24:48

Seata分布式事務(wù)

2019-08-19 10:24:33

分布式事務(wù)數(shù)據(jù)庫

2025-01-15 08:34:00

分布式事務(wù)服務(wù)

2023-09-14 15:44:46

分布式事務(wù)數(shù)據(jù)存儲

2022-11-06 19:28:02

分布式鎖etcd云原生

2021-08-06 08:33:27

Springboot分布式Seata

2024-06-28 09:07:19

2020-03-31 08:05:23

分布式開發(fā)技術(shù)

2024-01-05 07:28:50

分布式事務(wù)框架

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2024-11-28 15:11:28

2022-01-12 10:02:02

TCC模式 Seata

2023-09-14 15:38:55

云原生分布式架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號