自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka又出問(wèn)題了!

開(kāi)發(fā) 架構(gòu) Kafka
作者個(gè)人研發(fā)的在高并發(fā)場(chǎng)景下,提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,具有精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能。

[[384383]]

 作者個(gè)人研發(fā)的在高并發(fā)場(chǎng)景下,提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,具有精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能。自開(kāi)源半年多以來(lái),已成功為十幾家中小型企業(yè)提供了精準(zhǔn)定時(shí)調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗(yàn)。為使更多童鞋受益,現(xiàn)給出開(kāi)源框架地址:https://github.com/sunshinelyz/mykit-delay

寫(xiě)在前面

估計(jì)運(yùn)維年前沒(méi)有祭拜服務(wù)器,Nginx的問(wèn)題修復(fù)了,Kafka又不行了。今天,本來(lái)想再睡會(huì),結(jié)果,電話又響了。還是運(yùn)營(yíng),“喂,冰河,到公司了嗎?趕緊看看服務(wù)器吧,又出問(wèn)題了“。“在路上了,運(yùn)維那哥們兒還沒(méi)上班嗎”?“還在休假。。。”, 我:“。。。”。哎,這哥們兒是跑路了嗎?先不管他,問(wèn)題還是要解決。

問(wèn)題重現(xiàn)

到公司后,放下我專用的雙肩包,拿出我的利器——筆記本電腦,打開(kāi)后迅速登錄監(jiān)控系統(tǒng),發(fā)現(xiàn)主要業(yè)務(wù)系統(tǒng)沒(méi)啥問(wèn)題。一個(gè)非核心服務(wù)發(fā)出了告警,并且監(jiān)控系統(tǒng)中顯示這個(gè)服務(wù)頻繁的拋出如下異常。

  1. 2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] -  
  2. commit failed  
  3. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
  4.         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] 
  5.         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] 
  6.         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] 
  7.         at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] 
  8.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] 
  9.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] 
  10.         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161] 

從上面輸出的異常信息,大概可以判斷出系統(tǒng)出現(xiàn)的問(wèn)題:Kafka消費(fèi)者在處理完一批poll消息后,在同步提交偏移量給broker時(shí)報(bào)錯(cuò)了。大概就是因?yàn)楫?dāng)前消費(fèi)者線程的分區(qū)被broker給回收了,因?yàn)镵afka認(rèn)為這個(gè)消費(fèi)者掛掉了,我們可以從下面的輸出信息中可以看出這一點(diǎn)。

  1. Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 

Kafka內(nèi)部觸發(fā)了Rebalance機(jī)制,明確了問(wèn)題,接下來(lái),我們就開(kāi)始分析問(wèn)題了。

分析問(wèn)題

既然Kafka觸發(fā)了Rebalance機(jī)制,那我就來(lái)說(shuō)說(shuō)Kafka觸發(fā)Rebalance的時(shí)機(jī)。

什么是Rebalance

舉個(gè)具體點(diǎn)的例子,比如某個(gè)分組下有10個(gè)Consumer實(shí)例,這個(gè)分組訂閱了一個(gè)50個(gè)分區(qū)的主題。正常情況下,Kafka會(huì)為每個(gè)消費(fèi)者分配5個(gè)分區(qū)。這個(gè)分配的過(guò)程就是Rebalance。

觸發(fā)Rebalance的時(shí)機(jī)

當(dāng)Kafka中滿足如下條件時(shí),會(huì)觸發(fā)Rebalance:

  • 組內(nèi)成員的個(gè)數(shù)發(fā)生了變化,比如有新的消費(fèi)者加入消費(fèi)組,或者離開(kāi)消費(fèi)組。組成員離開(kāi)消費(fèi)組包含組成員崩潰或者主動(dòng)離開(kāi)消費(fèi)組。
  • 訂閱的主題個(gè)數(shù)發(fā)生了變化。
  • 訂閱的主題分區(qū)數(shù)發(fā)生了變化。

后面兩種情況我們可以人為的避免,在實(shí)際工作過(guò)程中,對(duì)于Kafka發(fā)生Rebalance最常見(jiàn)的原因是消費(fèi)組成員的變化。

消費(fèi)者成員正常的添加和停掉導(dǎo)致Rebalance,這種情況無(wú)法避免,但是時(shí)在某些情況下,Consumer 實(shí)例會(huì)被 Coordinator 錯(cuò)誤地認(rèn)為 “已停止” 從而被“踢出”Group,導(dǎo)致Rebalance。

