自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

阿里二面:要保證消息不丟失,又不重復(fù),消息隊(duì)列怎么選型?

開發(fā) 架構(gòu)
消息不丟失、不重復(fù)是消息隊(duì)列的基本要求,但這個基本要求還是很難滿足的。消息丟失這個要求,主流消息隊(duì)列通過消息重試和消息持久化的方式可以滿足。但消息重試也同時帶來了消息重復(fù)的可能性,主流消息隊(duì)列在解決重復(fù)消息的問題上并沒有現(xiàn)成的方案,對不允許重復(fù)消費(fèi)的場景,需要開發(fā)人員在消費(fèi)端做冪等處理。

大家好,我是君哥。

在使用消息隊(duì)列時,有兩個經(jīng)常讓我們煩惱的問題,消息丟失和消息重復(fù)。那我們在做技術(shù)選型時,有沒有一個消息隊(duì)列能解決消息丟失和消息重復(fù)這兩個問題呢?

消息丟失

如上圖,從生產(chǎn)者發(fā)送消息,Broker 保存消息,消費(fèi)者消費(fèi)消息,每一個環(huán)節(jié)都有可能丟失消息。

發(fā)送丟失

生產(chǎn)者發(fā)送消息時,如果處理不當(dāng),很可能會造成消息丟失。

生產(chǎn)者發(fā)送消息,主流消息隊(duì)列都支持同步發(fā)送和異步發(fā)送。如果使用同步發(fā)送,生產(chǎn)者發(fā)送消息后,會同步等待 Broker 返回的 ACK,收到 ACK 消息,就認(rèn)為消息發(fā)送成功。如果長時間沒有收到,則會認(rèn)為消息發(fā)送失敗,需要進(jìn)行重試。

同步發(fā)送可以保證消息不丟失,但是會有性能問題,所以多數(shù)情況會選擇異步發(fā)送。異步發(fā)送如何保證消息不丟失呢?主流消息隊(duì)列(比如 Kafka 和 RocketMQ)實(shí)現(xiàn)方法基本類似,使用回調(diào)函數(shù)來實(shí)現(xiàn)。下面看一下 Kafka 的異步發(fā)送代碼:

producer.send(record, new Callback() {
 public void onCompletion(RecordMetadata metadata, Exception exception) {
 if (exception != null) {
  logger.error("發(fā)送消息失?。?, exception);
 }
 if (metadata != null) {
     logger.info("消息發(fā)送成功");
  }
 }
});

消息存儲

生產(chǎn)者發(fā)送消息成功,也不能保證消息絕對不丟失。因?yàn)榧词瓜l(fā)送到 Broker,如果在消費(fèi)者拉取到消息之前,Broker 宕機(jī)了,消息還沒有落盤,也會導(dǎo)致消息丟失。

在存儲階段要保證消息不丟失,可以考慮幾個方面:

同步刷盤

采用異步刷盤,如果在消息落盤之前 Broker 宕機(jī)了,就會造成消息丟失。而采用同步刷盤,等待消息落盤之后,再給 Sender 返回發(fā)送成功,可以從消息發(fā)送環(huán)節(jié)保證消息不丟失。

在 RocketMQ 中,把 flushDiskType 參數(shù)配置為 SYNC_FLUSH 就可以開啟同步刷盤。

Broker 集群

如果 Broker 集群中只有一個節(jié)點(diǎn),即使消息落盤成功了,Broker 發(fā)送故障,在 Broker 恢復(fù)以前消費(fèi)者也會拉取不到消息。而且如果 Broker 磁盤故障不可恢復(fù),消息也會丟失。

采用 Broker 集群可以很好地解決這個問題。見下圖:

在 Broker 集群時,可以等待 2 個以上的節(jié)點(diǎn)同步消息完成后再給 Producer 返回成功。這樣即使一個 Broker 掛了,也可以很容易找到替代的 Broker。

消息消費(fèi)

消費(fèi)者保證不丟失消息,需要消費(fèi)完成后再給 Broker 返回 ACK。在主流的消息隊(duì)列中,如果 Broker 收不到 ACK,都會給消費(fèi)者再次發(fā)送這條消息。

