自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Kafka:大數(shù)據(jù)的實時處理時代

大數(shù)據(jù) Kafka
本演講將介紹最近 Apache Kafka 添加的一些系統(tǒng)架構(gòu),包括 Kafka Connect 和 Kafka Streams,并且描述一些如何使用它們的實際應(yīng)用體驗。

[[199510]]

在過去幾年,對于 Apache Kafka 的使用范疇已經(jīng)遠(yuǎn)不僅是分布式的消息系統(tǒng):我們可以將每一次用戶點(diǎn)擊,每一個數(shù)據(jù)庫更改,每一條日志的生成,都轉(zhuǎn)化成實時的結(jié)構(gòu)化數(shù)據(jù)流,更早的存儲和分析它們,并從中獲得價值。同時,越來越多的企業(yè)應(yīng)用也開始從批處理數(shù)據(jù)平臺向?qū)崟r的流數(shù)據(jù)數(shù)據(jù)平臺轉(zhuǎn)移。本演講將介紹最近 Apache Kafka 添加的一些系統(tǒng)架構(gòu),包括 Kafka Connect 和 Kafka Streams,并且描述一些如何使用它們的實際應(yīng)用體驗。

注:本文由王國璋在 QCon 北京 2017 站上的演講整理而成。

流處理

在流處理剛被提出來的時候,很多人認(rèn)為流處理只能進(jìn)行做近似的結(jié)果或者增量的計算,倘若你想保證其安全性,以 Lamda 架構(gòu)為基礎(chǔ),利用流處理得到最現(xiàn)在的結(jié)果。但同時你需要采用 batch processing 等其他方式來保證其全局的安全性以正確性。

在如此多年的研究結(jié)果下,在我看來,流處理并不一定是近似的,或者是僅僅以無法保證真確性為代價而提高速度的一種數(shù)據(jù)處理方式。相反,流處理應(yīng)該是一個與全局計算、batch processing 稍微有點(diǎn)不同的計算模型。跟批量處理不同之處在于,批量處理將數(shù)據(jù)引向計算,而流處理將計算引向數(shù)據(jù)。這句話大概有點(diǎn)模糊,接下來,我舉幾個大家熟悉的計算模型例子。

***個計算模型例子—請求應(yīng)答模型。

 

 

請求應(yīng)答模型是業(yè)務(wù)生活中最常用的模型例子。首先提交一個請求到服務(wù)方,而服務(wù)方可能是一個數(shù)據(jù)庫、也可能是別的存儲工具;然后進(jìn)行等待…等待;***得到一個回答。這便是一次請求、一次計算、一次回答。該模型非常簡單、也極易操作,當(dāng)你需要延展到多個機(jī)器上時,只要簡單地增加客戶端以及處理器即可成功。但是缺點(diǎn)在于,不能達(dá)到大的吞吐量,每提交一次請求,都需要等待時間來獲得最終應(yīng)答的結(jié)果。

 

 

第二種常見的模型就是批量處理如上圖所示。如果請求應(yīng)答模型在譜系的一端,那么 typo 的另一端則認(rèn)為是批量處理。當(dāng)我積累數(shù)據(jù)數(shù)量足夠多的時候,一次性提交任務(wù)到數(shù)據(jù)倉庫,再進(jìn)行等待,等待時間短則幾秒鐘、幾分鐘,長則幾小時,***才得到最終的結(jié)果—所有輸入對應(yīng)的所有輸出。該批處理模型的好處在于能夠提高其吞吐率,一次的請求和應(yīng)答可以得出較多結(jié)果。但它的缺點(diǎn)是具有高延時性,比如某數(shù)據(jù)產(chǎn)生時間為上午 6 點(diǎn)鐘,用戶點(diǎn)擊某網(wǎng)頁,由于批處理模型,每 12 小時才會運(yùn)行一次,那么它必須等到上午 6 點(diǎn)到下午 6 點(diǎn)的所有數(shù)據(jù)完整以后才會進(jìn)行工作,那么運(yùn)行結(jié)果可能是用戶點(diǎn)擊的 12 個小時之后。高延遲性是批處理自身帶有的特性。

那么什么是流處理呢? 在我看來,流處理就是介于請求應(yīng)答和批處理之間的一種新型計算模型或者編程模型。流處理并不等待數(shù)據(jù)的完整性,或者說數(shù)據(jù)本沒有完整性這一講法,數(shù)據(jù)本身就是一個數(shù)據(jù)流,當(dāng)每個數(shù)據(jù)流每產(chǎn)生一個新數(shù)據(jù)的時候立刻被計算出、進(jìn)行返回,因此數(shù)據(jù)是源源不斷地通向計算,并且源源不斷有結(jié)果被輸出。你可以設(shè)想,與等待數(shù)據(jù)完全完成之后發(fā)布到計算上相比,流處理就是將計算移到你數(shù)據(jù)發(fā)生地進(jìn)行實時計算的方式。