當(dāng) Consumer Group 完成 Rebalance 之后,每個(gè) Consumer 實(shí)例都會(huì)定期地向 Coordinator 發(fā)送心跳請(qǐng)求,表明它還存活著。如果某個(gè) Consumer 實(shí)例不能及時(shí)地發(fā)送這些心跳請(qǐng)求,Coordinator 就會(huì)認(rèn)為該 Consumer 已經(jīng) “死” 了,從而將其從 Group 中移除,然后開(kāi)啟新一輪 Rebalance。這個(gè)時(shí)間可以通過(guò)Consumer 端的參數(shù) session.timeout.ms 進(jìn)行配置。默認(rèn)值是 10 秒。

除了這個(gè)參數(shù),Consumer 還提供了一個(gè)控制發(fā)送心跳請(qǐng)求頻率的參數(shù),就是 heartbeat.interval.ms。這個(gè)值設(shè)置得越小,Consumer 實(shí)例發(fā)送心跳請(qǐng)求的頻率就越高。頻繁地發(fā)送心跳請(qǐng)求會(huì)額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開(kāi)啟 Rebalance,因?yàn)?,目? Coordinator 通知各個(gè) Consumer 實(shí)例開(kāi)啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標(biāo)志封裝進(jìn)心跳請(qǐng)求的響應(yīng)體中。

除了以上兩個(gè)參數(shù),Consumer 端還有一個(gè)參數(shù),用于控制 Consumer 實(shí)際消費(fèi)能力對(duì) Rebalance 的影響,即 max.poll.interval.ms 參數(shù)。它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時(shí)間間隔。它的默認(rèn)值是 5 分鐘,表示 Consumer 程序如果在 5 分鐘之內(nèi)無(wú)法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會(huì)主動(dòng)發(fā)起 “離開(kāi)組” 的請(qǐng)求,Coordinator 也會(huì)開(kāi)啟新一輪 Rebalance。

通過(guò)上面的分析,我們可以看一下那些rebalance是可以避免的:

第一類非必要 Rebalance 是因?yàn)槲茨芗皶r(shí)發(fā)送心跳,導(dǎo)致 Consumer 被 “踢出”Group 而引發(fā)的。這種情況下我們可以設(shè)置 session.timeout.ms 和 heartbeat.interval.ms 的值,來(lái)盡量避免rebalance的出現(xiàn)。(以下的配置是在網(wǎng)上找到的最佳實(shí)踐,暫時(shí)還沒(méi)測(cè)試過(guò))

  • 設(shè)置 session.timeout.ms = 6s。
  • 設(shè)置 heartbeat.interval.ms = 2s。
  • 要保證 Consumer 實(shí)例在被判定為 “dead” 之前,能夠發(fā)送至少 3 輪的心跳請(qǐng)求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

將 session.timeout.ms 設(shè)置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經(jīng)掛掉的 Consumer,早日把它們踢出 Group。

第二類非必要 Rebalance 是 Consumer 消費(fèi)時(shí)間過(guò)長(zhǎng)導(dǎo)致的。此時(shí),max.poll.interval.ms 參數(shù)值的設(shè)置顯得尤為關(guān)鍵。如果要避免非預(yù)期的 Rebalance,最好將該參數(shù)值設(shè)置得大一點(diǎn),比下游最大處理時(shí)間稍長(zhǎng)一點(diǎn)。

總之,要為業(yè)務(wù)處理邏輯留下充足的時(shí)間。這樣,Consumer 就不會(huì)因?yàn)樘幚磉@些消息的時(shí)間太長(zhǎng)而引發(fā) Rebalance 。

拉取偏移量與提交偏移量

kafka的偏移量(offset)是由消費(fèi)者進(jìn)行管理的,偏移量有兩種,拉取偏移量(position)與提交偏移量(committed)。拉取偏移量代表當(dāng)前消費(fèi)者分區(qū)消費(fèi)進(jìn)度。每次消息消費(fèi)后,需要提交偏移量。在提交偏移量時(shí),kafka會(huì)使用拉取偏移量的值作為分區(qū)的提交偏移量發(fā)送給協(xié)調(diào)者。

如果沒(méi)有提交偏移量,下一次消費(fèi)者重新與broker連接后,會(huì)從當(dāng)前消費(fèi)者group已提交到broker的偏移量處開(kāi)始消費(fèi)。

所以,問(wèn)題就在這里,當(dāng)我們處理消息時(shí)間太長(zhǎng)時(shí),已經(jīng)被broker剔除,提交偏移量又會(huì)報(bào)錯(cuò)。所以拉取偏移量沒(méi)有提交到broker,分區(qū)又rebalance。下一次重新分配分區(qū)時(shí),消費(fèi)者會(huì)從最新的已提交偏移量處開(kāi)始消費(fèi)。這里就出現(xiàn)了重復(fù)消費(fèi)的問(wèn)題。

異常日志提示的方案

