自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

圖解 Flink 的 Checkpoint 機(jī)制

存儲(chǔ)
通過本文,你可以了解到什么是全局一致性檢查點(diǎn),F(xiàn)link內(nèi)部如何通過檢查點(diǎn)實(shí)現(xiàn)Exactly Once的結(jié)果保障。

Flink是一個(gè)分布式的流處理引擎,而流處理的其中一個(gè)特點(diǎn)就是7X24。那么,如何保障Flink作業(yè)的持續(xù)運(yùn)行呢?Flink的內(nèi)部會(huì)將應(yīng)用狀態(tài)(state)存儲(chǔ)到本地內(nèi)存或者嵌入式的kv數(shù)據(jù)庫(RocksDB)中,由于采用的是分布式架構(gòu),F(xiàn)link需要對(duì)本地生成的狀態(tài)進(jìn)行持久化存儲(chǔ),以避免因應(yīng)用或者節(jié)點(diǎn)機(jī)器故障等原因?qū)е聰?shù)據(jù)的丟失,F(xiàn)link是通過checkpoint(檢查點(diǎn))的方式將狀態(tài)寫入到遠(yuǎn)程的持久化存儲(chǔ),從而就可以實(shí)現(xiàn)不同語義的結(jié)果保障。通過本文,你可以了解到什么是全局一致性檢查點(diǎn),F(xiàn)link內(nèi)部如何通過檢查點(diǎn)實(shí)現(xiàn)Exactly Once的結(jié)果保障。

什么是Checkpoint(檢查點(diǎn))

為了保證state容錯(cuò),F(xiàn)link提供了處理故障的措施,這種措施稱之為checkpoint(一致性檢查點(diǎn))。checkpoint是Flink實(shí)現(xiàn)容錯(cuò)的核心功能,主要是周期性地觸發(fā)checkpoint,將state生成快照持久化到外部存儲(chǔ)系統(tǒng)(比如HDFS)。這樣一來,如果Flink程序出現(xiàn)故障,那么就可以從上一次checkpoint中進(jìn)行狀態(tài)恢復(fù),從而提供容錯(cuò)保障。另外,通過checkpoint機(jī)制,F(xiàn)link可以實(shí)現(xiàn)Exactly-once語義(Flink內(nèi)部的Exactly-once,關(guān)于端到端的exactly_once,Flink是通過兩階段提交協(xié)議實(shí)現(xiàn)的)。下面將會(huì)詳細(xì)分析Flink的checkpoint機(jī)制。

檢查點(diǎn)的生成

如上圖,輸入流是用戶行為數(shù)據(jù),包括購買(buy)和加入購物車(cart)兩種,每種行為數(shù)據(jù)都有一個(gè)偏移量,統(tǒng)計(jì)每種行為的個(gè)數(shù)。

第一步:JobManager checkpoint coordinator 觸發(fā)checkpoint。

第二步:假設(shè)當(dāng)消費(fèi)到[cart,3]這條數(shù)據(jù)時(shí),觸發(fā)了checkpoint。那么此時(shí)數(shù)據(jù)源會(huì)把消費(fèi)的偏移量3寫入持久化存儲(chǔ)。

第三步:當(dāng)寫入結(jié)束后,source會(huì)將state handle(狀態(tài)存儲(chǔ)路徑)反饋給JobManager的checkpoint coordinator。

第四步:接著算子count buy與count cart也會(huì)進(jìn)行同樣的步驟

第五步:等所有的算子都完成了上述步驟之后,即當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle,就認(rèn)為這一次的 Checkpoint 全局完成了,向持久化存儲(chǔ)中再備份一個(gè) Checkpoint meta 文件,那么整個(gè)checkpoint也就完成了,如果中間有一個(gè)不成功,那么本次checkpoin就宣告失敗。

檢查點(diǎn)的恢復(fù)

通過上面的分析,或許你已經(jīng)對(duì)Flink的checkpoint有了初步的認(rèn)識(shí)。那么接下來,我們看一下是如何從檢查點(diǎn)恢復(fù)的。

  • 任務(wù)失敗

  • 重啟作業(yè)

  • 恢復(fù)檢查點(diǎn)

繼續(xù)處理數(shù)據(jù)