為什么很多人之前有這樣一種錯覺,他們認(rèn)為流處理可能存在有丟包的情況、或者說只可以得到近似的結(jié)果,其實這是早期的一些數(shù)據(jù)流處理系統(tǒng)所自帶的一些限制。因此以 Lamda 架構(gòu)為基礎(chǔ),在流處理上需要討論不同維度的取舍。接下里我將舉三個例子,延遲、、成本和正確性。正如很多人之前提及的,在進(jìn)行流處理時候,其大多數(shù)情況需要用時間來換取正確性,或者用更多的成本換取時間等等。

 

 

***個例子,說如果你需要做一個實時的 ETL 處理。而關(guān)于 ETL 處理不需要太小的延遲,為達(dá)到低成本的一種保證,我們可以忍受幾分鐘或者 1 分鐘的延遲;但是,如果你正在進(jìn)行一個實時的在線監(jiān)測,存在著幾毫秒的延遲,那么這時候可能更愿意選擇花大量的金錢,或者采取一些可能不必要的 possibility 來達(dá)到一種低延遲的效果;第二個例子,假設(shè)你在做一個在線付費(fèi)協(xié)議,它也是一個流處理平臺。由于在線付費(fèi)協(xié)議可能關(guān)乎到其機(jī)構(gòu),或者其公司的利益所在,因此你會說,我需要保證***的正確性,我不希望有任何丟包情況;

第三個例子,如果你是做一個實時的日志處理,實時收集所有日志,并將其導(dǎo)入 root,在這種情況下,你可能會說,為了降低成本,我愿意付出一小部分正確性的代價,即使不能達(dá)到 100%、達(dá)到 99.99%、達(dá)到 99.9%,這樣的結(jié)果都可以接受。這本是用戶在定義不同流處理應(yīng)用或者業(yè)務(wù)的時候應(yīng)該可以自己做出的選擇。但比較遺憾的是,多數(shù)早期的流處理平臺其實并沒有給予用戶該種選擇,他們自身的設(shè)計理念,那就是為了低延遲直接放棄掉正確性,或者說為了更高的吞吐量直接放棄低延遲。

以上是我想分享的關(guān)于流處理的一些誤會認(rèn)知,如果我的分享能夠讓大家?guī)ё邇蓚€答案的話,我希望這就是一個。我認(rèn)為流處理僅僅是一種不一樣的計算模型或者編程模型,它將計算帶到數(shù)據(jù)上,而不是將數(shù)據(jù)引用到計算上,并且在流處理的時候,用戶往往需要在正確性、延遲性、成本等不同的維度上做出選擇。

Kafka 的角色

為什么當(dāng)我們說到流處理的時候,很多人都在說 Kafka。大多數(shù)人在最早接觸 Kafka 時會說,Kafka 就是一個分布式發(fā)布訂閱的消息系統(tǒng),但是如果我們?nèi)ビ^察 Kafka 的最初一些設(shè)計特性可發(fā)現(xiàn)以下幾點(diǎn)內(nèi)容。***點(diǎn),它可以作為一個寫在磁盤上的緩存來使用,或者說,并不是僅基于內(nèi)存來存儲流數(shù)據(jù),它可以保證數(shù)據(jù)包不被及時消費(fèi)時,依然可用且不被丟失;第二點(diǎn),由于位移的存在提供了邏輯上的順序,在同一個話題上,***個數(shù)據(jù)比第二個數(shù)據(jù)***被發(fā)布的時候,也可保證在消費(fèi)時也是永遠(yuǎn)***個數(shù)據(jù)比第二個數(shù)據(jù)先被消費(fèi);第三點(diǎn),因為 Kafka 是一個公有的大數(shù)據(jù)中轉(zhuǎn)站,就是說,所有的數(shù)據(jù)只要在 Kafka 上,永遠(yuǎn)可以在 Kafka 周圍進(jìn)行業(yè)務(wù)的開發(fā)或者認(rèn)知事物的開發(fā)。接下來我將花費(fèi)一些時間詳細(xì)介紹這三點(diǎn)之間的關(guān)系。

Kafka 不僅僅是一個訂閱消息系統(tǒng),同時也是一個大規(guī)模的流數(shù)據(jù)平臺,那么它提供了什么呢?***,提供訂閱和發(fā)布消息;第二,提供一個緩存的流數(shù)據(jù)存儲平臺;第三,提供流數(shù)據(jù)的處理平臺。今天,我將著重討論流式計算在 Kafka 上面的應(yīng)用。

