阿里二面:使用消息隊列怎樣防止消息重復(fù)?
大家好,我是君哥。
使用消息隊列時,我們經(jīng)常會遇到一個可能對業(yè)務(wù)產(chǎn)生影響的問題,消息重復(fù)。在訂單、扣款、對賬等對冪有要求的場景,消息重復(fù)的問題必須解決。
那怎樣應(yīng)對重復(fù)消息呢?今天來聊一聊這個話題。
1.三個語義
正確使用消息隊列,我們會考慮到消息防丟失、防重復(fù),我們介紹 3 個語義:
- At Least Once:在消息隊列中,指消息不丟失,一條消息最少被消費一次,但是可能會有重復(fù)消費。
- Exactly Once:在消息隊列中,消息被精準(zhǔn)消費一次,不丟失,也不會重復(fù);
- At Most Once:在消息隊列中,消息不會被重復(fù)消費,但是可能會有消息丟失
不同的消息場景,需要的語義不同。比如 Exactly Once 最難實現(xiàn),一般需要引入事務(wù)消息。
不同使用場景,對語義的要求也不一樣。比如日志收集類的場景,At Most Once 就可以滿足,而支付類的場景則要求 Exactly Once。
2.消息重復(fù)
什么情況下會導(dǎo)致消息重復(fù)呢?
生產(chǎn)者發(fā)送消息后,Broker 保存成功,但是沒有成功給生產(chǎn)者返回 ACK,生產(chǎn)者以為消息發(fā)送失敗,重試,再次給 Broker 發(fā)送。Broker 保存了重復(fù)消息,導(dǎo)致 Consumer 多次消費。
圖片
消費者消費消息后,給 Broker 返回 ACK 失敗,導(dǎo)致 Broker 沒有修改偏移量,同一條消息再次發(fā)送給消費者,或者被消費者拉取到。
圖片
3.生產(chǎn)者防重
有的消息中間件是支持生產(chǎn)者冥等的。比如 Kafka 從 0.11.0 版本開始引入了冪等 Producer,可以使用下面代碼開啟冪等 Producer:
Properties props = new Properties();
//省略其他代碼
//配置冪等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
//創(chuàng)建生產(chǎn)者實例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Kafka 實現(xiàn)生產(chǎn)者冪等的原理是在生產(chǎn)者引入了 Producer ID(PID)和 Sequence Number 這兩個參數(shù)。
- PID:Producer 擁有的 ID,唯一標(biāo)識一個 Producer。
- Sequence Number:自增的數(shù)值,唯一標(biāo)識同一個 Producer 發(fā)送到指定分區(qū)的消息 ID。
有了這兩個參數(shù),Broker 單分區(qū)就可以唯一標(biāo)識一個生產(chǎn)者發(fā)送的唯一一條消息<PID,SequenceNumber>。Broker 收到消息時,如果檢查到消息的<PID,SequenceNumber>已經(jīng)存在,就不會再保留這條消息。
但冪等 Producer 只能在單分區(qū)下生效,多分區(qū)情況下是不生效的。因為多個分區(qū)之間并不能相互訪問對方的<PID,SequenceNumber>。
圖片
4.Broker 防重
Broker 如果可以防重,那對于生產(chǎn)者和消費者來說,節(jié)省了大量的工作。下面我們看下 Pulsar 是怎樣防重的。
Broker 通過參數(shù) BrokerDeduplicationEnabled 開啟防重功能。對于 Producer 發(fā)送的重復(fù)消息,Broker 返回響應(yīng) -1:-1。
Producer 發(fā)送消息時,會帶一個 sequenceId 字段,Broker 會按照 ProducerName 維度記錄當(dāng)前生產(chǎn)者最大的 sequenceId(highestSequenceId)。Broker 收到消息時,首先會判斷消息中的 sequenceId 是否大于自己保存的當(dāng)前生產(chǎn)者的 highestSequenceId,如果是則保存消息并更新 highestSequenceId,否則丟棄消息,并且給 Producer 返回 -1:-1。
下面是三個極端情況:
- Producer 斷開連接:這種情況下,跟 Broker 重新建立連接后,本地保存的 sequenceId 還在,只要使用 sequenceId 遞增后發(fā)送消息即可;
- Producer 宕機(jī):Producer 重啟后,緩存的 sequenceId 肯定不存在了,這時跟 Broker 重新建立連接后,Broker 會根據(jù) ProducerName 找出 highestSequenceId 發(fā)給 Producer,Producer 使用這個 sequenceId 來發(fā)送消息;
- Producer 和 Broker 都宕機(jī):Broker 重啟后,可以從宕機(jī)前保存的快照中恢復(fù)各 Producer 對應(yīng)的 highestSequenceId 發(fā)送給各 Producer。但這個 highestSequenceId 不一定準(zhǔn)確,因為 Broker 宕機(jī)瞬間很有可能最新的 sequenceId 沒有來得及保存快照。
需要注意的是,跟 Kafka 的冪等 Producer 類似,Pulsar 的 Broker 冪等也只能保證 Topic/Partition 級別。
5.消費者防重
從上面的分析可以看出,靠生產(chǎn)者防重和 Broker 防重,只能在 Topic/Partition 級別生效,這通常并不能滿足我們的需求。而為了避免消費者重復(fù)消費對業(yè)務(wù)造成影響,消息防重還是必要的。這就要求我們做最后一道防線,在消費端進(jìn)行防重或冪等處理。
消費端做防重,就不再考慮消息中間件層面的配置(比如 sequenceId),而是從消息體進(jìn)行下手。
生產(chǎn)者發(fā)送消息時,給消息體賦值一個全局唯一的 ID,消費者處理消息時,根據(jù)全局唯一 ID 做防重。
比如消費端的邏輯是保存一條訂單消息,那把唯一 ID 保存到數(shù)據(jù)庫并且加一個唯一索引,這樣根據(jù)唯一索引就可以做消息去重。
不過使用唯一索引也有缺點:
- 如果使用 MySQL 數(shù)據(jù)庫,不能使用 Change Buffer;
- 非插入的場景(比如更新庫存)不能去重。
對于唯一索引的缺點,我們可以引入 Redis 對唯一 ID 做保存,利用 setNx 判斷消息是否已經(jīng)處理過。如下圖:
圖片
if (jedis.setnx(ID, "1") == 1) {
//處理業(yè)務(wù),返回 ACK
}else {
//直接返回 返回 ACK
}
6.總結(jié)
使用消息隊列,在一些場景下是需要防重的。主流消息隊列提供了一些防重的能力,但并不是完全可靠的。在對重復(fù)消息敏感的場景下,最好是在消費端處理消息時,從業(yè)務(wù)層面進(jìn)行消息防重。