自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Flink 漫談系列(05) - Fault Tolerance

開發(fā) 開發(fā)工具
在 《Apache Flink 漫談系列 - State》一篇中我們介紹了 Apache Flink 會利用State記錄計(jì)算的狀態(tài),在Failover時(shí)候Task會根據(jù)State進(jìn)行恢復(fù)。但State的內(nèi)容是如何記錄的?Apache Flink 是如何保證 Exactly-Once 語義的呢?

實(shí)際問題

在流計(jì)算場景中,數(shù)據(jù)會源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進(jìn)入Apache Flink系統(tǒng)都會觸發(fā)計(jì)算。那么在計(jì)算過程中如果網(wǎng)絡(luò)、機(jī)器等原因?qū)е耇ask運(yùn)行失敗了,Apache Flink會如何處理呢?在 《Apache Flink 漫談系列 - State》一篇中我們介紹了 Apache Flink 會利用State記錄計(jì)算的狀態(tài),在Failover時(shí)候Task會根據(jù)State進(jìn)行恢復(fù)。但State的內(nèi)容是如何記錄的?Apache Flink 是如何保證 Exactly-Once 語義的呢?這就涉及到了Apache Flink的 容錯(cuò)(Fault Tolerance) 機(jī)制,本篇將會為大家進(jìn)行相關(guān)內(nèi)容的介紹。

什么是Fault Tolerance

容錯(cuò)(Fault Tolerance) 是指容忍故障,在故障發(fā)生時(shí)能夠自動檢測出來,并使系統(tǒng)能夠自動恢復(fù)正常運(yùn)行。當(dāng)出現(xiàn)某些指定的網(wǎng)絡(luò)故障、硬件故障、軟件錯(cuò)誤時(shí),系統(tǒng)仍能執(zhí)行規(guī)定的一組程序,或者說程序不會因系統(tǒng)中的故障而中止,并且執(zhí)行結(jié)果也不包含系統(tǒng)故障所引起的差錯(cuò)。

傳統(tǒng)數(shù)據(jù)庫Fault Tolerance

我們知道MySQL的binlog是一個(gè)Append Only的日志文件,MySQL的主備復(fù)制是高可用的主要方式,binlog是主備復(fù)制的核心手段(當(dāng)然MySQL高可用細(xì)節(jié)很復(fù)雜也有多種不同的優(yōu)化點(diǎn),如 純異步復(fù)制優(yōu)化為半同步和同步復(fù)制以保證異步復(fù)制binlog導(dǎo)致的master和slave的同步時(shí)候網(wǎng)絡(luò)壞掉,導(dǎo)致主備不一致問題等)。MySQL主備復(fù)制,是MySQL容錯(cuò)機(jī)制的一部分,在容錯(cuò)機(jī)制之中也包括事物控制,在傳統(tǒng)數(shù)據(jù)庫中事物可以設(shè)置不同的事物級別,以保證不同的數(shù)據(jù)質(zhì)量,級別由低到高 如下:

  • Read uncommitted - 讀未提交,就是一個(gè)事務(wù)可以讀取另一個(gè)未提交事務(wù)的數(shù)據(jù)。那么這種事物控制成本***,但是會導(dǎo)致另一個(gè)事物讀到臟數(shù)據(jù),那么如何解決讀到臟數(shù)據(jù)的問題呢?利用Read committed 級別...
  • Read committed - 讀提交,就是一個(gè)事務(wù)要等另一個(gè)事務(wù)提交后才能讀取數(shù)據(jù)。這種級別可以解決讀臟數(shù)據(jù)的問題,那么這種級別有什么問題呢?這個(gè)級別還有一個(gè) 不能重復(fù)讀的問題,即:開啟一個(gè)讀事物T1,先讀取字段F1值是V1,這時(shí)候另一個(gè)事物T2可以UPDATA這個(gè)字段值V2,導(dǎo)致T1再次讀取字段值時(shí)候獲得V2了,同一個(gè)事物中的兩次讀取不一致了。那么如何解決不可重復(fù)讀的問題呢?利用 Repeatable read 級別...
  • Repeatable read - 重復(fù)讀,就是在開始讀取數(shù)據(jù)(事務(wù)開啟)時(shí),不再允許修改操作。重復(fù)讀模式要有事物順序的等待,需要一定的成本達(dá)到高質(zhì)量的數(shù)據(jù)信息,那么重復(fù)讀還會有什么問題嗎?是的,重復(fù)讀級別還有一個(gè)問題就是 幻讀,幻讀產(chǎn)生的原因是INSERT,那么幻讀怎么解決呢?利用Serializable級別...
  • Serializable - 序列化 是***的事務(wù)隔離級別,在該級別下,事務(wù)串行化順序執(zhí)行,可以避免臟讀、不可重復(fù)讀與幻讀。但是這種事務(wù)隔離級別效率低下,比較耗數(shù)據(jù)庫性能,一般不使用。