流式計算在 Kafka 上的應(yīng)用主要有哪些選項呢?***個選項就是 DIY,Kafka 提供了兩個客戶端 —— 一個簡單的發(fā)布者和一個簡單的消費(fèi)者,我們可以使用這兩個客戶端進(jìn)行簡單的流處理操作。舉個簡單的例子,利用消息消費(fèi)者來實時消費(fèi)數(shù)據(jù),每當(dāng)?shù)玫叫碌南M(fèi)數(shù)據(jù)時,可做一些計算的結(jié)果,再通過數(shù)據(jù)發(fā)布者發(fā)布到 Kafka 上,或者將它存儲到第三方存儲系統(tǒng)中。DIY 的流處理需要成本。打個比方,考慮數(shù)據(jù)的延遲性,考慮不同時間上的管理分配,正如很多人提到的 processing time,這將是我后文會重點(diǎn)提及的概念。以上這些都說明,利用 DIY 做流處理任務(wù)、或者做流處理業(yè)務(wù)的應(yīng)用都不是非常簡單的一件事情。

第二個選項是進(jìn)行開源、閉源的流處理平臺。比如,spark。關(guān)于流處理平臺的一個公有認(rèn)知的表示是,如果你想進(jìn)行流處理操作,首先拿出一個集群,且該集群包含所有必需內(nèi)容,比如,如果你要用 spark,那么必須用 spark 的 runtime。因為他們劃定了你作為一個流處理平臺使用者需要用到的所有行為,比如,資源管理系統(tǒng)、參數(shù)調(diào)配系統(tǒng)、容器配置、代碼封裝、分發(fā)等,以上行為都已被該平臺所限定。一旦你選擇使用甲就必須用甲套餐裝備,如果選擇使用乙就必須使用乙套餐裝備。有人不禁提出疑問,我能不能既選擇流處理平臺,又使用自己選擇的,我能不能這樣做呢?

這個應(yīng)用場景其實很普遍,舉個例子,可異步式微服務(wù)處理。什么叫異步式微服務(wù)處理?假設(shè) Kafka 作為一個緩存數(shù)據(jù),在該緩存區(qū)含有很多不同的業(yè)務(wù)。打個比方,一個網(wǎng)店的機(jī)構(gòu)可以有不同的組、不同的員工,有人負(fù)責(zé)銷售、有人負(fù)責(zé)商品分發(fā),有人負(fù)責(zé)價格管理、有人負(fù)責(zé)在線實時的限流監(jiān)控,不同的組、不同的員工可能會以不同的時間,或者以不同的代碼來更新他們的產(chǎn)品,只要擁有一個異步式緩存機(jī)制,即 Kafka,便可擴(kuò)大該微服務(wù),而不需要他們的任何一個組之間進(jìn)行同步請求應(yīng)答機(jī)制。

在該微服務(wù)情況下,每個小組的喜好、特性并不一致,有的組表示我需要做流處理平臺,從 Kafka 讀數(shù)據(jù),處理完再寫回 Kafka,并且想要使用 EWS 把我的應(yīng)用部署在云端大規(guī)模集群上;而另外小組表示我不需要那么復(fù)雜,我只是小規(guī)模數(shù)據(jù),不希望起一個集群,只需起三個機(jī)器,并且每個機(jī)器有 1GB 內(nèi)存足以,可進(jìn)行手動控制操作,不需要資源管理器。那么我們能不能同時滿足他們不同的需求呢? 答案就是我接下來要說的第三種選項。

第三種選項是使用一個輕量級流處理的庫,而不需要使用一個廣泛、復(fù)雜的框架或者平臺來滿足他們不同的需求。在 Kafka 0.10 當(dāng)中已發(fā)布輕量級流處理內(nèi)容平臺,我們可以設(shè)想,跟其他客戶端發(fā)布者和消費(fèi)者一樣,它也是一個客戶端,不同之處在于它是一個計算者客戶端,一個好用的、功能強(qiáng)大的客戶端,并且支持 state processing、Windows 延時的、異步的、甚至不同數(shù)據(jù)的調(diào)控。 最重要的是 Kafka 作為一個庫,可以采用多種方法來發(fā)布流處理平臺的使用。比如,你可以構(gòu)建一個集群;你可以把它作為一個手提電腦來使用;甚至還可以在黑莓上運(yùn)行 Kafka。以上都是尤其簡單的運(yùn)行庫的概念。

 

 

因此我們要做的事情與使用 Kafka 其他的客戶端類似,比如發(fā)布者、消費(fèi)者,只要在代碼里邊加入就可以使用各種各樣的 API。當(dāng)你要調(diào)配控制 Kafka Stream 應(yīng)用的時候,選擇最基礎(chǔ)的 War File 來運(yùn)行或者采用 Java、C,甚至資源管理器來運(yùn)行都是可行的。因為 Kafka Stream 是一個輕量級流處理的庫,可支持各種各樣的運(yùn)維方式。

