自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

線(xiàn)上Kafka消息堆積,Consumer掉線(xiàn),怎么辦?

開(kāi)發(fā) 前端
服務(wù)端、客戶(hù)端都沒(méi)有特別的異常日志,kafka其他topic的生產(chǎn)和消費(fèi)都是正常,所以基本可以判斷是客戶(hù)端消費(fèi)存在問(wèn)題。

線(xiàn)上kafka消息堆積,所有consumer全部掉線(xiàn),到底怎么回事?

最近處理了一次線(xiàn)上故障,具體故障表現(xiàn)就是kafka某個(gè)topic消息堆積,這個(gè)topic的相關(guān)consumer全部掉線(xiàn)。

整體排查過(guò)程和事后的復(fù)盤(pán)都很有意思,并且結(jié)合本次故障,對(duì)kafka使用的最佳實(shí)踐有了更深刻的理解。

好了,一起來(lái)回顧下這次線(xiàn)上故障吧,最佳實(shí)踐總結(jié)放在最后,千萬(wàn)不要錯(cuò)過(guò)。

1、現(xiàn)象

線(xiàn)上kafka消息突然開(kāi)始堆積

消費(fèi)者應(yīng)用反饋沒(méi)有收到消息(沒(méi)有處理消息的日志)

kafka的consumer group上看沒(méi)有消費(fèi)者注冊(cè)

消費(fèi)者應(yīng)用和kafka集群最近一周內(nèi)沒(méi)有代碼、配置相關(guān)變更

2、排查過(guò)程

服務(wù)端、客戶(hù)端都沒(méi)有特別的異常日志,kafka其他topic的生產(chǎn)和消費(fèi)都是正常,所以基本可以判斷是客戶(hù)端消費(fèi)存在問(wèn)題。

所以我們重點(diǎn)放在客戶(hù)端排查上。

1)arthas在線(xiàn)修改日志等級(jí),輸出debug

由于客戶(hù)端并沒(méi)有明顯異常日志,因此只能通過(guò)arthas修改應(yīng)用日志等級(jí),來(lái)尋找線(xiàn)索。

果然有比較重要的發(fā)現(xiàn):

2022-10-25 17:36:17,774 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Disabling heartbeat thread

2022-10-25 17:36:17,773 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Sending LeaveGroup request to coordinator xxxxxx (id: 2147483644 rack: null)

看起來(lái)是kafka-client自己主動(dòng)發(fā)送消息給kafka集群,進(jìn)行自我驅(qū)逐了。因此consumer都掉線(xiàn)了。

2)arthas查看相關(guān)線(xiàn)程狀態(tài)變量用arthas vmtool命令進(jìn)一步看下kafka-client相關(guān)線(xiàn)程的狀態(tài)。

圖片

可以看到 HeartbeatThread線(xiàn)程狀態(tài)是WAITING,Cordinator狀態(tài)是UNJOINED。

此時(shí),結(jié)合源碼看,大概推斷是由于消費(fèi)時(shí)間過(guò)長(zhǎng),導(dǎo)致客戶(hù)端自我驅(qū)逐了。

于是立刻嘗試修改max.poll.records,減少一批拉取的消息數(shù)量,同時(shí)增大max.poll.interval.ms參數(shù),避免由于拉取間隔時(shí)間過(guò)長(zhǎng)導(dǎo)致自我驅(qū)逐。

參數(shù)修改上線(xiàn)后,發(fā)現(xiàn)consumer確實(shí)不掉線(xiàn)了,但是消費(fèi)一段時(shí)間后,還是就停止消費(fèi)了。

3、最終原因

相關(guān)同學(xué)去查看了消費(fèi)邏輯,發(fā)現(xiàn)了業(yè)務(wù)代碼中的死循環(huán),確認(rèn)了最終原因。

消息內(nèi)容中的一個(gè)字段有新的值,觸發(fā)了消費(fèi)者消費(fèi)邏輯的死循環(huán),導(dǎo)致后續(xù)消息無(wú)法消費(fèi)。同時(shí),消費(fèi)阻塞導(dǎo)致消費(fèi)者自我驅(qū)逐,partition重新reblance,所有消費(fèi)者逐個(gè)自我驅(qū)逐。

