Go語言如何操縱Kafka保證無消息丟失
背景
目前一些互聯(lián)網公司會使用消息隊列來做核心業(yè)務,因為是核心業(yè)務,所以對數(shù)據(jù)的最后一致性比較敏感,如果中間出現(xiàn)數(shù)據(jù)丟失,就會引來用戶的投訴,年底績效就變成325了。之前和幾個朋友聊天,他們的公司都在用kafka來做消息隊列,使用kafka到底會不會丟消息呢?如果丟消息了該怎么做好補償措施呢?本文我們就一起來分析一下,并介紹如何使用Go操作Kafka可以不丟失數(shù)據(jù)。
本文操作kafka基于:https://github.com/Shopify/sarama
初識kafka架構
維基百科對kafka的介紹:
Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規(guī)模發(fā)布/訂閱消息隊列”,這使它作為企業(yè)級基礎設施來處理流式數(shù)據(jù)非常有價值。此外,Kafka可以通過Kafka Connect連接到外部系統(tǒng)(用于數(shù)據(jù)輸入/輸出),并提供了Kafka Streams——一個Java]流式處理庫。該設計受事務日志的影響較大。
kafka的整體架構比較簡單,主要由producer、broker、consumer組成:
截屏2021-09-12 上午10.00.13
針對架構圖我們解釋一個各個模塊:
- Producer:數(shù)據(jù)的生產者,可以將數(shù)據(jù)發(fā)布到所選擇的topic中。
- Consumer:數(shù)據(jù)的消費者,使用Consumer Group進行標識,在topic中的每條記錄都會被分配給訂閱消費組中的一個消費者實例,消費者實例可以分布在多個進程中或者多個機器上。
- Broker:消息中間件處理節(jié)點(服務器),一個節(jié)點就是一個broker,一個Kafka集群由一個或多個broker組成。
還有些概念我們也介紹一下:
- topic:可以理解為一個消息的集合,topic存儲在broker中,一個topic可以有多個partition分區(qū),一個topic可以有多個Producer來push消息,一個topic可以有多個消費者向其pull消息,一個topic可以存在一個或多個broker中。
- partition:其是topic的子集,不同分區(qū)分配在不同的broker上進行水平擴展從而增加kafka并行處理能力,同topic下的不同分區(qū)信息是不同的,同一分區(qū)信息是有序的;每一個分區(qū)都有一個或者多個副本,其中會選舉一個leader,fowller從leader拉取數(shù)據(jù)更新自己的log(每個分區(qū)邏輯上對應一個log文件夾),消費者向leader中pull信息。
kafka丟消息的三個節(jié)點
生產者push消息節(jié)點
先看一下producer的大概寫入流程:
- producer先從kafka集群找到該partition的leader
- producer將消息發(fā)送給leader,leader將該消息寫入本地
- follwers從leader pull消息,寫入本地log后leader發(fā)送ack
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加high watermark,并向 producer 發(fā)送 ack
截屏2021-09-12 上午11.16.43
通過這個流程我們可以看到kafka最終會返回一個ack來確認推送消息結果,這里kafka提供了三種模式:
- NoResponse RequiredAcks = 0
- WaitForLocal RequiredAcks = 1
- WaitForAll RequiredAcks = -1
- NoResponse RequiredAcks = 0:這個代表的就是數(shù)據(jù)推出的成功與否都與我無關了
- WaitForLocal RequiredAcks = 1:當local(leader)確認接收成功后,就可以返回了
- WaitForAll RequiredAcks = -1:當所有的leader和follower都接收成功時,才會返回
所以根據(jù)這三種模式我們就能推斷出生產者在push消息時有一定幾率丟失的,分析如下:
- 如果我們選擇了模式1,這種模式丟失數(shù)據(jù)的幾率很大,無法重試
- 如果我們選擇了模式2,這種模式下只要leader不掛,就可以保證數(shù)據(jù)不丟失,但是如果leader掛了,follower還沒有同步數(shù)據(jù),那么就會有一定幾率造成數(shù)據(jù)丟失
- 如果選擇了模式3,這種情況不會造成數(shù)據(jù)丟失,但是有可能會造成數(shù)據(jù)重復,假如leader與follower同步數(shù)據(jù)是網絡出現(xiàn)問題,就有可能造成數(shù)據(jù)重復的問題。
所以在生產環(huán)境中我們可以選擇模式2或者模式3來保證消息的可靠性,具體需要根據(jù)業(yè)務場景來進行選擇,在乎吞吐量就選擇模式2,不在乎吞吐量,就選擇模式3,要想完全保證數(shù)據(jù)不丟失就選擇模式3是最可靠的。
kafka集群自身故障造成
kafka集群接收到數(shù)據(jù)后會將數(shù)據(jù)進行持久化存儲,最終數(shù)據(jù)會被寫入到磁盤中,在寫入磁盤這一步也是有可能會造成數(shù)據(jù)損失的,因為寫入磁盤的時候操作系統(tǒng)會先將數(shù)據(jù)寫入緩存,操作系統(tǒng)將緩存中數(shù)據(jù)寫入磁盤的時間是不確定的,所以在這種情況下,如果kafka機器突然宕機了,也會造成數(shù)據(jù)損失,不過這種概率發(fā)生很小,一般公司內部kafka機器都會做備份,這種情況很極端,可以忽略不計。
消費者pull消息節(jié)點
push消息時會把數(shù)據(jù)追加到Partition并且分配一個偏移量,這個偏移量代表當前消費者消費到的位置,通過這個Partition也可以保證消息的順序性,消費者在pull到某個消息后,可以設置自動提交或者手動提交commit,提交commit成功,offset就會發(fā)生偏移:
截屏2021-09-12 下午3.37.33
所以自動提交會帶來數(shù)據(jù)丟失的問題,手動提交會帶來數(shù)據(jù)重復的問題,分析如下:
- 在設置自動提交的時候,當我們拉取到一個消息后,此時offset已經提交了,但是我們在處理消費邏輯的時候失敗了,這就會導致數(shù)據(jù)丟失了
- 在設置手動提交時,如果我們是在處理完消息后提交commit,那么在commit這一步發(fā)生了失敗,就會導致重復消費的問題。
比起數(shù)據(jù)丟失,重復消費是符合業(yè)務預期的,我們可以通過一些冪等性設計來規(guī)避這個問題。
實戰(zhàn)
完整代碼已經上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/kafka_demo
解決push消息丟失問題
主要是通過兩點來解決:
- 通過設置RequiredAcks模式來解決,選用WaitForAll可以保證數(shù)據(jù)推送成功,不過會影響時延時
- 引入重試機制,設置重試次數(shù)和重試間隔
因此我們寫出如下代碼(摘出創(chuàng)建client部分):
- func NewAsyncProducer() sarama.AsyncProducer {
- cfg := sarama.NewConfig()
- version, err := sarama.ParseKafkaVersion(VERSION)
- if err != nil{
- log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error())
- return nil
- }
- cfg.Version = version
- cfg.Producer.RequiredAcks = sarama.WaitForAll // 三種模式任君選擇
- cfg.Producer.Partitioner = sarama.NewHashPartitioner
- cfg.Producer.Return.Successes = true
- cfg.Producer.Return.Errors = true
- cfg.Producer.Retry.Max = 3 // 設置重試3次
- cfg.Producer.Retry.Backoff = 100 * time.Millisecond
- cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg)
- if err != nil{
- log.Fatal("NewAsyncProducer failed", err.Error())
- return nil
- }
- return cli
- }
解決pull消息丟失問題
這個解決辦法就比較粗暴了,直接使用自動提交的模式,在每次真正消費完之后在自己手動提交offset,但是會產生重復消費的問題,不過很好解決,使用冪等性操作即可解決。
代碼示例:
- func NewConsumerGroup(group string) sarama.ConsumerGroup {
- cfg := sarama.NewConfig()
- version, err := sarama.ParseKafkaVersion(VERSION)
- if err != nil{
- log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error())
- return nil
- }
- cfg.Version = version
- cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
- cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
- cfg.Consumer.Offsets.Retry.Max = 3
- cfg.Consumer.Offsets.AutoCommit.Enable = true // 開啟自動提交,需要手動調用MarkMessage才有效
- cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 間隔
- client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg)
- if err != nil {
- log.Fatal("NewConsumerGroup failed", err.Error())
- }
- return client
- }
上面主要是創(chuàng)建ConsumerGroup部分,細心的讀者應該看到了,我們這里使用的是自動提交,說好的使用手動提交呢?這是因為我們這個kafka庫的特性不同,這個自動提交需要與MarkMessage()方法配合使用才會提交(有疑問的朋友可以實踐一下,或者看一下源碼),否則也會提交失敗,因為我們在寫消費邏輯時要這樣寫:
- func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- var data common.KafkaMsg
- if err := json.Unmarshal(msg.Value, &data); err != nil {
- return errors.New("failed to unmarshal message err is " + err.Error())
- }
- // 操作數(shù)據(jù),改用打印
- log.Print("consumerClaim data is ")
- // 處理消息成功后標記為處理, 然后會自動提交
- session.MarkMessage(msg,"")
- }
- return nil
- }
或者直接使用手動提交方法來解決,只需兩步:
第一步:關閉自動提交:
- consumerConfig.Consumer.Offsets.AutoCommit.Enable = false // 禁用自動提交,改為手動
第二步:消費邏輯中添加如下代碼,手動提交模式下,也需要先進行標記,在進行commit
- session.MarkMessage(msg,"")
- session.Commit()
完整代碼可以到github上下載并進行驗證!
總結
本文我們主要說明了兩個知識點:
Kafka會產生消息丟失
使用Go操作Kafka如何配置可以不丟失數(shù)據(jù)
日常業(yè)務開發(fā)中,很多公司都喜歡拿消息隊列進行解耦,那么你就要注意了,使用Kafka做消息隊列無法保證數(shù)據(jù)不丟失,需要我們自己手動配置補償,別忘記了,要不又是一場P0事故。