主備復(fù)制,事物控制都是傳統(tǒng)數(shù)據(jù)庫容錯(cuò)的機(jī)制。

流計(jì)算Fault Tolerance的挑戰(zhàn)

流計(jì)算Fault Tolerance的一個(gè)很大的挑戰(zhàn)是低延遲,很多Apache Flink任務(wù)都是7 x 24小時(shí)不間斷,端到端的秒級延遲,要想在遇上網(wǎng)絡(luò)閃斷,機(jī)器壞掉等非預(yù)期的問題時(shí)候快速恢復(fù)正常,并且不影響計(jì)算結(jié)果正確性是一件極其困難的事情。同時(shí)除了流計(jì)算的低延時(shí)要求,還有計(jì)算模式上面的挑戰(zhàn),在Apache Flink中支持Exactly-Once和At-Least-Once兩種計(jì)算模式,如何做到在Failover時(shí)候不重復(fù)計(jì)算,進(jìn)而精準(zhǔn)的做到Exactly-Once也是流計(jì)算Fault Tolerance要重點(diǎn)解決的問題。

Apache Flink的Fault Tolerance 機(jī)制

Apache Flink的Fault Tolerance機(jī)制核心是持續(xù)創(chuàng)建分布式流數(shù)據(jù)及其狀態(tài)的快照。這些快照在系統(tǒng)遇到故障時(shí),作為一個(gè)回退點(diǎn)。Apache Flink中創(chuàng)建快照的機(jī)制叫做Checkpointing,Checkpointing的理論基礎(chǔ) Stephan 在 Lightweight Asynchronous Snapshots for Distributed Dataflows 進(jìn)行了細(xì)節(jié)描述,該機(jī)制源于由K. MANI CHANDY和LESLIE LAMPORT 發(fā)表的 Determining-Global-States-of-a-Distributed-System Paper,該P(yáng)aper描述了在分布式系統(tǒng)如何解決全局狀態(tài)一致性問題。

在Apache Flink中以Checkpointing的機(jī)制進(jìn)行容錯(cuò),Checkpointing會產(chǎn)生類似binlog一樣的、可以用來恢復(fù)任務(wù)狀態(tài)的數(shù)據(jù)文件。Apache Flink中也有類似于數(shù)據(jù)庫事物控制一樣的數(shù)據(jù)計(jì)算語義控制,比如:At-Least-Once和Exactly-Once。

Checkpointing 的算法邏輯

上面我們說Checkpointing是Apache Flink中Fault Tolerance的核心機(jī)制,我們以Checkpointing的方式創(chuàng)建包含timer,connector,window,user-defined state 等stateful Operator的快照。在Determining-Global-States-of-a-Distributed-System的全局狀態(tài)一致性算法中重點(diǎn)描述了全局狀態(tài)的對齊問題,在Lightweight Asynchronous Snapshots for Distributed Dataflows中核心描述了對齊的方式,在Apache Flink中采用以在流信息中插入barrier的方式完成DAG中異步快照。 如下圖(from Lightweight Asynchronous Snapshots for Distributed Dataflows)描述了Asynchronous barrier snapshots for acyclic graphs,也是Apache Flink中采用的方式。

