自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink分布式程序的異常處理

開發(fā) 架構(gòu) 分布式
Job與Flow之間的關(guān)系可以利用自定義的@JobFlow注解進(jìn)行配置,如此就可以在執(zhí)行抽象的AbstractJob的run()方法時,利用反射獲得該Job下的所有Flow,遍歷執(zhí)行每個Flow的run()方法。

[[409230]]

本文轉(zhuǎn)載自微信公眾號「逸言」,作者逸言。轉(zhuǎn)載本文請聯(lián)系逸言公眾號。

在我們的數(shù)據(jù)平臺產(chǎn)品中,為了簡化開發(fā),對Flink做了一層封裝,定義了Job和Flow的抽象。一個Job其實(shí)就是Flink的一個作業(yè),每個Job可以定義多個Flow,一個Flow可以理解為是Flink的一個DataStream,利用Job傳遞的StreamExecutionEnvironment可以在Flow中添加包括Source與Sink的多個算子。

Job與Flow之間的關(guān)系可以利用自定義的@JobFlow注解進(jìn)行配置,如此就可以在執(zhí)行抽象的AbstractJob的run()方法時,利用反射獲得該Job下的所有Flow,遍歷執(zhí)行每個Flow的run()方法。在Flow的run()方法中,才會真正根據(jù)StreamExecutionEnvironment執(zhí)行多個算子。

Flink為了保證計算的穩(wěn)定性,提供了不同的重啟策略。例如,當(dāng)我們將重啟策略設(shè)置為失敗率(failure-rate)時,如果執(zhí)行的任務(wù)出錯次數(shù)達(dá)到了失敗率配置的要求,F(xiàn)link的Worker節(jié)點(diǎn)的TaskManager就會重啟。如果超過重啟次數(shù),Task Manager就會停止運(yùn)行。

失敗的原因可能有很多,例如資源不足、網(wǎng)絡(luò)通信出現(xiàn)故障等Flink集群環(huán)境導(dǎo)致的故障,但是也可能是我們編寫的作業(yè)在處理流式數(shù)據(jù)時,因為處理數(shù)據(jù)不當(dāng)拋出了業(yè)務(wù)異常,使得Flink將其視為一次失敗。