有時候?yàn)榱私鉀Q消息積壓的問題,消費(fèi)者拉取到消息后會直接返回 ACK,然后再異步執(zhí)行消息處理邏輯。這樣要保證消息不丟失,需要在返回 ACK 之前把消息保存到本地,比如持久化到數(shù)據(jù)庫,后面可以取數(shù)據(jù)庫保存的消息進(jìn)行處理。

消息重復(fù)

消息重復(fù)一般有兩個原因,一個是生產(chǎn)者發(fā)送消息后沒有收到 ACK,然后進(jìn)行重復(fù)發(fā)送,另一個原因是消費(fèi)者消費(fèi)完成后 Broker 沒有收到 ACK,導(dǎo)致消息重復(fù)推送給消費(fèi)者。

重復(fù)消息會對業(yè)務(wù)造成影響,比如電商場景中的重復(fù)支付、賬務(wù)場景中的重復(fù)記賬,對業(yè)務(wù)造成的影響都比較嚴(yán)重。

從目前主流的消息隊(duì)列來看,并沒有一個消息隊(duì)列能解決消息重復(fù)消費(fèi)的問題,只能在消費(fèi)端做冪等處理。下面提供幾個思路作為參考。

數(shù)據(jù)庫唯一鍵約束

如果消息會落本地數(shù)據(jù)庫,可以采用消息 ID 作為唯一鍵。如果消息不落數(shù)據(jù)庫,可以將消息 ID 或者消息中其他唯一能標(biāo)識消息的屬性作為唯一鍵落業(yè)務(wù)數(shù)據(jù)表。

保存消費(fèi)記錄

我們也可以將消息 ID 保存 Redis,消費(fèi)消息前判斷消息 ID 是否已存在。

ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
Boolean result = valueOperations.setIfAbsent(messageId, messageId);
if (result) {
 //消費(fèi)邏輯;
} else {
 logger.error("這條消息已經(jīng)消費(fèi),跳過,消息ID:{}", messageId);
}

這里有一個注意點(diǎn),如果消費(fèi)失敗了,需要刪除 Redis 中保存的消息 ID。

總結(jié)

消息不丟失、不重復(fù)是消息隊(duì)列的基本要求,但這個基本要求還是很難滿足的。

消息丟失這個要求,主流消息隊(duì)列通過消息重試和消息持久化的方式可以滿足。

但消息重試也同時帶來了消息重復(fù)的可能性,主流消息隊(duì)列在解決重復(fù)消息的問題上并沒有現(xiàn)成的方案,對不允許重復(fù)消費(fèi)的場景,需要開發(fā)人員在消費(fèi)端做冪等處理。

責(zé)任編輯:姜華 來源: 君哥聊技術(shù)
相關(guān)推薦

2024-06-18 08:26:22

2025-02-26 07:53:21

2024-08-06 09:55:25

2021-03-08 10:19:59

MQ消息磁盤

2023-10-24 08:25:20

TCC模式事務(wù)

2021-10-22 08:37:13

消息不丟失rocketmq消息隊(duì)列

2023-11-27 13:18:00

Redis數(shù)據(jù)不丟失

2020-10-26 09:19:11

線程池消息

2024-06-05 06:37:19

2023-11-13 08:37:33

消息中間件分布式架構(gòu)

2023-11-27 13:42:00

消息隊(duì)列RocketMQ

2021-08-04 07:47:18

Kafka消息框架

2024-09-18 07:00:00

消息隊(duì)列中間件消息隊(duì)列

2016-11-10 21:00:49

消息存儲數(shù)據(jù)

2021-09-13 07:23:53

KafkaGo語言

2023-09-13 08:14:57

RocketMQ次數(shù)機(jī)制

2017-10-11 15:08:28

消息隊(duì)列常見

2022-08-26 05:24:04

中間件技術(shù)Kafka

2023-09-26 08:20:12

消息隊(duì)列RabbitMQ

2024-09-23 08:00:00

消息隊(duì)列MQ分布式系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號