自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

流計(jì)算引擎數(shù)據(jù)一致性的本質(zhì)

開發(fā) 開發(fā)工具
流計(jì)算的應(yīng)用與實(shí)踐在大數(shù)據(jù)領(lǐng)域越來越常見,其重要性不言而喻,常見的流計(jì)算引擎有 Google DataFlow、Apache Flink,Apache Kafka Streams,Apache Spark Streaming 等。

流計(jì)算的應(yīng)用與實(shí)踐在大數(shù)據(jù)領(lǐng)域越來越常見,其重要性不言而喻,常見的流計(jì)算引擎有 Google DataFlow、Apache Flink,Apache Kafka Streams,Apache Spark Streaming 等。流計(jì)算系統(tǒng)中的數(shù)據(jù)一致性一般是用消息處理語義來定義的,如某引擎聲稱可以提供「恰好一次(Exactly-once Processing Semantics)流處理語義,表示(或暗示)引擎具備保證數(shù)據(jù)一致性的能力。事實(shí)上,「恰好一次(Exactly-Once)」并不等價(jià)于流計(jì)算的輸出數(shù)據(jù)就符合一致性的要求,該術(shù)語存在很多理解和使用上的誤區(qū)。

本篇文章從流計(jì)算的本質(zhì)出發(fā),重點(diǎn)分析流計(jì)算領(lǐng)域中數(shù)據(jù)處理的一致性問題,同時(shí)對(duì)一致性問題進(jìn)行簡(jiǎn)單的形式化定義,提供一個(gè)一窺當(dāng)下流計(jì)算引擎發(fā)展脈絡(luò)的視角,讓大家對(duì)流計(jì)算引擎的認(rèn)識(shí)更為深入,為可能的流計(jì)算技術(shù)選型提供一些參考。文章主要分為三個(gè)部分:第一部分,會(huì)介紹流計(jì)算系統(tǒng)和一致性難題的本質(zhì);第二部分,會(huì)介紹一致性難題的通用解法以及各種方案間的取舍;第三部分,會(huì)介紹主流的流計(jì)算引擎是如何對(duì)通用解法進(jìn)行泛化以實(shí)現(xiàn)一致性。

一、流計(jì)算中的一致性

在認(rèn)識(shí)流計(jì)算系統(tǒng)一致性之前,我們需要精確定義流計(jì)算。流(Streaming)計(jì)算是一種在無邊界數(shù)據(jù)(unbounded data)上進(jìn)行低延遲計(jì)算的數(shù)據(jù)處理過程。相應(yīng)的,批計(jì)算更準(zhǔn)確的說法是有界數(shù)據(jù)(bounded data)的處理,亦即有明確邊界的數(shù)據(jù)處理,流和批只是兩種不同數(shù)據(jù)集的傳統(tǒng)數(shù)據(jù)計(jì)算方法,它們并不是涇渭分明的,譬如也可以通過批量的方式(e.g. Spark Streaming 中的 micro-batch)來實(shí)現(xiàn)無界數(shù)據(jù)上的流處理過程。

?

??

??


1.一致性定義及挑戰(zhàn)

如果我們將流計(jì)算的過程(獲取輸入數(shù)據(jù)、處理數(shù)據(jù)、輸出計(jì)算結(jié)果)視為數(shù)據(jù)庫(kù)的主從同步過程,抑或視為一種從流數(shù)據(jù)生成衍生數(shù)據(jù)集(表)的過程,則流計(jì)算中的數(shù)據(jù)一致性同關(guān)系型數(shù)據(jù)庫(kù)事務(wù) ACID 理論中的 Consistency 有異曲同工之妙,后者指的是在事務(wù)開始或結(jié)束時(shí),數(shù)據(jù)庫(kù)中的記錄應(yīng)該在一致狀態(tài),相應(yīng)地,流計(jì)算中的一致性可以定義為:流計(jì)算系統(tǒng)在計(jì)算過程中,或是出現(xiàn)故障恢復(fù)計(jì)算后,流系統(tǒng)的內(nèi)部狀態(tài)和外部輸出的數(shù)據(jù)應(yīng)該處在一致的狀態(tài)。譬如,當(dāng)故障恢復(fù)后開始重新計(jì)算,計(jì)算的結(jié)果是否滿足數(shù)據(jù)的一致性(即用戶無法區(qū)分恢復(fù)前和恢復(fù)后的數(shù)據(jù))?記錄是否會(huì)重復(fù)/丟失,第三方系統(tǒng)對(duì)同一條計(jì)算結(jié)果的多次獲取,是否會(huì)存在值上的不一致?對(duì)一致性有了清晰的認(rèn)知和定義后,我們來看看為什么實(shí)現(xiàn)一致性這么難。

?

??

??


在定義一中我們可以看到,流計(jì)算輸入的數(shù)據(jù)是無邊界的,所以系統(tǒng)中會(huì)存在消息抵達(dá)流計(jì)算系統(tǒng)延遲、順序錯(cuò)亂、數(shù)量/規(guī)模未知等不確定因素,這也是流計(jì)算系統(tǒng)一致性復(fù)雜性遠(yuǎn)遠(yuǎn)大于批處理系統(tǒng)的原因:批處理系統(tǒng)中的輸入是確定的,計(jì)算過程中可以通過計(jì)算的原子性來保證數(shù)據(jù)的一致性(如 Spark 中的 RDD 血緣)。此外,同其他分布式應(yīng)用一樣,流計(jì)算系統(tǒng)經(jīng)常也會(huì)受到各類意外因素的影響而發(fā)生故障,比如流量激增、網(wǎng)絡(luò)抖動(dòng)、云服務(wù)資源分配出現(xiàn)問題等,發(fā)生故障后重新執(zhí)行計(jì)算,在存在不確定輸入的前提下設(shè)計(jì)健壯的容錯(cuò)機(jī)制難度很大。