在我們看來,簡單的就是美的,只有給用戶提供***的兼容性與***的延展性,用戶才能得到***的用戶體驗。

Kafka Stream 的編程語言

 

 

如果接觸過 Storm、Spark 等流處理平臺的同學(xué)可以發(fā)現(xiàn),它們與 Kafka Stream 高階位 DSL 語言其實有相似之處。如上圖所示,首先定義一個 Streams 流, Streams 是從 topic1 中的 topic 獲取得到,即定義 Streams、處理 Streams、得到新的 Streams。比如,從 topic1 里面得到兩個原始數(shù)據(jù)流,然后數(shù)據(jù)流進(jìn)行 countByKey 得到新的數(shù)據(jù)流叫做 Counts。那么 counts.to(“topic2”) 是什么意思呢?在獲取到新的數(shù)據(jù)流之后寫回 Kafka topic2 內(nèi),啟動 KafkaStreams 進(jìn)程,與 Kafka producer、Kafka consumer 類似,讓它來運(yùn)行已定義計算。

 

 

正如大家所了解的,API 的使用其實很簡單。提供一個簡單的 API,用戶簡單地寫入運(yùn)行邏輯即可運(yùn)行。但是編程應(yīng)用總是容易的,而它的復(fù)雜程度在于,一旦你開始運(yùn)維該應(yīng)用,當(dāng)你想要把業(yè)務(wù)拓展到更大規(guī)模,或者業(yè)務(wù)出現(xiàn)變化,或者集群不穩(wěn)定,需要強(qiáng)大的運(yùn)維時,運(yùn)維的程度便顯得異常重要,最上面的編程可能只是冰山一角。Kafka Stream 的設(shè)計理念是最簡單的就是最美的,包括 API、運(yùn)維、debugging,以及各種各樣的方式,都是希望給用戶帶來最簡單的體驗。它的核心思想就是把難問題直接給 Kafka 集群本身。

Kafka 的介紹

 

 

Kafka 的核心思想是什么?就是把這些消息全部存成一個有序日志,所有的消息發(fā)布者把消息發(fā)布到底端,從某一個邏輯上的位移開始順序讀取所有的消息。它的一個好處在于所有的讀和寫,盡管都是刷到磁盤上,但都是按照順序進(jìn)行,該方式對磁盤的使用比較有效,倘若消費(fèi)者和發(fā)布者隔得比較近,將利用 page cash 直接讀數(shù)據(jù)。

 

 

 

延展性。如上圖,提供 topic 以及 topic partitions,即話題與話題分區(qū)的機(jī)制。每個用戶有不同的 topic,每個 topic 可以有多個分區(qū),每個分區(qū)可被裝載在不同的機(jī)器上,當(dāng)用戶提高規(guī)模之后,Kafka 只需要簡單地增加機(jī)器和 topic partitions 數(shù)量,或者采用 ROM balance 的方式到不同機(jī)器上,即可達(dá)到線性延展方式。

以上是 Kafka 最簡單的核心思想,接下來我將介紹 Kafka Streams 作為 Kafka 客戶端如何利用以上核心思想來設(shè)計流處理的平臺。數(shù)據(jù)流其實就是有序的記錄或消息,每個消息是一個 Key 加一個 Value,并且 record 與 Kafka 自身 massage 具有一一對應(yīng)關(guān)系。

 

 

用戶所提供的業(yè)務(wù)上的計算模型,其實可用拓補(bǔ)結(jié)構(gòu)進(jìn)行表達(dá)。如上圖,圖的左邊。用戶首先進(jìn)行定義數(shù)據(jù)流,然后對數(shù)據(jù)流進(jìn)行計算,得到新的數(shù)據(jù)流,最終將數(shù)據(jù)流寫回到 Kafka 內(nèi)。每當(dāng)用戶進(jìn)行定義的時候,每一步都會變成拓?fù)浣Y(jié)構(gòu)里面的一個點(diǎn),每個點(diǎn)通過流進(jìn)行計算,變成新的流來進(jìn)行新的連接,最終在 Kafka 內(nèi)部形成拓?fù)浣Y(jié)構(gòu)。用戶并不需要在意該拓補(bǔ)結(jié)構(gòu),只需明白定義流、計算流、得到新的流,寫回 Kafka。

連接每一個不同的運(yùn)算單元就是一個 Stream,即 record stream,每一個 Stream 都在源源不斷地實時產(chǎn)生 record,每一個 record 是一個 key 加一個 value。利用 Stream Processor 連接 Stream,每個用戶定義的流的一個計算單位對應(yīng)著一個 Stream Processor。

