記一次線上Java程序故障驚心動(dòng)魄兩小時(shí)
周日早上醒來,明媚的陽光從臥室的窗戶直射進(jìn)來,久違的好天氣。穿好衣服我開始籌劃今天去哪里轉(zhuǎn)轉(zhuǎn)。一周忙碌的工作幾乎沒有時(shí)間陪家人,今天該好好陪陪家人了。
當(dāng)我起來收拾好一切準(zhǔn)備出發(fā)的時(shí)候,我瞄了一眼手機(jī),發(fā)現(xiàn)手機(jī)的郵箱里有一份報(bào)警郵件,報(bào)警郵件顯示線上最近10分鐘流量有異常,而且是多個(gè)渠道。有突然有一種不祥的預(yù)感:線上kafka出問題了。我讓媳婦和孩子下樓在車?yán)锏任遥亿s緊打開了電腦,查看線上系統(tǒng)。果然不出意外,kafka已經(jīng)積壓了幾千萬的數(shù)據(jù)。因?yàn)槲覀兊臉I(yè)務(wù)分為實(shí)時(shí)數(shù)據(jù)和離線數(shù)據(jù),實(shí)時(shí)數(shù)據(jù)是FileBeat負(fù)責(zé)收集日志發(fā)到Kafka,然后我們這個(gè)業(yè)務(wù)系統(tǒng)消費(fèi)Kafka統(tǒng)計(jì)數(shù)據(jù),實(shí)時(shí)數(shù)據(jù)對于當(dāng)前流量分析、預(yù)算控制、熔斷有非常重要的作用,如果實(shí)時(shí)數(shù)據(jù)異常,其它業(yè)務(wù)系統(tǒng)都會受到一定的影響。
定位到報(bào)警郵件是由于kafka消息積壓而導(dǎo)致實(shí)時(shí)數(shù)據(jù)異常觸發(fā)的,我立馬連上了我們消費(fèi)Kafka的業(yè)務(wù)系統(tǒng)(data-collect)。這是一個(gè)運(yùn)行時(shí)間很長了的Java服務(wù),它的作用就是實(shí)時(shí)消費(fèi)kafka數(shù)據(jù),然后經(jīng)過一定的業(yè)務(wù)邏輯處理,將最終結(jié)果更新到mongodb中。進(jìn)到服務(wù)器以后,我發(fā)現(xiàn)這個(gè)服務(wù)已經(jīng)處于假死狀態(tài),最后一條日志顯示系統(tǒng)發(fā)生了OOM,也就是服務(wù)器內(nèi)存爆了。