上述過程具體總結(jié)如下:

  • 第一步:重啟作業(yè)
  • 第二步:從上一次檢查點(diǎn)恢復(fù)狀態(tài)數(shù)據(jù)
  • 第三步:繼續(xù)處理新的數(shù)據(jù)

Flink內(nèi)部Exactly-Once實(shí)現(xiàn)

Flink提供了精確一次的處理語義,精確一次的處理語義可以理解為:數(shù)據(jù)可能會(huì)重復(fù)計(jì)算,但是結(jié)果狀態(tài)只有一個(gè)。Flink通過Checkpoint機(jī)制實(shí)現(xiàn)了精確一次的處理語義,F(xiàn)link在觸發(fā)Checkpoint時(shí)會(huì)向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的,并且會(huì)向下游算子進(jìn)行傳遞。checkpoint barriers攜帶一個(gè)checkpoint ID,用于標(biāo)識(shí)屬于哪一個(gè)checkpoint,checkpoint barriers將流邏輯是哪個(gè)分為了兩部分。對(duì)于雙流的情況,通過barrier對(duì)齊的方式實(shí)現(xiàn)精確一次的處理語義。

關(guān)于什么是checkpoint barrier,可以看一下CheckpointBarrier類的源碼描述,如下:

  1. /** 
  2.  * Checkpoint barriers用來在數(shù)據(jù)流中實(shí)現(xiàn)checkpoint對(duì)齊的. 
  3.  * Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中, 
  4.  * Source會(huì)把barrier廣播發(fā)送到下游算子,當(dāng)一個(gè)算子接收到了其中一個(gè)輸入流的Checkpoint barrier時(shí), 
  5.  * 它就會(huì)知道已經(jīng)處理完了本次checkpoint與上次checkpoint之間的數(shù)據(jù). 
  6.  * 
  7.  * 一旦某個(gè)算子接收到了所有輸入流的checkpoint barrier時(shí), 
  8.  * 意味著該算子的已經(jīng)處理完了截止到當(dāng)前checkpoint的數(shù)據(jù), 
  9.  * 可以觸發(fā)checkpoint,并將barrier向下游傳遞 
  10.  * 
  11.  * 根據(jù)用戶選擇的處理語義,在checkpoint完成之前會(huì)緩存后一次checkpoint的數(shù)據(jù), 
  12.  * 直到本次checkpoint完成(exactly once) 
  13.  * 
  14.  * checkpoint barrier的id是嚴(yán)格單調(diào)遞增的 
  15.  * 
  16.  */ 
  17.     public class CheckpointBarrier extends RuntimeEvent {...} 

可以看出checkpoint barrier主要功能是實(shí)現(xiàn)checkpoint對(duì)齊的,從而可以實(shí)現(xiàn)Exactly-Once處理語義。

下面將會(huì)對(duì)checkpoint過程進(jìn)行分解,具體如下:

圖1,包括兩個(gè)流,每個(gè)任務(wù)都會(huì)消費(fèi)一條用戶行為數(shù)據(jù)(包括購買(buy)和加購(cart)),數(shù)字代表該數(shù)據(jù)的偏移量,count buy任務(wù)統(tǒng)計(jì)購買行為的個(gè)數(shù),coun cart統(tǒng)計(jì)加購行為的個(gè)數(shù)。

圖2,觸發(fā)checkpoint,JobManager會(huì)向每個(gè)數(shù)據(jù)源發(fā)送一個(gè)新的checkpoint編號(hào),以此來啟動(dòng)檢查點(diǎn)生成流程。

圖3,當(dāng)Source任務(wù)收到消息后,會(huì)停止發(fā)出數(shù)據(jù),然后利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點(diǎn),并把該checkpoint barrier以及checkpoint id廣播至所有傳出的數(shù)據(jù)流分區(qū)。狀態(tài)后端會(huì)在checkpoint完成之后通知任務(wù),隨后任務(wù)會(huì)向Job Manager發(fā)送確認(rèn)消息。在將checkpoint barrier發(fā)出之后,Source任務(wù)恢復(fù)正常工作。

