硬核八張圖搞懂 Flink 端到端精準(zhǔn)一次處理語義 Exactly-once
本文轉(zhuǎn)載自微信公眾號「五分鐘學(xué)大數(shù)據(jù)」,作者園陌。轉(zhuǎn)載本文請聯(lián)系五分鐘學(xué)大數(shù)據(jù)公眾號。
在 Flink 中需要端到端精準(zhǔn)一次處理的位置有三個(gè):
Flink 端到端精準(zhǔn)一次處理
- Source 端:數(shù)據(jù)從上一階段進(jìn)入到 Flink 時(shí),需要保證消息精準(zhǔn)一次消費(fèi)。
- Flink 內(nèi)部端:這個(gè)我們已經(jīng)了解,利用 Checkpoint 機(jī)制,把狀態(tài)存盤,發(fā)生故障的時(shí)候可以恢復(fù),保證內(nèi)部的狀態(tài)一致性。
- Flink可靠性的基石-checkpoint機(jī)制詳細(xì)解析
Sink 端:將處理完的數(shù)據(jù)發(fā)送到下一階段時(shí),需要保證數(shù)據(jù)能夠準(zhǔn)確無誤發(fā)送到下一階段。
在 Flink 1.4 版本之前,精準(zhǔn)一次處理只限于 Flink 應(yīng)用內(nèi),也就是所有的 Operator 完全由 Flink 狀態(tài)保存并管理的才能實(shí)現(xiàn)精確一次處理。但 Flink 處理完數(shù)據(jù)后大多需要將結(jié)果發(fā)送到外部系統(tǒng),比如 Sink 到 Kafka 中,這個(gè)過程中 Flink 并不保證精準(zhǔn)一次處理。
在 Flink 1.4 版本正式引入了一個(gè)里程碑式的功能:兩階段提交 Sink,即 TwoPhaseCommitSinkFunction 函數(shù)。該 SinkFunction 提取并封裝了兩階段提交協(xié)議中的公共邏輯,自此 Flink 搭配特定 Source 和 Sink(如 Kafka 0.11 版)實(shí)現(xiàn)精確一次處理語義(英文簡稱:EOS,即 Exactly-Once Semantics)。
Flink端到端精準(zhǔn)一次處理語義(EOS)
注:以下內(nèi)容適用于 Flink 1.4 及之后版本
對于 Source 端:Source 端的精準(zhǔn)一次處理比較簡單,畢竟數(shù)據(jù)是落到 Flink 中,所以 Flink 只需要保存消費(fèi)數(shù)據(jù)的偏移量即可, 如消費(fèi) Kafka 中的數(shù)據(jù),F(xiàn)link 將 Kafka Consumer 作為 Source,可以將偏移量保存下來,如果后續(xù)任務(wù)出現(xiàn)了故障,恢復(fù)的時(shí)候可以由連接器重置偏移量,重新消費(fèi)數(shù)據(jù),保證一致性。
對于 Sink 端:Sink 端是最復(fù)雜的,因?yàn)閿?shù)據(jù)是落地到其他系統(tǒng)上的,數(shù)據(jù)一旦離開 Flink 之后,F(xiàn)link 就監(jiān)控不到這些數(shù)據(jù)了,所以精準(zhǔn)一次處理語義必須也要應(yīng)用于 Flink 寫入數(shù)據(jù)的外部系統(tǒng),故這些外部系統(tǒng)必須提供一種手段允許提交或回滾這些寫入操作,同時(shí)還要保證與 Flink Checkpoint 能夠協(xié)調(diào)使用(Kafka 0.11 版本已經(jīng)實(shí)現(xiàn)精確一次處理語義)。
我們以 Flink 與 Kafka 組合為例,F(xiàn)link 從 Kafka 中讀數(shù)據(jù),處理完的數(shù)據(jù)在寫入 Kafka 中。
為什么以Kafka為例,第一個(gè)原因是目前大多數(shù)的 Flink 系統(tǒng)讀寫數(shù)據(jù)都是與 Kafka 系統(tǒng)進(jìn)行的。第二個(gè)原因,也是最重要的原因 Kafka 0.11 版本正式發(fā)布了對于事務(wù)的支持,這是與Kafka交互的Flink應(yīng)用要實(shí)現(xiàn)端到端精準(zhǔn)一次語義的必要條件。
當(dāng)然,F(xiàn)link 支持這種精準(zhǔn)一次處理語義并不只是限于與 Kafka 的結(jié)合,可以使用任何 Source/Sink,只要它們提供了必要的協(xié)調(diào)機(jī)制。
Flink 與 Kafka 組合
Flink 應(yīng)用示例
如上圖所示,F(xiàn)link 中包含以下組件:
一個(gè) Source,從 Kafka 中讀取數(shù)據(jù)(即 KafkaConsumer)
一個(gè)時(shí)間窗口化的聚會操作(Window)
一個(gè) Sink,將結(jié)果寫入到 Kafka(即 KafkaProducer)
若要 Sink 支持精準(zhǔn)一次處理語義(EOS),它必須以事務(wù)的方式寫數(shù)據(jù)到 Kafka,這樣當(dāng)提交事務(wù)時(shí)兩次 Checkpoint 間的所有寫入操作當(dāng)作為一個(gè)事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時(shí)這些寫入操作能夠被回滾。
當(dāng)然了,在一個(gè)分布式且含有多個(gè)并發(fā)執(zhí)行 Sink 的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因?yàn)樗薪M件都必須對這些提交或回滾達(dá)成共識,這樣才能保證得到一個(gè)一致性的結(jié)果。Flink 使用兩階段提交協(xié)議以及預(yù)提交(Pre-commit)階段來解決這個(gè)問題。
兩階段提交協(xié)議(2PC)
兩階段提交協(xié)議(Two-Phase Commit,2PC)是很常用的解決分布式事務(wù)問題的方式,它可以保證在分布式事務(wù)中,要么所有參與進(jìn)程都提交事務(wù),要么都取消,即實(shí)現(xiàn) ACID 中的 A (原子性)。
在數(shù)據(jù)一致性的環(huán)境下,其代表的含義是:要么所有備份數(shù)據(jù)同時(shí)更改某個(gè)數(shù)值,要么都不改,以此來達(dá)到數(shù)據(jù)的強(qiáng)一致性。
兩階段提交協(xié)議中有兩個(gè)重要角色,協(xié)調(diào)者(Coordinator)和參與者(Participant),其中協(xié)調(diào)者只有一個(gè),起到分布式事務(wù)的協(xié)調(diào)管理作用,參與者有多個(gè)。
顧名思義,兩階段提交將提交過程劃分為連續(xù)的兩個(gè)階段:表決階段(Voting)和提交階段(Commit)。
兩階段提交協(xié)議過程如下圖所示:
兩階段提交協(xié)議
第一階段:表決階段
協(xié)調(diào)者向所有參與者發(fā)送一個(gè) VOTE_REQUEST 消息。
當(dāng)參與者接收到 VOTE_REQUEST 消息,向協(xié)調(diào)者發(fā)送 VOTE_COMMIT 消息作為回應(yīng),告訴協(xié)調(diào)者自己已經(jīng)做好準(zhǔn)備提交準(zhǔn)備,如果參與者沒有準(zhǔn)備好或遇到其他故障,就返回一個(gè) VOTE_ABORT 消息,告訴協(xié)調(diào)者目前無法提交事務(wù)。
第二階段:提交階段
協(xié)調(diào)者收集來自各個(gè)參與者的表決消息。如果所有參與者一致認(rèn)為可以提交事務(wù),那么協(xié)調(diào)者決定事務(wù)的最終提交,在此情形下協(xié)調(diào)者向所有參與者發(fā)送一個(gè) GLOBAL_COMMIT 消息,通知參與者進(jìn)行本地提交;如果所有參與者中有任意一個(gè)返回消息是 VOTE_ABORT,協(xié)調(diào)者就會取消事務(wù),向所有參與者廣播一條 GLOBAL_ABORT 消息通知所有的參與者取消事務(wù)。
每個(gè)提交了表決信息的參與者等候協(xié)調(diào)者返回消息,如果參與者接收到一個(gè) GLOBAL_COMMIT 消息,那么參與者提交本地事務(wù),否則如果接收到 GLOBAL_ABORT 消息,則參與者取消本地事務(wù)。
兩階段提交協(xié)議在 Flink 中的應(yīng)用
Flink 的兩階段提交思路:
我們從 Flink 程序啟動到消費(fèi) Kafka 數(shù)據(jù),最后到 Flink 將數(shù)據(jù) Sink 到 Kafka 為止,來分析 Flink 的精準(zhǔn)一次處理。
當(dāng) Checkpoint 啟動時(shí),JobManager 會將檢查點(diǎn)分界線(checkpoint battier)注入數(shù)據(jù)流,checkpoint barrier 會在算子間傳遞下去,如下如所示:
Flink 精準(zhǔn)一次處理:Checkpoint 啟動
Source 端:Flink Kafka Source 負(fù)責(zé)保存 Kafka 消費(fèi) offset,當(dāng) Chckpoint 成功時(shí) Flink 負(fù)責(zé)提交這些寫入,否則就終止取消掉它們,當(dāng) Chckpoint 完成位移保存,它會將 checkpoint barrier(檢查點(diǎn)分界線) 傳給下一個(gè) Operator,然后每個(gè)算子會對當(dāng)前的狀態(tài)做個(gè)快照,保存到狀態(tài)后端(State Backend)。
對于 Source 任務(wù)而言,就會把當(dāng)前的 offset 作為狀態(tài)保存起來。下次從 Checkpoint 恢復(fù)時(shí),Source 任務(wù)可以重新提交偏移量,從上次保存的位置開始重新消費(fèi)數(shù)據(jù),如下圖所示:
Flink 精準(zhǔn)一次處理:checkpoint barrier 及 offset 保存
Slink 端:從 Source 端開始,每個(gè)內(nèi)部的 transform 任務(wù)遇到 checkpoint barrier(檢查點(diǎn)分界線)時(shí),都會把狀態(tài)存到 Checkpoint 里。數(shù)據(jù)處理完畢到 Sink 端時(shí),Sink 任務(wù)首先把數(shù)據(jù)寫入外部 Kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費(fèi)),此時(shí)的 Pre-commit 預(yù)提交階段下 Data Sink 在保存狀態(tài)到狀態(tài)后端的同時(shí)還必須預(yù)提交它的外部事務(wù),如下圖所示:
Flink 精準(zhǔn)一次處理:預(yù)提交到外部系統(tǒng)
當(dāng)所有算子任務(wù)的快照完成(所有創(chuàng)建的快照都被視為是 Checkpoint 的一部分),也就是這次的 Checkpoint 完成時(shí),JobManager 會向所有任務(wù)發(fā)通知,確認(rèn)這次 Checkpoint 完成,此時(shí) Pre-commit 預(yù)提交階段才算完成。才正式到兩階段提交協(xié)議的第二個(gè)階段:commit 階段。該階段中 JobManager 會為應(yīng)用中每個(gè) Operator 發(fā)起 Checkpoint 已完成的回調(diào)邏輯。
本例中的 Data Source 和窗口操作無外部狀態(tài),因此在該階段,這兩個(gè) Opeartor 無需執(zhí)行任何邏輯,但是 Data Sink 是有外部狀態(tài)的,此時(shí)我們必須提交外部事務(wù),當(dāng) Sink 任務(wù)收到確認(rèn)通知,就會正式提交之前的事務(wù),Kafka 中未確認(rèn)的數(shù)據(jù)就改為“已確認(rèn)”,數(shù)據(jù)就真正可以被消費(fèi)了,如下圖所示:
Flink 精準(zhǔn)一次處理:數(shù)據(jù)精準(zhǔn)被消費(fèi)
注:Flink 由 JobManager 協(xié)調(diào)各個(gè) TaskManager 進(jìn)行 Checkpoint 存儲,Checkpoint 保存在 StateBackend(狀態(tài)后端) 中,默認(rèn) StateBackend 是內(nèi)存級的,也可以改為文件級的進(jìn)行持久化保存。
最后,一張圖總結(jié)下 Flink 的 EOS:
Flink 端到端精準(zhǔn)一次處理