故障現(xiàn)場 | 消息發(fā)送居然有這么大的坑
1. 問題&分析
基于 MQ 進(jìn)行系統(tǒng)間的解耦真的是太香了,小艾還沉浸在喜悅中久久不能自拔。但,打臉的事已經(jīng)在路上了。。。。
1.1. 案例
昨天下班,在電梯里,物流組的晨姐偶遇了小艾,就一個(gè)技術(shù)問題向小艾進(jìn)行了反饋。具體來說,物流系統(tǒng)有一項(xiàng)功能,即實(shí)時(shí)監(jiān)控訂單的支付成功事件,一旦檢測(cè)到,便會(huì)為顧客準(zhǔn)備物資,進(jìn)而安排快遞發(fā)貨。今天系統(tǒng)出現(xiàn)了幾次空指針異常。查閱日志,似乎是在反查訂單信息時(shí),沒有獲取到預(yù)期的訂單數(shù)據(jù)。但查詢物流系統(tǒng),物流單已經(jīng)成功生成,對(duì)業(yè)務(wù)操作并未造成實(shí)際影響,但這個(gè)問題還是值得注意。由于這個(gè)問題并沒有立即影響到業(yè)務(wù)流程,所以晨姐沒有在第一時(shí)間聯(lián)系小艾進(jìn)行確認(rèn)。
在小艾正準(zhǔn)備啟動(dòng)IDEA尋找線索的時(shí)候,算法組的負(fù)責(zé)人龍哥急匆匆地走了過來,向小艾反映了他們團(tuán)隊(duì)遇到的一個(gè)重要問題。為了提升推薦效果,算法組也會(huì)實(shí)時(shí)監(jiān)控訂單支付成功事件,并以此為依據(jù)重新計(jì)算用戶的推薦商品。然而,今天早上,他們突然收到了一系列的報(bào)警信息,問題同樣是無法查詢到訂單信息,這個(gè)現(xiàn)象與物流系統(tǒng)的問題高度相似。
小艾隨口問道:“這個(gè)問題會(huì)自己修復(fù)嗎?”龍哥愣了一下,回答說:“以前會(huì)自動(dòng)修復(fù),但剛剛那條數(shù)據(jù)還在報(bào)錯(cuò)?!彪S后,龍哥提供了報(bào)錯(cuò)的訂單ID,小艾立即去數(shù)據(jù)庫中查詢,卻驚訝地發(fā)現(xiàn),這條數(shù)據(jù)竟然不存在!
看到這種場景,小艾有些慌神,連龍哥什么時(shí)候走的都沒有注意到。目光直勾勾的盯著電腦屏幕發(fā)呆:
@Transactional
public void paySuccess(String orderId, String token){
// 驗(yàn)證 token,保障有效性
checkToke(token);
// 加載訂單信息
Order order = this.orderRepository.getById(orderId);
if (order == null){
throw new RuntimeException("訂單不存在");
}
// 支付成功,更新訂單狀態(tài)
order.paySuccess();
// 將變更更新到數(shù)據(jù)庫
this.orderRepository.update(order);
// 發(fā)送支付成功事件
this.eventPublisher.publishEvent(new OrderPaidEvent(order));
// 執(zhí)行其他業(yè)務(wù)邏輯
doSomething();
}
// 監(jiān)聽變更,發(fā)布 MQ
@EventListener
public void handle(OrderPaidEvent event){
rocketMQTemplate.convertAndSend("order_event", event);
}
1.2. 問題分析
兩個(gè)問題看起來一樣,但又有區(qū)別。當(dāng)下游在收到 MQ 消息時(shí)
- 無法查不到訂單,但稍后能自我修復(fù)
- 一直查不到訂單,數(shù)據(jù)庫里還沒有,無法進(jìn)行自我修復(fù)
小艾無意間看到 paySuccess 方法上的 @Transactional 頓時(shí)茅塞頓開。
圖片
正如上圖所示:
- 在數(shù)據(jù)庫更新完成后,系統(tǒng)會(huì)立即發(fā)送消息隊(duì)列(MQ)消息,同時(shí)主流程會(huì)繼續(xù)執(zhí)行后續(xù)的耗時(shí)操作。
- 當(dāng)下游接收到MQ消息時(shí),會(huì)進(jìn)行數(shù)據(jù)查詢。然而,由于此時(shí)主流程尚未完成事務(wù)提交,因此無法查詢到相關(guān)數(shù)據(jù),導(dǎo)致下游出現(xiàn)錯(cuò)誤。
- 如果MQ消息消費(fèi)失敗,系統(tǒng)會(huì)自動(dòng)進(jìn)行重試。如果在此期間主流程已經(jīng)完成了事務(wù)提交,那么就能夠成功查詢到數(shù)據(jù),從而使得業(yè)務(wù)流程得以恢復(fù)正常。
這就完美的解釋了物流問題,那為什么算法組收到消息里的訂單ID在數(shù)據(jù)庫不存在呢?
圖片
如上圖所示:
- 在數(shù)據(jù)庫更新完成后,系統(tǒng)會(huì)立即發(fā)送消息隊(duì)列(MQ)消息,而主流程將同時(shí)繼續(xù)執(zhí)行后續(xù)的耗時(shí)操作。
- 若主流程在執(zhí)行后續(xù)邏輯時(shí)發(fā)生異常,將導(dǎo)致整個(gè)事務(wù)回滾,進(jìn)而中斷處理過程。
- 下游系統(tǒng)接收到消息后,進(jìn)行數(shù)據(jù)反查,但由于事務(wù)已回滾,因此無法查詢到任何數(shù)據(jù)。
- 因?yàn)榘l(fā)生事務(wù)回滾,數(shù)據(jù)庫中根本就沒有這條記錄,所以即使后面有自動(dòng)重試機(jī)制,也無法恢復(fù)處理邏輯。
小艾終于鎖定了問題所在,深深地吸了一口氣,釋放了緊繃的神經(jīng)。就在這時(shí),晨姐的電話打了進(jìn)來。小艾喃喃自語:“毫無疑問,和算法部門遇到的情況一樣,被XXX訂單給堵住了?!闭f罷,她信心滿滿地接起了電話…
本質(zhì):該問題根本原因是==沒有保障 更新數(shù)據(jù)庫操作 與 發(fā)送消息操作這兩個(gè)業(yè)務(wù)單元之間的一致性。==
2. 解決方案
定位后,解決方案就變的非常清晰。
2.1. 方案1:使用 @TransactionalEventListener
最簡的方案就是將 @EventListener 注解 換成 @TransactionalEventListener。
2.1.1. EventListener 和 TransactionalEventListener
EventListener 和 TransactionalEventListener 都是 Spring 中用于處理事件的監(jiān)聽器。它們之間的主要區(qū)別在于它們處理事件的方式和事務(wù)管理。
- EventListener:這是一個(gè)通用的事件監(jiān)聽器,當(dāng)事件發(fā)布時(shí),它會(huì)立即執(zhí)行相應(yīng)的處理方法。它不會(huì)參與到事務(wù)管理中,也就是說,即使在事務(wù)執(zhí)行過程中發(fā)生異常,EventListener 依然會(huì)執(zhí)行。
- TransactionalEventListener:這是一個(gè)具有事務(wù)管理功能的事件監(jiān)聽器。當(dāng)事件發(fā)布時(shí),它會(huì)等待當(dāng)前事務(wù)完成后再執(zhí)行相應(yīng)的處理方法。這意味著,如果在事務(wù)執(zhí)行過程中發(fā)生異常,TransactionalEventListener 將不會(huì)執(zhí)行,從而確保事務(wù)的一致性。
總之,EventListener 和 TransactionalEventListener 的主要區(qū)別在于它們處理事件的方式和事務(wù)管理。在選擇使用哪種監(jiān)聽器時(shí),需要根據(jù)實(shí)際需求和事務(wù)一致性的要求來決定。
2.1.2. 源碼示例
了解兩者的區(qū)別后,只需做一點(diǎn)調(diào)整便可以解決這個(gè)問題,調(diào)整如下:
/**
* 使用 @TransactionalEventListener 替代 @EventListener 監(jiān)聽訂單支付事件,然后發(fā)送消息到 RocketMQ
* @param event
*/
@TransactionalEventListener
public void handle(OrderPaidEvent event){
rocketMQTemplate.convertAndSend("order_event", event);
}
如果沒有使用 Spring 的 Event 機(jī)制,但仍想實(shí)現(xiàn) @TransactionEventlistner 的效果,可以直接使用 Spring API:
private void doIfCommitted(Runnable task) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter(){
@Override
public void afterCommit() {
task.run();
}
};
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}else {
task.run();
}
}
2.1.3. 問題&挑戰(zhàn)
這個(gè)方案確實(shí)解決了上述問題,但從一致性角度分析,還是存在設(shè)計(jì)缺陷,只是發(fā)生的概率變低而已,沒有從根本上解決問題。
在事務(wù)提交后發(fā)送 MQ 時(shí),可能會(huì)遇到以下幾種情況,導(dǎo)致兩個(gè)操作(數(shù)據(jù)庫操作和 MQ 發(fā)送操作)之間的一致性問題:
- 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生網(wǎng)絡(luò)故障。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
- 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生 MQ 服務(wù)器故障。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
- 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生應(yīng)用程序故障。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
- 數(shù)據(jù)庫事務(wù)提交成功,但在發(fā)送 MQ 消息時(shí)發(fā)生消息丟失。此時(shí),數(shù)據(jù)庫操作已經(jīng)完成,但 MQ 消息未能成功發(fā)送。
這個(gè)方案極為簡單,但大幅降低了錯(cuò)誤概率,主要應(yīng)用于要求并不嚴(yán)格的業(yè)務(wù)場景。
2.2 方案2:RocketMQ事務(wù)消息
RocketMQ 的事務(wù)消息就是針對(duì)這個(gè)問題設(shè)計(jì)的,可以非常高效的解決這個(gè)問題。
2.2.1. 半消息以及工作原理
RocketMQ事務(wù)消息是一種支持分布式事務(wù)的消息模型,將消息生產(chǎn)和消費(fèi)與業(yè)務(wù)邏輯綁定在一起,確保消息發(fā)送和事務(wù)執(zhí)行的原子性,保證消息的可靠性。
事務(wù)消息分為兩個(gè)階段:發(fā)送消息和確認(rèn)消息,確認(rèn)消息分為提交和回滾兩個(gè)操作。在提交操作執(zhí)行完畢后,消息才會(huì)被消費(fèi)端消費(fèi),而在回滾操作執(zhí)行完畢后,消息會(huì)被刪除,從而達(dá)到了事務(wù)的一致性和可靠性。
事務(wù)消息的發(fā)生流程如下:
圖片
- 生產(chǎn)者發(fā)送prepare消息到RocketMQ服務(wù)端,RocketMQ將消息存儲(chǔ)到本地并返回結(jié)果;
- 生產(chǎn)者開始執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)的結(jié)果將狀態(tài)信息提交給RocketMQ服務(wù)端;
- 如果本地事務(wù)執(zhí)行成功,生產(chǎn)者向RocketMQ服務(wù)端發(fā)送commit消息;
- 如果本地事務(wù)執(zhí)行失敗,生產(chǎn)者向RocketMQ服務(wù)端發(fā)送rollback消息;
- RocketMQ接收到commit或rollback消息后,對(duì)消息進(jìn)行投放或刪除;
如果生成者發(fā)送 prepare 消息后,未在規(guī)定時(shí)間內(nèi)發(fā)送 commit 或 rollback 消息,RocketMQ 將進(jìn)入恢復(fù)流程,具體如下:
圖片
- 如果在回查的時(shí)間之前沒有收到相應(yīng)的 commit 或 rollback 消息,則 RocketMQ 會(huì)將對(duì)該 prepare 消息進(jìn)行回查;
- 應(yīng)用程序接收到回查指令,從業(yè)務(wù)庫中獲取數(shù)據(jù),并根據(jù)業(yè)務(wù)邏輯進(jìn)行判斷,最終是 commit 還是 rollback;
- RocketMQ 接收到 commit 或 rollback 回復(fù)后,進(jìn)行相應(yīng)動(dòng)作,從而實(shí)現(xiàn)業(yè)務(wù)操作和消息發(fā)送的一致性;
2.2.2. 源碼示例
一個(gè)簡單的示例代碼如下:
// 編寫事務(wù)監(jiān)聽器類
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
// 執(zhí)行本地事務(wù)
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
System.out.println("executeLocalTransaction " + value);
// TODO 執(zhí)行本地事務(wù),并返回事務(wù)狀態(tài)
// 本例假定 index 為偶數(shù)的消息執(zhí)行成功,奇數(shù)的消息執(zhí)行失敗
if (value % 2 == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 檢查本地事務(wù)狀態(tài)
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("checkLocalTransaction " + msg.getTransactionId());
// 模擬檢查本地事務(wù)狀態(tài),返回事務(wù)狀態(tài)
boolean committed = prepare(true);
if (committed) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
// 模擬操作預(yù)處理邏輯
private boolean prepare(boolean commit) {
System.out.println("prepare " + (commit ? "commit" : "rollback"));
return commit;
}
}
// 編寫發(fā)送消息的代碼
public class Producer {
private static final String NAME_SERVER_ADDR = "localhost:9876";
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("MyGroup");
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 注冊(cè)事務(wù)監(jiān)聽器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 發(fā)送事務(wù)消息
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicTest", tags[i], ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
// 在消息發(fā)送時(shí)傳遞給事務(wù)監(jiān)聽器的參數(shù)
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
}
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
單看代碼很難理解,簡單畫了張圖,具體如下:
圖片
2.2.3. 問題&挑戰(zhàn)
事務(wù)消息并不完美,存在一定的問題:
- 與 MQ 實(shí)現(xiàn)強(qiáng)相關(guān),并不是每個(gè) MQ 實(shí)現(xiàn)都對(duì)事務(wù)消息提供支持;
- API 比較晦澀,存在一定的學(xué)習(xí)成本,同時(shí)需要對(duì)業(yè)務(wù)邏輯拆分到 Listener 中,增加理解成本;
2.3. 方案3:本地消息表
事務(wù)消息表方案是一種常用的保證消息發(fā)送與業(yè)務(wù)操作一致性的方法。該方案基于數(shù)據(jù)庫事務(wù)和消息隊(duì)列,將消息發(fā)送和業(yè)務(wù)操作放入同一個(gè)事務(wù)中,并將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)記錄在數(shù)據(jù)庫的消息表中,以實(shí)現(xiàn)消息的可靠性和冪等性。
2.3.1. 設(shè)計(jì)&核心流程
整體如下圖所示:
圖片
image
核心流程如下:
- 應(yīng)用程序開啟一個(gè)數(shù)據(jù)庫事務(wù),并在事務(wù)中執(zhí)行業(yè)務(wù)操作和消息發(fā)送;
- 在事務(wù)中,將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)記錄到消息表中;
- 如果業(yè)務(wù)操作執(zhí)行成功,并且消息發(fā)送成功,提交事務(wù),否則回滾事務(wù);
- 定時(shí)掃描消息表,并根據(jù)消息狀態(tài)重新發(fā)送未被確認(rèn)的消息。如果消息發(fā)送成功,更新消息狀態(tài);否則根據(jù)重試次數(shù)更新消息狀態(tài)或者丟棄消息;
通過事務(wù)消息表方案,可以保證消息的可靠性。即使在消息發(fā)送失敗或應(yīng)用程序崩潰的情況下,也可以通過重新發(fā)送消息將業(yè)務(wù)操作和消息發(fā)送的狀態(tài)同步。同時(shí),該方案可以避免消息重復(fù)發(fā)送和漏發(fā)的情況。
2.3.2. 功能封裝
清晰的流程為復(fù)用打下了基礎(chǔ),lego 對(duì)其做了封裝。
2.3.2.1. 環(huán)境準(zhǔn)備
首先,需要引入 lego 相關(guān)依賴:
<dependency>
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>0.1.12 以上版本</version>
</dependency>
其次,在業(yè)務(wù)數(shù)據(jù)庫上新建一張表用于存儲(chǔ)消息,示例如下:
create table test_message
(
id bigint auto_increment primary key,
orderly tinyint not null comment '是否為順序消息',
topic varchar(64) not null comment 'MQ topic',
sharding_key varchar(128) not null comment 'ShardingKey,用于選擇不同的 partition',
tag varchar(128) not null comment 'Message Tag 信息',
msg_id varchar(64) not null comment 'Msg ID 只有發(fā)送成功后才有數(shù)據(jù)',
msg_key varchar(64) not null comment 'MSG Key,用于查詢數(shù)據(jù)',
msg longtext not null comment '要發(fā)送的消息',
retry_time tinyint not null comment '重試次數(shù)',
status tinyint not null comment '發(fā)送狀態(tài):0-初始化,1-發(fā)送成功,2-發(fā)送失敗',
create_time datetime not null,
update_time datetime not null,
index idx_update_time_status(update_time, status)
);
為了兼容多種MQ類型,對(duì)發(fā)送者進(jìn)行了抽象,因此需要實(shí)現(xiàn)自己的 MessageSender。
@Component
@Getter
@Slf4j
public class TestMessageSender implements MessageSender {
@Override
public String send(Message message) {
// 發(fā)送消息
}
}
最后,就是對(duì)所有的組件進(jìn)行配置,示例代碼如下:
@Configuration
@Slf4j
public class LocalTableBasedReliableMessageConfiguration
extends LocalTableBasedReliableMessageConfigurationSupport {
@Autowired
private DataSource dataSource;
@Autowired
private MessageSender messageSender;
@Override
protected DataSource dataSource() {
return this.dataSource;
}
@Override
protected String messageTable() {
return "test_message";
}
@Override
protected MessageSender createMessageSend() {
return this.messageSender;
}
}
其中,包括:
- 繼承自 LocalTableBasedReliableMessageConfigurationSupport,由父類完成基本配置;
- 實(shí)現(xiàn) DataSource dataSource() 方法,返回業(yè)務(wù)數(shù)據(jù)源(備注:必須與業(yè)務(wù)使用同一個(gè)數(shù)據(jù)源)
- 實(shí)現(xiàn) String messageTable() 方法,配置本地消息表表名;
- 實(shí)現(xiàn) MessageSender createMessageSend() 方法,返回 MessageSender 實(shí)例,執(zhí)行真正的消費(fèi)發(fā)送;
2.2.3.2. 具體使用
ReliableMessageSender#send 在業(yè)務(wù)方法中使用,執(zhí)行可靠消息發(fā)送;
@Transactional
public void testSuccess(){
// 業(yè)務(wù)邏輯
Message message = buildMessage();
// 業(yè)務(wù)邏輯
this.reliableMessageSender.send(message);
}
除發(fā)送流程外,還需要配置補(bǔ)充機(jī)制。
ReliableMessageCompensator#compensate 周期性調(diào)度,對(duì)未發(fā)送或發(fā)送失敗的消息進(jìn)行補(bǔ)充;
4. 示例&源碼
代碼倉庫:https://gitee.com/litao851025/learnFromBug
代碼地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/sender