自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka 如何解決消息不丟失?

開發(fā) 架構(gòu) Kafka
Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個(gè)高性能的MQ服務(wù)來連接生產(chǎn)和消費(fèi)兩個(gè)系統(tǒng),達(dá)到系統(tǒng)間的解耦,有很強(qiáng)的擴(kuò)展性。

[[415220]]

本文轉(zhuǎn)載自微信公眾號(hào)「微觀技術(shù)」,作者微觀技術(shù)。轉(zhuǎn)載本文請聯(lián)系微觀技術(shù)公眾號(hào)。

大家好,我是Tom哥~

Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個(gè)高性能的MQ服務(wù)來連接生產(chǎn)和消費(fèi)兩個(gè)系統(tǒng),達(dá)到系統(tǒng)間的解耦,有很強(qiáng)的擴(kuò)展性。

你可能會(huì)有疑問,如果中間某一個(gè)環(huán)節(jié)斷掉了,那怎么辦?

這種情況,我們稱之為消息丟失,會(huì)造成系統(tǒng)間的數(shù)據(jù)不一致。

那如何解決這個(gè)問題?需要從生產(chǎn)端、MQ服務(wù)端、消費(fèi)端,三個(gè)維度來處理。

1、生產(chǎn)端

生產(chǎn)端的職責(zé)就是,確保生產(chǎn)的消息能到達(dá)MQ服務(wù)端,這里我們需要有一個(gè)響應(yīng)來判斷本次的操作是否成功。

  1. Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) 

比如,上面的代碼就是通過一個(gè)Callback函數(shù),來判斷消息是否發(fā)送成功,如果失敗,我們需要補(bǔ)償處理。

另外,為了提升發(fā)送時(shí)的靈活性,kafka提供了多種參數(shù),供不同業(yè)務(wù)自己選擇

1.1 參數(shù) acks

該參數(shù)表示有多少個(gè)分區(qū)副本收到消息,才認(rèn)為本次發(fā)送是成功的。

acks=0,只要發(fā)送消息就認(rèn)為成功,生產(chǎn)端不等待服務(wù)器節(jié)點(diǎn)的響應(yīng)

acks=1,表示生產(chǎn)者收到 leader 分區(qū)的響應(yīng)就認(rèn)為發(fā)送成功

acks=-1,只有當(dāng) ISR 中的副本全部收到消息時(shí),生產(chǎn)端才會(huì)認(rèn)為是成功的。這種配置是最安全的,但由于同步的節(jié)點(diǎn)較多,吞吐量會(huì)降低。

1.2 參數(shù) retries

表示生產(chǎn)端的重試次數(shù),如果重試次數(shù)用完后,還是失敗,會(huì)將消息臨時(shí)存儲(chǔ)在本地磁盤,待服務(wù)恢復(fù)后再重新發(fā)送。建議值 retries=3

1.3 參數(shù) retry.backoff.m

消息發(fā)送超時(shí)或失敗后,間隔的重試時(shí)間。一般推薦的設(shè)置時(shí)間是 300 毫秒。

這里要特別注意一種特殊情況,如果MQ服務(wù)沒有正常響應(yīng),不一定代表消息發(fā)送失敗,也有可能是響應(yīng)時(shí)正好趕上網(wǎng)絡(luò)抖動(dòng),響應(yīng)超時(shí)。

當(dāng)生產(chǎn)端做完這些,一定能保證消息發(fā)送成功了,但可能發(fā)送多次,這樣就會(huì)導(dǎo)致消息重復(fù),這個(gè)我們后面再講解決方案。

2、MQ服務(wù)端

MQ服務(wù)端作為消息的存儲(chǔ)介質(zhì),也有可能會(huì)丟失消息。比如:一個(gè)分區(qū)突然掛掉,那么怎么保證這個(gè)分區(qū)的數(shù)據(jù)不丟失,我們會(huì)引入副本概念,通過備份來解決這個(gè)問題。

具體可設(shè)置哪些參數(shù)?

2.1 參數(shù) replication.factor

表示分區(qū)副本的個(gè)數(shù),replication.factor >1 當(dāng)leader 副本掛了,follower副本會(huì)被選舉為leader繼續(xù)提供服務(wù)。

2.2 參數(shù) min.insync.replicas

表示 ISR 最少的副本數(shù)量,通常設(shè)置 min.insync.replicas >1,這樣才有可用的follower副本執(zhí)行替換,保證消息不丟失

2.3 參數(shù) unclean.leader.election.enable

是否可以把非 ISR 集合中的副本選舉為 leader 副本。

如果設(shè)置為true,而follower副本的同步消息進(jìn)度落后較多,此時(shí)被選舉為leader,會(huì)導(dǎo)致消息丟失,慎用。

3、消費(fèi)端

消費(fèi)端要做的是把消息完整的消費(fèi)處理掉。但是這里面有個(gè)提交位移的步驟。