當(dāng)用戶定義每一步計算的時候,就是定義每個拓?fù)浣Y(jié)構(gòu)里面的每個點(diǎn),最終把整個拓補(bǔ)結(jié)構(gòu)定義完整到 Kafka Stream 來運(yùn)行。計算單元其實可分成兩個特殊的單元,一個叫做元的計算單元,只有輸出流,沒有輸入流,它們唯一的認(rèn)同就是從 Kafka 讀取數(shù)據(jù)形成數(shù)據(jù)流,傳遞給下方其他數(shù)據(jù)處理。而 Stream Processor 底端的數(shù)據(jù)流,沒有輸出流,只有輸入流,它們的功能是把所有輸入流寫回到 Kafka。Kafka 的運(yùn)行操作簡單,源數(shù)據(jù)從 Kafka log 讀取消息變成數(shù)據(jù)流,每個消息貫穿整個拓?fù)浣Y(jié)構(gòu),最終從 Stream Processor 寫回到 Kafka。以上為 Kafka Stream 運(yùn)行情況。

用戶進(jìn)行并行發(fā)布進(jìn)程、應(yīng)用或者多個計算的操作其實也非常簡單。Kafka 是一個庫,當(dāng)你用 Kafka 庫寫成應(yīng)用,當(dāng) record 寫入多臺機(jī)器時,Kafka Stream 庫本身就會自動調(diào)動 partitions 方式,假設(shè)你有兩臺機(jī)器,每臺機(jī)器上都運(yùn)行了 Kafka Streams,當(dāng)它同時進(jìn)行運(yùn)行時,不同的 streams application instance 就會從不同的 Kafka partitions 內(nèi)讀取數(shù)據(jù)來達(dá)到并行任務(wù)的分發(fā)與執(zhí)行,任務(wù)之間沒有任何的數(shù)據(jù)重疊,當(dāng)你需要更多線性地增長任務(wù)時,你只需要在不同的機(jī)器上運(yùn)行同樣的 record,所有的 instance 將會自動進(jìn)行 rebalance,把新的 application 寫入,然后獲取到延展。

很多人看到不同的計算方式的時候會發(fā)現(xiàn),有的計算方式,比如說 fliter、map,沒有“計算狀態(tài)”需要保存,一個數(shù)據(jù)進(jìn)來計算、一個數(shù)據(jù)出去。但是有的計算,比如說 join、aggregate,就需要動態(tài)維護(hù)一個“計算狀態(tài)”,每一次新的信息或者日志進(jìn)來的時候, Stream 就要進(jìn)行更新甚至進(jìn)行讀取。后者被稱為 Stateful Processing,前者為 Stateless Processing。

 

 

那么如何進(jìn)行管理流處理的 states 呢?有兩個通用的方式,一個方式是 remote State,利用遠(yuǎn)程的數(shù)據(jù)庫或者遠(yuǎn)程的 key value store 存儲所有流處理的 states,每一次計算的時候,發(fā)送一個遠(yuǎn)程請求來讀取 states。遠(yuǎn)程請求的缺點(diǎn)在于需要進(jìn)行遠(yuǎn)程的請求和應(yīng)答。因為 states 存在于 Remove State 上,states 之間可能會有 overlation,不能很好做到 accesstion. 比如我是團(tuán)隊 A,只負(fù)責(zé) sell,另外一個是團(tuán)隊 B,只負(fù)責(zé) ajustment, 兩個不同的流有著不同的 job,但是 state 存在一起,所以兩者會相互影響;

另外一個方式是 Local State,意味著所有的 state 和所有的處理單元是并發(fā)在一起的,每個單元上存著 state。在 Kafka Stream 里面,每個計算單元之間不需要有任何交互,state 之間亦如此。我們只要把 state 存到 Local 計算單元上就足矣。***,可以保證 better isolation,它們之間沒有任何的 access;第二,local state 可以做到更好的時效性,不需要遠(yuǎn)程讀寫。

 

 

如上圖,在 Kafka 內(nèi)有 aggregateByKey(…)語句,類似于 Stateful Processing。當(dāng)用戶定義 Stateful Processing 的時候,在 Kafka Stream 庫內(nèi)部就會自動生成 State Strom,且與 aggregate opprate 進(jìn)行連接,只有該 opprate 能夠?qū)υ?State Strom 進(jìn)行讀寫,因為每個 opprate 有自己獨(dú)有的 State Strom,可達(dá)到 State Strom 完全 Local 化。

當(dāng)我們有多個并發(fā)流處理任務(wù)的時候,每個計算單元除了有一個自己的拓?fù)浣Y(jié)構(gòu)進(jìn)行計算之外,也有一份 State Store。每個 State Strom 之間是存儲完全不相干的流處理信息和數(shù)據(jù)。

 

 

