自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

訂單系統(tǒng)中的數(shù)據(jù)一致性方案及RocketMQ事務(wù)消息詳解

開(kāi)發(fā) 架構(gòu)
生產(chǎn)中存在兩種常用的解決方案:TCC和可靠消息最終一致性。前者要求強(qiáng)一致,后者要求最終一致。強(qiáng)一致主要用于核心模塊,例如交易/訂單等。最終一致一般用于邊緣模塊例如庫(kù)存,通過(guò)mq去通知,保證最終一致性,也可以業(yè)務(wù)解耦。

數(shù)據(jù)一致性是確保業(yè)務(wù)操作正確執(zhí)行的基礎(chǔ),本文將以電商系統(tǒng)為例,詳細(xì)分析其分布式系統(tǒng)中的一致性問(wèn)題。訂單核心流程:

訂單服務(wù) -> 創(chuàng)建訂單 -> 庫(kù)存服務(wù) -> 扣減庫(kù)存 -> 積分服務(wù) -> 增加積分 -> 倉(cāng)儲(chǔ)服務(wù) -> 通知發(fā)貨

分布式一致性技術(shù)方案

生產(chǎn)中存在兩種常用的解決方案:TCC和可靠消息最終一致性。前者要求強(qiáng)一致,后者要求最終一致。

強(qiáng)一致主要用于核心模塊,例如交易/訂單等。最終一致一般用于邊緣模塊例如庫(kù)存,通過(guò)mq去通知,保證最終一致性,也可以業(yè)務(wù)解耦。

TCC:

訂單服務(wù)、庫(kù)存服務(wù)、積分服務(wù) -> 綁定為一個(gè)TCC事務(wù);

撤銷(xiāo)訂單時(shí),回滾扣減庫(kù)存和增加積分。

可靠消息最終一致性:

可以去發(fā)送一個(gè)請(qǐng)求給消息中間件,由中間件保證一定會(huì)把消息交給下游的庫(kù)存服務(wù)去扣減庫(kù)存,倉(cāng)儲(chǔ)服務(wù)去通知發(fā)貨等;

如果這個(gè)過(guò)程中有消息發(fā)送失敗,則可靠消息中間件應(yīng)該保證不停的重試投遞消息。

本文重點(diǎn)分析如何利用RocketMQ的事務(wù)消息實(shí)現(xiàn)最終一致性,TCC事務(wù)將在另外一篇文章分享。

事務(wù)消息

RocketMQ的事務(wù)消息有兩個(gè)核心概念(流程):

  • Half Message,半消息

暫時(shí)不能被 Consumer消費(fèi)的消息。Producer已經(jīng)把消息發(fā)送到 Broker端,但是此消息的狀態(tài)被標(biāo)記為不能投遞,處于這種狀態(tài)下的消息稱(chēng)為半消息。事實(shí)上,該狀態(tài)下的消息會(huì)被放在一個(gè)叫做 RMQ_SYS_TRANS_HALF_TOPIC的主題下。

當(dāng) Producer端對(duì)它二次確認(rèn)后,也就是 Commit之后,Consumer端才可以消費(fèi)到;那么如果是Rollback,該消息則會(huì)被刪除,永遠(yuǎn)不會(huì)被消費(fèi)到。

  • 事務(wù)狀態(tài)回查

可能會(huì)因?yàn)榫W(wǎng)絡(luò)原因、應(yīng)用問(wèn)題等,導(dǎo)致Producer端一直沒(méi)有對(duì)這個(gè)半消息進(jìn)行確認(rèn),那么這時(shí)候 Broker服務(wù)器會(huì)定時(shí)掃描這些半消息,主動(dòng)找Producer端查詢(xún)?cè)撓⒌臓顟B(tài)。

簡(jiǎn)而言之,RocketMQ事務(wù)消息的實(shí)現(xiàn)原理就是基于兩階段提交和事務(wù)狀態(tài)回查,來(lái)決定消息最終是提交還是回滾的。

核心流程

結(jié)合整個(gè)訂單接口服務(wù),分為兩個(gè)支付鏈路,一個(gè)是核心鏈路(訂單業(yè)務(wù)),一個(gè)是非核心鏈路(wms) 整個(gè)流程。

先向RocketMQ發(fā)送half msg,然后調(diào)用核心鏈路。核心鏈路要是返回失敗,就會(huì)走失敗的邏輯:退款,更改訂單狀態(tài)為取消,再給rocketmq發(fā)送callback廢棄掉剛才的消息。

如果成功,就commit msg讓消費(fèi)者可以消費(fèi)。如果在等待期間,一直沒(méi)有callback/commit那么mq就會(huì)走回調(diào)查詢(xún)具體的狀態(tài)。

