自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于 RocketMQ 的可靠事件處理策略

開發(fā) 架構
事務消息是RocketMQ提供的一種高級消息類型,支持在分布式場景下消息生產和本地事務的最終一致性。我們可以分別從生產者和消費者維度出發(fā)來分析可靠事件實現上的需求。

Seata 框架本身并沒有內置針對可靠事件模式的解決方案,但我們可以使用另一款已經介紹過的框架來實現這一目標,就是 RocketMQ。

RocketMQ 為開發(fā)人員提供了事務消息這一消息類型,專門用來應對分布式環(huán)境下的數據一致性問題。

事務消息的基本概念

事務消息是RocketMQ提供的一種高級消息類型,支持在分布式場景下消息生產和本地事務的最終一致性。我們可以分別從生產者和消費者維度出發(fā)來分析可靠事件實現上的需求。

  • 消息發(fā)送方:對于消息發(fā)送方而言,我們需要解決執(zhí)行本地事務與發(fā)送消息的原子性問題,即保證本地事務執(zhí)行成功,消息一定發(fā)送成功。
  • 消息接收方:對于消息接收方而言,我們需要解決接收消息與本地事務的原子性問題,即保證接收消息成功后,本地事務也一定執(zhí)行成功。

事務消息的出現完美解決了可靠事件模式執(zhí)行過程中可能出現的問題。事務消息提供了類似X/Open XA的分布事務功能,通過事務消息能達到分布式事務的最終一致性。

那么,RocketMQ 是如何做到這一點的呢?關鍵就在于它所提供的半消息機制。

所謂半消息(Half Message),是指暫不能投遞的消息。發(fā)送方已經將消息成功發(fā)送到了服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成暫不能投遞狀態(tài),處于該種狀態(tài)下的消息就是半消息。

介紹完半消息的概念,我們再來明確什么是半消息回查。

我們知道由于網絡閃斷、生產者應用重啟等原因,可能會導致某條事務消息的二次確認丟失。RocketMQ 服務端通過掃描發(fā)現某條消息長期處于半消息狀態(tài)時,就會主動向消息生產者詢問該消息的最終狀態(tài)(Commit 或 Rollback),這一過程就是半消息回查。圖 1 展示了 RocketMQ 中事務消息的整體架構。

圖1 RocketMQ 事務消息架構圖1 RocketMQ 事務消息架構

進一步,我們梳理 RocketMQ 事務消息的執(zhí)行過程,如圖2所示。

圖2 RocketMQ 事務消息執(zhí)行過程圖2 RocketMQ 事務消息執(zhí)行過程

可以看到,圖 2 存在服務A和服務B這兩個微服務。其中服務 A 是消息發(fā)布者,而服務 B 是消息消費者,我們需要確保兩者之間數據的一致性。這里有 7 個步驟。

  1. 服務 A 向 RocketMQ 服務端發(fā)送事務消息。
  2. RocketMQ 將消息持久化成功之后,向服務A確認消息已經發(fā)送成功,此時消息為半消息。
  3. 服務 A 開始執(zhí)行本地事務邏輯。
  4. 服務 A 根據本地事務執(zhí)行結果向 RocketMQ 提交二次確認(Commit 或是 Rollback)。如果 RocketMQ 收到 Commit 結果,則將半消息標記為可投遞,服務 B 最終將收到該消息。而如果 RocketMQ 收到 Rollback 結果,則刪除半消息,服務 B 將不會接受該消息。
  5. 在斷網或者是應用重啟的特殊情況下,步驟 4 提交的二次確認最終未到達 RocketMQ,經過一定時間后 RocketMQ 將基于該消息向服務 A 發(fā)起消息回查。
  6. 服務 A 收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。
  7. 服務 A 根據檢查得到的本地事務的最終狀態(tài)再次提交二次確認,RocketMQ 仍按照步驟 4 對半消息進行操作。

圖 2 更多是站在消息發(fā)布者的角度看待事務消息的發(fā)布流程。而針對消息消費而言,如果消費者處理事務消息時出現異常,RocketMQ 會進行重試操作,直到消息消費和本地事務處理都成功。這是一種回調機制,會被 RocketMQ 自動調用。

事務消息開發(fā)模式

介紹完 RocketMQ 事務消息的基本概念和執(zhí)行流程之后,我們接著介紹它的開發(fā)模式。

實現消息發(fā)布者

當我們在微服務架構中引入事務消息之前,需要創(chuàng)建一張事務執(zhí)行記錄表。事務執(zhí)行記錄表的作用有兩個:一個是實現事務回查,另一個則是實現業(yè)務層冪等控制。