有的同學(xué),考慮到業(yè)務(wù)處理消耗時(shí)間較長,會(huì)單獨(dú)啟動(dòng)線程拉取消息存儲(chǔ)到本地內(nèi)存隊(duì)列,然后再搞個(gè)線程池并行處理業(yè)務(wù)邏輯。這樣設(shè)計(jì)有個(gè)風(fēng)險(xiǎn),本地消息如果沒有處理完,服務(wù)器宕機(jī)了,會(huì)造成消息丟失。

正確的做法:拉取消息 --- 業(yè)務(wù)處理 ---- 提交消費(fèi)位移

關(guān)于提交位移,kafka提供了集中參數(shù)配置

參數(shù) enable.auto.commit

表示消費(fèi)位移是否自動(dòng)提交。

如果拉取了消息,業(yè)務(wù)邏輯還沒處理完,提交了消費(fèi)位移但是消費(fèi)端卻掛了,消費(fèi)端恢復(fù)或其他消費(fèi)端接管該分片再也拉取不到這條消息,會(huì)造成消息丟失。所以,我們通常設(shè)置 enable.auto.commit=false,手動(dòng)提交消費(fèi)位移。

  1. List<String> messages = consumer.poll(); 
  2. processMsg(messages); 
  3. consumer.commitOffset(); 

這個(gè)方案,會(huì)產(chǎn)生另外一個(gè)問題,我們來看下這個(gè)圖:

拉取了消息4~消息8,業(yè)務(wù)處理后,在提交消費(fèi)位移時(shí),不湊巧系統(tǒng)宕機(jī)了,最后的提交位移并沒有保存到MQ 服務(wù)端,下次拉取消息時(shí),依然是從消息4開始拉取,但是這部分消息已經(jīng)處理過了,這樣便會(huì)導(dǎo)致重復(fù)消費(fèi)。

如何解決重復(fù)消費(fèi),避免引發(fā)數(shù)據(jù)不一致

首先,要解決MQ 服務(wù)端的重復(fù)消息。kafka 在 0.11.0 版本后,每條消息都有唯一的message id, MQ服務(wù)采用空間換時(shí)間方式,自動(dòng)對重復(fù)消息過濾處理,保證接口的冪等性。

但這個(gè)不能根本上解決消息重復(fù)問題,即使MQ服務(wù)中存儲(chǔ)的消息沒有重復(fù),但消費(fèi)端是采用拉取方式,如果重復(fù)拉取,也會(huì)導(dǎo)致重復(fù)消費(fèi),如何解決這種場景問題?

方案一:只拉取一次(消費(fèi)者拉取消息后,先提交 offset 后再處理消息),但是如果系統(tǒng)宕機(jī),業(yè)務(wù)處理沒有正常結(jié)束,后面再也拉取不到這些消息,會(huì)導(dǎo)致數(shù)據(jù)不一致,該方案很少采用。

方案二:允許拉取重復(fù)消息,但是消費(fèi)端自己做冪等性控制。保證只成功消費(fèi)一次。 

關(guān)于冪等技術(shù)方案很多,我們可以采用數(shù)據(jù)表或Redis緩存存儲(chǔ)處理標(biāo)識(shí),每次拉取到消息,處理前先校驗(yàn)處理狀態(tài),再?zèng)Q定是處理還是丟棄消息。

 

責(zé)任編輯:武曉燕 來源: 微觀技術(shù)
相關(guān)推薦

2024-06-18 08:26:22

2022-08-29 18:14:55

MQ數(shù)據(jù)不丟失

2024-08-06 09:55:25

2021-09-13 07:23:53

KafkaGo語言

2021-03-08 10:19:59

MQ消息磁盤

2022-08-26 05:24:04

中間件技術(shù)Kafka

2021-10-22 08:37:13

消息不丟失rocketmq消息隊(duì)列

2022-07-14 14:27:34

Javascript數(shù)字精度二進(jìn)制

2019-03-13 09:27:57

宕機(jī)Kafka數(shù)據(jù)

2022-08-26 17:08:51

KafkaRedi數(shù)據(jù)

2024-04-23 08:46:45

消息積壓KafkaMQ

2018-03-29 09:46:02

2022-03-31 08:26:44

RocketMQ消息排查

2009-06-05 15:35:31

網(wǎng)絡(luò)不通數(shù)據(jù)發(fā)送

2024-11-11 07:05:00

Redis哨兵模式主從復(fù)制

2024-02-26 08:10:00

Redis數(shù)據(jù)數(shù)據(jù)庫

2011-03-07 14:09:10

FileZilla

2011-08-22 14:50:39

ssh

2022-07-11 08:01:55

Kafka服務(wù)器宕機(jī)

2023-09-13 08:14:57

RocketMQ次數(shù)機(jī)制
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)