上圖描述的是一個(gè)增量計(jì)算word count的Job邏輯,核心邏輯是如下幾點(diǎn):

  • barrier 由source節(jié)點(diǎn)發(fā)出;
  • barrier會將流上event切分到不同的checkpoint中;
  • 匯聚到當(dāng)前節(jié)點(diǎn)的多流的barrier要對齊;
  • barrier對齊之后會進(jìn)行Checkpointing,生成snapshot;
  • 完成snapshot之后向下游發(fā)出barrier,繼續(xù)直到Sink節(jié)點(diǎn)。

這樣在整個(gè)流計(jì)算中以barrier方式進(jìn)行Checkpointing,隨著時(shí)間的推移,整個(gè)流的計(jì)算過程中按時(shí)間順序不斷的進(jìn)行Checkpointing,如下圖:

生成的snapshot會存儲到StateBackend中,相關(guān)State的介紹可以查閱 《Apache Flink 漫談系列 - State》。這樣在進(jìn)行Failover時(shí)候,從***一次成功的checkpoint進(jìn)行恢復(fù)。

Checkpointing 的控制

上面我們了解到整個(gè)流上面我們會隨這時(shí)間推移不斷的做Checkpointing,不斷的產(chǎn)生snapshot存儲到Statebackend中,那么多久進(jìn)行一次Checkpointing?對產(chǎn)生的snapshot如何持久化的呢?帶著這些疑問,我們看看Apache Flink對于Checkpointing如何控制的?有哪些可配置的參數(shù):(這些參數(shù)都在 CheckpointCoordinator 中進(jìn)行定義)

  • checkpointMode - 檢查點(diǎn)模式,分為 AT_LEAST_ONCE 和 EXACTLY_ONCE 兩種模式;
  • checkpointInterval - 檢查點(diǎn)時(shí)間間隔,單位是毫秒;
  • checkpointTimeout - 檢查點(diǎn)超時(shí)時(shí)間,單位毫秒。

在Apache Flink中還有一些其他配置,比如:是否將存儲到外部存儲的checkpoints數(shù)據(jù)刪除,如果不刪除,即使job被cancel掉,checkpoint信息也不會刪除,當(dāng)恢復(fù)job時(shí)候可以利用checkpoint進(jìn)行狀態(tài)恢復(fù)。我們有兩種配置方式,如下:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION - 當(dāng)job被cancel時(shí)候,外部存儲的checkpoints不會刪除;
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION - 當(dāng)job被cancel時(shí)候,外部存儲的checkpoints會被刪除。

Apache Flink 如何做到Exactly-once

通過上面內(nèi)容我們了解了Apache Flink中Exactly-Once和At-Least-Once只是在進(jìn)行checkpointing時(shí)候的配置模式,兩種模式下進(jìn)行checkpointing的原理是一致的,那么在實(shí)現(xiàn)上有什么本質(zhì)區(qū)別呢?

1. 語義

At-Least-Once - 語義是流上所有數(shù)據(jù)至少被處理過一次(不要丟數(shù)據(jù))

Exactly-Once - 語義是流上所有數(shù)據(jù)必須被處理且只能處理一次(不丟數(shù)據(jù),且不能重復(fù))

從語義上面Exactly-Once 比 At-Least-Once對數(shù)據(jù)處理的要求更嚴(yán)格,更精準(zhǔn),那么更高的要求就意味著更高的代價(jià),這里的代價(jià)就是 延遲。

2. 實(shí)現(xiàn)