事務執(zhí)行記錄表的創(chuàng)建腳本如以下代碼所示。

代碼清單1 事務執(zhí)行記錄表 SQL 定義代碼

CREATE TABLE `tx_record` (
    `tx_no` varchar(64) NOT NULL COMMENT '事務Id',
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
    PRIMARY KEY (`tx_no`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事務記錄表'

接下來我們要引入 RocketMQ 內置的TransactionListener接口。

為了實現事務消息,開發(fā)人員的主要開發(fā)工作量就體現在對這個接口的實現過程中。TransactionListener接口的定義如下所示。

代碼清單2 TransactionListener接口定義代碼

public interface TransactionListener {
    //當發(fā)送事務消息成功之后,該方法會被觸發(fā),本地事務將被執(zhí)行
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    //當沒有收到事務消息的響應時,服務器會發(fā)送確認消息來檢查事務狀態(tài),該方法會被觸發(fā)并獲取本地事務狀態(tài)
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

可以看到,TransactionListener接口的兩個方法分別完成了本地事務執(zhí)行和本地事務回查這兩個核心操作。那么我們應該如何實現這兩個方法呢?這里給出這兩個方法的執(zhí)行偽代碼。

代碼清單3 TransactionListener接口兩個方法實現偽代碼

executeLocalTransaction {
 執(zhí)行本地事務
 如果失敗就選擇回滾事務,反之提交事務
}

checkLocalTransaction {
 實現事務回查
 根據事務執(zhí)行記錄判斷,已執(zhí)行則提交事務
}

注意:這兩個方法需要消息的發(fā)布者來實現,但調用方是 RocketMQ 自身,而且這個調用過程是自動觸發(fā)的,不需要開發(fā)做任何干預。

圖 3 圍繞消息發(fā)布者展示了其所需要實現的各個核心步驟。

圖3 事務消息中消息發(fā)布者實現過程圖3 事務消息中消息發(fā)布者實現過程

如果我們使用 Spring 框架來集成 RocketMQ,那么圖 3 中的業(yè)務服務實現類的實現過程可以參考如下代碼示例。

代碼清單3 消息發(fā)布端業(yè)務服務實現類示例代碼。

@Service
public class CustomerTicketServiceImpl implements ICustomerTicketService {
    @Autowired
    TxRecordMapper txRecordMapper;

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Override
    public void generateTicket(AddCustomerTicketReqVO addCustomerTicketReqVO) {
        //從VO中創(chuàng)建TicketGeneratedEvent
        TicketGeneratedEvent ticketGeneratedEvent = createTicketGeneratedEvent(addCustomerTicketReqVO);

        //將Event轉化為JSON對象
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("ticketGeneratedEvent",ticketGeneratedEvent);
        String jsonString = jsonObject.toJSONString();

        //生成消息對象
        Message<String> message = MessageBuilder.withPayload(jsonString).build();

        //發(fā)送事務消息
        rocketMQTemplate.sendMessageInTransaction("producer_group_ticket","topic_ticket",message,null);
    }

    @Override
    @Transactional
    public void doGenerateTicket(TicketGeneratedEvent ticketGeneratedEvent) {
        //冪等判斷
        if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){
            return ;
        }

        //插入工單
        CustomerTicket customerTicket = CustomerTicketConverter.INSTANCE.convertEvent(ticketGeneratedEvent);
        customerTicket.setStatus(1);
        save(customerTicket);

        //添加事務日志
        txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());
    }
    ...
}

上述代碼展示的是一個插入客服工單(CustomerTicket)的過程,generateTicket和doGenerateTicket方法分別對應圖 3 中的發(fā)送消息和執(zhí)行本地事務這兩個環(huán)節(jié)。

注意:這里使用了RocketMQTemplate的sendMessageInTransaction方法來發(fā)送事務消息。同時,我們也看到了事務執(zhí)行記錄表的一種應用場景,即實現業(yè)務層冪等控制。

接下來繼續(xù)實現圖3所示的TransactionListener接口,示例代碼如下:

代碼清單4 TransactionListener接口實現類示例代碼。

@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_ticket")
public class ProducerListener implements RocketMQLocalTransactionListener {
    @Autowired
    ICustomerTicketService customerTicketService;

    @Autowired
    TxRecordMapper txRecordMapper;

    //事務消息發(fā)送后的回調方法,當消息發(fā)送給MQ成功,此方法被回調
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //解析消息,轉成Event對象
            TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);

            //執(zhí)行本地事務
            customerTicketService.doGenerateTicket(ticketGeneratedEvent);

            //當返回RocketMQLocalTransactionState.COMMIT,自動向MQ發(fā)送commit消息,MQ將消息的狀態(tài)改為可消費
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            //如果本地事務執(zhí)行失敗,就將消息設置為回滾狀態(tài)
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //事務狀態(tài)回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析消息,轉成Event對象
        TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);

        //根據事務Id判斷是否存在已執(zhí)行的事務
        Boolean isTxNoExisted = Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()));

        //如果事務已執(zhí)行則返回COMMIT,反之返回UNKNOWN狀態(tài)
        if(isTxNoExisted){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    ...
}