除了數(shù)據(jù)輸入帶來的挑戰(zhàn),流計(jì)算輸出的數(shù)據(jù)會(huì)被實(shí)時(shí)消費(fèi),類似這樣不同于批處理的應(yīng)用場(chǎng)景,也給數(shù)據(jù)的一致性帶來的諸多挑戰(zhàn),如出現(xiàn) FO 后,是撤回之前發(fā)出的數(shù)據(jù),還是是同下游進(jìn)行協(xié)商實(shí)現(xiàn)一致性,都是需要考慮的。

2.一致性相關(guān)概念祛魅

正確認(rèn)識(shí)流計(jì)算系統(tǒng)一致性的內(nèi)在含義和其能力范疇,對(duì)我們構(gòu)建正確且健壯的流計(jì)算任務(wù)至關(guān)重要。下面我會(huì)介紹幾組概念,以便于大家更好地理解流計(jì)算系統(tǒng)的一致性。

恰好一次≠恰好一致

今天大多數(shù)流計(jì)算引擎用「Exactly-Once」去暗示用戶:既然輸入的數(shù)據(jù)不是靜態(tài)集合而是會(huì)連續(xù)變化的,那對(duì)每一條消息「恰好處理」了一次,輸出的數(shù)據(jù)肯定是一致的。上述邏輯的推導(dǎo)過程是沒問題的,但并不嚴(yán)謹(jǐn),因?yàn)? Exactly-Once 作為一個(gè)形容詞,后面所連接的動(dòng)詞或者賓語被故意抹去了,不同的表達(dá)含義也會(huì)大相徑庭。

例子1,后接不同的動(dòng)(名)詞:Exactly-once Delivery 和 Exactly-once Process 。前者是對(duì)消息傳輸層面的語義表達(dá),和流計(jì)算的一致性關(guān)系不是很大,后者是從流計(jì)算的應(yīng)用層面去描述數(shù)據(jù)處理過程。

例子2,后接不同的名詞:Exactly-once State Consistency 和 Exactly-once Process Consistency。前者是 Flink 在官網(wǎng)中對(duì)其一致性的敘述,后者是 Kafka Streaming 的一致性保證,前者的語義約束弱于后者。Exactly-once State Consistency 只是表達(dá)了:流計(jì)算要求對(duì)狀態(tài)的更新只提交一次到持久后端存儲(chǔ),但這里的狀態(tài)一般不包括「輸出到下游結(jié)果」,而僅指引擎內(nèi)部的狀態(tài),譬如各個(gè)算子的狀態(tài)、實(shí)時(shí)流的消費(fèi)偏移等,流計(jì)算引擎內(nèi)部狀態(tài)變更的保證,并不能等價(jià)于從輸入到輸出的一致性,端到端一致性需要你自己關(guān)心。

總之,如何我們后面再看到 Exactly-once XXX,一定要警惕引擎想要透露出什么信息。

端到端的數(shù)據(jù)一致性

端到端一致性(End-To-Ene Consistency),即將數(shù)據(jù)的輸出也作為流計(jì)算引擎的一致性設(shè)計(jì)的一部分,正確的結(jié)果貫穿著這整個(gè)流計(jì)算應(yīng)用的始終:從輸入、處理過程、輸出,每一個(gè)環(huán)節(jié)都需要保證其自身的數(shù)據(jù)一致性,同時(shí)在整個(gè)流計(jì)算流程中,作為整體實(shí)現(xiàn)了端到端的一致性。

下面敘述中,如果不是特意說明,一致性指的是引擎自身狀態(tài)的一致性,端到端一致指的是包含了輸出的一致性。

二、流計(jì)算系統(tǒng)的本質(zhì)

前面我們定義了流計(jì)算一致性的概念,這一部分將會(huì)從概念出發(fā)將問題進(jìn)行形式化拆解,以便得到通用化的解法。

1.再次認(rèn)識(shí)流計(jì)算

上面提到,流計(jì)算的輸入數(shù)據(jù)是沒有邊界的,這符合我們傳統(tǒng)上對(duì)流計(jì)算認(rèn)知。在《System Streaming》一書中,作者提出了一個(gè)將流批統(tǒng)一考慮的流計(jì)算理論抽象,即,任意的數(shù)據(jù)的處理都是「流(Stream)」 和「表(Table)」間的互相轉(zhuǎn)換,其中流用來表征運(yùn)動(dòng)中的數(shù)據(jù),表用來表征靜止的數(shù)據(jù):

  • 流 -> 流:沒有聚合操作的數(shù)據(jù)處理過程;
  • 流 -> 表:存在聚合操作的數(shù)據(jù)處理過程;
  • 表 -> 流:觸發(fā)輸出表數(shù)據(jù)變化的情況;
  • 表 -> 表:不存在這樣的數(shù)據(jù)處理邏輯。