那在實(shí)現(xiàn)上面Apache Flink中At-Least-Once 和 Exactly-Once有什么區(qū)別呢?區(qū)別體現(xiàn)在多路輸入的時(shí)候(比如 Join),當(dāng)所有輸入的barrier沒有完全到來的時(shí)候,早到來的event在Exactly-Once模式下會進(jìn)行緩存(不進(jìn)行處理),而在At-Least-Once模式下即使所有輸入的barrier沒有完全到來,早到來的event也會進(jìn)行處理。也就是說對于At-Least-Once模式下,對于下游節(jié)點(diǎn)而言,本來數(shù)據(jù)屬于checkpoint N 的數(shù)據(jù)在checkpoint N-1 里面也可能處理過了。

我以Exactly-Once為例說明Exactly-Once模式相對于At-Least-Once模式為啥會有更高的延時(shí)?如下圖:

上圖示意了某個(gè)節(jié)點(diǎn)進(jìn)行Checkpointing的過程:

  • 當(dāng)Operator接收到某個(gè)上游發(fā)下來的第barrier時(shí)候開始進(jìn)行barrier的對齊階段;
  • 在進(jìn)行對齊期間早到的input的數(shù)據(jù)會被緩存到buffer中;
  • 當(dāng)Operator接收到上游所有barrier的時(shí)候,當(dāng)前Operator會進(jìn)行Checkpointing,生成snapshot并持久化;
  • 當(dāng)完Checkpointing時(shí)候?qū)arrier廣播給下游Operator。

多路輸入的barrier沒有對齊的時(shí)候,barrier先到的輸入數(shù)據(jù)會緩存在buffer中,不進(jìn)行處理,這樣對于下游而言buffer的數(shù)據(jù)越多就有更大的延遲。這個(gè)延時(shí)帶來的好處就是相鄰Checkpointing所記錄的數(shù)據(jù)(計(jì)算結(jié)果或event)沒有重復(fù)。相對At-Least-Once模式數(shù)據(jù)不會被buffer,減少延時(shí)的利好是以容忍數(shù)據(jù)重復(fù)計(jì)算為代價(jià)的。

在Apache Flink的代碼實(shí)現(xiàn)上用CheckpointBarrierHandler類處理barrier,其核心接口是:

  1. public interface CheckpointBarrierHandler { 
  2. ... 
  3. //返回operator消費(fèi)的下一個(gè)BufferOrEvent。這個(gè)調(diào)用會導(dǎo)致阻塞直到獲取到下一個(gè)BufferOrEvent 
  4. BufferOrEvent getNextNonBlocked() throws Exception; 
  5. ...} 

其中BufferOrEvent,可能是正常的data event,也可能是特殊的event,比如barrier event。對應(yīng)At-Least-Once和Exactly-Once有兩種不同的實(shí)現(xiàn),具體如下:

  • Exactly-Once模式 - BarrierBuffer

BarrierBuffer用于提供Exactly-Once一致性保證,其行為是:它將以barrier阻塞輸入直到所有的輸入都接收到基于某個(gè)檢查點(diǎn)的barrier,也就是上面所說的對齊。為了避免背壓輸入流,BarrierBuffer將從被阻塞的channel中持續(xù)地接收buffer并在內(nèi)部存儲它們,直到阻塞被解除。

BarrierBuffer 實(shí)現(xiàn)了CheckpointBarrierHandler的getNextNonBlocked, 該方法用于獲取待處理的下一條記錄。該方法是阻塞調(diào)用,直到獲取到下一個(gè)記錄。其中這里的記錄包括兩種,一種是來自于上游未被標(biāo)記為blocked的輸入,比如上圖中的 event(a),;另一種是,從已blocked輸入中緩沖區(qū)隊(duì)列中被釋放的記錄,比如上圖中的event(1,2,3,4)。

  • At-Least-Once模式 - BarrierTracker

