自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Beam: 下一代的大數(shù)據(jù)處理標(biāo)準(zhǔn)

大數(shù)據(jù)
本文主要介紹Apache Beam的編程范式-Beam Model,以及通過Beam SDK如何方便靈活的編寫分布式數(shù)據(jù)處理業(yè)務(wù)邏輯,希望讀者能夠通過本文對Apache Beam有初步的了解,同時對于分布式數(shù)據(jù)處理系統(tǒng)如何處理亂序無限數(shù)據(jù)流的能力有初步的認(rèn)識。

Apache Beam(原名Google DataFlow)是Google在2016年2月份貢獻(xiàn)給Apache基金會的Apache孵化項目,被認(rèn)為是繼MapReduce,GFS和BigQuery等之后,Google在大數(shù)據(jù)處理領(lǐng)域?qū)﹂_源社區(qū)的又一個非常大的貢獻(xiàn)。Apache Beam的主要目標(biāo)是統(tǒng)一批處理和流處理的編程范式,為***,亂序,web-scale的數(shù)據(jù)集處理提供簡單靈活,功能豐富以及表達(dá)能力十分強大的SDK。Apache Beam項目重點在于數(shù)據(jù)處理的編程范式和接口定義,并不涉及具體執(zhí)行引擎的實現(xiàn),Apache Beam希望基于Beam開發(fā)的數(shù)據(jù)處理程序可以執(zhí)行在任意的分布式計算引擎上。本文主要介紹Apache Beam的編程范式-Beam Model,以及通過Beam SDK如何方便靈活的編寫分布式數(shù)據(jù)處理業(yè)務(wù)邏輯,希望讀者能夠通過本文對Apache Beam有初步的了解,同時對于分布式數(shù)據(jù)處理系統(tǒng)如何處理亂序***數(shù)據(jù)流的能力有初步的認(rèn)識。

Apache Beam基本架構(gòu)

隨著分布式數(shù)據(jù)處理不斷發(fā)展,新的分布式數(shù)據(jù)處理技術(shù)也不斷被提出,業(yè)界涌現(xiàn)出了越來越多的分布式數(shù)據(jù)處理框架,從最早的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分布式處理框架可能帶來的更高的性能,更強大的功能,更低的延遲等,但用戶切換到新的分布式處理框架的代價也非常大:需要學(xué)習(xí)一個新的數(shù)據(jù)處理框架,并重寫所有的業(yè)務(wù)邏輯。解決這個問題的思路包括兩個部分,首先,需要一個編程范式,能夠統(tǒng)一,規(guī)范分布式數(shù)據(jù)處理的需求,例如,統(tǒng)一批處理和流處理的需求。其次,生成的分布式數(shù)據(jù)處理任務(wù)應(yīng)該能夠在各個分布式執(zhí)行引擎上執(zhí)行,用戶可以自由切換分布式數(shù)據(jù)處理任務(wù)的執(zhí)行引擎與執(zhí)行環(huán)境。Apache Beam正是為了解決以上問題而提出的。

Apache Beam主要由Beam SDK和Beam Runner組成,Beam SDK定義了開發(fā)分布式數(shù)據(jù)處理任務(wù)業(yè)務(wù)邏輯的API接口,生成的的分布式數(shù)據(jù)處理任務(wù)Pipeline交給具體的Beam Runner執(zhí)行引擎。Apache Beam目前支持的API接口是由Java語言實現(xiàn)的,Python版本的API正在開發(fā)之中。Apache Beam支持的底層執(zhí)行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等執(zhí)行引擎的支持也在討論或開發(fā)當(dāng)中。其基本架構(gòu)如下圖所示:


圖1 Apache Beam架構(gòu)圖

需要注意的是,雖然Apache Beam社區(qū)非常希望所有的Beam執(zhí)行引擎都能夠支持Beam SDK定義的功能全集,但是在實際實現(xiàn)中可能并不一定。例如,基于MapReduce的Runner顯然很難實現(xiàn)和流處理相關(guān)的功能特性。目前Google DataFlow Cloud是對Beam SDK功能集支持最全面的執(zhí)行引擎,在開源執(zhí)行引擎中,支持最全面的則是Apache Flink。

Beam Model