這段代碼清晰地展示了TransactionListener接口中兩個核心方法的實現過程。在executeLocalTransaction方法中,我們通過調用CustomerTicketService業(yè)務服務類的doGenerateTicket方法完成了本地事務;而在checkLocalTransaction方法中,我們則實現了事務回查機制。這里同樣展示了事務執(zhí)行記錄表的另一種應用場景,即實現事務回查。

實現消息消費者

類似,當使用事務消息時,消息消費者的實現過程同樣遵循一定的開發(fā)規(guī)范,如圖 4 所示。

圖4 事務消息中消息消費者實現過程圖4 事務消息中消息消費者實現過程

可以看到,相比于消息發(fā)布者,消息消費者的實現過程要簡單很多。

代碼清單5 消息消費實現類示例代碼。

@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_ticket",topic = "topic_ticket")
public class Consumer implements RocketMQListener<String> {
    @Autowired
    IChatRecordService chatRecordService;

    //接收消息
    @Override
    public void onMessage(String message) {
        log.info("開始消費消息:{}",message);

        //解析消息
        JSONObject jsonObject = JSONObject.parseObject(message);
        String ticketGeneratedEventString = jsonObject.getString("ticketGeneratedEvent");

        //轉成TicketGeneratedEvent
        TicketGeneratedEvent ticketGeneratedEvent = JSONObject.parseObject(ticketGeneratedEventString, TicketGeneratedEvent.class);

        //添加本地聊天記錄
        chatRecordService.generateChatRecord(ticketGeneratedEvent);
    }
}

可以看到,這個消息消費者的實現過程沒有任何特殊之處,我們只需要實現RocketMQListener接口的onMessage方法,并在該方法中調用業(yè)務服務實現類中的業(yè)務方法即可。

消費者端的業(yè)務服務實現類的實現過程如下。

代碼清單6 消息消費端業(yè)務服務實現類示例代碼:

@Service
public class ChatRecordServiceImpl implements IChatRecordService {
    @Autowired
    TxRecordMapper txRecordMapper;

    @Override
    @Transactional
    public void generateChatRecord(TicketGeneratedEvent ticketGeneratedEvent) {
        //冪等判斷
        if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){
            return ;
        }

        //插入聊天記錄
        ChatRecord chatRecord = ChatRecordConverter.INSTANCE.convertEvent(ticketGeneratedEvent);
        save(chatRecord);

        //添加事務日志
        txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());
    }
}

這里同樣通過事務執(zhí)行記錄表實現了業(yè)務層冪等控制,并最終完成本地事務的提交。

作為總結,我們使用時序圖來詳細展示事務消息發(fā)送和消費過程,如圖 5 所示。

圖5 事務消息發(fā)送和消費時序圖圖5 事務消息發(fā)送和消費時序圖


責任編輯:武曉燕 來源: 一安未來
相關推薦

2021-02-02 11:01:31

RocketMQ消息分布式

2021-04-27 07:52:18

RocketMQ消息投遞

2021-09-12 14:30:43

虛擬化存儲

2019-04-03 10:19:41

云遷移云端云提供商

2022-12-27 17:56:40

ack機制RocketMQ

2013-12-12 16:14:21

storm入門教程storm消息處理

2011-04-19 10:57:18

2022-11-16 08:31:12

AOPRocketMQ組件

2010-09-15 10:20:03

2022-02-09 15:44:20

RocketMQLinux系統(tǒng)

2009-12-23 16:06:46

WPF事件策略

2011-03-29 13:10:56

SQL Server

2024-08-14 08:11:41

2023-03-09 10:22:00

SpringBootRabbitMQ

2023-04-03 08:39:33

中間件go語言

2023-03-14 07:31:17

EoscGo語言

2023-11-08 00:23:08

網關API

2024-02-28 09:12:27

RocketMQKosmosAZ

2024-10-10 08:34:34

事務外包模式

2017-03-14 13:51:23

AndroidView事件分發(fā)和處理
點贊
收藏

51CTO技術棧公眾號