BarrierTracker會對各個(gè)輸入接收到的檢查點(diǎn)的barrier進(jìn)行跟蹤。一旦它觀察到某個(gè)檢查點(diǎn)的所有barrier都已經(jīng)到達(dá),它將會通知監(jiān)聽器檢查點(diǎn)已完成,以觸發(fā)相應(yīng)地回調(diào)處理。不像BarrierBuffer的處理邏輯,BarrierTracker不阻塞已經(jīng)發(fā)送了barrier的輸入,也就說明不采用對齊機(jī)制,因此本檢查點(diǎn)的數(shù)據(jù)會及時(shí)被處理,并且因此下一個(gè)檢查點(diǎn)的數(shù)據(jù)可能會在該檢查點(diǎn)還沒有完成時(shí)就已經(jīng)到來。這樣在恢復(fù)時(shí)只能提供At-Least-Once的語義保證。

BarrierTracker也實(shí)現(xiàn)了CheckpointBarrierHandler的getNextNonBlocked, 該方法用于獲取待處理的下一條記錄。與BarrierBuffer相比它實(shí)現(xiàn)很簡單,只是阻塞的獲取要處理的event。

如上兩個(gè)CheckpointBarrierHandler實(shí)現(xiàn)的核心區(qū)別是BarrierBuffer會維護(hù)多路輸入是否要blocked,緩存被blocked的輸入的record。所謂有得必有失,有失必有得,舍得舍得在這里也略有體現(xiàn)哈 :)。

完整Job的Checkpointing過程

在 《Apache Flink 漫談系列 - State》中我們有過對Apache Flink存儲到State中的內(nèi)容做過介紹,比如在connector會利用OperatorState記錄讀取位置的offset,那么一個(gè)完整的Apache Flink任務(wù)的執(zhí)行圖是一個(gè)DAG,上面我們描述了DAG中一個(gè)節(jié)點(diǎn)的過程,那么整體來看Checkpointing的過程是怎樣的呢?在產(chǎn)生checkpoint并分布式持久到HDFS的過程是怎樣的呢?

1. 整體Checkpointing流程

上圖我們看到一個(gè)完整的Apache Flink Job進(jìn)行Checkpointing的過程,JM觸發(fā)Soruce發(fā)射barriers,當(dāng)某個(gè)Operator接收到上游發(fā)下來的barrier,開始進(jìn)行barrier的處理,整體根據(jù)DAG自上而下的逐個(gè)節(jié)點(diǎn)進(jìn)行Checkpointing,并持久化到Statebackend,一直到DAG的sink節(jié)點(diǎn)。

2. Incremental Checkpointing

對于一個(gè)流計(jì)算的任務(wù),數(shù)據(jù)會源源不斷的流入,比如要進(jìn)行雙流join(Apache Flink 漫談系列 - Join 篇會詳細(xì)介紹),由于兩邊的流event的到來有先后順序問題,我們必須將left和right的數(shù)據(jù)都會在state中進(jìn)行存儲,Left event流入會在Right的State中進(jìn)行join數(shù)據(jù),Right event流入會在Left的State中進(jìn)行join數(shù)據(jù),如下圖左右兩邊的數(shù)據(jù)都會持久化到State中:

由于流上數(shù)據(jù)源源不斷,隨著時(shí)間的增加,每次checkpoint產(chǎn)生的snapshot的文件(RocksDB的sst文件)會變的非常龐大,增加網(wǎng)絡(luò)IO,拉長checkpoint時(shí)間,最終導(dǎo)致無法完成checkpoint,進(jìn)而導(dǎo)致Apache Flink失去Failover的能力。為了解決checkpoint不斷變大的問題,Apache Flink內(nèi)部實(shí)現(xiàn)了Incremental Checkpointing,這種增量進(jìn)行checkpoint的機(jī)制,會大大減少checkpoint時(shí)間,并且如果業(yè)務(wù)數(shù)據(jù)穩(wěn)定的情況下每次checkpoint的時(shí)間是相對穩(wěn)定的,根據(jù)不同的業(yè)務(wù)需求設(shè)定checkpoint的interval,穩(wěn)定快速的進(jìn)行Checkpointing,保障Apache Flink任務(wù)在遇到故障時(shí)候可以順利的進(jìn)行Failover。Incremental Checkpointing的優(yōu)化對于Apache Flink成百上千的任務(wù)節(jié)點(diǎn)帶來的利好不言而喻。

