消息隊列,聊聊發(fā)送消息的四種姿勢!
微服務開發(fā)中經(jīng)常會使用消息隊列進行跨服務通信。在一個典型場景中,服務A執(zhí)行一個業(yè)務邏輯,需要保存數(shù)據(jù)庫,然后通知服務B執(zhí)行相應的業(yè)務邏輯。在這種場景下,我們需要考慮如何發(fā)送消息。
1. 基礎版
首先,我們可能會考慮將數(shù)據(jù)庫操作和消息發(fā)送放在同一個事務中,以下是偽代碼示例:
@Transactional
public void saveWithMessage(BusinessDO businessDO){
String id = IdUtils.nextId();
businessDO.setId(id);
xxxRepository.save(businessDO);
BusinessMessage businessMessage = new BusinessMessage();
businessMessage.setKey(id);
SendResult send = rocketMQTemplate.syncSend("test-topic", sendMessage);
}
在這段代碼里通過@Transactional注解將數(shù)據(jù)庫的操作以及發(fā)送消息放到一個事務中,如果數(shù)據(jù)庫的保存或者消息發(fā)送失敗,則回滾事務。
乍一看似乎沒什么問題,但稍微推敲一下就會發(fā)現(xiàn)此方式有如下兩個缺陷:
1.1 數(shù)據(jù)不一致
首先最容易想到的是,這種消息發(fā)送方式無法保證數(shù)據(jù)的最終一致性。
這里先讓我來解釋一下基于消息隊列,生產(chǎn)者發(fā)送消息到消費者消費消息的過程:
- 生產(chǎn)者發(fā)送消息
- MQ收到消息并將數(shù)據(jù)持久化,在存儲中新增一條記錄
- 返回ACK給生產(chǎn)者
- MQ 推送消息給對應的消費者,等待消費者返回ACK
- 如果消費者在指定時間內(nèi)成功返回ACK,那么MQ則認為消費成功,執(zhí)行第6步刪除消息,如果MQ在指定時間內(nèi)沒有收到ACK,則認為消息消費失敗,會重新推送消息,重復執(zhí)行第4、5、6步操作。
- 刪除消息。
圖片
好,現(xiàn)在回到上面發(fā)送消息的場景,假設數(shù)據(jù)庫處理成功,消息消費成功,但是MQ由于某些原因處理超時,導致ACK確認失敗,此時整個事務回滾,結果出現(xiàn)數(shù)據(jù)不一致問題。
這種數(shù)據(jù)不一致的問題在RPC調(diào)用的場景下也經(jīng)常出現(xiàn),其根本的原因在于:遠程調(diào)用,結果最終可能為成功、失敗、超時;而對于超時的情況,處理方最終的結果可能是成功,也可能是失敗,調(diào)用方是無法知曉的。
1.2 事務未提交
其次,使用以上方式還存在另一個問題,即消費者在處理消息時可能讀不到剛剛保存的數(shù)據(jù),即消費者消費速度快于事務提交的速度。
舉例:
假設服務B需要通過消息中的數(shù)據(jù)ID獲取服務A數(shù)據(jù)庫保存的數(shù)據(jù)。這種情況下,數(shù)據(jù)庫操作與消息發(fā)送在同一事務中??赡艹霈F(xiàn)服務B在處理消息時,服務A事務還未提交,導致服務B獲取的數(shù)據(jù)是空數(shù)據(jù),無法執(zhí)行相應業(yè)務邏輯。
1.3 適用場景
盡管這種發(fā)送方式存在上述兩個問題,但在某些場景下仍然適用。例如,消費者在處理時不依賴生產(chǎn)者的數(shù)據(jù),且對數(shù)據(jù)一致性要求不高,這種情況下消息發(fā)送和數(shù)據(jù)庫保存可以不在同一個事務中。
2. 進階版
為解決事務未提交問題,我們可以確保事務提交后再發(fā)送消息。在SpringBoot項目中,有兩種常見解決方案。
2.1 事務同步器
基于事務同步器的方法:
@Transactional
public void saveWithMessage(BusinessDO businessDO){
xxxRepository.save(businessDO);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
BusinessMessage businessMessage = new BusinessMessage();
businessMessage.setXXX();
rocketMQTemplate.syncSend("test-topic", sendMessage);
}
});
}
TransactionSynchronizationManager.registerSynchronization 是 Spring 框架中用于注冊事務同步的方法。通過這個方法,你可以在事務提交、回滾或完成時執(zhí)行一些額外的邏輯。
在上述代碼中,使用了afterCommit方法,在事務成功提交后執(zhí)行發(fā)送消息操作,確保在數(shù)據(jù)庫操作成功且事務穩(wěn)定的情況下發(fā)送消息。
2.2 消息監(jiān)聽器
另一種方法是基于ApplicationEventPublisher,在保存數(shù)據(jù)庫操作后發(fā)布一個事件,并通過@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)注解監(jiān)聽事件。這樣可以確保消息在數(shù)據(jù)庫事務提交之后再發(fā)送。
@Transactional
public void saveWithMessage(BusinessDO businessDO){
xxxRepository.save(businessDO);
eventPublisher.publishEvent(new UserCreatedEvent(registerUser));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleUserRegisteredEvent(UserCreatedEvent userCreatedEvent) {
rocketMQTemplate.syncSend("test-topic", sendMessage);
}
這里需要說明一下:在默認情況下Spring的事件監(jiān)聽機制并不是異步的(上次群友弄錯了),而是同步的將代碼進行解耦,@TransactionalEventListener也是通過同步的方式,但是加入了回調(diào)的方式來解決,這樣就能夠控制事務進行Commited、Rollback時才進行事件的處理,來達到事務同步的目的。
2.3 適用范圍
通過以上方式,相較于基礎版,可以確保消息在事務提交后發(fā)送,解決了消費者讀取空數(shù)據(jù)的問題。但仍然無法保證數(shù)據(jù)的一致性,適用于對數(shù)據(jù)一致性要求不高的場景。
3. 本地消息表+補償重試
如果需要保證最終一致性而非強一致性,可以采用本地消息表+補償重試的方式來發(fā)送消息。
這種方式的執(zhí)行原理如下:
- 在執(zhí)行業(yè)務操作的同時,在本地消息表中插入一條狀態(tài)為待發(fā)送的記錄,業(yè)務數(shù)據(jù)的記錄與消息記錄必須在同一個事務中完成,這是此方案的核心原則。由于消息表與業(yè)務表在同一個庫中,事務可以通過數(shù)據(jù)庫來保證。
- 事務提交后發(fā)送消息,如果消息發(fā)送成功,則將消息狀態(tài)標記為發(fā)送成功或刪除消息
- 在生產(chǎn)者服務中會創(chuàng)建一個定時任務,定時從消息表中檢索待發(fā)送的消息重新發(fā)送。
- 對于消費者消費失敗,則依賴MQ本身的重試機制來完成,保證數(shù)據(jù)的最終一致性。
圖片
核心代碼如下:
@Transactional
public void saveWithMessage(BusinessDO businessDO){
TransactionMessage transactionMessage = new TransactionMessage();
transactionMessage.setStaus(MessageStatus.WAITING_SEND);
transactionMessage.setMessageKey(businessDO.getId());
...
xxxRepository.save(businessDO);
messageRepository.save(TransactionMessage);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
messageService.sendMessage(transactionMessage,businessDO);
}
});
}
public void sendMessage(TransactionMessage transactionMessage,BusinessDO businessDO){
BusinessMessage businessMessage = new BusinessMessage();
businessMessage.setXXX();
try{
rocketMQTemplate.syncSend("test-topic", sendMessage);
transactionMessage.setStatus(MessageStatus.SUCCESS);
messageRepository.update(transactionMessage);
}catch (Exception e){
// 執(zhí)行失敗的業(yè)務邏輯
}
}
3.1 問題
雖然這種方式能夠保證消息在事務提交后發(fā)送,且能夠保證最終一致性,但仍然存在一些缺陷:
首先,需要額外的消息表,增加了系統(tǒng)復雜度。(針對此問題,我們又可以將該功能單獨提取出來,做成一個消息服務來統(tǒng)一處理,考慮篇幅問題,這里暫不展開。)
其次,通過定時任務輪詢消息表,對于處理成功但ACK超時的數(shù)據(jù)會重新發(fā)送消息,這對下游系統(tǒng)產(chǎn)生了強烈的冪等性保障要求,消費者的處理邏輯必須做好冪等控制。關于冪等的處理方案,我在[[Dailymart17:并發(fā)與冪等的實現(xiàn)方案]]一文中有詳細的說明,歡迎翻閱。
4. 基于事務消息發(fā)送
目前,RocketMQ是主流MQ中唯一一個支持事務消息的,如果你們項目恰好使用的是RocketMQ,可以采用事務消息來發(fā)送。
有關RocketMQ事務消息的詳細信息,可以參考我之前的文章[SpringCloud基于RocketMQ實現(xiàn)分布式事務,這里不再贅述。同時,由于在Dailymart項目中使用的是RocketMQ,也可以參考Dailymart的代碼實現(xiàn)。
小結
本文詳細介紹了微服務開發(fā)中常用的4種消息發(fā)送方式。對于那些對數(shù)據(jù)一致性要求不高的場景,可以選擇使用進階版的消息發(fā)送方式。而對于需要保證最終一致性的情況,推薦采用事務消息和本地消息表的方式進行消息發(fā)送。