阿里面試:在高并發(fā)場(chǎng)景下,如何保證消息只被消費(fèi)一次?實(shí)際開(kāi)發(fā)踩過(guò)坑嘛?
前言
大家好,我是撿田螺的小男孩~
最近一位伙伴去阿里面試,問(wèn)了這么一道題:高并發(fā)場(chǎng)景下,如何保證消息只被消費(fèi)一次。要求全鏈路分析,并且給出對(duì)應(yīng)的處理方案。
本文田螺哥跟大家一起會(huì)會(huì)它~
圖片
1.業(yè)務(wù)場(chǎng)景
我們?nèi)粘i_(kāi)發(fā)中,如果用到消息隊(duì)列,就需要避免消費(fèi)重復(fù)的問(wèn)題。比如:
- 用戶參與營(yíng)銷活動(dòng)領(lǐng)取優(yōu)惠券,因消息重復(fù)消費(fèi),同一用戶收到多張相同優(yōu)惠券。
- 用戶支付成功,因消息重復(fù)消費(fèi),收到兩條扣款通知。
- 用戶下單后,系統(tǒng)因消息重復(fù)消費(fèi),觸發(fā)多次發(fā)貨流程,導(dǎo)致用戶收到多個(gè)相同包裹。
其實(shí)類似的業(yè)務(wù)場(chǎng)景比比皆是。如果我們使用了消息隊(duì)列,消息重復(fù)消費(fèi)問(wèn)題,堪稱高并發(fā)系統(tǒng)的隱形殺手!那么,如何確保每條消息精準(zhǔn)處理一次呢,為我們的業(yè)務(wù)系統(tǒng)保駕護(hù)航呢?
2. 消息重復(fù)消費(fèi)的原因
消息重復(fù)消費(fèi),常見(jiàn)有這些原因:
- 生產(chǎn)者發(fā)送后,因?yàn)榫W(wǎng)絡(luò)抖動(dòng),沒(méi)收到ACK,觸發(fā)自動(dòng)重試,導(dǎo)致消息重復(fù)發(fā)送。
- Broker主節(jié)點(diǎn)宕機(jī),未同步到從節(jié)點(diǎn)的消息在新主節(jié)點(diǎn)恢復(fù)后被重新投遞。
- 消費(fèi)者處理消息成功,但提交Offset時(shí)崩潰或網(wǎng)絡(luò)異常,重啟后重新拉取舊消息。
- 消費(fèi)者處理消息耗時(shí)過(guò)長(zhǎng),Broker判定其離線并觸發(fā)Rebalance,消息被分配給其他消費(fèi)者重復(fù)處理。
3. 全鏈路層層防御,保證不被重復(fù)消費(fèi)
一個(gè)消息從生產(chǎn)者產(chǎn)生,到被消費(fèi)者消費(fèi),主要經(jīng)過(guò)這3個(gè)過(guò)程:
圖片
因此,我們可以通過(guò)這三層,實(shí)現(xiàn)層層防御,保證不被重復(fù)消費(fèi)。
3.1 生產(chǎn)端防重
- 冪等性發(fā)送
Kafka、Pulsar等支持冪等性的消息隊(duì)列。
通過(guò)唯一標(biāo)識(shí)(如 ProducerID + 序列號(hào))過(guò)濾重復(fù)消息。
比如kafka:
Properties props = new Properties();
props.put("enable.idempotence", "true"); // 開(kāi)啟冪等性
props.put("acks", "all"); // 所有副本確認(rèn)
KafkaProducer producer = new KafkaProducer<>(props);
- 事務(wù)消息
利用消息隊(duì)列的事務(wù)消息,要么全成功,要么全回滾。
發(fā)送 Half Message(預(yù)消息,對(duì)消費(fèi)者不可見(jiàn))。
執(zhí)行本地事務(wù)(如更新訂單狀態(tài))。
根據(jù)事務(wù)結(jié)果提交或回滾消息。
3.2 Broker:去重,且保證消息穩(wěn)定投遞
- 消息攜帶唯一ID,去重
生產(chǎn)者攜帶業(yè)務(wù)主鍵(如訂單ID),類似快遞單號(hào)醬紫,然后(比如RocketMQ)Broker端根據(jù)Message Key去重。
- 持久化與順序性
Kafka:設(shè)置 acks=all,確保消息寫(xiě)入所有副本。
RocketMQ:同步刷盤(flushDiskType=SYNC_FLUSH)
分區(qū)有序性:同一業(yè)務(wù)ID的消息固定發(fā)往同一分區(qū)
主從同步:使用 Raft 協(xié)議(如 RocketMQ DLedger)或 ISR 機(jī)制(Kafka)
3.3 消費(fèi)端:業(yè)務(wù)冪等
- 業(yè)務(wù)冪等性設(shè)計(jì)
數(shù)據(jù)庫(kù)唯一約束:(訂單創(chuàng)建時(shí),防止重復(fù)插入同一訂單。)
-- 插入訂單時(shí),若order_id重復(fù)會(huì)直接報(bào)錯(cuò)
INSERT INTO orders (order_id, user_id, amount, status)
VALUES ('20231001123456', 1001, 99.99, 'UNPAID');
樂(lè)觀鎖: 賬戶余額扣減,防止重復(fù)扣款
-- 扣減余額時(shí),校驗(yàn)版本號(hào)
UPDATE account
SET balance = balance - 100,
version = version + 1
WHERE user_id = 123
AND version = 5; -- 當(dāng)前版本號(hào)為5時(shí)才更新
狀態(tài)機(jī)校驗(yàn):(訂單狀態(tài)流轉(zhuǎn)(未支付 → 已支付),防止重復(fù)支付)
-- 支付成功時(shí),僅允許從UNPAID狀態(tài)轉(zhuǎn)為PAID
UPDATE orders
SET status = 'PAID'
WHERE order_id = '20231001123456'
AND status = 'UNPAID'; -- 僅當(dāng)狀態(tài)是UNPAID時(shí)才更新
redis分布式鎖:消費(fèi)前加鎖,確保同一消息僅被一個(gè)消費(fèi)者處理。
// Redisson分布式鎖示例
RLock lock = redisson.getLock("MSG_LOCK:" + messageId);
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
try {
if (isProcessed(messageId)) { // 二次檢查
return;
}
process(message);
markAsProcessed(messageId); // 標(biāo)記已處理
} finally {
lock.unlock();
}
}
- 去重表
在數(shù)據(jù)庫(kù)中維護(hù)message_id表,消費(fèi)前查詢是否已處理。
圖片
4.兜底方案,監(jiān)控+對(duì)賬
其實(shí)很難保證百分百不出現(xiàn)消息重復(fù)消費(fèi),就好像很難保證程序員百分百保證代碼沒(méi)bug。
因此,我們可以再加個(gè)兜底方案,就是監(jiān)控+對(duì)賬。
比如,主要監(jiān)控一下這些指標(biāo):
- 生產(chǎn)者重復(fù)發(fā)送率
- 消費(fèi)者重復(fù)處理告警
- Offset提交延遲
然后我們?cè)偌觽€(gè)對(duì)賬任務(wù):
定期比對(duì)消息數(shù)量與業(yè)務(wù)數(shù)據(jù)量(如訂單表總數(shù) vs 消息消費(fèi)總數(shù))。
當(dāng)發(fā)現(xiàn)有問(wèn)題時(shí),告警通知,然后去修復(fù)它(如補(bǔ)償退款、庫(kù)存回滾)。
5.為什么不用Exactly-Once呢?
有些伙伴有一位,避免重復(fù)消費(fèi)的話,為啥不用Exactly-Once呢?
它是RocketMQ的消費(fèi)模式,確保每條消息只被處理一次,既不丟失也不重復(fù)。
其實(shí)主要還是性能和復(fù)雜性的考慮吧。其實(shí),絕大多數(shù)場(chǎng)景用At-Least-Once
+ 冪等更劃算!
最后
其實(shí)日常開(kāi)發(fā)中,保證消息不被重復(fù)消費(fèi),主要還是做好消費(fèi)端的冪等設(shè)計(jì)就好啦。但是如果涉及到面試的時(shí)候,還是按照本文的思路來(lái)。就是面試的時(shí)候,讓面試官看到的你的思考過(guò)程和邏輯辯證的過(guò)程。