端到端exactly-once

根據(jù)上面的介紹我們知道Apache Flink內(nèi)部支持Exactly-Once語義,要想達(dá)到端到端(Soruce到Sink)的Exactly-Once,需要Apache Flink外部Soruce和Sink的支持,具體如下:

  • 外部Source的容錯(cuò)要求:Apache Flink 要做到 End-to-End 的 Exactly-Once 需要外部Source的支持,比如上面我們說過 Apache Flink的Checkpointing機(jī)制會在Source節(jié)點(diǎn)記錄讀取的Position,那就需要外部Source提供讀取數(shù)據(jù)的Position和支持根據(jù)Position進(jìn)行數(shù)據(jù)讀取。
  • 外部Sink的容錯(cuò)要求:Apache Flink 要做到 End-to-End 的 Exactly-Once相對比較困難,以Kafka作為Sink為例,當(dāng)Sink Operator節(jié)點(diǎn)宕機(jī)時(shí)候,根據(jù)Apache Flink 內(nèi)部Exactly-Once模式的容錯(cuò)保證, 系統(tǒng)會回滾到上次成功的Checkpoint繼續(xù)寫入,但是上次成功checkpoint之后當(dāng)前checkpoint未完成之前已經(jīng)把一部分新數(shù)據(jù)寫入到kafka了. Apache Flink自上次成功的checkpoint繼續(xù)寫入kafka,就造成了kafka再次接收到一份同樣的來自Sink Operator的數(shù)據(jù),進(jìn)而破壞了End-to-End 的 Exactly-Once 語義(重復(fù)寫入就變成了At-Least-Once了),如果要解決這一問題,Apache Flink 利用Two Phase Commit(兩階段提交)的方式來進(jìn)行處理。本質(zhì)上是Sink Operator 需要感知整體Checkpoint的完成,并在整體Checkpoint完成時(shí)候?qū)⒂?jì)算結(jié)果寫入Kafka。

小結(jié)

本篇和大家介紹了Apache Flink的容錯(cuò)(Fault Tolerance)機(jī)制,本篇內(nèi)容結(jié)合《Apache Flink 漫談系列 - State》 一起查閱相信大家會對Apache Flink的State和Fault Tolerance會有更好的理解,也會對后面介紹window,retraction都會有很好的幫助。

# 關(guān)于點(diǎn)贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點(diǎn)贊鼓勵(lì),對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺Blink的設(shè)計(jì)研發(fā)工作。

【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請聯(lián)系原作者】

戳這里,看該作者更多好文

責(zé)任編輯:趙寧寧 來源: 51CTO專欄
相關(guān)推薦

2022-06-10 17:26:07

數(shù)據(jù)集計(jì)算

2018-10-09 10:55:52

Apache FlinWatermark流計(jì)算

2022-07-13 12:53:59

數(shù)據(jù)存儲

2018-09-26 08:44:22

Apache Flin流計(jì)算計(jì)算模式

2018-10-16 08:54:35

Apache Flin流計(jì)算State

2018-09-26 07:50:52

Apache Flin流計(jì)算計(jì)算模式

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-14 09:01:23

Apache FlinSQL代碼

2019-01-03 10:17:53

Apache FlinTable API代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-07-13 13:03:29

流計(jì)算亂序

2018-11-07 08:48:31

Apache Flin持續(xù)查詢流計(jì)算

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2018-12-29 08:16:32

Apache FlinJOIN代碼

2020-04-09 11:08:30

PyFlinkJAR依賴

2018-10-30 11:10:05

Flink數(shù)據(jù)集計(jì)算

2021-10-20 16:08:57

鴻蒙HarmonyOS應(yīng)用
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號