自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

故障現(xiàn)場 | 消息發(fā)送居然有這么大的坑

開發(fā) 前端
RocketMQ事務(wù)消息是一種支持分布式事務(wù)的消息模型,將消息生產(chǎn)和消費(fèi)與業(yè)務(wù)邏輯綁定在一起,確保消息發(fā)送和事務(wù)執(zhí)行的原子性,保證消息的可靠性。

1. 問題&分析

基于 MQ 進(jìn)行系統(tǒng)間的解耦真的是太香了,小艾還沉浸在喜悅中久久不能自拔。但,打臉的事已經(jīng)在路上了。。。。

1.1. 案例

昨天下班,在電梯里,物流組的晨姐偶遇了小艾,就一個(gè)技術(shù)問題向小艾進(jìn)行了反饋。具體來說,物流系統(tǒng)有一項(xiàng)功能,即實(shí)時(shí)監(jiān)控訂單的支付成功事件,一旦檢測(cè)到,便會(huì)為顧客準(zhǔn)備物資,進(jìn)而安排快遞發(fā)貨。今天系統(tǒng)出現(xiàn)了幾次空指針異常。查閱日志,似乎是在反查訂單信息時(shí),沒有獲取到預(yù)期的訂單數(shù)據(jù)。但查詢物流系統(tǒng),物流單已經(jīng)成功生成,對(duì)業(yè)務(wù)操作并未造成實(shí)際影響,但這個(gè)問題還是值得注意。由于這個(gè)問題并沒有立即影響到業(yè)務(wù)流程,所以晨姐沒有在第一時(shí)間聯(lián)系小艾進(jìn)行確認(rèn)。

在小艾正準(zhǔn)備啟動(dòng)IDEA尋找線索的時(shí)候,算法組的負(fù)責(zé)人龍哥急匆匆地走了過來,向小艾反映了他們團(tuán)隊(duì)遇到的一個(gè)重要問題。為了提升推薦效果,算法組也會(huì)實(shí)時(shí)監(jiān)控訂單支付成功事件,并以此為依據(jù)重新計(jì)算用戶的推薦商品。然而,今天早上,他們突然收到了一系列的報(bào)警信息,問題同樣是無法查詢到訂單信息,這個(gè)現(xiàn)象與物流系統(tǒng)的問題高度相似。

小艾隨口問道:“這個(gè)問題會(huì)自己修復(fù)嗎?”龍哥愣了一下,回答說:“以前會(huì)自動(dòng)修復(fù),但剛剛那條數(shù)據(jù)還在報(bào)錯(cuò)?!彪S后,龍哥提供了報(bào)錯(cuò)的訂單ID,小艾立即去數(shù)據(jù)庫中查詢,卻驚訝地發(fā)現(xiàn),這條數(shù)據(jù)竟然不存在!

看到這種場景,小艾有些慌神,連龍哥什么時(shí)候走的都沒有注意到。目光直勾勾的盯著電腦屏幕發(fā)呆:

@Transactional
public void paySuccess(String orderId, String token){
    // 驗(yàn)證 token,保障有效性
    checkToke(token);

    // 加載訂單信息
    Order order = this.orderRepository.getById(orderId);
    if (order == null){
        throw new RuntimeException("訂單不存在");
    }
    // 支付成功,更新訂單狀態(tài)
    order.paySuccess();

    // 將變更更新到數(shù)據(jù)庫
    this.orderRepository.update(order);

    // 發(fā)送支付成功事件
    this.eventPublisher.publishEvent(new OrderPaidEvent(order));
    // 執(zhí)行其他業(yè)務(wù)邏輯
    doSomething();
}

// 監(jiān)聽變更,發(fā)布 MQ
@EventListener
public void handle(OrderPaidEvent event){
    rocketMQTemplate.convertAndSend("order_event", event);
}

1.2. 問題分析

兩個(gè)問題看起來一樣,但又有區(qū)別。當(dāng)下游在收到 MQ 消息時(shí)

  1. 無法查不到訂單,但稍后能自我修復(fù)
  2. 一直查不到訂單,數(shù)據(jù)庫里還沒有,無法進(jìn)行自我修復(fù)

小艾無意間看到 paySuccess 方法上的 @Transactional 頓時(shí)茅塞頓開。

圖片圖片