在這個(gè)統(tǒng)一的理論框架下,批處理過程的一致性也可以納入本文討論的范疇中來。但無論是純粹的流計(jì)算,還是上面統(tǒng)一的數(shù)據(jù)處理模型,我們都可以將流(批)數(shù)據(jù)處理的過程抽象為「讀取數(shù)據(jù)-處理數(shù)據(jù)-輸出數(shù)據(jù)」這樣的三個(gè)部分,可用下面的有向無環(huán)圖來表達(dá),其中點(diǎn)代表數(shù)據(jù)加工邏輯,邊表示數(shù)據(jù)流向,數(shù)據(jù)處理過程中的中間狀態(tài)(State)一般需要做持久化存儲(chǔ)。

?

??

??


2.確定性/非確定性計(jì)算

流計(jì)算中的確定性指的是,給定相同的一組數(shù)據(jù),重復(fù)運(yùn)行多次或者打亂數(shù)據(jù)進(jìn)入引擎的順序,計(jì)算完成后將會(huì)輸出相同的結(jié)果,否則就是非確定性計(jì)算。常見的非確定性計(jì)算包括使用了隨機(jī)數(shù)、使用系統(tǒng)時(shí)間、字符串拼接等。如果流計(jì)算中存在非確定性的計(jì)算,則會(huì)給端到端一致性的實(shí)現(xiàn)造成很多困難,部分引擎并不能很好地支持此類場(chǎng)景。

3.一致性問題的形式化定義

在存在不確定性計(jì)算的流計(jì)算中,不確定性計(jì)算的(中間)結(jié)果可視為流計(jì)算引擎狀態(tài)的一部分。從整體上看,任何一個(gè)時(shí)間點(diǎn)的引擎狀態(tài)等于之前所有事件計(jì)算結(jié)果(中間結(jié)果和輸出結(jié)果)的累計(jì)。如果定義流計(jì)算的輸入集合為:E,t 時(shí)刻以來的輸入集合為 E(t),輸出集合為 Sink(t),引擎此時(shí)狀態(tài)為 State(t),State(t) 包括各個(gè)算子的狀態(tài)(包括上面提到的不確定性計(jì)算)、數(shù)據(jù)源的消費(fèi)偏移量(或文件讀取偏移等)等:

State(t) = OperatorState(t) + SourceState(t)

則定義流計(jì)算引擎的計(jì)算過程為,存在計(jì)算計(jì)算邏輯 F 使得:

F(E(t), Sink(t), State(t)) = Sink(t+1) + State(t)

令 O(t) = Sink(t) + State(t),即將計(jì)算對(duì)引擎狀態(tài)的更新視為一種特殊的輸出,則流計(jì)算過程可簡(jiǎn)化為:

F(E(t), O(t)) = O(t+1)

結(jié)合流計(jì)算上面流計(jì)算一致性的定義,我們希望在引擎發(fā)生故障 FailOver 時(shí),存在一種恢復(fù)函數(shù) R 使得

R(E(t), O(t)) = O'(t+1),且 O'(t+1) = O(t+1)

我們?cè)谶@里將引擎狀態(tài)作為一種特殊輸出的考慮有兩點(diǎn)。其一,引擎的狀態(tài)一般也是輸出到外部存儲(chǔ)如 RocksDB/HDFS,這和計(jì)算下游的輸出別無二致。其二,通過屏蔽引擎內(nèi)部的容錯(cuò)機(jī)制實(shí)現(xiàn),簡(jiǎn)化端到端一致性問題的抽象過程,便于更好地理解問題本身。

?

??

??


三、一致性的通用解法

1.通用解法的推導(dǎo)

我們?cè)谏厦娑x了端到端一致性難題:R(E(t), O(t)) = O(t+1)。從輸出結(jié)果的使用方(引擎內(nèi)部和引擎下游數(shù)據(jù)消費(fèi)方)的視角來看:對(duì)于記錄 O(t+1),當(dāng)在故障發(fā)生的時(shí)間小于 t (數(shù)據(jù)沒有輸出)或者 大于 t + 1(數(shù)據(jù)已經(jīng)輸出了),數(shù)據(jù)肯定是一致的。

當(dāng)在 t ~ t + 1 時(shí)刻發(fā)生故障,恢復(fù)函數(shù) R 可以屏蔽此次故障產(chǎn)生的副作用,讓使用方認(rèn)為沒有故障發(fā)生,可以得到正確的 O(t+1),顯然,解決的思路是:將 E(t) 和 O(t) 作為輸入,重新執(zhí)行計(jì)算 F,則可以得到正確的 O(t+1),具體地,E(t) 可以通過回?fù)軘?shù)據(jù)偏移量得到,O(t) 需要從持久化存儲(chǔ)中獲取。O(t) 是否可以通過遞歸重算得到呢,即 O(t) = F(E(t-1), O(t-1)) ,答案是不可以,因?yàn)橛?jì)算過程中可能存在不確定的計(jì)算邏輯,如果重算,則有一定概率 O(t) ≠ F(E(t-1), O(t-1)) 。