Beam Model指的是Beam的編程范式,即Beam SDK背后的設(shè)計思想。在介紹Beam Model之前,先簡要介紹一下Beam Model要處理的問題域與一些基本概念。

  1. 數(shù)據(jù)。分布式數(shù)據(jù)處理要處理的數(shù)據(jù)類型一般可以分為兩類,有限的數(shù)據(jù)集和***的數(shù)據(jù)流。有限的數(shù)據(jù)集,比如一個HDFS中的文件,一個HBase表等,特點是數(shù)據(jù)提前已經(jīng)存在,一般也已經(jīng)持久化,不會突然消失。而***的數(shù)據(jù)流,比如kafka中流過來的系統(tǒng)日志流,或是從twitter API拿到的twitter流等等,這類數(shù)據(jù)的特點是,數(shù)據(jù)動態(tài)流入,無窮無盡,無法全部持久化。一般來說,批處理框架的設(shè)計目標(biāo)是用來處理有限的數(shù)據(jù)集,流處理框架的設(shè)計目標(biāo)是用來處理***的數(shù)據(jù)流。有限的數(shù)據(jù)集可以看做是***的數(shù)據(jù)流的一種特例,但是從數(shù)據(jù)處理邏輯的角度,這兩者并無不同之處,例如,假設(shè)微博數(shù)據(jù)包含時間戳和轉(zhuǎn)發(fā)量,用戶希望按照統(tǒng)計每小時的轉(zhuǎn)發(fā)量總和,此業(yè)務(wù)邏輯應(yīng)該可以同時在有限數(shù)據(jù)集和***數(shù)據(jù)流上執(zhí)行,并不應(yīng)該因為數(shù)據(jù)源的不同而對業(yè)務(wù)邏輯的實現(xiàn)產(chǎn)生任何影響。
  2. 時間。Process Time是指數(shù)據(jù)進(jìn)入分布式處理框架的時間,而Event-Time則是指數(shù)據(jù)產(chǎn)生的時間。這兩個時間通常是不同的,例如,對于一個處理微博數(shù)據(jù)的流計算任務(wù),一條2016-06-01-12:00:00發(fā)表的微博經(jīng)過網(wǎng)絡(luò)傳輸?shù)妊舆t可能在2016-06-01-12:01:30才進(jìn)入到流處理系統(tǒng)中。批處理任務(wù)通常進(jìn)行全量的數(shù)據(jù)計算,較少關(guān)注數(shù)據(jù)的時間屬性,但是對于流處理任務(wù)來說,由于數(shù)據(jù)流是無情無盡的,無法進(jìn)行全量的計算,通常是對某個窗口中得數(shù)據(jù)進(jìn)行計算,對于大部分的流處理任務(wù)來說,按照時間進(jìn)行窗口劃分,可能是最常見的需求。
  3. 亂序。對于流處理框架處理的數(shù)據(jù)流來說,其數(shù)據(jù)的到達(dá)順序可能并不嚴(yán)格按照Event-Time的時間順序。如果基于Process Time定義時間窗口,數(shù)據(jù)到達(dá)的順序就是數(shù)據(jù)的順序,因此不存在亂序問題。但是對于基于Event Time定義的時間窗口來說,可能存在時間靠前的消息在時間靠后的消息后到達(dá)的情況,這在分布式的數(shù)據(jù)源中可能非常常見。對于這種情況,如何確定遲到數(shù)據(jù),以及對于遲到數(shù)據(jù)如何處理通常是很棘手的問題。

Beam Model處理的目標(biāo)數(shù)據(jù)是***的時間亂序數(shù)據(jù)流,不考慮時間順序或是有限的數(shù)據(jù)集可看做是***亂序數(shù)據(jù)流的一個特例。Beam Model從下面四個維度歸納了用戶在進(jìn)行數(shù)據(jù)處理的時候需要考慮的問題:

  1. What。如何對數(shù)據(jù)進(jìn)行計算?例如,Sum,Join或是機器學(xué)習(xí)中訓(xùn)練學(xué)習(xí)模型等。在Beam SDK中由Pipeline中的操作符指定。
  2. Where。數(shù)據(jù)在什么范圍中計算?例如,基于Process-Time的時間窗口,基于Event-Time的時間窗口,滑動窗口等等。在BeamSDK中由Pipeline中的窗口指定。
  3. When。何時將計算結(jié)果輸出?例如,在1小時的Event-Time時間窗口中,每隔1分鐘,將當(dāng)前窗口計算結(jié)果輸出。在Beam SDK中由Pipeline中的Watermark和觸發(fā)器指定。
  4. How。遲到數(shù)據(jù)如何處理?例如,將遲到數(shù)據(jù)計算增量結(jié)果輸出,或是將遲到數(shù)據(jù)計算結(jié)果和窗口內(nèi)數(shù)據(jù)計算結(jié)果合并成全量結(jié)果輸出。在Beam SDK中由Accumulation指定。