圖4,Source任務(wù)發(fā)出的checkpoint barrier會(huì)發(fā)送到與之相連的下游算子任務(wù),當(dāng)任務(wù)收到一個(gè)新的checkpoint barrier時(shí),會(huì)繼續(xù)等待其他輸入分區(qū)的checkpoint barrier到來,這個(gè)過程稱之為barrier 對(duì)齊,checkpoint barrier到來之前會(huì)把到來的數(shù)據(jù)線緩存起來。

圖5,任務(wù)收齊了全部輸入分區(qū)的checkpoint barrier之后,會(huì)通知狀態(tài)后端開始生成checkpoint,同時(shí)會(huì)把checkpoint barrier廣播至下游算子。

圖6,任務(wù)在發(fā)出checkpoint barrier之后,開始處理因barrier對(duì)齊產(chǎn)生的緩存數(shù)據(jù),在緩存的數(shù)據(jù)處理完之后,就會(huì)繼續(xù)處理輸入流數(shù)據(jù)。

圖7,最終checkpoint barrier會(huì)被傳送到sink端,sink任務(wù)接收到checkpoint barrier之后,會(huì)向其他算子任務(wù)一樣,將自身的狀態(tài)寫入checkpoint,之后向Job Manager發(fā)送確認(rèn)消息。Job Manager接收到所有任務(wù)返回的確認(rèn)消息之后,就會(huì)將此次檢查點(diǎn)標(biāo)記為完成。

使用案例

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  2.  
  3. // checkpoint的時(shí)間間隔,如果狀態(tài)比較大,可以適當(dāng)調(diào)大該值 
  4. env.enableCheckpointing(1000); 
  5. // 配置處理語義,默認(rèn)是exactly-once 
  6. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
  7. // 兩個(gè)checkpoint之間的最小時(shí)間間隔,防止因checkpoint時(shí)間過長,導(dǎo)致checkpoint積壓 
  8. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
  9. // checkpoint執(zhí)行的上限時(shí)間,如果超過該閾值,則會(huì)中斷checkpoint 
  10. env.getCheckpointConfig().setCheckpointTimeout(60000); 
  11. // 最大并行執(zhí)行的檢查點(diǎn)數(shù)量,默認(rèn)為1,可以指定多個(gè),從而同時(shí)出發(fā)多個(gè)checkpoint,提升效率 
  12. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
  13. // 設(shè)定周期性外部檢查點(diǎn),將狀態(tài)數(shù)據(jù)持久化到外部系統(tǒng)中, 
  14. // 使用該方式不會(huì)在任務(wù)正常停止的過程中清理掉檢查點(diǎn)數(shù)據(jù) 
  15. env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 

總結(jié)

本文首先從Flink的狀態(tài)入手,以圖解加文字的形式詳細(xì)解釋了Flink的checkpoint機(jī)制,并給出了使用Checkpoint時(shí)的程序配置。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)技術(shù)與數(shù)倉
相關(guān)推薦

2021-09-06 18:55:57

MySQLCheckpoint機(jī)制

2024-02-27 08:05:32

Flink分區(qū)機(jī)制數(shù)據(jù)傳輸

2025-04-27 08:15:00

FlinkSavepointCheckpoint

2018-07-12 15:30:03

HTTP緩存機(jī)制

2023-01-01 13:45:37

Condition機(jī)制條件

2016-12-08 10:19:18

Android事件分發(fā)機(jī)制

2021-11-02 06:58:55

FlinkWindow機(jī)制

2023-03-22 18:34:30

Flink調(diào)度部署

2022-06-20 08:03:17

KafkaJava NIO

2023-04-12 08:00:34

Dubbo分布式服務(wù)

2010-09-29 13:52:33

PostgreSQL

2023-03-15 08:30:37

2011-08-24 10:21:39

CHECKPOINT中文man

2022-05-19 08:47:30

Flinkwatermark窗口計(jì)算

2022-09-23 08:02:42

Kafka消息緩存

2023-06-19 18:37:14

HFDSFlink存儲(chǔ)系統(tǒng)

2021-06-30 18:16:38

MySQLWal策略

2013-05-08 12:42:39

HTTP協(xié)議IIS原理ASP.NET

2023-12-26 08:16:56

Kafka緩存架構(gòu)客戶端

2022-07-11 08:02:15

KafkaSelector
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)