正如上圖所示:

  1. 在數(shù)據(jù)庫更新完成后,系統(tǒng)會(huì)立即發(fā)送消息隊(duì)列(MQ)消息,同時(shí)主流程會(huì)繼續(xù)執(zhí)行后續(xù)的耗時(shí)操作。
  2. 當(dāng)下游接收到MQ消息時(shí),會(huì)進(jìn)行數(shù)據(jù)查詢。然而,由于此時(shí)主流程尚未完成事務(wù)提交,因此無法查詢到相關(guān)數(shù)據(jù),導(dǎo)致下游出現(xiàn)錯(cuò)誤。
  3. 如果MQ消息消費(fèi)失敗,系統(tǒng)會(huì)自動(dòng)進(jìn)行重試。如果在此期間主流程已經(jīng)完成了事務(wù)提交,那么就能夠成功查詢到數(shù)據(jù),從而使得業(yè)務(wù)流程得以恢復(fù)正常。

這就完美的解釋了物流問題,那為什么算法組收到消息里的訂單ID在數(shù)據(jù)庫不存在呢?

圖片圖片

如上圖所示:

  1. 在數(shù)據(jù)庫更新完成后,系統(tǒng)會(huì)立即發(fā)送消息隊(duì)列(MQ)消息,而主流程將同時(shí)繼續(xù)執(zhí)行后續(xù)的耗時(shí)操作。
  2. 若主流程在執(zhí)行后續(xù)邏輯時(shí)發(fā)生異常,將導(dǎo)致整個(gè)事務(wù)回滾,進(jìn)而中斷處理過程。
  3. 下游系統(tǒng)接收到消息后,進(jìn)行數(shù)據(jù)反查,但由于事務(wù)已回滾,因此無法查詢到任何數(shù)據(jù)。
  4. 因?yàn)榘l(fā)生事務(wù)回滾,數(shù)據(jù)庫中根本就沒有這條記錄,所以即使后面有自動(dòng)重試機(jī)制,也無法恢復(fù)處理邏輯。

小艾終于鎖定了問題所在,深深地吸了一口氣,釋放了緊繃的神經(jīng)。就在這時(shí),晨姐的電話打了進(jìn)來。小艾喃喃自語:“毫無疑問,和算法部門遇到的情況一樣,被XXX訂單給堵住了?!闭f罷,她信心滿滿地接起了電話…

本質(zhì):該問題根本原因是==沒有保障 更新數(shù)據(jù)庫操作 與 發(fā)送消息操作這兩個(gè)業(yè)務(wù)單元之間的一致性。==

2. 解決方案

定位后,解決方案就變的非常清晰。

2.1. 方案1:使用 @TransactionalEventListener

最簡的方案就是將 @EventListener 注解 換成 @TransactionalEventListener。

2.1.1. EventListener 和 TransactionalEventListener

EventListener 和 TransactionalEventListener 都是 Spring 中用于處理事件的監(jiān)聽器。它們之間的主要區(qū)別在于它們處理事件的方式和事務(wù)管理。

  1. EventListener:這是一個(gè)通用的事件監(jiān)聽器,當(dāng)事件發(fā)布時(shí),它會(huì)立即執(zhí)行相應(yīng)的處理方法。它不會(huì)參與到事務(wù)管理中,也就是說,即使在事務(wù)執(zhí)行過程中發(fā)生異常,EventListener 依然會(huì)執(zhí)行。
  2. TransactionalEventListener:這是一個(gè)具有事務(wù)管理功能的事件監(jiān)聽器。當(dāng)事件發(fā)布時(shí),它會(huì)等待當(dāng)前事務(wù)完成后再執(zhí)行相應(yīng)的處理方法。這意味著,如果在事務(wù)執(zhí)行過程中發(fā)生異常,TransactionalEventListener 將不會(huì)執(zhí)行,從而確保事務(wù)的一致性。

總之,EventListener 和 TransactionalEventListener 的主要區(qū)別在于它們處理事件的方式和事務(wù)管理。在選擇使用哪種監(jiān)聽器時(shí),需要根據(jù)實(shí)際需求和事務(wù)一致性的要求來決定。

2.1.2. 源碼示例

了解兩者的區(qū)別后,只需做一點(diǎn)調(diào)整便可以解決這個(gè)問題,調(diào)整如下:

/**
 * 使用 @TransactionalEventListener 替代 @EventListener 監(jiān)聽訂單支付事件,然后發(fā)送消息到 RocketMQ
 * @param event
 */
@TransactionalEventListener
public void handle(OrderPaidEvent event){
    rocketMQTemplate.convertAndSend("order_event", event);
}

如果沒有使用 Spring 的 Event 機(jī)制,但仍想實(shí)現(xiàn) @TransactionEventlistner 的效果,可以直接使用 Spring API:

private void doIfCommitted(Runnable task) {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter(){
                @Override
                public void afterCommit() {
                    task.run();
                }
            };
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);

        }else {
            task.run();
        }
    }

2.1.3. 問題&挑戰(zhàn)

這個(gè)方案確實(shí)解決了上述問題,但從一致性角度分析,還是存在設(shè)計(jì)缺陷,只是發(fā)生的概率變低而已,沒有從根本上解決問題。

在事務(wù)提交后發(fā)送 MQ 時(shí),可能會(huì)遇到以下幾種情況,導(dǎo)致兩個(gè)操作(數(shù)據(jù)庫操作和 MQ 發(fā)送操作)之間的一致性問題:

  1. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生網(wǎng)絡(luò)故障。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
  2. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生 MQ 服務(wù)器故障。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
  3. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生應(yīng)用程序故障。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
  4. 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生消息丟失。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。

這個(gè)方案極為簡單,但大幅降低了錯(cuò)誤概率,主要應(yīng)用于要求并不嚴(yán)格的業(yè)務(wù)場景。

2.2 方案2:RocketMQ事務(wù)消息

RocketMQ 的事務(wù)消息就是針對(duì)這個(gè)問題設(shè)計(jì)的,可以非常高效的解決這個(gè)問題。

2.2.1. 半消息以及工作原理

RocketMQ事務(wù)消息是一種支持分布式事務(wù)的消息模型,將消息生產(chǎn)和消費(fèi)與業(yè)務(wù)邏輯綁定在一起,確保消息發(fā)送和事務(wù)執(zhí)行的原子性,保證消息的可靠性。

事務(wù)消息分為兩個(gè)階段:發(fā)送消息和確認(rèn)消息,確認(rèn)消息分為提交和回滾兩個(gè)操作。在提交操作執(zhí)行完畢后,消息才會(huì)被消費(fèi)端消費(fèi),而在回滾操作執(zhí)行完畢后,消息會(huì)被刪除,從而達(dá)到了事務(wù)的一致性和可靠性。

事務(wù)消息的發(fā)生流程如下:

圖片圖片

  1. 生產(chǎn)者發(fā)送prepare消息到RocketMQ服務(wù)端,RocketMQ將消息存儲(chǔ)到本地并返回結(jié)果;
  2. 生產(chǎn)者開始執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)的結(jié)果將狀態(tài)信息提交給RocketMQ服務(wù)端;
  3. 如果本地事務(wù)執(zhí)行成功,生產(chǎn)者向RocketMQ服務(wù)端發(fā)送commit消息;
  4. 如果本地事務(wù)執(zhí)行失敗,生產(chǎn)者向RocketMQ服務(wù)端發(fā)送rollback消息;
  5. RocketMQ接收到commit或rollback消息后,對(duì)消息進(jìn)行投放或刪除;

如果生成者發(fā)送 prepare 消息后,未在規(guī)定時(shí)間內(nèi)發(fā)送 commit 或 rollback 消息,RocketMQ 將進(jìn)入恢復(fù)流程,具體如下:

圖片圖片

  1. 如果在回查的時(shí)間之前沒有收到相應(yīng)的 commit 或 rollback 消息,則 RocketMQ 會(huì)將對(duì)該 prepare 消息進(jìn)行回查;
  2. 應(yīng)用程序接收到回查指令,從業(yè)務(wù)庫中獲取數(shù)據(jù),并根據(jù)業(yè)務(wù)邏輯進(jìn)行判斷,最終是 commit 還是 rollback;
  3. RocketMQ 接收到 commit 或 rollback 回復(fù)后,進(jìn)行相應(yīng)動(dòng)作,從而實(shí)現(xiàn)業(yè)務(wù)操作和消息發(fā)送的一致性;

2.2.2. 源碼示例

一個(gè)簡單的示例代碼如下:

// 編寫事務(wù)監(jiān)聽器類
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    // 執(zhí)行本地事務(wù)
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        System.out.println("executeLocalTransaction " + value);
        // TODO 執(zhí)行本地事務(wù),并返回事務(wù)狀態(tài)
        // 本例假定 index 為偶數(shù)的消息執(zhí)行成功,奇數(shù)的消息執(zhí)行失敗
        if (value % 2 == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    // 檢查本地事務(wù)狀態(tài)
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("checkLocalTransaction " + msg.getTransactionId());
        // 模擬檢查本地事務(wù)狀態(tài),返回事務(wù)狀態(tài)
        boolean committed = prepare(true);
        if (committed) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }

    // 模擬操作預(yù)處理邏輯
    private boolean prepare(boolean commit) {
        System.out.println("prepare " + (commit ? "commit" : "rollback"));
        return commit;
    }

}

// 編寫發(fā)送消息的代碼
public class Producer {
    private static final String NAME_SERVER_ADDR = "localhost:9876";

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("MyGroup");
        producer.setNamesrvAddr(NAME_SERVER_ADDR);
        // 注冊(cè)事務(wù)監(jiān)聽器
        producer.setTransactionListener(new TransactionListenerImpl());
        producer.start();

        // 發(fā)送事務(wù)消息
        String[] tags = {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicTest", tags[i], ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
            // 在消息發(fā)送時(shí)傳遞給事務(wù)監(jiān)聽器的參數(shù)
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
        }

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

單看代碼很難理解,簡單畫了張圖,具體如下:

圖片圖片

2.2.3. 問題&挑戰(zhàn)

事務(wù)消息并不完美,存在一定的問題:

  1. 與 MQ 實(shí)現(xiàn)強(qiáng)相關(guān),并不是每個(gè) MQ 實(shí)現(xiàn)都對(duì)事務(wù)消息提供支持;
  2. API 比較晦澀,存在一定的學(xué)習(xí)成本,同時(shí)需要對(duì)業(yè)務(wù)邏輯拆分到 Listener 中,增加理解成本;

2.3. 方案3:本地消息表

事務(wù)消息表方案是一種常用的保證消息發(fā)送與業(yè)務(wù)操作一致性的方法。該方案基于數(shù)據(jù)庫事務(wù)和消息隊(duì)列,將消息發(fā)送和業(yè)務(wù)操作放入同一個(gè)事務(wù)中,并將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)記錄在數(shù)據(jù)庫的消息表中,以實(shí)現(xiàn)消息的可靠性和冪等性。

2.3.1. 設(shè)計(jì)&核心流程

整體如下圖所示:

圖片圖片

image

核心流程如下:

  1. 應(yīng)用程序開啟一個(gè)數(shù)據(jù)庫事務(wù),并在事務(wù)中執(zhí)行業(yè)務(wù)操作和消息發(fā)送;
  2. 在事務(wù)中,將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)記錄到消息表中;
  3. 如果業(yè)務(wù)操作執(zhí)行成功,并且消息發(fā)送成功,提交事務(wù),否則回滾事務(wù);
  4. 定時(shí)掃描消息表,并根據(jù)消息狀態(tài)重新發(fā)送未被確認(rèn)的消息。如果消息發(fā)送成功,更新消息狀態(tài);否則根據(jù)重試次數(shù)更新消息狀態(tài)或者丟棄消息;

通過事務(wù)消息表方案,可以保證消息的可靠性。即使在消息發(fā)送失敗或應(yīng)用程序崩潰的情況下,也可以通過重新發(fā)送消息將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)同步。同時(shí),該方案可以避免消息重復(fù)發(fā)送和漏發(fā)的情況。

2.3.2. 功能封裝

清晰的流程為復(fù)用打下了基礎(chǔ),lego 對(duì)其做了封裝。

2.3.2.1. 環(huán)境準(zhǔn)備

首先,需要引入 lego 相關(guān)依賴:

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>0.1.12 以上版本</version>
</dependency>

其次,在業(yè)務(wù)數(shù)據(jù)庫上新建一張表用于存儲(chǔ)消息,示例如下:

create table test_message
(
    id           bigint auto_increment primary key,

    orderly      tinyint      not null comment '是否為順序消息',

    topic        varchar(64)  not null comment 'MQ topic',
    sharding_key varchar(128) not null comment 'ShardingKey,用于選擇不同的 partition',
    tag          varchar(128) not null comment 'Message Tag 信息',

    msg_id       varchar(64)  not null comment 'Msg ID 只有發(fā)送成功后才有數(shù)據(jù)',
    msg_key      varchar(64)  not null comment 'MSG Key,用于查詢數(shù)據(jù)',
    msg          longtext     not null comment '要發(fā)送的消息',

    retry_time   tinyint      not null comment '重試次數(shù)',
    status       tinyint      not null comment '發(fā)送狀態(tài):0-初始化,1-發(fā)送成功,2-發(fā)送失敗',

    create_time  datetime     not null,
    update_time  datetime     not null,

    index idx_update_time_status(update_time, status)
);

為了兼容多種MQ類型,對(duì)發(fā)送者進(jìn)行了抽象,因此需要實(shí)現(xiàn)自己的 MessageSender。

@Component
@Getter
@Slf4j
public class TestMessageSender implements MessageSender {

    @Override
    public String send(Message message) {
        // 發(fā)送消息
    }
}

最后,就是對(duì)所有的組件進(jìn)行配置,示例代碼如下:

@Configuration
@Slf4j
public class LocalTableBasedReliableMessageConfiguration
        extends LocalTableBasedReliableMessageConfigurationSupport {

    @Autowired
    private DataSource dataSource;

    @Autowired
    private MessageSender messageSender;

    @Override
    protected DataSource dataSource() {
        return this.dataSource;
    }

    @Override
    protected String messageTable() {
        return "test_message";
    }

    @Override
    protected MessageSender createMessageSend() {
        return this.messageSender;
    }
}

其中,包括:

  1. 繼承自 LocalTableBasedReliableMessageConfigurationSupport,由父類完成基本配置;
  2. 實(shí)現(xiàn) DataSource dataSource() 方法,返回業(yè)務(wù)數(shù)據(jù)源(備注:必須與業(yè)務(wù)使用同一個(gè)數(shù)據(jù)源)
  3. 實(shí)現(xiàn) String messageTable() 方法,配置本地消息表表名;
  4. 實(shí)現(xiàn) MessageSender createMessageSend() 方法,返回 MessageSender 實(shí)例,執(zhí)行真正的消費(fèi)發(fā)送;
2.2.3.2. 具體使用

ReliableMessageSender#send 在業(yè)務(wù)方法中使用,執(zhí)行可靠消息發(fā)送;

@Transactional
public void testSuccess(){
    // 業(yè)務(wù)邏輯
    Message message = buildMessage();
    // 業(yè)務(wù)邏輯
    this.reliableMessageSender.send(message);
}

除發(fā)送流程外,還需要配置補(bǔ)充機(jī)制。

ReliableMessageCompensator#compensate 周期性調(diào)度,對(duì)未發(fā)送或發(fā)送失敗的消息進(jìn)行補(bǔ)充;

4. 示例&源碼

代碼倉庫:https://gitee.com/litao851025/learnFromBug

代碼地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/sender


責(zé)任編輯:武曉燕 來源: geekhalo
相關(guān)推薦

2021-08-03 22:26:46

Go函數(shù)分頁

2023-05-25 10:03:40

2024-04-02 08:41:10

ArrayListSubList場景

2023-11-06 06:52:51

2024-04-01 09:46:11

MQ消息亂序

2015-07-30 09:20:26

微軟Android Lau

2020-06-01 08:04:18

三目運(yùn)算符代碼

2023-04-10 07:26:28

UseStateUseReducer

2020-12-17 10:23:41

死鎖LinuxLockdep

2014-03-07 10:46:49

編程語言趣味

2019-08-09 15:07:33

TomcatJaegerSpringBoot

2024-01-22 09:16:47

多線程性能優(yōu)化

2020-11-02 08:35:59

內(nèi)存數(shù)據(jù)庫Redis

2022-07-06 11:47:27

JAVAfor循環(huán)

2009-06-12 16:55:10

VPN客戶端故障

2024-03-07 12:54:00

AI模型

2025-02-28 09:30:00

?DeepSeekDeepGEMMAI

2022-11-11 09:41:04

連接池微服務(wù)數(shù)據(jù)庫

2024-01-29 09:22:59

死鎖線程池服務(wù)

2022-01-12 20:04:09

網(wǎng)絡(luò)故障斷網(wǎng)事件網(wǎng)絡(luò)安全
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)