Beam Model將”WWWH“四個維度抽象出來組成了Beam SDK,用戶在基于Beam SDK構(gòu)建數(shù)據(jù)處理業(yè)務(wù)邏輯時,在每一步只需要根據(jù)業(yè)務(wù)需求按照這四個維度調(diào)用具體的API即可生成分布式數(shù)據(jù)處理Pipeline,并提交到具體執(zhí)行引擎上執(zhí)行。“WWWH”四個維度的抽象僅僅關(guān)注業(yè)務(wù)邏輯本身,和分布式任務(wù)如何執(zhí)行沒有任何關(guān)系。

Beam SDK

不同于Apache Flink或是Apache Spark,Beam SDK使用同一套API表示數(shù)據(jù)源,輸出目標(biāo)以及操作符等。下面介紹4個基于Beam SDK的數(shù)據(jù)處理任務(wù),通過這四個數(shù)據(jù)處理任務(wù),讀者可以了解通過Beam Mode是如何統(tǒng)一靈活的描述批處理和流處理任務(wù)的,這4個任務(wù)用來處理手機游戲領(lǐng)域的統(tǒng)計需求,包括:

  1. 用戶分?jǐn)?shù)。批處理任務(wù),基于有限數(shù)據(jù)集統(tǒng)計用戶分?jǐn)?shù)。
  2. 每小時團隊分?jǐn)?shù)。批處理任務(wù),基于有限數(shù)據(jù)集統(tǒng)計每小時,每個團隊的分?jǐn)?shù)。
  3. 排行榜。流處理任務(wù),2個統(tǒng)計項,每小時每個團隊的分?jǐn)?shù)以及用戶實時的歷史總得分?jǐn)?shù)。
  4. 游戲狀態(tài)。流處理任務(wù),統(tǒng)計每小時每個團隊的分?jǐn)?shù),以及更復(fù)雜的每小時統(tǒng)計信息,比如每小時每個用戶在線時間等。

注:示例代碼來自Beam的源碼,具體地址參見:apache/incubator-beam。部分分析內(nèi)容參考了Beam的官方文檔,詳情請參見引用鏈接。

下面基于Beam Model的“WWWH”四個維度,分析業(yè)務(wù)邏輯,并通過代碼展示如何通過Beam SDK實現(xiàn)“WWWH”四個維度的業(yè)務(wù)邏輯。

用戶分?jǐn)?shù)

統(tǒng)計每個用戶的歷史總得分?jǐn)?shù)是一個非常簡單的任務(wù),在這里我們簡單的通過一個批處理任務(wù)實現(xiàn),每次需要新的用戶分?jǐn)?shù)數(shù)據(jù)的時候,重新執(zhí)行一次這個批處理任務(wù)即可。對于用戶分?jǐn)?shù)任務(wù),“WWWH”四維度分析結(jié)果如下:

通過“WWWH”的分析,對于用戶分?jǐn)?shù)這個批處理任務(wù),通過Beam Java SDK實現(xiàn)的代碼如下所示:

 

  1. gameEvents  
  2. [... input ...]  
  3. [... parse ...]  
  4. .apply("ExtractUserScore", new ExtractAndSumScore("user"))  
  5. [... output ...]; 

ExtractAndSumScore實現(xiàn)了“What”中描述的邏輯,即按用戶分組,然后累加分?jǐn)?shù),其相關(guān)代碼如下:

 

  1. gameInfo  
  2. .apply(MapElements  
  3. .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))  
  4. .withOutputType(  
  5. TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))  
  6. .apply(Sum.integersPerKey()); 

通過MapElements確定Key與Value分別是用戶與分?jǐn)?shù),然后Sum定義按key分組,并累加分?jǐn)?shù)。Beam支持將多個對數(shù)據(jù)的操作合并成一個操作,這樣不僅可以支持更清晰的業(yè)務(wù)邏輯實現(xiàn),同時也可以在多處重用合并后的操作邏輯。

每小時團隊分?jǐn)?shù)

按照小時統(tǒng)計每個團隊的分?jǐn)?shù),獲得***分?jǐn)?shù)的團隊可能獲得獎勵,這個分析任務(wù)增加了對窗口的要求,不過我們依然可以通過一個批處理任務(wù)實現(xiàn),對于這個任務(wù)的“WWWH”四個維度的分析如下:

相對于***個用戶分?jǐn)?shù)任務(wù),只是在Where部分回答了“數(shù)據(jù)在什么范圍中計算?”的問題,同時在What部分“如何計算數(shù)據(jù)?”中,分組的條件由用戶改為了團隊,這在代碼中也會相應(yīng)的體現(xiàn):

 

  1. gameEvents  
  2. [... input ...]  
  3. [... parse ...]  
  4. .apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i)  
  5. -> new Instant(i.getTimestamp())))  
  6. .apply("FixedWindowsTeam", Window.into 
  7. FixedWindows.of(Duration.standardMinutes(windowDuration))))  
  8. .apply("ExtractTeamScore", new ExtractAndSumScore("team"))  
  9. [... output ...]; 