接下來討論的是 Kafka Streams 里面另一個重要概念,流與數(shù)據(jù)庫表的關(guān)系?正如大家所看見的,在 Kafka Streams 內(nèi)部有兩種流—— KStream 與 Ktable,那么什么叫做 KStream?什么叫做 Ktable 呢?在開發(fā) Kafka Streams 時的一個核心出發(fā)點(diǎn)是流和它所對應(yīng)的表或者數(shù)據(jù)庫的 State 彼此之間具有一一影射關(guān)系。為什么一一影射呢?

 

 

舉個例子,假設(shè)你有一個上圖的數(shù)據(jù)流,該數(shù)據(jù)流代表著某張表,即變量的日志或者更新日志。更新日志內(nèi)含有 Key 和 Valve,比如第三條的更新日志(key1,value3)其實正在更新第 1 日志(key1,value1)的新信息,換句話說,原本 key1 所對應(yīng)的是 value1,但是在這一時刻被改成對應(yīng) value3,如果我們重復(fù)更新該日志,我們能夠得到什么呢?我們可以得到該表在任意時間段內(nèi)的一個實時的可視化圖。

同理,如果我們只有這樣一個表,并且正在不斷更新這個表,只要在每次更新時保留該日志,就能夠從表反推回該更新日志的數(shù)據(jù)流所應(yīng)的所有內(nèi)容,這就是流和表或者流和狀態(tài)之間的一一對應(yīng)關(guān)系??偠灾?,只要你有一個日志更新流,即可重構(gòu)回你表狀態(tài)在任意時間內(nèi)的 value;如果你有一個表,也可以通過表的更新來找到該表所對應(yīng)的流。這就是我所說的 A Stream is a changelog of a table ;A table is a materialized view at tiome of a stream. 流和表具有對應(yīng)關(guān)系。

這促使我們定義兩種不同的——KStream 和 KTable。KStream 是很普通的數(shù)據(jù)流,在數(shù)據(jù)流之間不存在任何因果關(guān)系和邏輯關(guān)系,可以被認(rèn)為是 append only Stream。Typo 是更新日志流,每個日志里面相同的 key 所對應(yīng)的就是對表的更新。那么為什么要定義這兩種不同的數(shù)據(jù)流呢?我舉個例子。

 

 

如上圖,用戶購買歷史記錄。比如 Alice 曾經(jīng)買過雞蛋和牛奶,雞蛋和牛奶這兩者之間不存在任何因果關(guān)系,Alice 買過牛奶只是在 Alice 買過雞蛋上很簡單的增量。用戶雇傭狀態(tài)的更新日志,比如 Alice 曾經(jīng)在 LinkedIn 工作,之后信息被更新到 Alice 在微軟工作,現(xiàn)在 Alice 在微軟工作覆蓋了之前的工作信息。

如果以當(dāng)前的時間狀態(tài)進(jìn)行解讀這兩個流,***個流顯示的信息為 Alice 曾經(jīng)買過雞蛋,第二個流信息顯示為 Alice 在 LinkedIn 工作。如果將時間往前推,查看更新的數(shù)據(jù)流信息可以發(fā)現(xiàn),***個 KStream 顯示 Alice 買了雞蛋又買了牛奶;但是在第二種情況下,Alice 并不是同時在 LinkedIn 和微軟工作,而是 Alice 已經(jīng)在微軟工作,不在 LinkedIn 工作了。

為什么兩種不同的流有兩種定義呢?因為當(dāng)你做相同操作的時候,比方說簡單做一個合計操作,不同的流得出的結(jié)果是不一樣的。在上者,如果我們將時間往前推,可得出 Alice 的合計結(jié)果是 2+3;但是在下面,如果對其進(jìn)行 KTable 的 aggregate,顯示 Alice 的結(jié)果是將其原本數(shù)值 2 變成 3,而不是 +3 的關(guān)系。

 

 

在 Kafka Stream 的 DSL 里面有多種不同的 aggregate,reduce 操作等, 不同的數(shù)據(jù)流可能將 KStream 變成 KTable,也可能把 KTable 變回 KStream,在用戶定義如下不同的 operation 的時候,在后臺不同狀態(tài)的流可采用不同計算方式、計算模型。

 

 

如上圖,KTable。當(dāng)一條新消息進(jìn)來時該如何進(jìn)行拓?fù)溆嬎隳?舉個例子,在該拓?fù)浣Y(jié)構(gòu)內(nèi),Stream2 出現(xiàn)了一個新的 record,即紅顏色標(biāo)記,該標(biāo)記與***條 record 顏色相近,因為它們是同個 key,不同 value。Stream2 和 Stream1 進(jìn)行 join 操作成為一個新的 record,該新 record 會被放入到 KStream joined 里面,然后 KStream joined 進(jìn)行 aggregate 操作,而 aggregate 操作得到的結(jié)果是 state 被更新,新 record 被 append 到 aggregate 流內(nèi),但是 append 操作將之前的紅顏色 record 復(fù)寫了,換句話說,因為有了該新 record 的存在,之前紅顏色的 record 由于被復(fù)寫已經(jīng)不重要了。

