Flink分布式程序的異常處理
本文轉(zhuǎn)載自微信公眾號「逸言」,作者逸言。轉(zhuǎn)載本文請聯(lián)系逸言公眾號。
在我們的數(shù)據(jù)平臺產(chǎn)品中,為了簡化開發(fā),對Flink做了一層封裝,定義了Job和Flow的抽象。一個Job其實(shí)就是Flink的一個作業(yè),每個Job可以定義多個Flow,一個Flow可以理解為是Flink的一個DataStream,利用Job傳遞的StreamExecutionEnvironment可以在Flow中添加包括Source與Sink的多個算子。
Job與Flow之間的關(guān)系可以利用自定義的@JobFlow注解進(jìn)行配置,如此就可以在執(zhí)行抽象的AbstractJob的run()方法時,利用反射獲得該Job下的所有Flow,遍歷執(zhí)行每個Flow的run()方法。在Flow的run()方法中,才會真正根據(jù)StreamExecutionEnvironment執(zhí)行多個算子。
Flink為了保證計算的穩(wěn)定性,提供了不同的重啟策略。例如,當(dāng)我們將重啟策略設(shè)置為失敗率(failure-rate)時,如果執(zhí)行的任務(wù)出錯次數(shù)達(dá)到了失敗率配置的要求,F(xiàn)link的Worker節(jié)點(diǎn)的TaskManager就會重啟。如果超過重啟次數(shù),Task Manager就會停止運(yùn)行。
失敗的原因可能有很多,例如資源不足、網(wǎng)絡(luò)通信出現(xiàn)故障等Flink集群環(huán)境導(dǎo)致的故障,但是也可能是我們編寫的作業(yè)在處理流式數(shù)據(jù)時,因為處理數(shù)據(jù)不當(dāng)拋出了業(yè)務(wù)異常,使得Flink將其視為一次失敗。
為了減少因為業(yè)務(wù)原因拋出異常導(dǎo)致Task Manager的不必要重啟,需要規(guī)定我們編寫的Flink程序的異常處理機(jī)制。由于封裝了Flink的Job,從一開始,我就考慮一勞永逸地解決業(yè)務(wù)異常的問題,即在AbstractJob的run()方法中,捕獲我們自定義的業(yè)務(wù)異常,在日志記錄了錯誤信息后,把該異常“吃”掉,避免異常的拋出導(dǎo)致執(zhí)行失敗,造成TaskManager的重啟,如:
- public abstract class AbstractFlow implements Flow {
- public void run() {
- try {
- runBare();
- } catch (DomainException ex) {
- //...
- }
- }
- protected abstract void runBare();
- }
哪知道這一處理機(jī)制壓根兒就無法捕獲業(yè)務(wù)異常!為什么呢?這就要從Flink的分布式機(jī)制說起了。
在Flink集群上執(zhí)行任務(wù),需要Client將作業(yè)提交給Flink集群的Master節(jié)點(diǎn)。Master的Dispatcher接收到Job并啟動JobManager,通過解析Job的邏輯視圖,了解Job對資源的要求,然后向ResourceManager(Standalone模式,如果是YARN,則由YARN管理和調(diào)度資源)申請本次Job需要的資源。JobManager將Job的邏輯視圖轉(zhuǎn)換為物理視圖,并將計算任務(wù)分發(fā)部署到Flink集群的TaskManager上。整個執(zhí)行過程如下圖所示:
我們封裝的一個Flow,在物理視圖中,其實(shí)就是一個作業(yè),即前面所說的計算任務(wù)。一個作業(yè)可以包含多個算子。如果相鄰算子之間不存在數(shù)據(jù)Shuffle、并行度相同,則會合并為算子鏈(Operator Chain)。每個算子或算子鏈組成一個JobVertex,在執(zhí)行時作為一個任務(wù)(Task)。根據(jù)并行度的設(shè)置,每個任務(wù)包含并行度數(shù)目的子任務(wù)(SubTask),這些子任務(wù)就是作業(yè)調(diào)度的最小邏輯單元,對應(yīng)于進(jìn)程資源中的一個線程,在Flink中,就是一個Slot(如果不考慮Slot共享的話)。
假定Flink環(huán)境的并行度設(shè)置為1,作業(yè)的前面兩個算子滿足合并算子鏈的要求,且并行度設(shè)置為2;之后,通過keyBy()之類的算子完成了數(shù)據(jù)的Shuffle,然后再合并到同一個Sink中。那么它們的關(guān)系如下圖所示:
顯然,F(xiàn)link集群在執(zhí)行作業(yè)時,會對作業(yè)進(jìn)行劃分,并將劃分后的各個子任務(wù)分發(fā)到TaskManager中的每個Slot。一個TaskManager就是一個JVM,Slot則是進(jìn)程中的一個線程。
答案不言而喻。AbstractFlow之所以無法捕獲到各個算子執(zhí)行任務(wù)時拋出的業(yè)務(wù)異常,是因為它們根本就沒有執(zhí)行在一個JVM上,也沒有運(yùn)行在同一個線程中。這正是分布式開發(fā)與本地開發(fā)的本質(zhì)區(qū)別。如果不了解Flink的執(zhí)行原理,可能就會困惑Java的異常處理機(jī)制為何不生效。在進(jìn)行分布式開發(fā)時,如果還是照搬本地開發(fā)的經(jīng)驗,可能真的會撞得頭碰血流才會看清真相。因此,正確的做法是在每個算子的實(shí)現(xiàn)中捕獲各自的異常,也就是要保證每個算子自身都是健壯的,如此才能保證作業(yè)盡可能健壯。
當(dāng)然,分布式開發(fā)與本地開發(fā)的本質(zhì)區(qū)別不只限于此,例如分布式開發(fā)跨進(jìn)程調(diào)用對序列化的要求,對數(shù)據(jù)一致性的不同要求,對異步通信機(jī)制以及阻塞調(diào)用的認(rèn)識,都可能給程序員帶來不同的體驗。歸根結(jié)底,了解分布式開發(fā)或分布式系統(tǒng)的底層原理,可以讓我們盡早看到真相,避免調(diào)到坑里而不自知。