因此,我們得到流計(jì)算引擎要實(shí)現(xiàn)端到端一致性數(shù)據(jù)處理語義的充分必要條件:在流計(jì)算過程中,需要實(shí)時(shí)存儲(chǔ)每一條中間和最終計(jì)算結(jié)果,如果考慮吞吐率不能存儲(chǔ)每一條,則需定期以事務(wù)的方式進(jìn)行批量存儲(chǔ)。對(duì)于每一個(gè) O(t) 存儲(chǔ)后, 恢復(fù)函數(shù) R 的實(shí)現(xiàn)就簡(jiǎn)單多了:任務(wù)恢復(fù)時(shí),將 O(t) 重新加載,使用 F 執(zhí)行重算操作。

2.通用解法的工程實(shí)現(xiàn)

我們將端到端一致性問題的解法結(jié)合工程實(shí)踐,分析一下通用解法下的若干實(shí)現(xiàn)場(chǎng)景。

在通用解法中,我們需要存儲(chǔ)每一次計(jì)算的中間結(jié)果,這對(duì)引擎的架構(gòu)設(shè)計(jì)、配套基建能力有著很高的要求,如需要高可用、高吞吐的存儲(chǔ)后端用于狀態(tài)存儲(chǔ)。因此,我們將條件退化為可以通過事務(wù)的方式進(jìn)行批量存儲(chǔ),這是因?yàn)槭聞?wù)的 ACID 特性能保證結(jié)果能以原子提交的方式作用于下游算子或者是外部的消息系統(tǒng)/數(shù)據(jù)庫(kù),在保證了結(jié)果(狀態(tài))一致性的前提下,能達(dá)到較高的吞吐率。

進(jìn)一步分析,每一次存儲(chǔ)或者批量事務(wù)存儲(chǔ) O(t) 時(shí),引擎到底做了什么?前面我們定義了 O(t) = Sink(t) + State(t) -> O(t) = Sink(t) + OperatorState(t) + SourceState(t) ,對(duì)于引擎來說,當(dāng)出現(xiàn) FailOver 時(shí),都會(huì)通過 SourceState(t) 回?fù)軘?shù)據(jù)源偏移量進(jìn)行部分重算,即消息讀取語義是 At-Least-Once 的,當(dāng)重復(fù)計(jì)算時(shí),前面存儲(chǔ)的結(jié)果(每一次計(jì)算)或者空的結(jié)果(批量事務(wù))可以實(shí)現(xiàn)冪等變更的效果:如果結(jié)果已經(jīng)存在了, 則使用已有的結(jié)果,消除不確定性計(jì)算帶來的副作用,如果之前的結(jié)果不存在,就更不會(huì)對(duì)外部系統(tǒng)有影響了。

如果我們的計(jì)算過程都是確定性的,那么上述的充分必要條件會(huì)有什么變化呢?在確定性計(jì)算的前提下,如果引擎輸出結(jié)果的接受端是可以實(shí)現(xiàn)為冪等,則很多約束條件會(huì)有所簡(jiǎn)化。由于 O(t) = Sink(t) + State(t) ,引擎內(nèi)部很好實(shí)現(xiàn)冪等狀態(tài)更新,若引擎下游系統(tǒng)也實(shí)現(xiàn)了數(shù)據(jù)冪等,當(dāng)在 t ~ t + n 間內(nèi)出現(xiàn) FailOver 時(shí),引擎可以通過重新計(jì)算 t ~ t + n 之間的所有值,直接輸出給下游使用。

因此,在僅有確定性計(jì)算的流計(jì)算系統(tǒng)中,實(shí)現(xiàn)端到端的充分必要條件可退化為:在流計(jì)算過程中,需要外部的最終結(jié)果接受端實(shí)現(xiàn)冪等,實(shí)時(shí)存儲(chǔ)每一條中間和最終計(jì)算結(jié)果,如果考慮吞吐率不能存儲(chǔ)每一條,則需定期批量存儲(chǔ),上述條件中去掉了對(duì)「事務(wù)」的要求的原因:如果在提交這一批數(shù)據(jù)的提交過程中又發(fā)生了異常,譬如只有部分節(jié)點(diǎn)的結(jié)果輸出了,其他節(jié)點(diǎn)發(fā)生了故障結(jié)果丟失,則可以通過回到上個(gè)批次提交的狀態(tài),重算此批次數(shù)據(jù),重算過程中,由于僅存在確定性計(jì)算,所以無論是引擎內(nèi)還是引擎外,是可以通過冪等來保證數(shù)據(jù)的的一致性的。

在實(shí)際的流計(jì)算引擎實(shí)現(xiàn)中,對(duì)于結(jié)果內(nèi)容的定義大都是一致的,主要包括輸入源的消費(fèi)偏移 SourceState(t),e.g. Kafka Offset,算子狀態(tài) OperatorState(t),e.g. Spark RDD 血緣,輸出的結(jié)果 Sink(t),e.g. Kafka 事務(wù)消息,但是在結(jié)果的存儲(chǔ)方式上各有所不同,下面我們來看一看目前業(yè)界主流的幾個(gè)流計(jì)算引擎的設(shè)計(jì)考量。

四、一致性的引擎實(shí)現(xiàn)