“AddEventTimestamps”定義了如何從原始數(shù)據(jù)中抽取EventTime數(shù)據(jù),“FixedWindowsTeam”則定義了1小時固定窗口,然后重用了ExtractAndSumScore類,只是將分組的列從用戶改成了團隊。對于每小時團隊分?jǐn)?shù)任務(wù),引入了關(guān)于“Where”部分窗口定義的新業(yè)務(wù)邏輯,但是從代碼中可以看到,關(guān)于“Where”部分的實現(xiàn)和關(guān)于“What”部分的實現(xiàn)是完全獨立的,用戶只需要新加兩行關(guān)于“Where”的代碼,非常簡單和清晰。

排行榜

前面兩個任務(wù)均是基于有限數(shù)據(jù)集的批處理任務(wù),對于排行榜來說,我們同樣需要統(tǒng)計用戶分?jǐn)?shù)以及每小時團隊分?jǐn)?shù),但是從業(yè)務(wù)角度希望得到的是實時數(shù)據(jù)。對于Apache Beam來說,一個相同處理邏輯的批處理任務(wù)和流處理任務(wù)的唯一不同就是任務(wù)的輸入和輸出,中間的業(yè)務(wù)邏輯Pipeline無需任何改變。對于當(dāng)前示例的排行榜數(shù)據(jù)分析任務(wù),我們不僅希望他們滿足和前兩個示例相同的業(yè)務(wù)邏輯,同時也可以滿足更定制化的業(yè)務(wù)需求,例如:

  1. 流處理任務(wù)相對于批處理任務(wù),一個非常重要的特性是,流處理任務(wù)可以更加實時的返回計算結(jié)果,例如計算每小時團隊分?jǐn)?shù)時,對于一小時的時間窗口,默認(rèn)是在一小時的數(shù)據(jù)全部到達(dá)后,把最終的結(jié)算結(jié)果輸出,但是流處理系統(tǒng)應(yīng)該同時支持在一小時窗口只有部分?jǐn)?shù)據(jù)到達(dá)時,就將部分計算結(jié)果輸出,從而使得用戶可以得到實時的分析結(jié)果。
  2. 保證和批處理任務(wù)一致的計算結(jié)果正確性。由于亂序數(shù)據(jù)的存在,對于某一個計算窗口,如何確定所有數(shù)據(jù)是否到達(dá)(Watermark)?遲到數(shù)據(jù)如何處理?處理結(jié)果如何輸出,總量,增量,并列?流處理系統(tǒng)應(yīng)該提供機制保證用戶可以在滿足低延遲性能的同時達(dá)到最終的計算結(jié)果正確性。

上述兩個問題正是通過回答“When”和“How”兩個問題來定義用戶的數(shù)據(jù)分析需求。“When”取決于用戶希望多常得到計算結(jié)果,在回答“When”的時候,基本上可以分為四個階段:

  1. Early。在窗口結(jié)束前,確定何時輸出中間狀態(tài)數(shù)據(jù)。
  2. On-Time。在窗口結(jié)束時,輸出窗口數(shù)據(jù)計算結(jié)果。由于亂序數(shù)據(jù)的存在,如何判斷窗口結(jié)束可能是用戶根據(jù)額外的知識預(yù)估的,且允許在用戶設(shè)定的窗口結(jié)束后出現(xiàn)遲到的屬于該窗口的數(shù)據(jù)。
  3. Late。在窗口結(jié)束后,有遲到的數(shù)據(jù)到達(dá),在這個階段,何時輸出計算結(jié)果。
  4. Final。能夠容忍遲到的***限度,例如1小時。到達(dá)***的等待時間后,輸出最終的計算結(jié)果,同時不再接受之后的遲到數(shù)據(jù),清理該窗口的狀態(tài)數(shù)據(jù)。

對于每小時團隊得分的流處理任務(wù),本示例希望的業(yè)務(wù)邏輯為,基于Event Time的1小時時間窗口,按團隊計算分?jǐn)?shù),在一小時窗口內(nèi),每5分鐘輸出一次當(dāng)前的團隊分?jǐn)?shù),對于遲到的數(shù)據(jù),每10分鐘輸出一次當(dāng)前的團隊分?jǐn)?shù),在窗口結(jié)束2小時后遲到的數(shù)據(jù)一般不可能會出現(xiàn),假如出現(xiàn)的話,直接拋棄。“WWWH”表達(dá)如下:

在基于Beam SDK的實現(xiàn)中,用戶基于“WWWH” Beam Model表示的業(yè)務(wù)邏輯可以分別獨立直接的實現(xiàn)出來:

 

  1. gameEvents  
  2. [... input ...]  
  3. .apply("LeaderboardTeamFixedWindows", Window  
  4. .into(FixedWindows.of 
  5. Duration.standardMinutes(Durations.minutes(60)))) 
  6. .triggering(AfterWatermark.pastEndOfWindow()  
  7. .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()  
  8. .plusDelayOf(Durations.minutes(5)))  
  9. .withLateFirings(AfterProcessingTime.pastFirstElementInPane()  
  10. .plusDelayOf(Durations.minutes(10))))  
  11. .withAllowedLateness(Duration.standardMinutes(120)  
  12. .accumulatingFiredPanes())  
  13. .apply("ExtractTeamScore", new ExtractAndSumScore("team"))  
  14. [... output ...] 

LeaderboardTeamFixedWindows對應(yīng)“Where”定義窗口,Trigger對應(yīng)“Where”定義結(jié)果輸出條件,Accumulation對應(yīng)“How”定義輸出結(jié)果內(nèi)容,ExtractTeamScore對應(yīng)“What”定義計算邏輯。

總結(jié)

Apache Beam的Beam Model對***亂序數(shù)據(jù)流的數(shù)據(jù)處理進(jìn)行了非常優(yōu)雅的抽象,“WWWH”四個維度對數(shù)據(jù)處理的描述,非常清晰與合理,Beam Model在統(tǒng)一了對***數(shù)據(jù)流和有限數(shù)據(jù)集的處理模式的同時,也明確了對***數(shù)據(jù)流的數(shù)據(jù)處理方式的編程范式,擴大了流處理系統(tǒng)可應(yīng)用的業(yè)務(wù)范圍,例如,Event-Time/Session窗口的支持,亂序數(shù)據(jù)的處理支持等。Apache Flink,Apache Spark Streaming等項目的API設(shè)計均越來越多的借鑒或參考了Apache Beam Model,且作為Beam Runner的實現(xiàn),與Beam SDK的兼容度也越來越高。本文主要介紹了Beam Model,以及如何基于Beam Model設(shè)計現(xiàn)實中的數(shù)據(jù)處理任務(wù),希望能夠讓讀者對Apache Beam項目能夠有一個初步的了解。由于Apache Beam已經(jīng)進(jìn)入Apache Incubator孵化,所以讀者也可以通過官網(wǎng)或是郵件組了解更多Apache Beam的進(jìn)展和狀態(tài)。

引用

  1. Apache Beam (incubating)
  2.  https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
  3. The world beyond batch: Streaming 102
  4. https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison
責(zé)任編輯:未麗燕 來源: 《程序員》雜志
相關(guān)推薦

2019-05-23 15:44:55

Flink大數(shù)據(jù)框架

2015-10-15 10:30:32

2013-07-27 21:28:44

2016-11-15 09:44:21

大數(shù)據(jù)批處理流處理

2013-07-22 09:47:17

大數(shù)據(jù)IBM技術(shù)大會

2015-03-09 14:24:59

TeradataAppCenterAster

2017-03-08 10:56:03

大數(shù)據(jù)架構(gòu)數(shù)據(jù)湖

2012-11-16 11:31:39

大數(shù)據(jù)CRM

2013-06-27 11:21:17

2023-10-05 18:25:40

存儲分開存儲SSD

2019-05-22 09:34:24

物聯(lián)網(wǎng)技術(shù)標(biāo)準(zhǔn)物聯(lián)網(wǎng)IOT

2025-02-13 09:37:58

2012-06-07 09:06:04

主流云計算產(chǎn)品大數(shù)據(jù)分析

2018-05-17 11:31:45

大數(shù)據(jù)IOTA架構(gòu)數(shù)據(jù)架構(gòu)

2024-05-14 08:03:31

SaaS 服務(wù)云原生AI 一體架構(gòu)

2018-09-11 08:00:00

DevOpsAIOps機器學(xué)習(xí)

2020-06-02 08:05:28

智能電表蜂窩物聯(lián)網(wǎng)NB-IoT

2024-02-26 14:46:53

移動計算人工智能5G

2010-04-29 16:19:27

數(shù)據(jù)中心IT安全世紀(jì)互聯(lián)

2025-01-03 09:24:10

模型架構(gòu)論文
點贊
收藏

51CTO技術(shù)棧公眾號