Kafka 消息積壓百萬(wàn)!凌晨三點(diǎn)機(jī)房通宵搶修,我靠這招絕地翻盤(pán)
兄弟們,今天咱來(lái)嘮嘮那個(gè)讓我在凌晨三點(diǎn)的機(jī)房里差點(diǎn)把頭發(fā)薅禿的 Kafka 消息積壓?jiǎn)栴}。咱先把場(chǎng)景拉滿:上周四晚上十一點(diǎn),我正準(zhǔn)備跟周公約個(gè)會(huì)呢,手機(jī)突然跟抽風(fēng)似的瘋狂震動(dòng),運(yùn)維群里紅色警報(bào)直閃 —— 訂單系統(tǒng)的 Kafka 集群積壓量突破百萬(wàn)大關(guān),而且還在以每秒八千條的速度往上竄。咱就是說(shuō),這誰(shuí)能睡得著啊,趕緊套上外套就往公司狂奔,一場(chǎng)跟 Kafka 的硬仗就這么拉開(kāi)了序幕。
一、先搞明白:Kafka 為啥會(huì)積壓成 "堵城"
到了機(jī)房,咱先冷靜下來(lái)分析。Kafka 這玩意兒,平時(shí)就像個(gè)高效的快遞中轉(zhuǎn)站,生產(chǎn)者把包裹(消息)源源不斷地送來(lái),消費(fèi)者再把包裹分門別類地送走。那現(xiàn)在咋就堵成停車場(chǎng)了呢?咱從三個(gè)最容易出問(wèn)題的地方下手。
(一)消費(fèi)者 "摸魚(yú)" 現(xiàn)場(chǎng):處理速度跟不上生產(chǎn)速度
最常見(jiàn)的就是消費(fèi)者處理能力跟不上。咱打個(gè)比方,生產(chǎn)者是個(gè)手腳麻利的快遞員,每分鐘能送 100 個(gè)包裹,可消費(fèi)者這邊呢,要么是新手快遞員,業(yè)務(wù)不熟,處理一個(gè)包裹得老半天;要么是快遞站的電腦太卡,處理程序效率低,每分鐘只能處理 50 個(gè)。這樣一來(lái),包裹可不就越堆越多嘛。就像這次,我們排查發(fā)現(xiàn),消費(fèi)者的線程池大小設(shè)置得太小了,只有可憐的 5 個(gè)線程,面對(duì)突然暴增的訂單消息,根本忙不過(guò)來(lái),活生生變成了 "摸魚(yú)選手"。
(二)分區(qū)規(guī)劃 "翻車":車道太少導(dǎo)致堵車
Kafka 的分區(qū)就好比高速公路上的車道,分區(qū)數(shù)太少,就相當(dāng)于車道不夠,生產(chǎn)者和消費(fèi)者的吞吐量自然上不去。舉個(gè)例子,要是生產(chǎn)者往一個(gè)分區(qū)里瘋狂發(fā)消息,而消費(fèi)者這邊只有一個(gè)線程來(lái)處理這個(gè)分區(qū)的消息,那不管消費(fèi)者多努力,處理速度都被分區(qū)數(shù)給限制死了。我們這次就發(fā)現(xiàn),某個(gè)關(guān)鍵主題的分區(qū)數(shù)居然只有 2 個(gè),而消費(fèi)者實(shí)例雖然有 4 個(gè),但每個(gè)實(shí)例只能分到 0.5 個(gè)分區(qū),這不是鬧著玩嘛,明顯的資源浪費(fèi),處理效率能高才怪。
(三)網(wǎng)絡(luò) & 磁盤(pán) "拖后腿":硬件瓶頸拖慢整體速度
別以為只有軟件問(wèn)題會(huì)導(dǎo)致積壓,硬件瓶頸也不容忽視。要是生產(chǎn)者所在的服務(wù)器網(wǎng)絡(luò)帶寬不夠,消息發(fā)送速度就會(huì)變慢,堆積在生產(chǎn)者客戶端;要是消費(fèi)者的磁盤(pán) I/O 性能太差,寫(xiě)入數(shù)據(jù)到本地存儲(chǔ)時(shí)耗時(shí)過(guò)長(zhǎng),也會(huì)影響消費(fèi)速度。我們?cè)诒O(jiān)控中發(fā)現(xiàn),消費(fèi)者服務(wù)器的磁盤(pán)平均響應(yīng)時(shí)間超過(guò)了 50ms,正常情況下應(yīng)該在 10ms 以內(nèi),這明顯是磁盤(pán)扛不住了,拖了整個(gè)系統(tǒng)的后腿。
二、緊急排障:凌晨三點(diǎn)的 "搶救" 現(xiàn)場(chǎng)
(一)第一步:快速定位積壓 "重災(zāi)區(qū)"
咱先得知道哪個(gè)主題、哪個(gè)分區(qū)積壓得最嚴(yán)重。這時(shí)候就得請(qǐng)出 Kafka 的自帶工具 kafka-consumer-groups.sh 了。在命令行里敲上這么一句:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-consumer-group --describe
這一執(zhí)行,可就像給 Kafka 做了個(gè) "CT 掃描",每個(gè)分區(qū)的當(dāng)前偏移量(CURRENT - OFFSET)、日志末尾偏移量(LOG - END - OFFSET)以及積壓量(LAG)都一目了然。我們當(dāng)時(shí)就發(fā)現(xiàn),order - topic 的 3 號(hào)分區(qū),LAG 居然達(dá)到了 30 萬(wàn),妥妥的重災(zāi)區(qū)啊。
(二)臨時(shí)方案:先把積壓量降下來(lái)
1. 緊急擴(kuò)容消費(fèi)者實(shí)例
既然消費(fèi)者處理能力不夠,那咱就趕緊擴(kuò)招 "快遞員"。增加消費(fèi)者實(shí)例的數(shù)量,讓更多的實(shí)例來(lái)分擔(dān)消息處理任務(wù)。不過(guò)要注意,消費(fèi)者實(shí)例的數(shù)量不能超過(guò)主題的分區(qū)數(shù),不然多出來(lái)的實(shí)例就只能干瞪眼了。我們把消費(fèi)者實(shí)例從原來(lái)的 4 個(gè)擴(kuò)容到 8 個(gè),這一下子,每個(gè)實(shí)例分到的分區(qū)數(shù)更合理了,處理速度明顯提升。
2. 調(diào)整消費(fèi)者線程池參數(shù)
之前消費(fèi)者線程池太小,現(xiàn)在咱把它調(diào)大。在消費(fèi)者的配置里,有兩個(gè)關(guān)鍵參數(shù):fetch.min.bytes 和 fetch.max.wait.ms。fetch.min.bytes 表示消費(fèi)者每次從 Kafka 拉取的最小數(shù)據(jù)量,默認(rèn)是 1 字節(jié),咱可以根據(jù)實(shí)際情況調(diào)大,比如調(diào)成 10KB,這樣每次拉取的數(shù)據(jù)更多,減少拉取次數(shù);fetch.max.wait.ms 表示如果沒(méi)有達(dá)到 fetch.min.bytes 的數(shù)據(jù)量,消費(fèi)者最多等待的時(shí)間,默認(rèn)是 500ms,咱可以適當(dāng)調(diào)小,比如調(diào)成 100ms,讓消費(fèi)者更積極地去拉取數(shù)據(jù)。同時(shí),處理消息的線程池 processing - thread - pool 的大小,我們從 5 調(diào)到了 20,讓更多的線程來(lái)處理消息。
3. 跳過(guò)無(wú)效消息(謹(jǐn)慎操作?。?/span>
如果積壓的消息中有很多是過(guò)期的或者無(wú)效的,比如訂單超時(shí)取消的消息,在業(yè)務(wù)允許的情況下,可以考慮跳過(guò)這些消息。這就需要修改消費(fèi)者的偏移量,讓消費(fèi)者從某個(gè)指定的位置開(kāi)始消費(fèi)。不過(guò)這一步一定要謹(jǐn)慎,要是跳過(guò)了重要的消息,那可就麻煩大了。我們當(dāng)時(shí)確認(rèn)了部分消息確實(shí)是已經(jīng)處理過(guò)的無(wú)效消息,才小心翼翼地調(diào)整了偏移量,跳過(guò)了 10 萬(wàn)條無(wú)效消息,積壓量一下子就降了不少。
(三)根治方案:讓系統(tǒng)恢復(fù) "健康"
1. 優(yōu)化生產(chǎn)者端:別可勁兒 "猛塞" 消息
生產(chǎn)者發(fā)送消息的時(shí)候,不能一股腦兒地猛塞,得控制好發(fā)送速度??梢栽O(shè)置合理的 batch.size 和 linger.ms 參數(shù)。batch.size 表示生產(chǎn)者緩存消息的批次大小,默認(rèn)是 16KB,適當(dāng)調(diào)大可以減少網(wǎng)絡(luò)傳輸次數(shù),提高吞吐量,但也不能太大,不然會(huì)增加延遲;linger.ms 表示生產(chǎn)者等待批次填滿的時(shí)間,默認(rèn)是 0ms,也就是立即發(fā)送,咱可以調(diào)成 5ms,讓生產(chǎn)者稍微等一會(huì)兒,等積累一些消息再發(fā)送,提高效率。同時(shí),要做好流量控制,當(dāng)發(fā)現(xiàn)消息積壓有上升趨勢(shì)時(shí),自動(dòng)減慢發(fā)送速度,避免系統(tǒng)被壓垮。
2. 重構(gòu)消費(fèi)者邏輯:讓 "快遞員" 更高效
之前消費(fèi)者處理消息的邏輯有點(diǎn)臃腫,里面有一些不必要的數(shù)據(jù)庫(kù)操作和復(fù)雜的業(yè)務(wù)計(jì)算。咱把這些業(yè)務(wù)邏輯進(jìn)行了拆分,對(duì)于一些非實(shí)時(shí)的操作,比如日志記錄、數(shù)據(jù)分析,放到異步線程或者隊(duì)列里處理,讓消費(fèi)者主線程只專注于處理核心的消息業(yè)務(wù)。同時(shí),優(yōu)化了數(shù)據(jù)庫(kù)的查詢語(yǔ)句,增加了索引,讓數(shù)據(jù)庫(kù)操作的速度提升了 30% 以上。另外,還引入了批量處理的方式,每次從 Kafka 拉取一批消息(比如 100 條),然后批量寫(xiě)入數(shù)據(jù)庫(kù)或者進(jìn)行處理,減少頻繁的 I/O 操作。
3. 增加分區(qū)數(shù):拓寬 "車道"
針對(duì)之前分區(qū)數(shù)太少的問(wèn)題,我們對(duì) order - topic 進(jìn)行了分區(qū)擴(kuò)容,把分區(qū)數(shù)從 2 個(gè)增加到了 16 個(gè)。不過(guò)要注意,分區(qū)數(shù)一旦增加,就不能減少了,而且擴(kuò)容分區(qū)需要借助 Kafka 的工具 kafka - reassign - partitions.sh 來(lái)進(jìn)行,過(guò)程中要密切監(jiān)控集群的狀態(tài),避免出現(xiàn)數(shù)據(jù)丟失或者不一致的問(wèn)題。分區(qū)數(shù)增加后,消費(fèi)者可以并行處理更多的分區(qū),吞吐量得到了大幅提升。
4. 升級(jí)硬件:給系統(tǒng) "換發(fā)動(dòng)機(jī)"
既然磁盤(pán) I/O 性能不行,咱就直接升級(jí)服務(wù)器的磁盤(pán),把原來(lái)的機(jī)械硬盤(pán)換成了 SSD,磁盤(pán)平均響應(yīng)時(shí)間一下子降到了 5ms 以內(nèi)。同時(shí),增加了服務(wù)器的內(nèi)存和 CPU 資源,讓消費(fèi)者和生產(chǎn)者所在的服務(wù)器都有了更強(qiáng)勁的性能。網(wǎng)絡(luò)方面,把服務(wù)器的網(wǎng)絡(luò)帶寬從 1G 升級(jí)到了 10G,減少了網(wǎng)絡(luò)延遲對(duì)消息傳輸?shù)挠绊憽?/span>
三、事后復(fù)盤(pán):這些坑以后可別再踩了
(一)日常監(jiān)控不能少:給系統(tǒng)裝個(gè) "體檢儀"
我們這次之所以能在第一時(shí)間發(fā)現(xiàn)問(wèn)題,多虧了平時(shí)部署的監(jiān)控系統(tǒng)。我們用 Prometheus 和 Grafana 搭建了一套監(jiān)控平臺(tái),實(shí)時(shí)監(jiān)控 Kafka 集群的各項(xiàng)指標(biāo),比如消息生產(chǎn)速率、消費(fèi)速率、積壓量、分區(qū) leader 分布、服務(wù)器的 CPU、內(nèi)存、磁盤(pán) I/O 等。還設(shè)置了告警規(guī)則,當(dāng)積壓量超過(guò) 5 萬(wàn)條時(shí),就通過(guò)短信和郵件告警,讓我們能及時(shí)響應(yīng)。以后要繼續(xù)完善監(jiān)控指標(biāo),比如增加對(duì)消費(fèi)者線程池利用率、生產(chǎn)者緩沖區(qū)利用率的監(jiān)控,做到早發(fā)現(xiàn)、早處理。
(二)容量規(guī)劃要提前:別等堵車了才修路
在系統(tǒng)設(shè)計(jì)初期,就要做好容量規(guī)劃。根據(jù)業(yè)務(wù)的發(fā)展預(yù)期,估算未來(lái)一段時(shí)間內(nèi)的消息量,合理設(shè)置 Kafka 主題的分區(qū)數(shù)、副本數(shù),以及消費(fèi)者和生產(chǎn)者的實(shí)例數(shù)量、硬件配置。我們這次就是因?yàn)閷?duì)訂單量的增長(zhǎng)估計(jì)不足,才導(dǎo)致分區(qū)數(shù)和消費(fèi)者實(shí)例數(shù)不夠,引發(fā)了積壓?jiǎn)栴}。以后每季度都要對(duì)系統(tǒng)容量進(jìn)行評(píng)估,提前做好擴(kuò)容計(jì)劃。
(三)應(yīng)急預(yù)案要完善:打有準(zhǔn)備的仗
這次處理積壓?jiǎn)栴},雖然最后解決了,但過(guò)程中還是有點(diǎn)手忙腳亂。以后要制定詳細(xì)的應(yīng)急預(yù)案,比如當(dāng)積壓量達(dá)到不同級(jí)別時(shí)(5 萬(wàn)、10 萬(wàn)、50 萬(wàn)、100 萬(wàn)),分別采取什么措施,誰(shuí)負(fù)責(zé)執(zhí)行,誰(shuí)負(fù)責(zé)監(jiān)控,都要明確下來(lái)。還要定期進(jìn)行應(yīng)急演練,讓團(tuán)隊(duì)成員熟悉處理流程,提高應(yīng)對(duì)突發(fā)情況的能力。
(四)代碼質(zhì)量是根本:別寫(xiě) "坑自己" 的代碼
消費(fèi)者和生產(chǎn)者的代碼質(zhì)量直接影響系統(tǒng)的穩(wěn)定性和性能。這次發(fā)現(xiàn)的消費(fèi)者線程池參數(shù)不合理、處理邏輯臃腫等問(wèn)題,都是代碼層面的問(wèn)題。以后要加強(qiáng)代碼評(píng)審,確保代碼的高效性和健壯性。對(duì)于關(guān)鍵的業(yè)務(wù)邏輯,要進(jìn)行性能測(cè)試和壓力測(cè)試,提前發(fā)現(xiàn)潛在的問(wèn)題。
四、總結(jié):跟 Kafka 打交道,得玩點(diǎn) "套路"
這次凌晨三點(diǎn)的機(jī)房搶修,可算是讓我對(duì) Kafka 有了更深刻的認(rèn)識(shí)。Kafka 確實(shí)是個(gè)強(qiáng)大的消息中間件,但要是不好好伺候它,它分分鐘給你整出幺蛾子。咱總結(jié)一下處理消息積壓的核心思路:先快速定位問(wèn)題,找到積壓的根源;然后采取臨時(shí)措施緩解積壓,避免問(wèn)題擴(kuò)大化;接著從根本上解決問(wèn)題,優(yōu)化系統(tǒng)設(shè)計(jì)和代碼邏輯;最后做好日常監(jiān)控和應(yīng)急預(yù)案,防止類似問(wèn)題再次發(fā)生。
咱再把關(guān)鍵的知識(shí)點(diǎn)梳理一下:
- 消息積壓的常見(jiàn)原因:消費(fèi)者處理能力不足、分區(qū)規(guī)劃不合理、硬件瓶頸。
- 緊急處理措施:擴(kuò)容消費(fèi)者實(shí)例、調(diào)整參數(shù)、跳過(guò)無(wú)效消息。
- 根治方案:優(yōu)化生產(chǎn)者和消費(fèi)者邏輯、增加分區(qū)數(shù)、升級(jí)硬件。
- 事后預(yù)防:完善監(jiān)控、做好容量規(guī)劃、制定應(yīng)急預(yù)案、提高代碼質(zhì)量。
兄弟們,以后遇到 Kafka 消息積壓?jiǎn)栴},別慌!按照咱這套流程來(lái),保準(zhǔn)能把問(wèn)題解決得明明白白。最后提醒大家,技術(shù)這玩意兒,就得不斷實(shí)踐、不斷總結(jié),才能越來(lái)越熟練。