目前流計(jì)算引擎的種類非常多,不是所有的引擎都可以實(shí)現(xiàn)端到端一致的流處理,在具備此能力的引擎中,從技術(shù)成本、引擎架構(gòu)、能力范圍考慮,會(huì)有不同的取舍和實(shí)現(xiàn),如 Flink 中使用了輕量級(jí)的「分布式一致性快照」用于狀態(tài)管理,Kafka Streams 為何沒有使用呢?實(shí)現(xiàn)了冪等輸出就一定能實(shí)現(xiàn)端到端一致么?本章節(jié)會(huì)一一解答上述問題。

1.Google MillWheel

Google在2013年發(fā)了一篇名為《MillWheel: Fault-Tolerant Stream Processing at. Internet Scale》的文章,論述了在 Google 內(nèi)部實(shí)現(xiàn)低延遲數(shù)據(jù)處理的編程模型和工程實(shí)現(xiàn),后面 Google 在此基礎(chǔ)上抽象出了 DataFlow 流處理模型(具體參考論文《The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale,Unbounded, Out-of-Order Data Processing》),后者對(duì)流計(jì)算流域的影響堪比20世紀(jì)初 GFS,BigTable 以及MapReduce 三篇論文對(duì)大數(shù)據(jù)的影響,后面 Google 又在 MillWheel 之上繼續(xù)發(fā)展,開源了 Apache Bean 這個(gè)系統(tǒng)級(jí)的流批一體數(shù)據(jù)解決方案,因?yàn)?MillWheel 是更純粹的「流計(jì)算」,所以我們重點(diǎn)來分析 MillWheel。

MillWheel 使用了一種名為「Strong production」的機(jī)制將每個(gè)算子的輸出在發(fā)送至下游之前都進(jìn)行了持久化存儲(chǔ),一旦發(fā)生了故障,當(dāng)需要恢復(fù)時(shí),引擎可以直接將存儲(chǔ)后的結(jié)果發(fā)出去?;仡^再看端到端一致性數(shù)據(jù)處理語義的充分必要條件,顯然 MillWheel 是符合「實(shí)時(shí)存儲(chǔ)每一條中間和最終計(jì)算結(jié)果」這個(gè)條件的。對(duì)于存在不確定性計(jì)算的流計(jì)算場(chǎng)景,當(dāng) FailOver 時(shí),引擎會(huì)從源頭重新發(fā)送消息進(jìn)行重算,多次計(jì)算可能會(huì)產(chǎn)生的不一致的結(jié)果,但由于「Strong Production」會(huì)對(duì)計(jì)算進(jìn)行去重,因此即便進(jìn)行了多次重算,但有且僅有一次重算的結(jié)果被輸出給下游(下游算子或結(jié)果接受端),從整體上來看數(shù)據(jù)是滿足一致性的,這也被稱之為「Effective Determinism」。

?

??

??


MillWheel 會(huì)對(duì)每一條記錄賦予一個(gè)唯一 ID,同時(shí)基于此 ID 維護(hù)一份是否處理過當(dāng)前記錄的目錄。對(duì)于每一條流入當(dāng)前算子的記錄,引擎查找此 ID 目錄以確定此記錄是否是已經(jīng)處理過。這里會(huì)有很多技術(shù)上的挑戰(zhàn),這里稍微舉幾個(gè)例子。

譬如,需要有穩(wěn)定且高吞吐的存儲(chǔ)后端用于結(jié)果存儲(chǔ),Google 內(nèi)部的 BigTable 發(fā)揮了其作用。流任務(wù)執(zhí)行前后,引擎會(huì)對(duì)執(zhí)行流做若干優(yōu)化,如合并多個(gè)邏輯算子至單個(gè)算子(類似 Flink 中的 chain 化)、節(jié)點(diǎn)內(nèi)先執(zhí)行部分合并(count / sum)后再 shuffle等等,種種手段均是為了降低算子間 IO 的數(shù)據(jù)規(guī)模。

此外,在判斷「當(dāng)前記錄」是否已被處理時(shí),MillWheel 使用了布隆過濾器用于前置過濾,因?yàn)樵谝粋€(gè)正常運(yùn)行的流計(jì)算任務(wù)中,記錄絕大多數(shù)的時(shí)間都是不重復(fù)的,這剛好契合布隆過濾器的使用場(chǎng)景(如過濾器返回不存在則記錄一定不存在),引擎中的每個(gè)節(jié)點(diǎn)都維護(hù)了以記錄 ID 為主鍵的布隆過濾器,計(jì)算前都會(huì)通過此過濾器進(jìn)行判斷,若提示不存在則進(jìn)行數(shù)據(jù)處理,如果存在,則需要二次校驗(yàn)。當(dāng)然,MillWheel 在實(shí)際使用布隆過濾器,是做了若干改造的,這里就不具體展開了。

2.Apache Flink

MillWheel 作為一個(gè)內(nèi)部系統(tǒng)可以存儲(chǔ)每一個(gè)中間結(jié)果,但是對(duì)于開源系統(tǒng)的 Apache Flink 來說,畢竟不是每一個(gè)公司都有這么完備的技術(shù)基建。Flink 會(huì)定期把結(jié)果以事務(wù)的方式進(jìn)行批量存儲(chǔ),這里的「結(jié)果」如上面分析,由源狀態(tài) SourceState(t)、算子狀態(tài) OperatorState(t) 、輸出的結(jié)果 Sink(t) 組成,其中 Flink 把源狀態(tài)和算子狀態(tài)進(jìn)行了打包,統(tǒng)稱為「分布式一致性快照」(基于 Chandy-Lamport 分布式快照算法來實(shí)現(xiàn)),數(shù)據(jù)會(huì)持久化在 RocksDB 中。