Kafka Stream 運(yùn)維

如果我們有一個 fault,那么我們?nèi)绾卧?Kafka Stream 上做 fault tolerance?

正如上文所提及的,Tables 和 Stream 之間存在一一影射關(guān)系,Kafka Stream 有效地利用了該特性。舉個例子,有個 Kafka Stream 的應(yīng)用業(yè)務(wù),該業(yè)務(wù)有三個并發(fā) task,每個 task 有自己的 local state,每當(dāng) State 進(jìn)行更新時,Kafka Stream 就會自動將更新消息寫到更新日志內(nèi),更新日志也自動生成。每更新一個狀態(tài)時,消息日志就被更新該日志上。

 

 

比如過了一段時間,中間的 task 壞掉了,那么 Kafka Stream 會做什么呢?首先它會檢測異常,自動地在已有的 instance 上重新啟動原本壞掉的 task,重新構(gòu)建 State,那么 State 怎么 build 呢?通過更新 changelog,直到 restore 整個原本正在進(jìn)行的狀態(tài)的 restoration,只有新狀態(tài)被 restore 完整之后才能繼續(xù) task 同步計算。

 

 

消息回溯也是類似的原理。比方說,某應(yīng)用已被運(yùn)行了很多年,發(fā)現(xiàn) stream 流處理計算里面存在 Bug,我們不得不將已計算的結(jié)果舍棄,回溯到一個更早的歷史時間重新進(jìn)行計算,即計算回溯。Reprocessing 在 Kafka Stream 也是一種簡單的方式,當(dāng)我們達(dá)到某一個位移,比如位移 5,需要進(jìn)行消息回溯時,用戶可以簡單地起一個新的狀態(tài) -New State,該 State 完全沒有任何內(nèi)容,然后從最早的時間開始重新進(jìn)行計算,直到計算到趕上現(xiàn)有 task 時候。只需要 switch over 就可以完成消息回溯,且該整個消息回溯過程不需要關(guān)閉整個流處理任務(wù)。于是很多人便問,那么 Kafka Stream 能不能支持 Streaming processing 呢?

舉個例子,我不希望 Kafka Stream 一直在運(yùn)行,希望它可以每 6 個小時 run 一次,并且每 run 一次可將當(dāng)前所有已累計的 Kafka massage 全部處理掉。這個操作也很簡單,從 outsite A 開始,一直位移到 B 結(jié)束或者到 C 結(jié)束,表示已停止整個應(yīng)用;6 個小時之后當(dāng)它重啟的時候,再從新的位移開始進(jìn)行下一段的位移,這是批處理計算結(jié)果,即從一個 outsite 到另外一個 outsite,緊接著是另外一個 outsite…Kafka Stream 通過位移的控制和管理進(jìn)行批處理結(jié)果,而不需要運(yùn)行整個 Kafka Stream。

時間的管理

時間管理是流處理上非常重要的觀念,同時也是區(qū)別于流處理和批量式處理非常重要的概念。很多人都已熟悉 Event Time 和 Processing Time 的區(qū)別,Event Time 是每個日志、消息、狀態(tài)發(fā)生的時候所發(fā)生的時間,而 Processing Time 是日志被計算和處理的時候所發(fā)生的時間。這兩者可能并不是完全融合的,可能存在位移,這便是所謂的時間延遲。

 

 

如上圖,以《星球大戰(zhàn)》故事時間和拍攝時間為例?!缎乔虼髴?zhàn)》有七步曲,Processing Time 是電影真正拍攝時間,是在現(xiàn)實生活中的時間——1999 年到 2015 年;但是拍攝時間和星球大戰(zhàn)所發(fā)生時間并不一一對應(yīng),存在延遲。對其做流處理時候可以發(fā)現(xiàn),類似 out of order 的現(xiàn)象很常見,比如因為數(shù)據(jù)量太大而導(dǎo)致數(shù)據(jù)發(fā)生延遲,或者說數(shù)據(jù)處理發(fā)生了延遲等,都會發(fā)生延時情況。

那么 Kafka Stream 怎么解決該問題呢? Kafka Stream 允許給每個日志定義時間戳,該時間戳可以是當(dāng)前系統(tǒng)時間,也可以是提取時間戳,也可以從當(dāng)前 record 被生成的時候所提取的時間戳,這些即被定義成 Event Time。類似的,如果 record 是一個 Jason format,將其時間戳提取出來也可被定義成 Event Time。

 

 