為了減少因為業(yè)務(wù)原因拋出異常導(dǎo)致Task Manager的不必要重啟,需要規(guī)定我們編寫的Flink程序的異常處理機(jī)制。由于封裝了Flink的Job,從一開始,我就考慮一勞永逸地解決業(yè)務(wù)異常的問題,即在AbstractJob的run()方法中,捕獲我們自定義的業(yè)務(wù)異常,在日志記錄了錯誤信息后,把該異常“吃”掉,避免異常的拋出導(dǎo)致執(zhí)行失敗,造成TaskManager的重啟,如:

  1. public abstract class AbstractFlow implements Flow {   
  2.     public void run() { 
  3.         try { 
  4.             runBare(); 
  5.         } catch (DomainException ex) { 
  6.             //... 
  7.         } 
  8.     } 
  9.    
  10.     protected abstract void runBare(); 

哪知道這一處理機(jī)制壓根兒就無法捕獲業(yè)務(wù)異常!為什么呢?這就要從Flink的分布式機(jī)制說起了。

在Flink集群上執(zhí)行任務(wù),需要Client將作業(yè)提交給Flink集群的Master節(jié)點(diǎn)。Master的Dispatcher接收到Job并啟動JobManager,通過解析Job的邏輯視圖,了解Job對資源的要求,然后向ResourceManager(Standalone模式,如果是YARN,則由YARN管理和調(diào)度資源)申請本次Job需要的資源。JobManager將Job的邏輯視圖轉(zhuǎn)換為物理視圖,并將計算任務(wù)分發(fā)部署到Flink集群的TaskManager上。整個執(zhí)行過程如下圖所示:

我們封裝的一個Flow,在物理視圖中,其實(shí)就是一個作業(yè),即前面所說的計算任務(wù)。一個作業(yè)可以包含多個算子。如果相鄰算子之間不存在數(shù)據(jù)Shuffle、并行度相同,則會合并為算子鏈(Operator Chain)。每個算子或算子鏈組成一個JobVertex,在執(zhí)行時作為一個任務(wù)(Task)。根據(jù)并行度的設(shè)置,每個任務(wù)包含并行度數(shù)目的子任務(wù)(SubTask),這些子任務(wù)就是作業(yè)調(diào)度的最小邏輯單元,對應(yīng)于進(jìn)程資源中的一個線程,在Flink中,就是一個Slot(如果不考慮Slot共享的話)。

假定Flink環(huán)境的并行度設(shè)置為1,作業(yè)的前面兩個算子滿足合并算子鏈的要求,且并行度設(shè)置為2;之后,通過keyBy()之類的算子完成了數(shù)據(jù)的Shuffle,然后再合并到同一個Sink中。那么它們的關(guān)系如下圖所示:

顯然,F(xiàn)link集群在執(zhí)行作業(yè)時,會對作業(yè)進(jìn)行劃分,并將劃分后的各個子任務(wù)分發(fā)到TaskManager中的每個Slot。一個TaskManager就是一個JVM,Slot則是進(jìn)程中的一個線程。

答案不言而喻。AbstractFlow之所以無法捕獲到各個算子執(zhí)行任務(wù)時拋出的業(yè)務(wù)異常,是因為它們根本就沒有執(zhí)行在一個JVM上,也沒有運(yùn)行在同一個線程中。這正是分布式開發(fā)與本地開發(fā)的本質(zhì)區(qū)別。如果不了解Flink的執(zhí)行原理,可能就會困惑Java的異常處理機(jī)制為何不生效。在進(jìn)行分布式開發(fā)時,如果還是照搬本地開發(fā)的經(jīng)驗,可能真的會撞得頭碰血流才會看清真相。因此,正確的做法是在每個算子的實(shí)現(xiàn)中捕獲各自的異常,也就是要保證每個算子自身都是健壯的,如此才能保證作業(yè)盡可能健壯。

 

當(dāng)然,分布式開發(fā)與本地開發(fā)的本質(zhì)區(qū)別不只限于此,例如分布式開發(fā)跨進(jìn)程調(diào)用對序列化的要求,對數(shù)據(jù)一致性的不同要求,對異步通信機(jī)制以及阻塞調(diào)用的認(rèn)識,都可能給程序員帶來不同的體驗。歸根結(jié)底,了解分布式開發(fā)或分布式系統(tǒng)的底層原理,可以讓我們盡早看到真相,避免調(diào)到坑里而不自知。

 

責(zé)任編輯:武曉燕 來源: 逸言
相關(guān)推薦

2019-06-19 15:40:06

分布式鎖RedisJava

2014-01-22 13:37:53

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2023-05-12 08:23:03

分布式系統(tǒng)網(wǎng)絡(luò)

2023-05-29 14:07:00

Zuul網(wǎng)關(guān)系統(tǒng)

2017-09-01 05:35:58

分布式計算存儲

2023-02-11 00:04:17

分布式系統(tǒng)安全

2009-01-18 09:11:16

JavaIDLJava分布式程序設(shè)計

2021-08-30 20:19:55

應(yīng)用程序

2017-10-27 08:40:44

分布式存儲剪枝系統(tǒng)

2019-04-30 09:17:31

Ceph存儲OSD

2023-10-26 18:10:43

分布式并行技術(shù)系統(tǒng)

2014-02-11 09:07:31

2019-02-17 09:56:43

2024-01-10 08:02:03

分布式技術(shù)令牌,

2015-07-15 10:42:38

分布式分布式事務(wù)集群

2024-03-01 09:53:34

2018-07-17 08:14:22

分布式分布式鎖方位

2023-11-30 07:19:08

.NET開源

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號