消費(fèi)者接收到消息后,消費(fèi)完成就回復(fù)mq一個(gè)ack, 如果消費(fèi)失敗了,mq就會(huì)重新投遞或者換一個(gè)服務(wù)投遞。使用rocketmq的half msg機(jī)制,可以實(shí)現(xiàn)這一套固定模式的最終一致性。

代碼實(shí)現(xiàn)

【核心鏈路-訂單、庫(kù)存、積分】

核心業(yè)務(wù)流程

【步驟一】:發(fā)送事務(wù)消息(half msg

springboot下,RocketMQ的集成還是很簡(jiǎn)單的,引入
rocketmq-spring-boot-starter依賴(lài)、添加相關(guān)配置后,即可利用RocketMQTemplate的sendMessageInTransaction方法發(fā)送消息:

/**
 * 發(fā)送事務(wù)消息
 *
 * @param topic   topic
 * @param message 消息對(duì)象
 */
public void sendMessageInTransaction(String topic, Object message) {
    String transactionId = UUID.randomUUID().toString();
    TransactionSendResult result = this.rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .build(), message);
}

【步驟二】:broker回調(diào),執(zhí)行本地事務(wù)

消息發(fā)送成功之后,系統(tǒng)需要知道RocketMQ的broker是否成功收到了消息,這里主要借助
RocketMQTransactionListener注解實(shí)現(xiàn)。在成功收到回調(diào)后,會(huì)觸發(fā)executeLocalTransaction來(lái)
執(zhí)行核心業(yè)務(wù)(訂單、庫(kù)存、積分等)。

@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    /**
     * 執(zhí)行本地事務(wù),即處理核心鏈路
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
          	// 事務(wù)transactionId
            String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID").toString();
            // 本地事務(wù),執(zhí)行核心鏈路業(yè)務(wù)
            String payload = new String((byte[]) msg.getPayload());
            OrderTranscationMesageDTO data = JSONObject.parseObject(payload, OrderTranscationMesageDTO.class);
            orderService.executeCoreBusiness(data.getPayMoney(),data.getOrderDO(), data.getTransactionNo(),data.getPayType(),transactionId);
        } catch (Exception e) {
            log.error("本地事務(wù)執(zhí)行異常:{}事務(wù)消息回滾", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
      	log.info("提交事務(wù)消息");
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 校驗(yàn)本地事務(wù)(broker未收到提交或回滾事務(wù)消息時(shí)主動(dòng)回查)
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事務(wù)transactionId
        String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID").toString();
        // 數(shù)據(jù)庫(kù)能查到,說(shuō)明本地事務(wù)執(zhí)行失敗,需要回滾
        if (Objects.isNull(transcationLogDao.getById(transactionId))){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

}

執(zhí)行核心業(yè)務(wù)的同時(shí),還有一個(gè)很重要的環(huán)節(jié),即記錄事務(wù)ID。為什么要記錄事務(wù)ID呢?假想這樣一種情況:我們本地事務(wù)即核心的業(yè)務(wù)都成功執(zhí)行后,需要提交RocketMQ的事務(wù)消息,只有提交后,消息才能被消費(fèi)者(即非核心業(yè)務(wù)系統(tǒng),如倉(cāng)儲(chǔ))消費(fèi),但是如果提交時(shí),網(wǎng)絡(luò)出現(xiàn)異常,broker一直未收到怎么辦呢,這時(shí)利用transactionId,也是RocketMA的回查機(jī)制了。

/**
  * 核心業(yè)務(wù),并記錄RocketMQ事務(wù)ID
*/
@GlobalTransactional  // seata全局事務(wù)
public void executeCoreBusiness(BigDecimal payMoney, AppDerivativeGoodsOrderDO orderDO, String transactionNo, String payType,String transactionId) {
		// 核心業(yè)務(wù)偽代碼
    orderService.execute();
    storeFeign.execute();
    scoreFeign.execute();
    
    // 數(shù)據(jù)庫(kù)記錄rocket事務(wù)消息ID 用于異常情況下的回查
    if (Objects.nonNull(transactionId)){
        //寫(xiě)入事務(wù)日志
        TransactionLogDO log = new TransactionLogDO();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(orderDO.getId()));
        transcationLogDao.save(log);
    }
}

本地事務(wù)執(zhí)行成功之后,記錄事務(wù)ID,即便提交時(shí),出現(xiàn)網(wǎng)絡(luò)異常,broker遲遲未收到,也可以利用回查機(jī)制,即checkLocalTransaction方法,得知本地事務(wù)是否執(zhí)行成功。

用于記錄事務(wù)的表結(jié)構(gòu):

