Apache Beam 剖析
1.概述
在大數(shù)據(jù)的浪潮之下,技術(shù)的更新迭代十分頻繁。受技術(shù)開源的影響,大數(shù)據(jù)開發(fā)者提供了十分豐富的工具。但也因?yàn)槿绱?,增加了開發(fā)者選擇合適工具的難度。在大數(shù)據(jù)處理一些問題的時候,往往使用的技術(shù)是多樣化的。這完全取決于業(yè)務(wù)需求,比如進(jìn)行批處理的MapReduce,實(shí)時流處理的Flink,以及SQL交互的Spark SQL等等。而把這些開源框架,工具,類庫,平臺整合到一起,所需要的工作量以及復(fù)雜度,可想而知。這也是大數(shù)據(jù)開發(fā)者比較頭疼的問題。而今天要分享的就是整合這些資源的一個解決方案,它就是 Apache Beam。
2.內(nèi)容
Apache Beam 最初叫 Apache Dataflow,由谷歌和其合作伙伴向Apache捐贈了大量的核心代碼,并創(chuàng)立孵化了該項目。該項目的大部分大碼來自于 Cloud Dataflow SDK,其特點(diǎn)有以下幾點(diǎn):
- 統(tǒng)一數(shù)據(jù)批處理(Batch)和流處理(Stream)編程的范式
- 能運(yùn)行在任何可執(zhí)行的引擎之上
那 Apache Beam到底能解決哪些問題,它的應(yīng)用場景是什么,下面我們可以通過一張圖來說明,如下圖所示:
通過改圖,我們可以很清晰的看到整個技術(shù)的發(fā)展流向;一部分是谷歌派系,另一部分則是Apache派系。在開發(fā)大數(shù)據(jù)應(yīng)用時,我們有時候使用谷歌的框架,API,類庫,平臺等,而有時候我們則使用Apache的,比如:HBase,F(xiàn)link,Spark等。而我們要整合這些資源則是一個比較頭疼的問題,Apache Beam 的問世,整合這些資源提供了很方便的解決方案。
2.1 Vision
下面,我們通過一張流程圖來看Beam的運(yùn)行流程,如下圖所示:
通過上圖,我們可以清楚的知道,執(zhí)行一個流程分以下步驟:
- End Users:選擇一種你熟悉的編程語言提交應(yīng)用
- SDK Writers:該編程語言必須是 Beam 模型支持的
- Library Writers:轉(zhuǎn)換成Beam模型的格式
- Runner Writers:在分布式環(huán)境下處理并支持Beam的數(shù)據(jù)處理管道
- IO Providers:在Beam的數(shù)據(jù)處理管道上運(yùn)行所有的應(yīng)用
- DSL Writers:創(chuàng)建一個高階的數(shù)據(jù)處理管道
2.2 SDK
Beam SDK 提供了一個統(tǒng)一的編程模型,來處理任意規(guī)模的數(shù)據(jù)集,其中包括有限的數(shù)據(jù)集,無限的流數(shù)據(jù)。Apache Beam SDK 使用相同的類來表達(dá)有限和無限的數(shù)據(jù),同樣使用相同的轉(zhuǎn)換方法對數(shù)據(jù)進(jìn)行操作。Beam 提供了多種 SDK,你可以選擇一種你熟悉的來建立數(shù)據(jù)處理管道,如上述的 2.1 中的圖,我們可以知道,目前 Beam 支持 Java,Python 以及其他待開發(fā)的語言。
2.3 Pipeline Runners
在 Beam 管道上運(yùn)行引擎會根據(jù)你選擇的分布式處理引擎,其中兼容的 API 轉(zhuǎn)換你的 Beam 程序應(yīng)用,讓你的 Beam 應(yīng)用程序可以有效的運(yùn)行在指定的分布式處理引擎上。因而,當(dāng)運(yùn)行 Beam 程序的時候,你可以按照自己的需求選擇一種分布式處理引擎。當(dāng)前 Beam 支持的管道運(yùn)行引擎有以下幾種:
- Apache Apex
- Apache Flink
- Apache Spark
- Google Cloud Dataflow
3.示例
本示例通過使用 Java SDK 來完成,你可以嘗試運(yùn)行在不同的執(zhí)行引擎上。
3.1 開發(fā)環(huán)境
- 下載安裝 JDK 7 或更新的版本,檢測 JAVA_HOME環(huán)境變量
- 下載 Maven 打包環(huán)境。
關(guān)于上述的安裝步驟,并不是本篇博客的重點(diǎn),這里筆者就不多贅述了,不明白的可以到官網(wǎng)翻閱文檔進(jìn)行安裝。
3.2 下載示例代碼
Apache Beam 的源代碼在 Github 有托管,可以到 Github 下載對應(yīng)的源碼,下載地址:https://github.com/apache/beam
然后,將其中的示例代碼進(jìn)行打包,命令如下所示:
- $ mvn archetype:generate \
- -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots \
- -DarchetypeGroupId=org.apache.beam \
- -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
- -DarchetypeVersion=LATEST \
- -DgroupId=org.example \
- -DartifactId=word-count-beam \
- -Dversion="0.1" \
- -Dpackage=org.apache.beam.examples \
- -DinteractiveMode=false
此時,命令會創(chuàng)建一個文件夾 word-count-beam,里面包含一個 pom.xml 和相關(guān)的代碼文件。命令如下所示:
- $ cd word-count-beam/
- $ ls
- pom.xml src
- $ ls src/main/java/org/apache/beam/examples/
- DebuggingWordCount.java WindowedWordCount.java common
- MinimalWordCount.java WordCount.java
3.3 運(yùn)行 WordCount 示例代碼
一個 Beam 程序可以運(yùn)行在多個 Beam 的可執(zhí)行引擎上,包括 ApexRunner,F(xiàn)linkRunner,SparkRunner 或者 DataflowRunner。 另外還有 DirectRunner。不需要特殊的配置就可以在本地執(zhí)行,方便測試使用。
下面,你可以按需選擇你想執(zhí)行程序的引擎:
- 對引擎進(jìn)行相關(guān)配置
- 使用不同的命令:通過 –runner=<runner>參數(shù)指明引擎類型,默認(rèn)是 DirectRunner;添加引擎相關(guān)的參數(shù);指定輸出文件和輸出目錄,當(dāng)然這里需要保證文件目錄是執(zhí)行引擎可以訪問到的,比如本地文件目錄是不能被外部集群訪問的。
- 運(yùn)行示例程序
3.3.1 Direct
- $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
3.3.2 Apex
- $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--inputFile=pom.xml --output=counts --runner=ApexRunner" -Papex-runner
3.3.3 Flink-Local
- $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
3.3.4 Flink-Cluster
- $ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
- --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
然后,你可以通過訪問 http://<flink master>:8081 來監(jiān)測運(yùn)行的應(yīng)用程序。
3.3.5 Spark
- $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
3.3.6 Dataflow
- $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
- --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
- -Pdataflow-runner
3.4 運(yùn)行結(jié)果
當(dāng)程序運(yùn)行完成后,你可以看到有多個文件以 count 開頭,個數(shù)取決于執(zhí)行引擎的類型。當(dāng)你查看文件的內(nèi)容的時候,每個唯一的單詞后面會顯示其出現(xiàn)次數(shù),但是前后順序是不固定的,也是分布式引擎為了提高效率的一種常用方式。
3.4.1 Direct
- $ ls counts*
- $ more counts*
- api: 9
- bundled: 1
- old: 4
- Apache: 2
- The: 1
- limitations: 1
- Foundation: 1
- ...
3.4.2 Apex
- $ cat counts*
- BEAM: 1
- have: 1
- simple: 1
- skip: 4
- PAssert: 1
- ...
3.4.3 Flink-Local
- $ ls counts*
- $ more counts*
- The: 1
- api: 9
- old: 4
- Apache: 2
- limitations: 1
- bundled: 1
- Foundation: 1
- ...
3.4.4 Flink-Cluster
- $ ls /tmp/counts*
- $ more /tmp/counts*
- The: 1
- api: 9
- old: 4
- Apache: 2
- limitations: 1
- bundled: 1
- Foundation: 1
- ...
3.4.5 Spark
- $ ls counts*
- $ more counts*
- beam: 27
- SF: 1
- fat: 1
- job: 1
- limitations: 1
- require: 1
- of: 11
- profile: 10
- ...
3.4.6 Dataflow
- $ gsutil ls gs://<your-gcs-bucket>/counts*
- $ gsutil cat gs://<your-gcs-bucket>/counts*
- feature: 15
- smother'st: 1
- revelry: 1
- bashfulness: 1
- Bashful: 1
- Below: 2
- deserves: 32
- barrenly: 1
- ...
4.總結(jié)
Apache Beam 主要針對理想并行的數(shù)據(jù)處理任務(wù),并通過把數(shù)據(jù)集拆分多個子數(shù)據(jù)集,讓每個子數(shù)據(jù)集能夠被單獨(dú)處理,從而實(shí)現(xiàn)整體數(shù)據(jù)集的并行化處理。當(dāng)然,也可以用 Beam 來處理抽取,轉(zhuǎn)換和加載任務(wù)和數(shù)據(jù)集成任務(wù)(一個ETL過程)。進(jìn)一步將數(shù)據(jù)從不同的存儲介質(zhì)中或者數(shù)據(jù)源中讀取,轉(zhuǎn)換數(shù)據(jù)格式,最后加載到新的系統(tǒng)中。