生產(chǎn)事故!Kafka 消費(fèi)延遲十小時(shí),我用這三招起死回生
兄弟們,凌晨?jī)牲c(diǎn),手機(jī)像被踩了尾巴的貓一樣狂震。我迷迷糊糊摸到手機(jī),鎖屏上跳出運(yùn)維監(jiān)控群的99+消息——某核心業(yè)務(wù)的Kafka消費(fèi)延遲突破10小時(shí)大關(guān),像脫韁的野馬在報(bào)警大屏上狂奔。作為負(fù)責(zé)這個(gè)系統(tǒng)的背鍋俠,我瞬間清醒,套上拖鞋就往公司趕,心里暗罵:“Kafka你個(gè)老六,平時(shí)挺穩(wěn)當(dāng)?shù)?,怎么突然給我整這出?”
一、事故現(xiàn)場(chǎng):當(dāng)訂單堆積成“珠穆朗瑪”
到公司打開(kāi)監(jiān)控頁(yè)面,好家伙,Kafka消費(fèi)組的Lag數(shù)值像坐了火箭,直接竄到120萬(wàn)條。再看業(yè)務(wù)系統(tǒng),訂單處理模塊的吞吐量幾乎歸零,數(shù)據(jù)庫(kù)里待處理的訂單表像個(gè)被吹脹的氣球,隨時(shí)可能爆炸??头块T(mén)已經(jīng)傳來(lái)戰(zhàn)報(bào),用戶投訴量直線上升,說(shuō)下單后半天收不到確認(rèn)短信,還以為自己遇到了詐騙APP。
趕緊連到Kafka服務(wù)器,用kafka-consumer-groups.sh
命令一查,發(fā)現(xiàn)問(wèn)題出在某個(gè)關(guān)鍵Topic的消費(fèi)組上。這個(gè)Topic平時(shí)承載著用戶下單、支付、物流等核心事件,下游有10多個(gè)消費(fèi)者組在消費(fèi),偏偏我們這個(gè)組掉了鏈子。再仔細(xì)看消費(fèi)者實(shí)例,明明配置了8個(gè)實(shí)例,怎么每個(gè)實(shí)例的消費(fèi)速率都低得可憐?每秒處理量不到200條,而生產(chǎn)端的消息寫(xiě)入速率可是穩(wěn)定在5000條/秒,這就好比一個(gè)水龍頭開(kāi)得嘩嘩響,下面接水的杯子卻只有針眼大,不堵才怪。
抽絲剝繭:到底是誰(shuí)拖了后腿?
剛開(kāi)始懷疑是網(wǎng)絡(luò)問(wèn)題,畢竟之前有過(guò)機(jī)房交換機(jī)故障導(dǎo)致吞吐量下降的先例。但登錄服務(wù)器一查,網(wǎng)卡流量連峰值的10%都沒(méi)到,帶寬穩(wěn)穩(wěn)的。再看CPU和內(nèi)存,CPU利用率倒是不低,平均在70%左右,但內(nèi)存還有一大半空閑,難道是CPU瓶頸?
不對(duì)啊,我們的消費(fèi)者實(shí)例用的可是4核8G的配置,按理說(shuō)處理這種量級(jí)的消息不該這么吃力。突然想到,Kafka消費(fèi)者的性能和分區(qū)分配有很大關(guān)系。用list-consumer-groups
和describe-consumer-groups
命令一看,好家伙,8個(gè)消費(fèi)者實(shí)例,居然有2個(gè)實(shí)例各分到了20個(gè)分區(qū),剩下6個(gè)實(shí)例只分到5個(gè)分區(qū)。這就好比班里分作業(yè),有的同學(xué)抱了一摞,有的同學(xué)卻只拿到可憐的幾本,忙的忙死,閑的閑死。
再深入分析消費(fèi)者的日志,發(fā)現(xiàn)大量的時(shí)間花在了反序列化和業(yè)務(wù)處理上。我們用的是Avro序列化格式,按道理反序列化效率不低,但業(yè)務(wù)處理里有個(gè)坑:每處理一條消息,都要去調(diào)用3個(gè)不同的微服務(wù)接口,而且還是串行調(diào)用,每個(gè)接口的平均耗時(shí)居然超過(guò)200ms。這就相當(dāng)于你吃個(gè)漢堡,非要先去買(mǎi)面包,再去煎肉餅,最后去摘生菜,每一步都得等上半天,效率能高才怪。
二、第一招:讓消費(fèi)者“多線程搬磚”
既然問(wèn)題出在分區(qū)分配不均和處理效率上,那就先從消費(fèi)者的并行度下手。Kafka的消費(fèi)者是通過(guò)分區(qū)來(lái)并行消費(fèi)的,每個(gè)分區(qū)只能被同一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者實(shí)例處理,所以合理分配分區(qū)是關(guān)鍵。
1. 調(diào)整分區(qū)分配策略
默認(rèn)的分區(qū)分配策略是RangeAssignor
,這種策略在分區(qū)數(shù)量不能被消費(fèi)者實(shí)例數(shù)整除時(shí),容易導(dǎo)致分配不均。比如我們有100個(gè)分區(qū),8個(gè)消費(fèi)者,100÷8=12余4,前4個(gè)消費(fèi)者會(huì)分到13個(gè)分區(qū),后4個(gè)分到12個(gè)。雖然這次的分配更離譜,但本質(zhì)還是分配策略的問(wèn)題。我們換成RoundRobinAssignor
,它會(huì)把分區(qū)按順序輪流分配給消費(fèi)者,能更均勻一些。
修改消費(fèi)者配置,加上partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
,然后重啟消費(fèi)者實(shí)例。神奇的事情發(fā)生了,每個(gè)實(shí)例分到的分區(qū)數(shù)基本一致,再也沒(méi)有“勞?!焙汀懊~(yú)黨”之分了。
2. 增加消費(fèi)者實(shí)例數(shù)
既然分區(qū)數(shù)量是100個(gè),那消費(fèi)者實(shí)例數(shù)最好和分區(qū)數(shù)匹配,或者是分區(qū)數(shù)的因數(shù)。我們之前用8個(gè)實(shí)例,和100不匹配,那就加到20個(gè)實(shí)例,這樣每個(gè)實(shí)例分到5個(gè)分區(qū),壓力均勻多了。這里要注意,消費(fèi)者實(shí)例數(shù)不能超過(guò)分區(qū)數(shù),否則多余的實(shí)例會(huì)閑置。
3. 優(yōu)化業(yè)務(wù)處理線程池
消費(fèi)者處理消息是在poll循環(huán)里,默認(rèn)是單線程處理。我們的業(yè)務(wù)處理耗時(shí)太長(zhǎng),必須用多線程來(lái)加速。在消費(fèi)者的回調(diào)函數(shù)里,把消息丟到一個(gè)線程池里異步處理,這樣poll可以盡快去取下一批消息,不會(huì)被阻塞。
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processMessage(record));
}
consumer.commitAsync();
}
這里要注意線程池的大小不能太大,否則會(huì)占用太多資源,一般設(shè)置為CPU核心數(shù)的2倍左右比較合適。另外,異步處理時(shí)要做好異常處理,避免消息處理失敗后丟失。
三、第二招:給Kafka“松綁”,別讓它“背鍋”
處理完消費(fèi)者端的問(wèn)題,發(fā)現(xiàn)延遲下降了一些,但還是在5小時(shí)左右徘徊。這時(shí)候意識(shí)到,可能生產(chǎn)者端也有問(wèn)題,或者Kafka本身的配置需要優(yōu)化。
1. 檢查生產(chǎn)者批次大小
登錄生產(chǎn)者服務(wù)器,查看配置,發(fā)現(xiàn)batch.size
設(shè)置得太小,只有16KB。Kafka生產(chǎn)者會(huì)把消息攢成批次發(fā)送,批次太小會(huì)導(dǎo)致發(fā)送頻率過(guò)高,網(wǎng)絡(luò)開(kāi)銷(xiāo)增大。我們把它調(diào)到160KB,這樣每次發(fā)送的消息更多,效率更高。不過(guò)也不能調(diào)得太大,否則會(huì)增加延遲,需要根據(jù)實(shí)際情況平衡。
2. 調(diào)整消費(fèi)者拉取參數(shù)
消費(fèi)者端的fetch.maxBytes
和fetch.max等待時(shí)間
也很重要。默認(rèn)fetch.maxBytes
是50MB,可能太大了,導(dǎo)致消費(fèi)者每次拉取需要處理很久。我們調(diào)到10MB,同時(shí)把fetch.max.wait.ms
從500ms調(diào)到200ms,讓消費(fèi)者在沒(méi)有足夠數(shù)據(jù)時(shí)不用等太久,及時(shí)處理已有的數(shù)據(jù)。
3. 清理無(wú)效的舊數(shù)據(jù)
查看Kafka Topic的配置,發(fā)現(xiàn)retention.hours
設(shè)置為72小時(shí),但我們的業(yè)務(wù)其實(shí)只需要保留24小時(shí)的數(shù)據(jù)。大量的舊數(shù)據(jù)堆積在磁盤(pán)上,不僅占用空間,還會(huì)影響消費(fèi)者的拉取速度。趕緊修改配置,執(zhí)行kafka-topics.sh --alter --topic my_topic --config retention.hours=24
,然后等待Kafka自動(dòng)清理舊數(shù)據(jù)。不過(guò)要注意,清理過(guò)程中可能會(huì)對(duì)性能有一定影響,最好選擇業(yè)務(wù)低峰期操作。
四、第三招:給消息處理“減肥”,拒絕“無(wú)效勞動(dòng)”
經(jīng)過(guò)前兩招,延遲已經(jīng)降到了2小時(shí),但離我們的目標(biāo)還有差距。這時(shí)候必須深入業(yè)務(wù)處理邏輯,看看有沒(méi)有可以優(yōu)化的地方。
1. 合并微服務(wù)調(diào)用
之前每處理一條消息,都要串行調(diào)用3個(gè)微服務(wù)接口,總耗時(shí)超過(guò)600ms。其實(shí)這3個(gè)接口之間沒(méi)有嚴(yán)格的依賴關(guān)系,可以改成并行調(diào)用。用CompletableFuture來(lái)實(shí)現(xiàn)異步并行調(diào)用,然后合并結(jié)果。
CompletableFuture<Result1> future1 = CompletableFuture.supplyAsync(() -> callService1());
CompletableFuture<Result2> future2 = CompletableFuture.supplyAsync(() -> callService2());
CompletableFuture<Result3> future3 = CompletableFuture.supplyAsync(() -> callService3());
CompletableFuture.allOf(future1, future2, future3).join();
Result1 result1 = future1.get();
Result2 result2 = future2.get();
Result3 result3 = future3.get();
這樣總耗時(shí)降到了200ms左右,效率提升了3倍。
2. 增加本地緩存
有些頻繁調(diào)用的基礎(chǔ)數(shù)據(jù),比如商品類(lèi)目、用戶等級(jí)等,每次都去數(shù)據(jù)庫(kù)查詢,耗時(shí)很長(zhǎng)。我們?cè)谙M(fèi)者實(shí)例里增加了本地緩存,用Caffeine緩存,設(shè)置5分鐘的過(guò)期時(shí)間,減少數(shù)據(jù)庫(kù)訪問(wèn)次數(shù)。這就好比你每天上班都要帶鑰匙,每次回家都要翻包找,不如在門(mén)口裝個(gè)密碼鎖,直接輸入密碼更快捷。
3. 跳過(guò)無(wú)效消息
通過(guò)監(jiān)控發(fā)現(xiàn),有一部分消息是重復(fù)的或者狀態(tài)無(wú)效的,比如已經(jīng)取消的訂單再次發(fā)送確認(rèn)消息。我們?cè)谙⑻幚砬霸黾恿艘粋€(gè)過(guò)濾環(huán)節(jié),先檢查消息的狀態(tài),如果是無(wú)效的,直接跳過(guò),不進(jìn)行后續(xù)處理。這就像分揀快遞,先把明顯破損或者地址錯(cuò)誤的包裹挑出來(lái),剩下的再慢慢處理,效率自然提高。
五、勝利時(shí)刻:延遲從10小時(shí)到30分鐘的逆襲
經(jīng)過(guò)這三招組合拳,凌晨五點(diǎn),監(jiān)控頁(yè)面上的Lag數(shù)值開(kāi)始穩(wěn)步下降,像泄了氣的氣球一樣,到早上八點(diǎn),已經(jīng)降到了30分鐘以內(nèi),吞吐量也恢復(fù)到了每秒5000條以上,和生產(chǎn)端基本持平。再看業(yè)務(wù)系統(tǒng),訂單處理終于跟上了節(jié)奏,數(shù)據(jù)庫(kù)里的積壓訂單也慢慢消化完了。
六、復(fù)盤(pán)總結(jié):這些坑以后別再踩
- 分區(qū)分配要均衡:根據(jù)分區(qū)數(shù)量合理設(shè)置消費(fèi)者實(shí)例數(shù),選擇合適的分配策略,避免“勞逸不均”。
- 業(yè)務(wù)處理要輕量:盡量減少同步阻塞操作,能用異步并行的就別串行,能緩存的就別頻繁查數(shù)據(jù)庫(kù)。
- 參數(shù)配置要調(diào)優(yōu):生產(chǎn)者和消費(fèi)者的參數(shù)不是一成不變的,要根據(jù)實(shí)際吞吐量和延遲情況動(dòng)態(tài)調(diào)整,比如
batch.size
、fetch.maxBytes
等。 - 監(jiān)控報(bào)警要完善:這次事故能及時(shí)發(fā)現(xiàn),多虧了完善的監(jiān)控體系。以后要繼續(xù)優(yōu)化監(jiān)控指標(biāo),比如消費(fèi)者延遲、吞吐量、CPU內(nèi)存使用率等,設(shè)置合理的報(bào)警閾值。
Kafka雖然強(qiáng)大,但也不是萬(wàn)能的。當(dāng)遇到消費(fèi)延遲問(wèn)題時(shí),不要慌,先冷靜分析原因,從消費(fèi)者、生產(chǎn)者、業(yè)務(wù)處理三個(gè)維度去排查。記住,沒(méi)有最好的配置,只有最適合自己業(yè)務(wù)的配置。平時(shí)多做壓力測(cè)試,模擬高并發(fā)場(chǎng)景,提前發(fā)現(xiàn)潛在問(wèn)題,才能在生產(chǎn)事故來(lái)臨時(shí)不慌不亂,從容應(yīng)對(duì)。