CREATE TABLE `transaction_log` (
  `id` varchar(32) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '事務(wù)ID',
  `business` varchar(32) COLLATE utf8mb4_bin NOT NULL COMMENT '業(yè)務(wù)標(biāo)識(shí)',
  `foreign_key` varchar(32) COLLATE utf8mb4_bin NOT NULL COMMENT '對(duì)應(yīng)業(yè)務(wù)表中的主鍵',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

【非核心鏈路-wms倉(cāng)儲(chǔ)】

【步驟三】:消費(fèi)消息,處理其他業(yè)務(wù)

上述的步驟保證了核心業(yè)務(wù)與RocketMQ消息的一致性,即核心業(yè)務(wù)成功,消息就一定會(huì)被發(fā)送到broker。接下來(lái)就是非核心業(yè)務(wù)(如倉(cāng)儲(chǔ)物流)監(jiān)聽(tīng)消息,通過(guò)@RocketMQMessageListener實(shí)現(xiàn):

@RocketMQMessageListener(topic = "order_topic",consumerGroup = "order_group")
public class TestListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        if(messageExt.getReconsumeTimes() >= 3){
            log.error("消息重試已達(dá)最大次數(shù),將通知業(yè)務(wù)人員排查問(wèn)題。{}",messageExt.getMsgId());
            //消息處理,第3次處理失敗后,發(fā)送郵件通知人工介入
            sendMail(messageExt.getMsgId());
        }
        // 倉(cāng)儲(chǔ)物流相關(guān)業(yè)務(wù) 
        wmsService.execute(messageExt.getBody());
    }
}

非核心業(yè)務(wù),接受不了消息后,再處理相關(guān)業(yè)務(wù),其實(shí),此時(shí)已經(jīng)與核心業(yè)務(wù)脫離了關(guān)聯(lián),因此,不管它成功與否,核心業(yè)務(wù)都已經(jīng)完成了,這也為何是最終一致性,而非強(qiáng)一致性。

最終一致性主要依賴(lài)的是RocketMQ的重試機(jī)制以及補(bǔ)償處理(比人工干預(yù))。如上述代碼中,假若wmsService執(zhí)行業(yè)務(wù)過(guò)程拋出了異常,即消息消費(fèi)失敗,RocketMQ則會(huì)自動(dòng)重發(fā)。默認(rèn)16次,可以通過(guò)配置修改。另外,可以在重試一定次數(shù)后,做補(bǔ)償處理,例如,將執(zhí)行失敗的任務(wù)記錄在數(shù)據(jù)庫(kù),后續(xù)定時(shí)任務(wù)補(bǔ)償處理,抑或是像上述代碼,發(fā)送郵件通知相關(guān)人員。

冪等性消費(fèi)

消息的重發(fā),有可能帶來(lái)另外一個(gè)問(wèn)題,重復(fù)消費(fèi)。不做處理,就可能導(dǎo)致數(shù)據(jù)重復(fù)插入,倉(cāng)儲(chǔ)系統(tǒng)就可能重復(fù)發(fā)貨。

冪等性:就是用戶(hù)對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。

實(shí)現(xiàn)冪等性消費(fèi)的方式有很多種,具體怎么做,根據(jù)自己的情況來(lái)看。一種常用的方式就是利用redis緩存,在執(zhí)行操作之前,先到緩存中查詢(xún),該操作是否已執(zhí)行過(guò)。

總結(jié)

本文重點(diǎn)闡述了基于RocketMQ來(lái)實(shí)現(xiàn)最終一致性的分布式事務(wù)案例。

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2021-03-04 06:49:53

RocketMQ事務(wù)

2023-12-01 13:51:21

數(shù)據(jù)一致性數(shù)據(jù)庫(kù)

2019-12-17 08:40:33

微服務(wù)架構(gòu)數(shù)據(jù)

2019-01-15 17:58:03

微服務(wù)架構(gòu)數(shù)據(jù)

2009-06-18 09:18:08

Oracle檢索數(shù)據(jù)數(shù)據(jù)一致性事務(wù)恢復(fù)

2021-11-01 21:15:54

微服務(wù)系統(tǒng)數(shù)據(jù)

2023-08-22 09:32:44

邊緣計(jì)算管理

2025-03-27 08:20:54

2023-11-22 12:55:59

微服務(wù)架構(gòu)數(shù)據(jù)庫(kù)

2022-07-21 06:54:28

微服務(wù)系統(tǒng)RocketMQ

2018-09-11 10:46:10

緩存數(shù)據(jù)庫(kù)一致性

2024-12-26 15:01:29

2021-12-05 21:06:27

軟件

2021-06-16 08:33:02

分布式事務(wù)ACID

2023-09-07 08:11:24

Redis管道機(jī)制

2021-10-13 09:55:11

流計(jì)算引擎數(shù)據(jù)

2021-10-18 10:30:59

流計(jì)算阿里云

2023-06-07 08:10:29

2023-12-27 14:23:10

微服務(wù)數(shù)據(jù)存儲(chǔ)

2023-06-29 08:00:59

redis數(shù)據(jù)MySQL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)