自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

完美解決,RocketMQ如何支持多事務(wù)消息?

開發(fā) 前端
本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener? 的問題。通過引入事務(wù)消息處理接口 TransactionMessageHandler,我們將原有的事務(wù)處理器改造成了一個分發(fā)器,使得在 DailyMart 項目中可以輕松處理多事務(wù)消息的場景。

今天我們將解決使用RocketMQ事務(wù)消息時可能遇到的一個常見問題:如何讓其支持多事務(wù)消息?

1. 問題背景

在實際開發(fā)中,我們常常會面臨多事務(wù)消息的場景,例如在DailyMart的訂單模塊中,用戶支付后需要調(diào)用庫存服務(wù)進(jìn)行庫存扣減,而在訂單確認(rèn)收貨后需要調(diào)用用戶服務(wù)實現(xiàn)積分贈送。這兩個業(yè)務(wù)邏輯都需要通過事務(wù)消息來保證分布式事務(wù)。

為了處理這種情況,我們可能會考慮在訂單模塊中創(chuàng)建兩個事務(wù)消息監(jiān)聽器,分別用于處理庫存扣減和積分贈送的事務(wù)處理和事務(wù)回查。

@Component
@Slf4j
//處理訂單支付的事務(wù)監(jiān)聽器
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
    //處理訂單支付邏輯
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
      //檢查訂單處理邏輯
   }
}

@Component
@Slf4j
//處理訂單收貨的事務(wù)監(jiān)聽器
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
   }
}

然而,當(dāng)我們信心滿滿地完成業(yè)務(wù)邏輯編寫并啟動服務(wù)時,可能會遇到如下錯誤:rocketMQTemplate already exists RocketMQLocalTransactionListener

圖片圖片

在rocketmq-spring-boot-starter版本低于2.1.0的項目中,可以使用多個 @RocketMQTransactionListener 監(jiān)聽不同的 txProducerGroup 來發(fā)送不同類型的事務(wù)消息到topic。然而,從 RocketMQ-Spring 2.1.0 版本開始,注解 @RocketMQTransactionListener 不能設(shè)置 txProducerGroup、ak、sk,這些值均需與對應(yīng)的 RocketMQTemplate 保持一致。通過閱讀源碼 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已經(jīng)存在了 RocketMQTransactionListener 則會出現(xiàn)上述錯誤。

圖片圖片

2. 如何解決

為了在保證系統(tǒng)只有一個 RocketMQTransactionListener 的前提下實現(xiàn)多事務(wù)消息,我們可以將 RocketMQLocalTransactionListener 不處理具體業(yè)務(wù)邏輯,而是將其作為一個分發(fā)器使用。

在生產(chǎn)者發(fā)送事務(wù)消息時指定對應(yīng)的事務(wù)處理器 ,并將事務(wù)處理器放置在消息頭上發(fā)送出去,在 RocketMQTransactionListener 中根據(jù)消息頭選擇具體的事務(wù)處理器來實現(xiàn)業(yè)務(wù)邏輯。

具體實現(xiàn)如下:

2.1 定義事務(wù)消息處理接口

首先,定義公共的事務(wù)消息處理接口,所有事務(wù)消息都實現(xiàn)此接口而非 RocketMQ 默認(rèn)的 RocketMQLocalTransactionListener。

public interface TransactionMessageHandler {
    
    /**
    * 執(zhí)行本地事務(wù)
    * @param payload 消息體
    * @param arg 參數(shù)
    */
    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);
    
    /**
     * 檢查本地執(zhí)行狀態(tài)
     * @param payload 消息體
     * @return 執(zhí)行結(jié)果
     */
    RocketMQLocalTransactionState checkLocalTransaction(Object payload);
    
}

2.2 修改事務(wù)消息發(fā)送工具類,指定消息處理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) {  
  if(transactionMessageListener == null){
    throw new IllegalArgumentException("transactionMessageListener must not null");
  }
  
  String destination = buildDestination(topic, tag);

  Message<T> sendMessage = MessageBuilder.withPayload(message)
    .setHeader(RocketMQHeaders.KEYS, message.getKey())
    .setHeader(SOURCE_HEADER, message.getSource())
    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())
    .build();
  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);

  log.info("[{}]事務(wù)消息[{}]發(fā)送結(jié)果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));

  return sendResult;
}

2.3 修改RocketMQ事務(wù)消息監(jiān)聽器

@Slf4j
@RocketMQTransactionListener
public class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {
    
    private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;
    
    public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {
        this.transactionMessageHandlerMap = transactionMessageHandlerMap;
    }
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("消費者收到事務(wù)消息[{}]", JSONObject.toJSON(message));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        
        RocketMQLocalTransactionState state;
        Object payload = message.getPayload();
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.executeLocalTransaction(payload, arg);
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("消費者收到事務(wù)回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        RocketMQLocalTransactionState state;
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.checkLocalTransaction(message.getPayload());
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
}

在上述代碼中,根據(jù)消息頭中的TRANSACTION_MESSAGE_HEADER參數(shù)選擇對應(yīng)的事務(wù)處理器來處理事務(wù)消息。

在 DailyMart 中有一個公共組件 dailymart-rocketmq-spring-boot-starter 專門用于 RocketMQ 消息發(fā)送監(jiān)聽的封裝,因此我們也將事務(wù)消息的處理邏輯封裝到了此組件中。

圖片圖片

2.4 修改事務(wù)消息處理邏輯

所有的事務(wù)消息處理邏輯都實現(xiàn) TransactionMessageHandler 接口,以訂單支付的處理邏輯為例:

@Component
@Slf4j
public class OrderPaidTransactionConsumer implements TransactionMessageHandler {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
}

2.5 修改事務(wù)消息發(fā)送邏輯,指定事務(wù)處理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);

小結(jié)

本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener 的問題。通過引入事務(wù)消息處理接口 TransactionMessageHandler,我們將原有的事務(wù)處理器改造成了一個分發(fā)器,使得在 DailyMart 項目中可以輕松處理多事務(wù)消息的場景。

責(zé)任編輯:武曉燕 來源: JAVA日知錄
相關(guān)推薦

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2023-07-17 08:34:03

RocketMQ消息初體驗

2021-04-15 09:17:01

SpringBootRocketMQ

2021-10-03 21:41:13

RocketMQKafkaPulsar

2021-03-04 06:49:53

RocketMQ事務(wù)

2023-09-04 08:00:53

提交事務(wù)消息

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2022-07-04 11:06:02

RocketMQ事務(wù)消息實現(xiàn)

2024-06-13 09:25:14

2024-10-22 08:01:15

2024-08-06 09:55:25

2014-03-25 10:57:42

Android消息推送方案

2024-12-04 15:38:43

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2021-02-02 11:01:31

RocketMQ消息分布式

2023-12-15 13:08:00

RocketMQ中間件消費順序

2022-12-22 10:03:18

消息集成

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-18 09:03:01

RocketMQ場景消息
點贊
收藏

51CTO技術(shù)棧公眾號