這里核心涉及到kafka的消費(fèi)者和kafka之間的?;顧C(jī)制,可以簡(jiǎn)單了解一下。

圖片

kafka-client會(huì)有一個(gè)獨(dú)立線(xiàn)程HeartbeatThread跟kafka集群進(jìn)行定時(shí)心跳,這個(gè)線(xiàn)程跟lisenter無(wú)關(guān),完全獨(dú)立。

根據(jù)debug日志顯示的“Sending LeaveGroup request”信息,我們可以很容易定位到自我驅(qū)逐的邏輯。

圖片

HeartbeatThread線(xiàn)程在發(fā)送心跳前,會(huì)比較一下當(dāng)前時(shí)間跟上次poll時(shí)間,一旦大于max.poll.interval.ms 參數(shù),就會(huì)發(fā)起自我驅(qū)逐了。

4、進(jìn)一步思考

雖然最后原因找到了,但是回顧下整個(gè)排查過(guò)程,其實(shí)并不順利,主要有兩點(diǎn):

kafka-client對(duì)某個(gè)消息消費(fèi)超時(shí)能否有明確異常?而不是只看到自我驅(qū)逐和rebalance

有沒(méi)有辦法通過(guò)什么手段發(fā)現(xiàn) 消費(fèi)死循環(huán)?

4.1 kafka-client對(duì)某個(gè)消息消費(fèi)超時(shí)能否有明確異常?

4.1.1 kafka似乎沒(méi)有類(lèi)似機(jī)制

我們對(duì)消費(fèi)邏輯進(jìn)行斷點(diǎn),可以很容易看到整個(gè)調(diào)用鏈路。

圖片

對(duì)消費(fèi)者來(lái)說(shuō),主要采用一個(gè)線(xiàn)程池來(lái)處理每個(gè)kafkaListener,一個(gè)listener就是一個(gè)獨(dú)立線(xiàn)程。

這個(gè)線(xiàn)程會(huì)同步處理 poll消息,然后動(dòng)態(tài)代理回調(diào)用戶(hù)自定義的消息消費(fèi)邏輯,也就是我們?cè)贎KafkaListener中寫(xiě)的業(yè)務(wù)。

圖片

所以,從這里可以知道兩件事情。

第一點(diǎn),如果業(yè)務(wù)消費(fèi)邏輯很慢或者卡住了,會(huì)影響poll。

第二點(diǎn),這里沒(méi)有看到直接設(shè)置消費(fèi)超時(shí)的參數(shù),其實(shí)也不太好做。

因?yàn)檫@里做了超時(shí)中斷,那么poll也會(huì)被中斷,是在同一個(gè)線(xiàn)程中。所以要么poll和消費(fèi)邏輯在兩個(gè)工作線(xiàn)程,要么中斷掉當(dāng)前線(xiàn)程后,重新起一個(gè)線(xiàn)程poll。

所以從業(yè)務(wù)使用角度來(lái)說(shuō),可能的實(shí)現(xiàn),還是自己設(shè)置業(yè)務(wù)超時(shí)。比較通用的實(shí)現(xiàn),可以是在消費(fèi)邏輯中,用線(xiàn)程池處理消費(fèi)邏輯,同時(shí)用Future get阻塞超時(shí)中斷。

google了一下,發(fā)現(xiàn)kafka 0.8 曾經(jīng)有consumer.timeout.ms這個(gè)參數(shù),但是現(xiàn)在的版本沒(méi)有這個(gè)參數(shù)了,不知道是不是類(lèi)似的作用。

4.1.2 RocketMQ有點(diǎn)相關(guān)機(jī)制

然后去看了下RocketMQ是否有相關(guān)實(shí)現(xiàn),果然有發(fā)現(xiàn)。

在RocketMQ中,可以對(duì)consumer設(shè)置consumeTimeout,這個(gè)超時(shí)就跟我們的設(shè)想有一點(diǎn)像了。

consumer會(huì)啟動(dòng)一個(gè)異步線(xiàn)程池對(duì)正在消費(fèi)的消息做定時(shí)做 cleanExpiredMsg() 處理。

圖片

注意,如果消息類(lèi)型是順序消費(fèi)(orderly),這個(gè)機(jī)制就不生效。