關(guān)于data-collect這個(gè)Java服務(wù)的核心邏輯我在這里詳細(xì)說明一下。這個(gè)系統(tǒng)的代碼是很早的一位同事寫的,因?yàn)樵缙谖覀兊臄?shù)據(jù)體量還不是很大,所以,他采用了一種簡單的處理方式。先消費(fèi)數(shù)據(jù),處理完成以后放到一個(gè)Map中,然后,啟動(dòng)了一個(gè)每10s執(zhí)行一次的定時(shí)任務(wù),定時(shí)任務(wù)讀取Map數(shù)據(jù)更新到mongo中,然后清空Map(ConcurrentMap)。這樣做的優(yōu)點(diǎn)是將消費(fèi)Kafka的操作和入庫操作分開了,可以防止因?yàn)槿霂鞎r(shí)間太長而導(dǎo)致消費(fèi)速度變慢,但是,這種做法有一個(gè)致命的缺點(diǎn):內(nèi)存不可控。如果定時(shí)任務(wù)因?yàn)镸ongo操作時(shí)間太長而沒有及時(shí)清空Map,Map中會積累大量的數(shù)據(jù),最終耗盡內(nèi)存,系統(tǒng)發(fā)生OOM。這時(shí)候如果系統(tǒng)自啟了,也會丟失大量的數(shù)據(jù)。
其實(shí),這個(gè)問題我很早有意識到,但是系統(tǒng)一直運(yùn)行良好,沒有出現(xiàn)任何問題,我們認(rèn)為在現(xiàn)有數(shù)據(jù)體量下它是安全的。而碰巧的是,就在前一天我們升級了Mongo的配置,mongo機(jī)器進(jìn)行了一個(gè)主從切換。同時(shí),有一些大表清理和TTL索引重建的工作還在mongo后臺運(yùn)行。這就導(dǎo)致了我們操作mongo耗時(shí)的增加。進(jìn)而導(dǎo)致了我們一直認(rèn)為安全的系統(tǒng)出現(xiàn)了這個(gè)問題。
回到data-collect這個(gè)系統(tǒng)的設(shè)計(jì)上??赡苡械耐瑢W(xué)會在這里有個(gè)疑問,為什么不直接消費(fèi)出來就入庫操作呢?這里我們有一個(gè)重要的處理邏輯:為了防止頻繁的更新mongo,我們會將消費(fèi)出來的數(shù)據(jù)在內(nèi)存中進(jìn)行一個(gè)合并處理,你可以簡單的理解為一個(gè)Map,如果key存在,我們就進(jìn)行++的操作。最終操作mongo是$inc的操作,不是insert和update的操作。這也是我們需要一個(gè)ConcurrentMap的原因。也就是我們大概消費(fèi)了1000萬條數(shù)據(jù),但是最終我們處理完成以后只有10萬條數(shù)據(jù),很多key相同的數(shù)據(jù)我們都進(jìn)行了合并處理,這樣我們mongo的操作就減少了很多。
data-collect發(fā)生了OOM,我只能第一時(shí)間重啟,重啟以后,消費(fèi)正常,系統(tǒng)開始有了數(shù)據(jù)。但是大概運(yùn)行了幾分鐘以后,又發(fā)生了OOM。原因很簡單:kafka積壓的了大量的消息,消費(fèi)很快,但是異步如mongo太耗時(shí),所以導(dǎo)致數(shù)據(jù)全部擠壓在了這個(gè)Map內(nèi)存中??吹竭@里,我想只能動(dòng)手改造代碼了。改造的最終要達(dá)到的結(jié)果是:系統(tǒng)在不發(fā)生OOM的前提下,消費(fèi)積壓在kafka中的數(shù)據(jù),完成mongo操作。
改造的思路很簡單,就是干掉定時(shí)任務(wù)。在消費(fèi)kafka消息中增加一個(gè)邏輯,每當(dāng)消費(fèi)消息并且內(nèi)存進(jìn)行數(shù)據(jù)合并完成以后,我們判斷Map的大小,如果Map的大小超過我們設(shè)定的限制以后,開始觸發(fā)mongo操作。之前的代碼mongo操作是單線程執(zhí)行,為了提升mongo插入操作,我們開啟20個(gè)線程并行執(zhí)行,所以我們這里需要一個(gè)帶阻塞隊(duì)列的線程池。改造后的代碼如下:

這里是SpringBoot集成Kafka的消費(fèi)代碼。


這是內(nèi)存處理完成以后入mongo的操作。因?yàn)槲覀兊膖opic有20個(gè)分區(qū),所以代碼中的listenPartition0是多線程執(zhí)行的。如果沒有synchronized的同步代碼塊,那assembleyAdxTrafficVo方法就會多線程執(zhí)行,這就會導(dǎo)致數(shù)據(jù)重復(fù)插入mongo,具體大家可以體會assembleyAdxTrafficVo方法的邏輯。
而插入mongo操作的用了線程池ExecutorService,注意這里我們executorService對象的定義。


為什么要自己定義一個(gè)阻塞隊(duì)列CustomeBlockQueue?這相比很多人碰到過這個(gè)問題,如果采用默認(rèn)的阻塞隊(duì)列,例如:ArrayBlockingQueue,當(dāng)隊(duì)列長度長度超過設(shè)置的值時(shí),ArrayBlockingQueue會拒絕新的數(shù)據(jù)進(jìn)入,并且拋出異常,所以我們需要自己定義CustomeBlockQueue,并且重寫他的offer方法(BlockingQueue默認(rèn)采用offer方法將元素增加到隊(duì)列),offer方法不會阻塞,put方法會阻塞,所以我們需要重寫offer方法,并且內(nèi)部采用put方法實(shí)現(xiàn)。關(guān)于這一點(diǎn),大家可以多嘗試。ArrayBlockingQueue和LinkedBlockingQueue都有很多坑等大家去踩。
按照上述代碼處理完成上線以后,系統(tǒng)開始正常運(yùn)行,kafka積壓的消息也開始慢慢降低,系統(tǒng)趨于恢復(fù)正常,而這時(shí)已經(jīng)是12點(diǎn)了,驚心動(dòng)魄的2小時(shí)總算過去了,阿彌陀佛。