有如此時間戳,我們可以基于該時間戳進(jìn)行各式計算,比方說 Windowing 的計算。舉個例子,每隔 5 分鐘計算一個平均值、總和或者合計,每一個 Windowing 正如上圖顏色所示,不同顏色代表不同的時間戳和不同的 Windowing。當(dāng)你收到一個 record,而該 record 時間戳指向非常未來的時間,你便得到一個非常未來的日志。Kafka 不會直接更新當(dāng)前的 Windowing,而是會生成該時間戳所對應(yīng)的 Windowing 更新 aggregate。

同理,倘若你繼續(xù)計算,你會發(fā)現(xiàn)有個古老日志的時間戳指向很早以前的 Windowing。Kafka Stream 可以通過更新原本的 aggregate 來達(dá)到這樣延時結(jié)果。用戶在現(xiàn)在時間進(jìn)行如下定義,比方說定義 Window aggregation,每一個 Windowing 是 5 分鐘,但是我希望每個 Windowing 可保持整整一天時間,只要該 Windowing 在當(dāng)前 24 小時之內(nèi)依然存在即可做到。

寫在***

上文分享了較多內(nèi)容,從 ordering 到狀態(tài)、一直到 partitioning & scalability ,但其實最重要的是所有的這些都是由 Kafka Stream 庫自動完成的。我們希望用戶不要受到以上任何問題的影響,只需定義自己的業(yè)務(wù),所有如上的問題都由 Kafka Stream 解決,盡管它只是一個庫,但依然有足夠強(qiáng)大的能力去處理所有事物。

我們在 Kafka 0.10 里面公布 Kafka Stream 之后,把 Streams 延展到 Java 以外的語言,比如支持 python,或者像 SQL 一樣的更高階編程模型來讓用戶更方便地定義自己的流處理應(yīng)用。在 7 月份的 release 里面,我們也會增加正好一次(exactly-once)計算方式的 aggregate。

很多人可能會好奇,Kafka Stream 很好,可是我的數(shù)據(jù)原本不在 Kafka 內(nèi),而 Kafka Stream 只能從 Kafka 內(nèi)部獲取,如何將數(shù)據(jù)導(dǎo)入 Kafka 呢? 答案是 Kafka Connect,一個簡單的數(shù)據(jù)導(dǎo)入導(dǎo)出框架。 時至去年年底,Kafka Connect 已經(jīng)有 40 個不同規(guī)模的 Connect,包括從 JDBC 到 HDFS、一直到 MYSQL,以及所有可以想到的第三方系統(tǒng),用戶可以簡單地把數(shù)據(jù)從第三方系統(tǒng)導(dǎo)入和導(dǎo)出 Kafka。

總之,回到本源,Kafka 到底是什么? Kafka 是一個中央式的流處理平臺,他們支持消息的發(fā)布、消費(fèi)、傳輸和存儲,以及消息的計算和消息的處理。

以上是本文分享的全部內(nèi)容。關(guān)注兩個 Take-aways,***個 Take-away,流處理只是不同的計算模型,它不會只給你近似的結(jié)果,只能用來做增量的結(jié)果;第二個 Take- away,因為 Kafka Stream 的存在使得 Stream processing 存在更加簡單。 

責(zé)任編輯:龐桂玉 來源: 36大數(shù)據(jù)
相關(guān)推薦

2017-08-31 16:36:26

2016-11-08 12:49:27

大數(shù)據(jù)分布式系統(tǒng)Druid-IO

2013-04-27 12:18:58

大數(shù)據(jù)全球技術(shù)峰會京東

2017-11-21 14:14:04

PHPnode.js圖片訪問

2011-12-30 13:50:21

流式計算Hadoop

2014-12-15 09:32:17

StormSpark

2017-02-14 15:37:32

KappaLambda

2019-09-04 09:31:40

日志Flink監(jiān)控

2015-11-09 09:58:31

大數(shù)據(jù)Lambda架構(gòu)

2012-12-06 10:59:51

大數(shù)據(jù)

2025-03-04 08:00:00

JavaiTextPDFPDF

2023-10-26 07:36:02

分布式架構(gòu)

2023-12-11 08:00:00

架構(gòu)FlinkDruid

2017-05-10 16:10:28

Kafka大數(shù)據(jù)數(shù)據(jù)庫

2019-11-04 14:25:54

大數(shù)據(jù)Hadoop大數(shù)據(jù)時代

2016-11-15 09:44:21

大數(shù)據(jù)批處理流處理

2021-07-21 10:22:02

數(shù)據(jù)存儲

2021-10-29 22:45:47

大數(shù)據(jù)算法技術(shù)

2013-06-13 09:42:11

大數(shù)據(jù)

2015-12-14 17:52:06

ENI經(jīng)濟(jì)和信息化網(wǎng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號