?

??

??


如上圖所示,F(xiàn)link 引擎會(huì)定時(shí)(每個(gè)周期稱之為一個(gè) epoch)以 2PC 的方式提交結(jié)果。事實(shí)上,即便不考慮結(jié)果輸出,F(xiàn)link 「分布式一致性快照」的快照的實(shí)現(xiàn)也是一個(gè) 2PC 的過程:算子的狀態(tài)快照存儲(chǔ)類似于 2PC 的 Prepare 階段,但 Commit 的確認(rèn)僅需 Coordinator( Flink JobManager) 根據(jù)「是否收到了完整算子的 ACK 」來推出是否 Commit 或 Abort。將結(jié)果輸出納入快照生成的 2PC 后,端到端一致性數(shù)據(jù)處理語義的充分必要條件在這里也得到了滿足:在流計(jì)算過程中,定期(epoch)以事務(wù)(2PC)的方式進(jìn)行批量存儲(chǔ)結(jié)果(分布式一致性快照 + 寫外部存儲(chǔ))。需要注意的是,由于 Flink 會(huì)以 epoch 為周期輸出結(jié)果,因此基于此構(gòu)建的流處理系統(tǒng)會(huì)存在一定的端到端延遲。

3.Apache Kafka Streams

Kafka Streams 是 Apache Kafka 0.10.0版本中包含的一個(gè)Java庫(kù),嚴(yán)格來講并不算一個(gè)完整的流處理引擎,利用這個(gè)庫(kù),用戶可以基于 Kafka 構(gòu)建有狀態(tài)的實(shí)時(shí)數(shù)據(jù)處理應(yīng)用,更進(jìn)一步地,Kafka Streams 需要數(shù)據(jù)輸入源和輸出均為 Kafka 消息隊(duì)列。

Kafka Streams 中的「結(jié)果」也以事務(wù)的方式批量持久化,但和 Flink 不同的是,這些結(jié)果是被寫入不同的消息隊(duì)列中:

  • 源狀態(tài) SourceState(t):即 Kafka 源中的 Offset 信息,會(huì)被寫入一個(gè)單獨(dú)的 Kafaka 隊(duì)列中,該隊(duì)列對(duì)用戶透明;
  • 算子狀態(tài) OperatorState(t) :計(jì)算中算子的 Changelog,也會(huì)寫入單獨(dú)的 Kafaka 隊(duì)列中,該隊(duì)列對(duì)用戶透明;
  • 輸出結(jié)果 Sink(t) :即用戶配置的實(shí)際的輸出隊(duì)列,用于存放計(jì)算結(jié)果。

Kafka Streams 將上述結(jié)果定期以事務(wù)的方式進(jìn)行批量存儲(chǔ),上述事務(wù)在 Kafka 這被稱之為 Transactions API,使用這個(gè) API 構(gòu)建的流處理應(yīng)用,可以在一個(gè)事務(wù)中將多個(gè)主題消息進(jìn)行同時(shí)提交,如果事務(wù)終止或回滾,則下游消費(fèi)不會(huì)讀取到相應(yīng)的結(jié)果(當(dāng)然下游消費(fèi)者也需要配置相應(yīng)的一致性級(jí)別),其過程如下圖所示:

?

??

??


如果稍微回顧一下 Flink 一致性的實(shí)現(xiàn)邏輯,會(huì)發(fā)現(xiàn)這兩者有很多相似點(diǎn),因此 Kafka Streams 的輸出結(jié)果也會(huì)存在一定的端到端延遲。因?yàn)樵谔峤唤Y(jié)果時(shí)創(chuàng)建了新的事務(wù),所以平均事務(wù)大小由提交間隔確定,當(dāng)流量相同時(shí),較短的提交間隔將導(dǎo)致較小的事務(wù),但太小的間隔將導(dǎo)致吞吐下降,因此吞吐量與端到端處理延遲之間需要有一個(gè)折衷。

同時(shí),我們需要注意到的是,F(xiàn)link 和 Kafaka 中的「事務(wù)」提交,和我們常規(guī)的操作關(guān)系型數(shù)據(jù)庫(kù)中的事務(wù)還是有所不同的,后者的事務(wù)提交對(duì)象一般就一個(gè)(e.g. MySQL Server),但在流計(jì)算中,由于結(jié)果有下游輸出、消費(fèi)進(jìn)度、算子狀態(tài)等,因此流計(jì)算引擎需要設(shè)計(jì)一個(gè)全局的事務(wù)協(xié)議用于和下游待提交的各個(gè)存儲(chǔ)后端進(jìn)行交互。舉例:Kafka Streams 的輸出后端需要是 Kafka,以配合在事務(wù)提交過程中,屏蔽部分已輸出至下游(被 Kafka Broker 持久化),但還不滿足事務(wù)隔離性的消息(read_committed 級(jí)別),從流計(jì)算輸出的角度來看,這些消息已被成功處理同時(shí)輸出至下游,但從端到端的一致性來看,它們依然屬于不一致的數(shù)據(jù)。又如,使用 Flink 處理 CDC(Change Data Capture) 的場(chǎng)景,如果下游是 MySQL,在 Flink 2PC 完成之前,來自不同 Flink 節(jié)點(diǎn)的數(shù)據(jù)輸出后其實(shí)已經(jīng)被 commit,類似 Kafka Broker 中的消息無法撤回,MySQL 提交的事務(wù)也無法回滾,因此輸出數(shù)據(jù)中也需要有類似的字段實(shí)現(xiàn)隔離(isolation)語義,以屏蔽這種不一致的數(shù)據(jù)。