其實(shí),說(shuō)了這么多,Kafka消費(fèi)者輸出的異常日志中也給出了相應(yīng)的解決方案。

接下來(lái),我們說(shuō)說(shuō)Kafka中的拉取偏移量和提交偏移量。

其實(shí),從輸出的日志信息中,也大概給出了解決問(wèn)題的方式,簡(jiǎn)單點(diǎn)來(lái)說(shuō),就是可以通過(guò)增加 max.poll.interval.ms 時(shí)長(zhǎng)和 session.timeout.ms時(shí)長(zhǎng),減少 max.poll.records的配置值,并且消費(fèi)端在處理完消息時(shí)要及時(shí)提交偏移量。

問(wèn)題解決

通過(guò)之前的分析,我們應(yīng)該知道如何解決這個(gè)問(wèn)題了。這里需要說(shuō)一下的是,我在集成Kafka的時(shí)候,使用的是SpringBoot和Kafka消費(fèi)監(jiān)聽(tīng)器,消費(fèi)端的主要代碼結(jié)構(gòu)如下所示。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory"
  2. public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ 
  3.     logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value()); 
  4.     try { 
  5.         Object value = record.value(); 
  6.         logger.info(value.toString()); 
  7.         ack.acknowledge(); 
  8.     } catch (Exception e) { 
  9.         logger.error("日志消費(fèi)端異常: {}", e); 
  10.     } 

上述代碼邏輯比較簡(jiǎn)單,就是獲取到Kafka中的消息后直接打印輸出到日志文件中。

嘗試解決

這里,我先根據(jù)異常日志的提示信息進(jìn)行配置,所以,我在SpringBoot的application.yml文件中新增了如下配置信息。

  1. spring: 
  2.   kafka: 
  3.     consumer: 
  4.     properties: 
  5.      max.poll.interval.ms: 3600000 
  6.      max.poll.records: 50 
  7.      session.timeout.ms: 60000 
  8.      heartbeat.interval.ms: 3000 

配置完成后,再次測(cè)試消費(fèi)者邏輯,發(fā)現(xiàn)還是拋出Rebalance異常。

最終解決

我們從另一個(gè)角度來(lái)看下Kafka消費(fèi)者所產(chǎn)生的問(wèn)題:一個(gè)Consumer在生產(chǎn)消息,另一個(gè)Consumer在消費(fèi)它的消息,它們不能在同一個(gè)groupId 下面,更改其中一個(gè)的groupId 即可。

這里,我們的業(yè)務(wù)項(xiàng)目是分模塊和子系統(tǒng)進(jìn)行開(kāi)發(fā)的,例如模塊A在生產(chǎn)消息,模塊B消費(fèi)模塊A生產(chǎn)的消息。此時(shí),修改配置參數(shù),例如 session.timeout.ms: 60000,根本不起作用,還是拋出Rebalance異常。

此時(shí),我嘗試修改下消費(fèi)者分組的groupId,將下面的代碼

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory"
  2. public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ 

修改為如下所示的代碼。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory"
  2. public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ 

再次測(cè)試,問(wèn)題解決~~

 本文轉(zhuǎn)載自微信公眾號(hào)「冰河技術(shù)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系冰河技術(shù)公眾號(hào)。

 

責(zé)任編輯:武曉燕 來(lái)源: 冰河技術(shù)
相關(guān)推薦

2022-09-19 08:35:28

Kafka節(jié)點(diǎn)故障

2012-05-16 13:43:20

操作系統(tǒng)故障檢修系統(tǒng)管理

2013-10-18 17:09:18

Windows 8.1微軟

2022-06-07 00:33:21

驅(qū)動(dòng)安卓開(kāi)發(fā)

2021-04-23 09:33:55

Windows10操作系統(tǒng)微軟

2020-05-27 15:14:55

iOSiPhone更新

2019-02-14 10:13:42

網(wǎng)絡(luò)故障RIPIGRP

2021-05-31 09:47:03

Windows10操作系統(tǒng)微軟

2019-05-25 17:19:33

Apple 支持蘋果設(shè)備

2020-03-04 15:20:17

Windows 10Windows更新

2023-07-27 15:17:56

微軟Windows 11

2021-03-12 15:50:54

Windows 10Windows操作系統(tǒng)

2018-08-22 10:12:07

2021-06-15 05:36:45

Gulpawaitasync

2021-06-28 07:27:43

AwaitAsync語(yǔ)法

2021-03-06 10:25:19

內(nèi)存Java代碼

2010-03-22 16:27:57

Windows安全殺毒軟件

2021-02-03 15:12:08

java內(nèi)存溢出

2010-02-01 16:39:32

Dell主板質(zhì)量

2019-02-27 16:00:28

IT資產(chǎn)審計(jì)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)