如果是并發(fā)消費(fèi),那么就會(huì)進(jìn)行超時(shí)判斷,如果超時(shí)了,就會(huì)將這條消息的信息通過(guò)sendMessageBack() 方法發(fā)回給broker進(jìn)行重試。

圖片

如果消息重試超過(guò)一定次數(shù),就會(huì)進(jìn)入RocketMQ的死信隊(duì)列。

spring-kafka其實(shí)也有做類(lèi)似的封裝,可以自定義一個(gè)死信topic,做異常處理

4.2 有辦法快速發(fā)現(xiàn)死循環(huán)嗎?

一般來(lái)說(shuō),死循環(huán)的線(xiàn)程會(huì)導(dǎo)致CPU飆高、OOM等現(xiàn)象,在本次故障中,并沒(méi)有相關(guān)異常表現(xiàn),所以并沒(méi)有聯(lián)系到死循環(huán)的問(wèn)題。

那通過(guò)這次故障后,對(duì)kafka相關(guān)機(jī)制有了更深刻了解,poll間隔超時(shí)很有可能就是消費(fèi)阻塞甚至死循環(huán)導(dǎo)致。

所以,如果下次出現(xiàn)類(lèi)似問(wèn)題,消費(fèi)者停止消費(fèi),但是kafkaListener線(xiàn)程還在,可以直接通過(guò)arthas的 thread id 命令查看對(duì)應(yīng)線(xiàn)程的調(diào)用棧,看看是否有異常方法死循環(huán)調(diào)用。

5、最佳實(shí)踐

通過(guò)此次故障,我們也可以總結(jié)幾點(diǎn)kafka使用的最佳實(shí)踐:

  • 使用消息隊(duì)列進(jìn)行消費(fèi)時(shí),一定需要多考慮異常情況,包括冪等、耗時(shí)處理(甚至死循環(huán))的情況。
  • 盡量提高客戶(hù)端的消費(fèi)速度,消費(fèi)邏輯另起線(xiàn)程進(jìn)行處理,并最好做超時(shí)控制。
  • 減少Group訂閱Topic的數(shù)量,一個(gè)Group訂閱的Topic最好不要超過(guò)5個(gè),建議一個(gè)Group只訂閱一個(gè)Topic。
  • 參考以下說(shuō)明調(diào)整參數(shù)值:max.poll.records:降低該參數(shù)值,建議遠(yuǎn)遠(yuǎn)小于<單個(gè)線(xiàn)程每秒消費(fèi)的條數(shù)> * <消費(fèi)線(xiàn)程的個(gè)數(shù)> * <max.poll.interval.ms>的積。max.poll.interval.ms: 該值要大于<max.poll.records> / (<單個(gè)線(xiàn)程每秒消費(fèi)的條數(shù)> * <消費(fèi)線(xiàn)程的個(gè)數(shù)>)的值。
責(zé)任編輯:武曉燕 來(lái)源: 阿丸筆記
相關(guān)推薦

2021-02-24 08:38:48

Kafka消息Consumer

2022-05-10 07:31:49

消息隊(duì)列CPUQPS

2024-03-20 08:33:00

Kafka線(xiàn)程安全Rebalance

2020-09-29 12:15:13

生死鎖MySQL

2024-12-12 14:56:48

消息積壓MQ分區(qū)

2022-07-14 10:23:39

數(shù)據(jù)

2022-07-14 10:16:22

Flink

2022-06-24 09:22:15

MySQL自增id

2023-12-21 08:01:41

RocketMQ消息堆積

2019-10-12 09:50:46

Redis內(nèi)存數(shù)據(jù)庫(kù)

2018-01-28 20:39:39

戴爾

2022-07-05 11:48:47

MySQL死鎖表鎖

2009-11-03 08:56:02

linux死機(jī)操作系統(tǒng)

2022-12-19 11:31:57

緩存失效數(shù)據(jù)庫(kù)

2024-04-22 08:17:23

MySQL誤刪數(shù)據(jù)

2017-02-21 13:11:43

SDN網(wǎng)絡(luò)體系SDN架構(gòu)

2022-05-19 08:01:49

PostgreSQL數(shù)據(jù)庫(kù)

2015-10-22 09:09:59

BAT投資VC

2021-11-08 15:38:15

消息延遲堆積

2011-11-18 10:52:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)