4.Apache Spark Streaming

這里提到的 Spark Streaming 指的是原始的基于「Micro-batch,微批」的 Spark 流處理引擎,后面 Spark 又提出了Structured Streaming,使用 Continuous Processing mode 來替代「微批」解決延遲的問題,容錯(cuò)機(jī)制上和 Flink 一樣也使用了Chandy-Lamport 算法,Structured Stream 目前還不成熟,暫時(shí)還不能完全支持 Exactly-Once-Processing,因此這里著重對(duì)比 Spark Streaming。

Spark Streaming 只能保證引擎內(nèi)部的處理邏輯是一致的,但是對(duì)于結(jié)果輸出,則并沒有做特別的抽象,因此如果我們希望實(shí)現(xiàn)端到端的一致性語義,則需要對(duì)自行維護(hù)和判斷一些信息。同傳統(tǒng)的批處理系統(tǒng)類似,流處理中也是以 RDD 構(gòu)建出整個(gè)的數(shù)據(jù)血緣,當(dāng)發(fā)生 FailOver 時(shí),則重新計(jì)算整個(gè) RDD 就可以了。如果 Spark Streaming 存在非確定性的計(jì)算,則不能實(shí)現(xiàn)端到端一致,原因是:1、不滿足條件一「實(shí)時(shí)存儲(chǔ)每一條結(jié)果」。如果能記錄下每個(gè) RDD 分區(qū)下的執(zhí)行情況,避免重復(fù)執(zhí)行(冪等),也一定程度上能實(shí)現(xiàn)端到端一致,但這需要進(jìn)行大量的改造工作,最終形態(tài)會(huì)和 MillWheel 比較類似;2、不滿足條件二「事務(wù)方式存儲(chǔ)」,需要保證每個(gè) RDD 產(chǎn)出環(huán)節(jié)的事務(wù)性(如最終結(jié)果寫 HDFS 就不是原子的)。

考慮一種比較簡(jiǎn)單的場(chǎng)景:不存在非確定計(jì)算的流計(jì)算應(yīng)用。如果不存在非確定計(jì)算,根據(jù)端到端的一致性語義的充分必要條件,只需要接受端實(shí)現(xiàn)冪等,則 Spark Streaming 就可以實(shí)現(xiàn)端到端的一致性。背后的原因是,當(dāng)將形式化的結(jié)果定義與 Spark Streaming 進(jìn)行映射,會(huì)發(fā)現(xiàn)當(dāng)以「微批」的形式存儲(chǔ)結(jié)果時(shí),源狀態(tài)和算子狀態(tài)以 RDD 血緣的方式天然地和輸出結(jié)果進(jìn)行了綁定,即當(dāng)輸出最終結(jié)果時(shí),我們其實(shí)也一并輸出了源和算子狀態(tài),操作符合一致性條件。

?

??

??


更進(jìn)一步,當(dāng)把僅有確定性計(jì)算(冪等輸出)的 Spark Streaming 和 僅有確定性計(jì)算(冪等輸出)的的 Flink 進(jìn)行對(duì)比時(shí),會(huì)發(fā)現(xiàn)二者非常相似。RDD 血緣類比分布式一致性快照,批量輸出類比一致性快照后的結(jié)果輸出,微批類比 epoch。不同之處在于:1、Spark Streaming 在計(jì)算過程中的每一個(gè) RDD 生成階段都會(huì)有延遲,而 Flink 在計(jì)算過程中可以進(jìn)行實(shí)時(shí)處理;2、Spark Streaming 只有一個(gè)「epoch」,而 Flink 可以有多個(gè) 「epoch」并行存在?;谏鲜鰞牲c(diǎn)原因,F(xiàn)link 的數(shù)據(jù)處理的端到端延遲要小得多,但這兩種引擎冪等輸出能實(shí)現(xiàn)一致性的本質(zhì)是相似的。

5.各引擎一致性實(shí)現(xiàn)總結(jié)

上面我們簡(jiǎn)述了目前主流的幾種流計(jì)算引擎的一致性實(shí)現(xiàn)機(jī)制。從整體來看,如果實(shí)現(xiàn)端到端的一致性,則均需要滿足我們上面從形式化定義推導(dǎo)出來的充分必要條件:實(shí)時(shí)存儲(chǔ)每一條中間和最終計(jì)算結(jié)果,如果考慮吞吐率不能存儲(chǔ)每一條,則需定期以事務(wù)的方式進(jìn)行批量存儲(chǔ),這里的結(jié)果包含流計(jì)算引擎中的狀態(tài)。上面的充分必要條件還可以進(jìn)一步簡(jiǎn)化,即實(shí)時(shí)存儲(chǔ)結(jié)果或定期事務(wù),均可以視為當(dāng)前處理邏輯單元(算子或最終存儲(chǔ))對(duì)上游的輸入(引擎狀態(tài)+輸出結(jié)果)進(jìn)行的冪等化處理:引擎 FailOver -> 輸入源的事件會(huì)進(jìn)行重發(fā) -> 前期存儲(chǔ)的結(jié)果會(huì)用于去重/事務(wù)回滾讓結(jié)果(引擎狀態(tài)+輸出結(jié)果)回到上一次的一致性狀態(tài) -> 下一批結(jié)果輸出 -> 結(jié)果接受端只影響一次 -> 實(shí)現(xiàn)了端到端的一致。

