圖解 Flink 的 Checkpoint 機(jī)制
Flink是一個(gè)分布式的流處理引擎,而流處理的其中一個(gè)特點(diǎn)就是7X24。那么,如何保障Flink作業(yè)的持續(xù)運(yùn)行呢?Flink的內(nèi)部會(huì)將應(yīng)用狀態(tài)(state)存儲(chǔ)到本地內(nèi)存或者嵌入式的kv數(shù)據(jù)庫(RocksDB)中,由于采用的是分布式架構(gòu),F(xiàn)link需要對(duì)本地生成的狀態(tài)進(jìn)行持久化存儲(chǔ),以避免因應(yīng)用或者節(jié)點(diǎn)機(jī)器故障等原因?qū)е聰?shù)據(jù)的丟失,F(xiàn)link是通過checkpoint(檢查點(diǎn))的方式將狀態(tài)寫入到遠(yuǎn)程的持久化存儲(chǔ),從而就可以實(shí)現(xiàn)不同語義的結(jié)果保障。通過本文,你可以了解到什么是全局一致性檢查點(diǎn),F(xiàn)link內(nèi)部如何通過檢查點(diǎn)實(shí)現(xiàn)Exactly Once的結(jié)果保障。
什么是Checkpoint(檢查點(diǎn))
為了保證state容錯(cuò),F(xiàn)link提供了處理故障的措施,這種措施稱之為checkpoint(一致性檢查點(diǎn))。checkpoint是Flink實(shí)現(xiàn)容錯(cuò)的核心功能,主要是周期性地觸發(fā)checkpoint,將state生成快照持久化到外部存儲(chǔ)系統(tǒng)(比如HDFS)。這樣一來,如果Flink程序出現(xiàn)故障,那么就可以從上一次checkpoint中進(jìn)行狀態(tài)恢復(fù),從而提供容錯(cuò)保障。另外,通過checkpoint機(jī)制,F(xiàn)link可以實(shí)現(xiàn)Exactly-once語義(Flink內(nèi)部的Exactly-once,關(guān)于端到端的exactly_once,Flink是通過兩階段提交協(xié)議實(shí)現(xiàn)的)。下面將會(huì)詳細(xì)分析Flink的checkpoint機(jī)制。
檢查點(diǎn)的生成
如上圖,輸入流是用戶行為數(shù)據(jù),包括購買(buy)和加入購物車(cart)兩種,每種行為數(shù)據(jù)都有一個(gè)偏移量,統(tǒng)計(jì)每種行為的個(gè)數(shù)。
第一步:JobManager checkpoint coordinator 觸發(fā)checkpoint。
第二步:假設(shè)當(dāng)消費(fèi)到[cart,3]這條數(shù)據(jù)時(shí),觸發(fā)了checkpoint。那么此時(shí)數(shù)據(jù)源會(huì)把消費(fèi)的偏移量3寫入持久化存儲(chǔ)。
第三步:當(dāng)寫入結(jié)束后,source會(huì)將state handle(狀態(tài)存儲(chǔ)路徑)反饋給JobManager的checkpoint coordinator。
第四步:接著算子count buy與count cart也會(huì)進(jìn)行同樣的步驟
第五步:等所有的算子都完成了上述步驟之后,即當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle,就認(rèn)為這一次的 Checkpoint 全局完成了,向持久化存儲(chǔ)中再備份一個(gè) Checkpoint meta 文件,那么整個(gè)checkpoint也就完成了,如果中間有一個(gè)不成功,那么本次checkpoin就宣告失敗。
檢查點(diǎn)的恢復(fù)
通過上面的分析,或許你已經(jīng)對(duì)Flink的checkpoint有了初步的認(rèn)識(shí)。那么接下來,我們看一下是如何從檢查點(diǎn)恢復(fù)的。
- 任務(wù)失敗
- 重啟作業(yè)
- 恢復(fù)檢查點(diǎn)
繼續(xù)處理數(shù)據(jù)
上述過程具體總結(jié)如下:
- 第一步:重啟作業(yè)
- 第二步:從上一次檢查點(diǎn)恢復(fù)狀態(tài)數(shù)據(jù)
- 第三步:繼續(xù)處理新的數(shù)據(jù)
Flink內(nèi)部Exactly-Once實(shí)現(xiàn)
Flink提供了精確一次的處理語義,精確一次的處理語義可以理解為:數(shù)據(jù)可能會(huì)重復(fù)計(jì)算,但是結(jié)果狀態(tài)只有一個(gè)。Flink通過Checkpoint機(jī)制實(shí)現(xiàn)了精確一次的處理語義,F(xiàn)link在觸發(fā)Checkpoint時(shí)會(huì)向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的,并且會(huì)向下游算子進(jìn)行傳遞。checkpoint barriers攜帶一個(gè)checkpoint ID,用于標(biāo)識(shí)屬于哪一個(gè)checkpoint,checkpoint barriers將流邏輯是哪個(gè)分為了兩部分。對(duì)于雙流的情況,通過barrier對(duì)齊的方式實(shí)現(xiàn)精確一次的處理語義。
關(guān)于什么是checkpoint barrier,可以看一下CheckpointBarrier類的源碼描述,如下:
- /**
- * Checkpoint barriers用來在數(shù)據(jù)流中實(shí)現(xiàn)checkpoint對(duì)齊的.
- * Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,
- * Source會(huì)把barrier廣播發(fā)送到下游算子,當(dāng)一個(gè)算子接收到了其中一個(gè)輸入流的Checkpoint barrier時(shí),
- * 它就會(huì)知道已經(jīng)處理完了本次checkpoint與上次checkpoint之間的數(shù)據(jù).
- *
- * 一旦某個(gè)算子接收到了所有輸入流的checkpoint barrier時(shí),
- * 意味著該算子的已經(jīng)處理完了截止到當(dāng)前checkpoint的數(shù)據(jù),
- * 可以觸發(fā)checkpoint,并將barrier向下游傳遞
- *
- * 根據(jù)用戶選擇的處理語義,在checkpoint完成之前會(huì)緩存后一次checkpoint的數(shù)據(jù),
- * 直到本次checkpoint完成(exactly once)
- *
- * checkpoint barrier的id是嚴(yán)格單調(diào)遞增的
- *
- */
- public class CheckpointBarrier extends RuntimeEvent {...}
可以看出checkpoint barrier主要功能是實(shí)現(xiàn)checkpoint對(duì)齊的,從而可以實(shí)現(xiàn)Exactly-Once處理語義。
下面將會(huì)對(duì)checkpoint過程進(jìn)行分解,具體如下:
圖1,包括兩個(gè)流,每個(gè)任務(wù)都會(huì)消費(fèi)一條用戶行為數(shù)據(jù)(包括購買(buy)和加購(cart)),數(shù)字代表該數(shù)據(jù)的偏移量,count buy任務(wù)統(tǒng)計(jì)購買行為的個(gè)數(shù),coun cart統(tǒng)計(jì)加購行為的個(gè)數(shù)。
圖2,觸發(fā)checkpoint,JobManager會(huì)向每個(gè)數(shù)據(jù)源發(fā)送一個(gè)新的checkpoint編號(hào),以此來啟動(dòng)檢查點(diǎn)生成流程。
圖3,當(dāng)Source任務(wù)收到消息后,會(huì)停止發(fā)出數(shù)據(jù),然后利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點(diǎn),并把該checkpoint barrier以及checkpoint id廣播至所有傳出的數(shù)據(jù)流分區(qū)。狀態(tài)后端會(huì)在checkpoint完成之后通知任務(wù),隨后任務(wù)會(huì)向Job Manager發(fā)送確認(rèn)消息。在將checkpoint barrier發(fā)出之后,Source任務(wù)恢復(fù)正常工作。
圖4,Source任務(wù)發(fā)出的checkpoint barrier會(huì)發(fā)送到與之相連的下游算子任務(wù),當(dāng)任務(wù)收到一個(gè)新的checkpoint barrier時(shí),會(huì)繼續(xù)等待其他輸入分區(qū)的checkpoint barrier到來,這個(gè)過程稱之為barrier 對(duì)齊,checkpoint barrier到來之前會(huì)把到來的數(shù)據(jù)線緩存起來。
圖5,任務(wù)收齊了全部輸入分區(qū)的checkpoint barrier之后,會(huì)通知狀態(tài)后端開始生成checkpoint,同時(shí)會(huì)把checkpoint barrier廣播至下游算子。
圖6,任務(wù)在發(fā)出checkpoint barrier之后,開始處理因barrier對(duì)齊產(chǎn)生的緩存數(shù)據(jù),在緩存的數(shù)據(jù)處理完之后,就會(huì)繼續(xù)處理輸入流數(shù)據(jù)。
圖7,最終checkpoint barrier會(huì)被傳送到sink端,sink任務(wù)接收到checkpoint barrier之后,會(huì)向其他算子任務(wù)一樣,將自身的狀態(tài)寫入checkpoint,之后向Job Manager發(fā)送確認(rèn)消息。Job Manager接收到所有任務(wù)返回的確認(rèn)消息之后,就會(huì)將此次檢查點(diǎn)標(biāo)記為完成。
使用案例
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // checkpoint的時(shí)間間隔,如果狀態(tài)比較大,可以適當(dāng)調(diào)大該值
- env.enableCheckpointing(1000);
- // 配置處理語義,默認(rèn)是exactly-once
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // 兩個(gè)checkpoint之間的最小時(shí)間間隔,防止因checkpoint時(shí)間過長,導(dǎo)致checkpoint積壓
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
- // checkpoint執(zhí)行的上限時(shí)間,如果超過該閾值,則會(huì)中斷checkpoint
- env.getCheckpointConfig().setCheckpointTimeout(60000);
- // 最大并行執(zhí)行的檢查點(diǎn)數(shù)量,默認(rèn)為1,可以指定多個(gè),從而同時(shí)出發(fā)多個(gè)checkpoint,提升效率
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- // 設(shè)定周期性外部檢查點(diǎn),將狀態(tài)數(shù)據(jù)持久化到外部系統(tǒng)中,
- // 使用該方式不會(huì)在任務(wù)正常停止的過程中清理掉檢查點(diǎn)數(shù)據(jù)
- env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
總結(jié)
本文首先從Flink的狀態(tài)入手,以圖解加文字的形式詳細(xì)解釋了Flink的checkpoint機(jī)制,并給出了使用Checkpoint時(shí)的程序配置。