Apache Flink 漫談系列(05) - Fault Tolerance
實(shí)際問題
在流計(jì)算場景中,數(shù)據(jù)會源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進(jìn)入Apache Flink系統(tǒng)都會觸發(fā)計(jì)算。那么在計(jì)算過程中如果網(wǎng)絡(luò)、機(jī)器等原因?qū)е耇ask運(yùn)行失敗了,Apache Flink會如何處理呢?在 《Apache Flink 漫談系列 - State》一篇中我們介紹了 Apache Flink 會利用State記錄計(jì)算的狀態(tài),在Failover時(shí)候Task會根據(jù)State進(jìn)行恢復(fù)。但State的內(nèi)容是如何記錄的?Apache Flink 是如何保證 Exactly-Once 語義的呢?這就涉及到了Apache Flink的 容錯(cuò)(Fault Tolerance) 機(jī)制,本篇將會為大家進(jìn)行相關(guān)內(nèi)容的介紹。
什么是Fault Tolerance
容錯(cuò)(Fault Tolerance) 是指容忍故障,在故障發(fā)生時(shí)能夠自動檢測出來,并使系統(tǒng)能夠自動恢復(fù)正常運(yùn)行。當(dāng)出現(xiàn)某些指定的網(wǎng)絡(luò)故障、硬件故障、軟件錯(cuò)誤時(shí),系統(tǒng)仍能執(zhí)行規(guī)定的一組程序,或者說程序不會因系統(tǒng)中的故障而中止,并且執(zhí)行結(jié)果也不包含系統(tǒng)故障所引起的差錯(cuò)。
傳統(tǒng)數(shù)據(jù)庫Fault Tolerance
我們知道MySQL的binlog是一個(gè)Append Only的日志文件,MySQL的主備復(fù)制是高可用的主要方式,binlog是主備復(fù)制的核心手段(當(dāng)然MySQL高可用細(xì)節(jié)很復(fù)雜也有多種不同的優(yōu)化點(diǎn),如 純異步復(fù)制優(yōu)化為半同步和同步復(fù)制以保證異步復(fù)制binlog導(dǎo)致的master和slave的同步時(shí)候網(wǎng)絡(luò)壞掉,導(dǎo)致主備不一致問題等)。MySQL主備復(fù)制,是MySQL容錯(cuò)機(jī)制的一部分,在容錯(cuò)機(jī)制之中也包括事物控制,在傳統(tǒng)數(shù)據(jù)庫中事物可以設(shè)置不同的事物級別,以保證不同的數(shù)據(jù)質(zhì)量,級別由低到高 如下:
- Read uncommitted - 讀未提交,就是一個(gè)事務(wù)可以讀取另一個(gè)未提交事務(wù)的數(shù)據(jù)。那么這種事物控制成本***,但是會導(dǎo)致另一個(gè)事物讀到臟數(shù)據(jù),那么如何解決讀到臟數(shù)據(jù)的問題呢?利用Read committed 級別...
- Read committed - 讀提交,就是一個(gè)事務(wù)要等另一個(gè)事務(wù)提交后才能讀取數(shù)據(jù)。這種級別可以解決讀臟數(shù)據(jù)的問題,那么這種級別有什么問題呢?這個(gè)級別還有一個(gè) 不能重復(fù)讀的問題,即:開啟一個(gè)讀事物T1,先讀取字段F1值是V1,這時(shí)候另一個(gè)事物T2可以UPDATA這個(gè)字段值V2,導(dǎo)致T1再次讀取字段值時(shí)候獲得V2了,同一個(gè)事物中的兩次讀取不一致了。那么如何解決不可重復(fù)讀的問題呢?利用 Repeatable read 級別...
- Repeatable read - 重復(fù)讀,就是在開始讀取數(shù)據(jù)(事務(wù)開啟)時(shí),不再允許修改操作。重復(fù)讀模式要有事物順序的等待,需要一定的成本達(dá)到高質(zhì)量的數(shù)據(jù)信息,那么重復(fù)讀還會有什么問題嗎?是的,重復(fù)讀級別還有一個(gè)問題就是 幻讀,幻讀產(chǎn)生的原因是INSERT,那么幻讀怎么解決呢?利用Serializable級別...
- Serializable - 序列化 是***的事務(wù)隔離級別,在該級別下,事務(wù)串行化順序執(zhí)行,可以避免臟讀、不可重復(fù)讀與幻讀。但是這種事務(wù)隔離級別效率低下,比較耗數(shù)據(jù)庫性能,一般不使用。
主備復(fù)制,事物控制都是傳統(tǒng)數(shù)據(jù)庫容錯(cuò)的機(jī)制。
流計(jì)算Fault Tolerance的挑戰(zhàn)
流計(jì)算Fault Tolerance的一個(gè)很大的挑戰(zhàn)是低延遲,很多Apache Flink任務(wù)都是7 x 24小時(shí)不間斷,端到端的秒級延遲,要想在遇上網(wǎng)絡(luò)閃斷,機(jī)器壞掉等非預(yù)期的問題時(shí)候快速恢復(fù)正常,并且不影響計(jì)算結(jié)果正確性是一件極其困難的事情。同時(shí)除了流計(jì)算的低延時(shí)要求,還有計(jì)算模式上面的挑戰(zhàn),在Apache Flink中支持Exactly-Once和At-Least-Once兩種計(jì)算模式,如何做到在Failover時(shí)候不重復(fù)計(jì)算,進(jìn)而精準(zhǔn)的做到Exactly-Once也是流計(jì)算Fault Tolerance要重點(diǎn)解決的問題。
Apache Flink的Fault Tolerance 機(jī)制
Apache Flink的Fault Tolerance機(jī)制核心是持續(xù)創(chuàng)建分布式流數(shù)據(jù)及其狀態(tài)的快照。這些快照在系統(tǒng)遇到故障時(shí),作為一個(gè)回退點(diǎn)。Apache Flink中創(chuàng)建快照的機(jī)制叫做Checkpointing,Checkpointing的理論基礎(chǔ) Stephan 在 Lightweight Asynchronous Snapshots for Distributed Dataflows 進(jìn)行了細(xì)節(jié)描述,該機(jī)制源于由K. MANI CHANDY和LESLIE LAMPORT 發(fā)表的 Determining-Global-States-of-a-Distributed-System Paper,該P(yáng)aper描述了在分布式系統(tǒng)如何解決全局狀態(tài)一致性問題。
在Apache Flink中以Checkpointing的機(jī)制進(jìn)行容錯(cuò),Checkpointing會產(chǎn)生類似binlog一樣的、可以用來恢復(fù)任務(wù)狀態(tài)的數(shù)據(jù)文件。Apache Flink中也有類似于數(shù)據(jù)庫事物控制一樣的數(shù)據(jù)計(jì)算語義控制,比如:At-Least-Once和Exactly-Once。
Checkpointing 的算法邏輯
上面我們說Checkpointing是Apache Flink中Fault Tolerance的核心機(jī)制,我們以Checkpointing的方式創(chuàng)建包含timer,connector,window,user-defined state 等stateful Operator的快照。在Determining-Global-States-of-a-Distributed-System的全局狀態(tài)一致性算法中重點(diǎn)描述了全局狀態(tài)的對齊問題,在Lightweight Asynchronous Snapshots for Distributed Dataflows中核心描述了對齊的方式,在Apache Flink中采用以在流信息中插入barrier的方式完成DAG中異步快照。 如下圖(from Lightweight Asynchronous Snapshots for Distributed Dataflows)描述了Asynchronous barrier snapshots for acyclic graphs,也是Apache Flink中采用的方式。
上圖描述的是一個(gè)增量計(jì)算word count的Job邏輯,核心邏輯是如下幾點(diǎn):
- barrier 由source節(jié)點(diǎn)發(fā)出;
- barrier會將流上event切分到不同的checkpoint中;
- 匯聚到當(dāng)前節(jié)點(diǎn)的多流的barrier要對齊;
- barrier對齊之后會進(jìn)行Checkpointing,生成snapshot;
- 完成snapshot之后向下游發(fā)出barrier,繼續(xù)直到Sink節(jié)點(diǎn)。
這樣在整個(gè)流計(jì)算中以barrier方式進(jìn)行Checkpointing,隨著時(shí)間的推移,整個(gè)流的計(jì)算過程中按時(shí)間順序不斷的進(jìn)行Checkpointing,如下圖:
生成的snapshot會存儲到StateBackend中,相關(guān)State的介紹可以查閱 《Apache Flink 漫談系列 - State》。這樣在進(jìn)行Failover時(shí)候,從***一次成功的checkpoint進(jìn)行恢復(fù)。
Checkpointing 的控制
上面我們了解到整個(gè)流上面我們會隨這時(shí)間推移不斷的做Checkpointing,不斷的產(chǎn)生snapshot存儲到Statebackend中,那么多久進(jìn)行一次Checkpointing?對產(chǎn)生的snapshot如何持久化的呢?帶著這些疑問,我們看看Apache Flink對于Checkpointing如何控制的?有哪些可配置的參數(shù):(這些參數(shù)都在 CheckpointCoordinator 中進(jìn)行定義)
- checkpointMode - 檢查點(diǎn)模式,分為 AT_LEAST_ONCE 和 EXACTLY_ONCE 兩種模式;
- checkpointInterval - 檢查點(diǎn)時(shí)間間隔,單位是毫秒;
- checkpointTimeout - 檢查點(diǎn)超時(shí)時(shí)間,單位毫秒。
在Apache Flink中還有一些其他配置,比如:是否將存儲到外部存儲的checkpoints數(shù)據(jù)刪除,如果不刪除,即使job被cancel掉,checkpoint信息也不會刪除,當(dāng)恢復(fù)job時(shí)候可以利用checkpoint進(jìn)行狀態(tài)恢復(fù)。我們有兩種配置方式,如下:
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION - 當(dāng)job被cancel時(shí)候,外部存儲的checkpoints不會刪除;
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION - 當(dāng)job被cancel時(shí)候,外部存儲的checkpoints會被刪除。
Apache Flink 如何做到Exactly-once
通過上面內(nèi)容我們了解了Apache Flink中Exactly-Once和At-Least-Once只是在進(jìn)行checkpointing時(shí)候的配置模式,兩種模式下進(jìn)行checkpointing的原理是一致的,那么在實(shí)現(xiàn)上有什么本質(zhì)區(qū)別呢?
1. 語義
At-Least-Once - 語義是流上所有數(shù)據(jù)至少被處理過一次(不要丟數(shù)據(jù))
Exactly-Once - 語義是流上所有數(shù)據(jù)必須被處理且只能處理一次(不丟數(shù)據(jù),且不能重復(fù))
從語義上面Exactly-Once 比 At-Least-Once對數(shù)據(jù)處理的要求更嚴(yán)格,更精準(zhǔn),那么更高的要求就意味著更高的代價(jià),這里的代價(jià)就是 延遲。
2. 實(shí)現(xiàn)
那在實(shí)現(xiàn)上面Apache Flink中At-Least-Once 和 Exactly-Once有什么區(qū)別呢?區(qū)別體現(xiàn)在多路輸入的時(shí)候(比如 Join),當(dāng)所有輸入的barrier沒有完全到來的時(shí)候,早到來的event在Exactly-Once模式下會進(jìn)行緩存(不進(jìn)行處理),而在At-Least-Once模式下即使所有輸入的barrier沒有完全到來,早到來的event也會進(jìn)行處理。也就是說對于At-Least-Once模式下,對于下游節(jié)點(diǎn)而言,本來數(shù)據(jù)屬于checkpoint N 的數(shù)據(jù)在checkpoint N-1 里面也可能處理過了。
我以Exactly-Once為例說明Exactly-Once模式相對于At-Least-Once模式為啥會有更高的延時(shí)?如下圖:
上圖示意了某個(gè)節(jié)點(diǎn)進(jìn)行Checkpointing的過程:
- 當(dāng)Operator接收到某個(gè)上游發(fā)下來的第barrier時(shí)候開始進(jìn)行barrier的對齊階段;
- 在進(jìn)行對齊期間早到的input的數(shù)據(jù)會被緩存到buffer中;
- 當(dāng)Operator接收到上游所有barrier的時(shí)候,當(dāng)前Operator會進(jìn)行Checkpointing,生成snapshot并持久化;
- 當(dāng)完Checkpointing時(shí)候?qū)arrier廣播給下游Operator。
多路輸入的barrier沒有對齊的時(shí)候,barrier先到的輸入數(shù)據(jù)會緩存在buffer中,不進(jìn)行處理,這樣對于下游而言buffer的數(shù)據(jù)越多就有更大的延遲。這個(gè)延時(shí)帶來的好處就是相鄰Checkpointing所記錄的數(shù)據(jù)(計(jì)算結(jié)果或event)沒有重復(fù)。相對At-Least-Once模式數(shù)據(jù)不會被buffer,減少延時(shí)的利好是以容忍數(shù)據(jù)重復(fù)計(jì)算為代價(jià)的。
在Apache Flink的代碼實(shí)現(xiàn)上用CheckpointBarrierHandler類處理barrier,其核心接口是:
- public interface CheckpointBarrierHandler {
- ...
- //返回operator消費(fèi)的下一個(gè)BufferOrEvent。這個(gè)調(diào)用會導(dǎo)致阻塞直到獲取到下一個(gè)BufferOrEvent
- BufferOrEvent getNextNonBlocked() throws Exception;
- ...}
其中BufferOrEvent,可能是正常的data event,也可能是特殊的event,比如barrier event。對應(yīng)At-Least-Once和Exactly-Once有兩種不同的實(shí)現(xiàn),具體如下:
- Exactly-Once模式 - BarrierBuffer
BarrierBuffer用于提供Exactly-Once一致性保證,其行為是:它將以barrier阻塞輸入直到所有的輸入都接收到基于某個(gè)檢查點(diǎn)的barrier,也就是上面所說的對齊。為了避免背壓輸入流,BarrierBuffer將從被阻塞的channel中持續(xù)地接收buffer并在內(nèi)部存儲它們,直到阻塞被解除。
BarrierBuffer 實(shí)現(xiàn)了CheckpointBarrierHandler的getNextNonBlocked, 該方法用于獲取待處理的下一條記錄。該方法是阻塞調(diào)用,直到獲取到下一個(gè)記錄。其中這里的記錄包括兩種,一種是來自于上游未被標(biāo)記為blocked的輸入,比如上圖中的 event(a),;另一種是,從已blocked輸入中緩沖區(qū)隊(duì)列中被釋放的記錄,比如上圖中的event(1,2,3,4)。
- At-Least-Once模式 - BarrierTracker
BarrierTracker會對各個(gè)輸入接收到的檢查點(diǎn)的barrier進(jìn)行跟蹤。一旦它觀察到某個(gè)檢查點(diǎn)的所有barrier都已經(jīng)到達(dá),它將會通知監(jiān)聽器檢查點(diǎn)已完成,以觸發(fā)相應(yīng)地回調(diào)處理。不像BarrierBuffer的處理邏輯,BarrierTracker不阻塞已經(jīng)發(fā)送了barrier的輸入,也就說明不采用對齊機(jī)制,因此本檢查點(diǎn)的數(shù)據(jù)會及時(shí)被處理,并且因此下一個(gè)檢查點(diǎn)的數(shù)據(jù)可能會在該檢查點(diǎn)還沒有完成時(shí)就已經(jīng)到來。這樣在恢復(fù)時(shí)只能提供At-Least-Once的語義保證。
BarrierTracker也實(shí)現(xiàn)了CheckpointBarrierHandler的getNextNonBlocked, 該方法用于獲取待處理的下一條記錄。與BarrierBuffer相比它實(shí)現(xiàn)很簡單,只是阻塞的獲取要處理的event。
如上兩個(gè)CheckpointBarrierHandler實(shí)現(xiàn)的核心區(qū)別是BarrierBuffer會維護(hù)多路輸入是否要blocked,緩存被blocked的輸入的record。所謂有得必有失,有失必有得,舍得舍得在這里也略有體現(xiàn)哈 :)。
完整Job的Checkpointing過程
在 《Apache Flink 漫談系列 - State》中我們有過對Apache Flink存儲到State中的內(nèi)容做過介紹,比如在connector會利用OperatorState記錄讀取位置的offset,那么一個(gè)完整的Apache Flink任務(wù)的執(zhí)行圖是一個(gè)DAG,上面我們描述了DAG中一個(gè)節(jié)點(diǎn)的過程,那么整體來看Checkpointing的過程是怎樣的呢?在產(chǎn)生checkpoint并分布式持久到HDFS的過程是怎樣的呢?
1. 整體Checkpointing流程
上圖我們看到一個(gè)完整的Apache Flink Job進(jìn)行Checkpointing的過程,JM觸發(fā)Soruce發(fā)射barriers,當(dāng)某個(gè)Operator接收到上游發(fā)下來的barrier,開始進(jìn)行barrier的處理,整體根據(jù)DAG自上而下的逐個(gè)節(jié)點(diǎn)進(jìn)行Checkpointing,并持久化到Statebackend,一直到DAG的sink節(jié)點(diǎn)。
2. Incremental Checkpointing
對于一個(gè)流計(jì)算的任務(wù),數(shù)據(jù)會源源不斷的流入,比如要進(jìn)行雙流join(Apache Flink 漫談系列 - Join 篇會詳細(xì)介紹),由于兩邊的流event的到來有先后順序問題,我們必須將left和right的數(shù)據(jù)都會在state中進(jìn)行存儲,Left event流入會在Right的State中進(jìn)行join數(shù)據(jù),Right event流入會在Left的State中進(jìn)行join數(shù)據(jù),如下圖左右兩邊的數(shù)據(jù)都會持久化到State中:
由于流上數(shù)據(jù)源源不斷,隨著時(shí)間的增加,每次checkpoint產(chǎn)生的snapshot的文件(RocksDB的sst文件)會變的非常龐大,增加網(wǎng)絡(luò)IO,拉長checkpoint時(shí)間,最終導(dǎo)致無法完成checkpoint,進(jìn)而導(dǎo)致Apache Flink失去Failover的能力。為了解決checkpoint不斷變大的問題,Apache Flink內(nèi)部實(shí)現(xiàn)了Incremental Checkpointing,這種增量進(jìn)行checkpoint的機(jī)制,會大大減少checkpoint時(shí)間,并且如果業(yè)務(wù)數(shù)據(jù)穩(wěn)定的情況下每次checkpoint的時(shí)間是相對穩(wěn)定的,根據(jù)不同的業(yè)務(wù)需求設(shè)定checkpoint的interval,穩(wěn)定快速的進(jìn)行Checkpointing,保障Apache Flink任務(wù)在遇到故障時(shí)候可以順利的進(jìn)行Failover。Incremental Checkpointing的優(yōu)化對于Apache Flink成百上千的任務(wù)節(jié)點(diǎn)帶來的利好不言而喻。
端到端exactly-once
根據(jù)上面的介紹我們知道Apache Flink內(nèi)部支持Exactly-Once語義,要想達(dá)到端到端(Soruce到Sink)的Exactly-Once,需要Apache Flink外部Soruce和Sink的支持,具體如下:
- 外部Source的容錯(cuò)要求:Apache Flink 要做到 End-to-End 的 Exactly-Once 需要外部Source的支持,比如上面我們說過 Apache Flink的Checkpointing機(jī)制會在Source節(jié)點(diǎn)記錄讀取的Position,那就需要外部Source提供讀取數(shù)據(jù)的Position和支持根據(jù)Position進(jìn)行數(shù)據(jù)讀取。
- 外部Sink的容錯(cuò)要求:Apache Flink 要做到 End-to-End 的 Exactly-Once相對比較困難,以Kafka作為Sink為例,當(dāng)Sink Operator節(jié)點(diǎn)宕機(jī)時(shí)候,根據(jù)Apache Flink 內(nèi)部Exactly-Once模式的容錯(cuò)保證, 系統(tǒng)會回滾到上次成功的Checkpoint繼續(xù)寫入,但是上次成功checkpoint之后當(dāng)前checkpoint未完成之前已經(jīng)把一部分新數(shù)據(jù)寫入到kafka了. Apache Flink自上次成功的checkpoint繼續(xù)寫入kafka,就造成了kafka再次接收到一份同樣的來自Sink Operator的數(shù)據(jù),進(jìn)而破壞了End-to-End 的 Exactly-Once 語義(重復(fù)寫入就變成了At-Least-Once了),如果要解決這一問題,Apache Flink 利用Two Phase Commit(兩階段提交)的方式來進(jìn)行處理。本質(zhì)上是Sink Operator 需要感知整體Checkpoint的完成,并在整體Checkpoint完成時(shí)候?qū)⒂?jì)算結(jié)果寫入Kafka。
小結(jié)
本篇和大家介紹了Apache Flink的容錯(cuò)(Fault Tolerance)機(jī)制,本篇內(nèi)容結(jié)合《Apache Flink 漫談系列 - State》 一起查閱相信大家會對Apache Flink的State和Fault Tolerance會有更好的理解,也會對后面介紹window,retraction都會有很好的幫助。
# 關(guān)于點(diǎn)贊和評論
本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點(diǎn)贊鼓勵(lì),對有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺Blink的設(shè)計(jì)研發(fā)工作。
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】