RocketMQ 消息積壓千萬級如何快速恢復(fù) 生產(chǎn)環(huán)境踩坑實錄
兄弟們,凌晨三點,手機像被扔進了洗衣機,在枕邊瘋狂震動。我摸索著接起電話,運維小哥帶著哭腔:“哥,訂單系統(tǒng)的 RocketMQ 集群炸了,積壓量直奔兩千萬,現(xiàn)在支付回調(diào)全堵著,客服已經(jīng)收到二十多個商家投訴了!”
我騰地坐起來,腦子還沒完全清醒,手指已經(jīng)條件反射地打開監(jiān)控頁面。好家伙,Consumer 集群的平均消費速度只有 500 條/秒,而堆積量正以肉眼可見的速度往上竄。再看 Producer 端,TPS 倒是穩(wěn)如老狗——問題出在消費者這邊,典型的“生產(chǎn)快、消費慢”導(dǎo)致的積壓慘案。
這不是我第一次跟 RocketMQ 積壓打交道,但千萬級的規(guī)模還是頭一回。摸著鍵盤的手有點抖,廢話不多說,咱們邊復(fù)盤邊聊,看看這種級別的積壓到底該怎么破。
一、先別急著“重啟服務(wù)器”,先把積壓原因盤清楚
很多同學(xué)遇到積壓第一反應(yīng)是“擴容消費者”,但盲目擴容前必須先搞清楚:到底是什么卡住了消費速度? RocketMQ 的消費鏈路就像一條流水線,任何一個環(huán)節(jié)“堵車”都會導(dǎo)致積壓,咱們得先給這條流水線做個“CT 掃描”。
1. 第一步:排查 Consumer 是不是“假死”狀態(tài)
打開 RocketMQ Dashboard(沒有的同學(xué)趕緊裝,運維必備神器),先看 Consumer 分組的“在線客戶端”列表。如果發(fā)現(xiàn)某臺服務(wù)器的 Consumer 長時間沒有上報心跳(LastHeartbeatTime 超過 2 分鐘),恭喜你,大概率遇到了“消費者假死”。
這種情況通常是因為消費者線程被 Full GC 卡住,或者代碼里有死循環(huán)。我這次就碰到一臺服務(wù)器,因為日志打印太猛(沒錯,就是某個同事在循環(huán)里寫了 System.out.println),導(dǎo)致 CPU 100%,Consumer 線程直接卡死,積壓量像滾雪球一樣越滾越大。
踩坑提醒:記得給 Consumer 加上 JVM 監(jiān)控,重點看 GC 頻率和耗時。我后來發(fā)現(xiàn)那臺假死機器的 Young GC 耗時居然超過 500ms,老年代頻繁 Full GC,這種情況下 Consumer 能正常工作才怪。
2. 第二步:檢查隊列負載是否均衡
RocketMQ 的 Consumer 采用“隊列均分”策略,每個 Consumer 會分配多個 Message Queue(以下簡稱 MQ)。如果某臺 Consumer 分配了 100 個 MQ,另一臺只分配了 10 個,那肯定是“忙的忙死,閑的閑死”。
怎么看負載情況?Dashboard 里每個 Consumer 實例的“已分配隊列數(shù)”一目了然。我這次就發(fā)現(xiàn),有三臺新擴容的服務(wù)器因為網(wǎng)絡(luò)配置問題,沒連上 NameServer,導(dǎo)致老服務(wù)器承擔(dān)了 80% 的隊列,消費能力直接被壓垮。
實操技巧:如果發(fā)現(xiàn)隊列分配不均,先重啟 Consumer 實例(觸發(fā)重新負載均衡),如果還不行,檢查 Consumer 分組的配置,確保 consumeFromWhere
和 messageModel
設(shè)置正確(默認 CLUSTERING
模式下會自動均衡)。
3. 第三步:看看消費線程是不是“摸魚”
RocketMQ Consumer 的默認消費線程數(shù)是 20(對,沒錯,就是這個藏得很深的參數(shù) consumeThreadMin
和 consumeThreadMax
)。如果你的業(yè)務(wù)邏輯比較復(fù)雜,比如需要查數(shù)據(jù)庫、調(diào)接口,20 個線程可能根本不夠用,導(dǎo)致大量線程在排隊等待處理。
我這次查看 Consumer 日志,發(fā)現(xiàn)線程池里的任務(wù)堆積量超過 1000,而實際在工作的線程只有可憐的 10 個——原來同事在初始化 Consumer 時,手滑把 consumeThreadMin
寫成了 10,Max 也設(shè)成 10,相當(dāng)于固定只有 10 個線程在干活,面對突然暴增的流量,自然頂不住。
劃重點:消費線程數(shù)不是越多越好,要看 CPU 核心數(shù)(一般設(shè)置為 CPU 核心數(shù)的 2 - 3 倍)。如果是 IO 密集型任務(wù),可以適當(dāng)多開,比如設(shè)到 50;如果是 CPU 密集型,超過 32 基本沒意義,反而會因為線程上下文切換拖慢速度。
二、千萬級積壓的“急救三連招”,先把積壓量壓下來
搞清楚原因后,接下來就是“急救階段”。記?。?/span>千萬級積壓時,任何微小的優(yōu)化乘以千萬都會放大成顯著效果。咱們分步驟來,先讓消費速度追上生產(chǎn)速度,再慢慢“消化”歷史積壓。
1. 第一招:臨時擴容 Consumer,先把“車道”拓寬
這是最直接的辦法,相當(dāng)于給高速公路多開幾條車道。RocketMQ 的 Consumer 是“無狀態(tài)”的,理論上可以無限擴容,但要注意兩個關(guān)鍵點:
(1)擴容數(shù)量不超過 MQ 總數(shù)
每個 MQ 同一時間只能被一個 Consumer 消費,比如集群有 100 個 MQ,最多開 100 個 Consumer 實例(每個實例分配 1 個 MQ)。我這次集群有 200 個 MQ,當(dāng)前只有 10 個 Consumer,理論上可以先擴容到 50 個實例,把隊列分配率拉滿。
(2)別踩“IP 不對等”的坑
之前擴容時,運維小哥直接復(fù)制了老服務(wù)器的配置,結(jié)果新服務(wù)器的 clientIP
被錯誤設(shè)置成了內(nèi)網(wǎng) IP,而 NameServer 在公網(wǎng),導(dǎo)致 Consumer 注冊時,其他實例根本找不到它。最后不得不手動加上 rocketmq.client.endpoint
參數(shù),指定公網(wǎng)地址才解決。
實操步驟:
- 臨時創(chuàng)建一個新的 Consumer 分組(比如加個后綴
-tmp
),避免和原有消費者搶資源 - 啟動時指定
--consumerThreadMin 50 --consumerThreadMax 50
(臨時調(diào)高線程數(shù)) - 觀察 Dashboard 上的“消費速度”,理想情況下,每臺新服務(wù)器能分到 4 - 5 個 MQ,消費速度能提升 3 - 5 倍
2. 第二招:開啟批量消費,讓消費者一次“搬多箱貨”
RocketMQ 支持批量消費,默認每次拉取 1 條消息(參數(shù) consumeMessageBatchMaxSize
默認為 1)。如果你的業(yè)務(wù)邏輯允許,可以改成一次拉 10 - 32 條,減少網(wǎng)絡(luò)交互次數(shù),提升吞吐量。
我這次把這個參數(shù)改成 16,配合前面的擴容,消費速度從 500 條/秒直接跳到 8000 條/秒——相當(dāng)于原來每次跑一趟搬 1 箱貨,現(xiàn)在搬 16 箱,效率自然飆升。但要注意:
(1)批量處理時保持冪等性
因為可能會重復(fù)消費(比如處理到第 10 條時消費者掛了,重啟后這 16 條會重新消費),所以業(yè)務(wù)代碼必須支持冪等(比如用唯一 ID 去重)。我們當(dāng)時就吃了虧,沒做冪等,導(dǎo)致數(shù)據(jù)庫出現(xiàn)重復(fù)訂單,最后不得不寫腳本去重,血的教訓(xùn)!
(2)別貪心設(shè)太大的值
超過 32 之后,吞吐量提升不明顯,反而會增加內(nèi)存壓力(每條消息都會存到內(nèi)存里)。我們試過設(shè)成 100,結(jié)果 Consumer 內(nèi)存使用率瞬間超過 80%,差點觸發(fā) OOM,最后穩(wěn)定在 16 - 32 之間最佳。
3. 第三招:暫停 Producer 或限流,先“掐斷源頭”
如果積壓量實在太大,比如像我們這次已經(jīng)到兩千萬,而消費速度一時半會兒追不上,可以考慮暫時讓 Producer 停止發(fā)消息,或者降低發(fā)送頻率。
注意:暫停 Producer 前一定要和業(yè)務(wù)方溝通,我們當(dāng)時是電商大促期間,暫停支付回調(diào)消息會影響商家收款,最后只能和前端商量,在用戶支付成功頁增加“稍后刷新”提示,同時對 Producer 做限流(從 2000 TPS 降到 500 TPS),給消費者爭取緩沖時間。
踩坑提醒:暫停 Producer 后,記得監(jiān)控 Consumer 的“堆積量”是否開始下降(理想情況下每分鐘下降 10 - 20 萬)。如果沒變化,可能是消費者有重試邏輯在反復(fù)投遞(比如消息處理失敗后進入重試隊列,導(dǎo)致積壓量“假死”),這時候需要檢查 maxReconsumeTimes
參數(shù)(默認 16 次,超過后進入死信隊列)。
三、積壓消化中的“連環(huán)坑”,每一步都可能翻車
當(dāng)積壓量開始下降,千萬別掉以輕心,這時候往往會遇到各種“隱性炸彈”。我們這次就踩了三個大坑,每個都讓我在凌晨四點的會議室里冒冷汗。
1. 坑一:消費過快導(dǎo)致“內(nèi)存溢出”
前面提到我們把批量消費設(shè)成 16,線程數(shù)開到 50,消費速度確實上去了,但半小時后,一臺 Consumer 突然掛掉,日志里寫著“java.lang.OutOfMemoryError: GC overhead limit exceeded”。
原因分析:
- 批量消費時,每條消息都會解析成 Java 對象,16 條一批,每秒處理 500 批,每秒產(chǎn)生 8000 個對象
- Consumer 堆內(nèi)存默認只有 1G(很多同學(xué)不知道,RocketMQ 的 Consumer 啟動腳本默認堆大小是 -Xms1g -Xmx1g),新生代很快被占滿,觸發(fā)頻繁 Full GC
解決方案:
- 給 Consumer 增加內(nèi)存,改成
-Xms4g -Xmx4g
(根據(jù)服務(wù)器配置調(diào)整,建議不超過物理內(nèi)存的 70%) - 調(diào)整 JVM 參數(shù),比如
-XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=512m
,避免元空間溢出 - 最重要的:在業(yè)務(wù)代碼里及時釋放對象引用,比如處理完消息后把對象設(shè)為 null,幫助 GC 回收
2. 坑二:網(wǎng)絡(luò)帶寬成了“最后一公里瓶頸”
當(dāng)積壓量降到 500 萬時,消費速度突然卡住,無論怎么擴容都上不去。檢查服務(wù)器監(jiān)控,發(fā)現(xiàn)網(wǎng)卡吞吐量達到 90%(我們用的是 1G 帶寬的服務(wù)器)。
解決思路:
RocketMQ 的消費流程是“拉取消息”→“處理消息”→“提交消費位點”,其中“拉取消息”是網(wǎng)絡(luò)密集型操作。當(dāng)批量拉取 16 條消息時,每條消息平均 1KB,一次拉取就是 16KB,每秒 500 次拉取就是 8MB/s,50 個 Consumer 實例就是 400MB/s,接近 1G 帶寬的極限。
實操方案:
- 給部分 Consumer 實例更換成 10G 帶寬的服務(wù)器(臨時采購,有錢任性)
- 調(diào)整拉取策略,比如增加
pullBatchSize
參數(shù)(默認 32,我們設(shè)成 64,減少拉取次數(shù)) - 對消息體進行壓縮(Producer 發(fā)送時啟用 GZIP 壓縮,Consumer 自動解壓縮),我們的消息體從平均 1KB 壓縮到 300 字節(jié),帶寬占用直接降了 70%
3. 坑三:死信隊列突然“爆炸”,積壓量反彈
當(dāng)積壓量降到 100 萬時,運維小哥突然喊:“死信隊列里的消息怎么突然多了 50 萬?” 原來是很多消息重試 16 次后仍失敗,自動進入死信隊列,而我們沒有配置死信隊列的消費者,導(dǎo)致這部分消息被“遺忘”了。
處理步驟:
- 先暫停死信隊列的自動投遞(在 RocketMQ 控制臺找到死信隊列對應(yīng)的 Consumer 分組,設(shè)置
maxReconsumeTimes
為 0,停止重試) - 寫一個臨時消費者,專門消費死信隊列,把消息內(nèi)容記錄到日志文件,然后人工排查失敗原因(我們發(fā)現(xiàn)大部分是因為數(shù)據(jù)庫連接超時,凌晨數(shù)據(jù)庫負載高導(dǎo)致)
- 修復(fù)業(yè)務(wù)代碼后,把死信隊列的消息重新發(fā)送回原隊列(用 RocketMQ 的
sendMessageInTransaction
接口,避免重復(fù)消費)
經(jīng)驗教訓(xùn):死信隊列是“最后的防線”,平時一定要監(jiān)控死信隊列的堆積量,建議設(shè)置報警閾值(比如超過 1000 條就提醒),否則等問題爆發(fā)時,又是一場硬仗。
四、積壓恢復(fù)后的“亡羊補牢”,避免下次再掉坑
經(jīng)過 6 個小時的奮戰(zhàn),積壓量終于歸零,看著監(jiān)控曲線慢慢 flatten 下來,我揉著酸痛的脖子,開始總結(jié)這次的教訓(xùn)。其實很多問題都是可以提前預(yù)防的,以下是我梳理的“防積壓三板斧”,建議寫進運維手冊:
1. 事前:給 Consumer 裝“儀表盤”,實時監(jiān)控關(guān)鍵指標(biāo)
別只看“堆積量”,這幾個指標(biāo)更重要:
- 消費延遲(consumeLatency):消息產(chǎn)生到被消費的時間差,超過 10 秒就該警覺
- 拉取吞吐量(pullTPS):如果突然下降 50%,可能是網(wǎng)絡(luò)或 Broker 有問題
- 消費線程池利用率:用
ThreadPoolExecutor.getActiveCount()
監(jiān)控,長期接近consumeThreadMax
說明線程不夠用
我們后來給每個 Consumer 實例加了 Prometheus 監(jiān)控,配合 Grafana 儀表盤,現(xiàn)在積壓預(yù)警比心跳還準(zhǔn)。
2. 事中:準(zhǔn)備“應(yīng)急預(yù)案模板”,讓新人也能快速上手
把這次的急救步驟寫成腳本:
# 快速擴容消費者腳本
for i in {1..50}; do
java -jar consumer.jar \
--consumerGroup order_consumer_tmp \
--namesrvAddr xxx:9876 \
--consumeThreadMin 50 \
--consumeMessageBatchMaxSize 16 \
--clientIP $(curl ifconfig.me) & # 自動獲取公網(wǎng) IP
done
# 暫停死信隊列腳本
curl -X POST http://rocketmq-dashboard:8080/consumer/update \
-H "Content-Type: application/json" \
-d '{"groupName":"order_consumer","maxReconsumeTimes":0}'
記?。?/span>應(yīng)急預(yù)案一定要定期演練,我們后來組織了一次“積壓模擬演練”,發(fā)現(xiàn)腳本里居然有路徑錯誤,還好提前發(fā)現(xiàn),否則實戰(zhàn)時又要抓瞎。
3. 事后:給消息處理加“保險絲”,防止單個消息拖垮整體
這次發(fā)現(xiàn),90% 的消費延遲都是因為個別“毒消息”(比如超大消息體、格式錯誤)導(dǎo)致的。解決方案:
- 設(shè)置消息處理超時時間:用
CompletableFuture
包裝業(yè)務(wù)邏輯,超過 500ms 自動放棄,記錄到日志 - 隔離消費線程池:給不同類型的消息分配不同的線程池,比如訂單消息和日志消息分開處理,避免互相影響
- 增加本地重試機制:業(yè)務(wù)代碼里先重試 3 次,失敗后再交給 RocketMQ 的重試隊列,減少 Broker 壓力
我們給訂單消費加了本地重試后,RocketMQ 的重試次數(shù)下降了 60%,消費速度穩(wěn)定多了。
五、總結(jié):千萬級積壓不可怕,怕的是沒有“系統(tǒng)化思維”
回顧這次戰(zhàn)斗,我最大的感悟是:處理積壓不是“頭痛醫(yī)頭腳痛醫(yī)腳”,而是要從“生產(chǎn) - 存儲 - 消費”全鏈路分析。比如這次我們只盯著消費者擴容,卻忽略了 Producer 端的流量控制,以及 Broker 端的磁盤 IO 瓶頸。