下面的圖列舉出各引擎實(shí)現(xiàn)端到端一致性的路線圖:

?

??

??


前面分析端到端一致性的實(shí)現(xiàn)中,重點(diǎn)在分析引擎處理(算子)和輸出端行為,沒有提及對(duì)數(shù)據(jù)源的要求,數(shù)據(jù)源需具備重播(repaly)和消息去重的功能即可,屬于基礎(chǔ)要求,這里不再展開。

五、總結(jié)與展望

本文從流計(jì)算的本質(zhì)出發(fā),推導(dǎo)出了在流處理中實(shí)現(xiàn)端到端一致性的通用解法,同時(shí)結(jié)合通用解法,分析了目前幾種主流流計(jì)算引擎在一致性上的實(shí)現(xiàn)思路。有「財(cái)大氣粗」型的 Google MillWheel,背靠強(qiáng)大的基礎(chǔ)架構(gòu)用于狀態(tài)管理;有「心靈手巧」型的 Apache Flink,巧妙地結(jié)合了分布式一致性快照和兩階段事務(wù)實(shí)現(xiàn)一致性;也有「重劍無鋒」型的 Apache Kafka Streams,直接將流處理過程事務(wù)化,屏蔽復(fù)雜的底層邏輯,編程模型和理解成本都更簡(jiǎn)單(當(dāng)然也一定程度上限制其使用的場(chǎng)景);也有 「蓬勃發(fā)展」中的 Apache Spark (Structured)Streaming,底層的一些實(shí)現(xiàn)構(gòu)想和 Apache Flink 愈加趨同,可以期待它將來能達(dá)到類似 Apache Spark 在批處理流域中的地位。

當(dāng)然,引擎雖然這么多,但其背后是有若干條主線貫穿的,希望我們能撥開迷霧,不被營(yíng)銷的噱頭所影響,能洞察到一些更為本質(zhì)的東西。本文論述的端到端一致的流數(shù)據(jù)處理實(shí)現(xiàn),重點(diǎn)聚焦在「計(jì)算和狀態(tài)」管理,但實(shí)際上,還有很多因素需要我們?nèi)タ紤],如時(shí)間窗口的推導(dǎo)、延遲數(shù)據(jù)的處理策略、底層計(jì)算節(jié)點(diǎn)的通信容錯(cuò)等,這些問題多多少少也會(huì)影響數(shù)據(jù)的一致性,考慮到文中篇幅,這里就不一一展開了,感興趣的同學(xué)可以選擇一個(gè)主題做深入研究。

下面這些論文對(duì)進(jìn)一步了解流計(jì)算很有幫助,感興趣的同學(xué)可以參考:

  • 《Streaming System》,T Akidau, S Chernyak, R Lax
  • 《Transactions in Apache Kafka》,Apurva Mehta,Jason Gustafson
  • 《A Survey of State Management in Big Data Processing Systems》,QC To, J Soto, V Markl
  • 《MillWheel: fault-tolerant stream processing at Internet scale》,T Akidau, A Balikov, K Bekiro?lu, S Chernyak
  • 《Discretized Streams: Fault-Tolerant Streaming Computation at Scale》,M Zaharia, T Das, H Li, T Hunter
責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2021-10-18 10:30:59

流計(jì)算阿里云

2025-03-27 08:20:54

2023-09-15 14:24:54

ByteHouseClickHouse開源

2024-12-26 15:01:29

2023-09-07 08:11:24

Redis管道機(jī)制

2021-12-05 21:06:27

軟件

2023-12-01 13:51:21

數(shù)據(jù)一致性數(shù)據(jù)庫(kù)

2022-02-17 21:04:27

數(shù)據(jù)庫(kù)MysqlRedis

2009-06-18 09:18:08

Oracle檢索數(shù)據(jù)數(shù)據(jù)一致性事務(wù)恢復(fù)

2024-08-20 16:13:52

2023-05-26 07:34:50

RedisMySQL緩存

2021-12-14 07:15:57

MySQLRedis數(shù)據(jù)

2019-12-17 08:40:33

微服務(wù)架構(gòu)數(shù)據(jù)

2023-11-22 12:55:59

微服務(wù)架構(gòu)數(shù)據(jù)庫(kù)

2019-01-15 17:58:03

微服務(wù)架構(gòu)數(shù)據(jù)

2022-09-15 10:37:46

MySQLRedis數(shù)據(jù)一致性

2024-01-22 08:52:00

AQS雙異步數(shù)據(jù)一致性

2019-11-21 10:19:45

數(shù)據(jù)應(yīng)用場(chǎng)景系統(tǒng)

2021-11-01 21:15:54

微服務(wù)系統(tǒng)數(shù)據(jù)

2023-12-27 14:23:10

微服務(wù)數(shù)據(jù)存儲(chǔ)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)