Spark與Storm注意:Apache Apex已然橫空出世
譯文作為新的開源數據流分析方案,Apex脫胎于DataTorrent的RTS平臺,能夠帶來出色的速度表現并簡化編程要求。
說起數據流分析任務,我們首先想到的自然是Spark。盡管Spark已經憑借著2.0版本將非結構化與結構化兩類數據的分析能力融為一體,但Storm的1.0版本解決了自身難于使用的問題。
誕生于2015年6月的Apache Apex可謂橫空出世,其同樣源自DataTorrent及其令人印象深刻的RTS平臺,其中包含一套核心處理引擎,儀表板、診斷與監(jiān)控工具套件外加專門面向數據科學家用戶的圖形流編程系統dtAssemble。
作為RTS平臺的核心處理引擎,Apex可以說是DataTorrent獻給Apache的又一份大禮。Apex的設計目標在于運行大家的現有Hadoop生態(tài)系統,并利用YARN實現按需規(guī)模伸縮且通過HDFS實現容錯能力。盡管其并不像RTS平臺那樣功能全面,但Apex已經足以提供大家希望數據處理平臺所能實現的多數主要功能。
Apex應用示例
下面我們來看一套基本Apex流程示例,其中將涉及多項核心概念。在本示例中,我們將讀取Kafka中的日志條目,對日志記錄類型進行計數并將其寫入控制臺當中。相關代碼片段將實際列出,大家也可以點擊此處獲取GitHub上的完整應用。
Apex的核心概念在于operator,其屬于Java類,負責實現輸入信息接收與輸出信息生成。(如果大家熟悉Storm,那么其作用基本類似于bolt與spout。)另外,每個operator還會定義一組用于數據輸入或輸出的端口。該方法的實際作用在于讀取來自InputPort的輸入信息,或者通過OutportPort向下游發(fā)送數據。
通過operator的數據流將進行建模,即將數據流拆分為基于時間的數據窗口——但與Spark的microbathcing不同,Apex中的輸入數據處理無需等待窗口結束即可開始進行。
DataTorrent
在以下示例中,我們需要3個operator,它們各自對應三種Apex所支持的operator類型中的一種:輸入operator負責由Kafka讀取信息條目,通用operator負責對日志類型進行計數,而輸出opeartor則將其寫入控制臺。對于***種與第三種,我們可以直接使用Apex的Malhar庫,但在第二種中我們需要使用定制化業(yè)務邏輯以對查看到的不同日志類型進行計數。
下面來看我們的LogCounterOperator代碼內容:
public class LogCounterOperator extends BaseOperator { private HashMap counter; public transient DefaultInputPort input = new DefaultInputPort() { @Override public void process(String text) { String type = text.substring(0, text.indexOf(' ')); Integer currentCounter = counter.getOrDefault(type, 0); counter.put(type, currentCounter+1); } }; public transient DefaultOutputPort> output = new DefaultOutputPort<>(); @Override public void endWindow() { output.emit(counter); } @Override public void setup(OperatorContext context){ counter = new HashMap(); } }
這里我們使用簡單的HashMap進行日志類型計數,同時定義2個端口以通過該operator實現數據流處理:其一負責輸入,其二負責輸出。在輸入過程中,不兼容operator將引發(fā)編譯時失敗。需要注意的是,雖然我在這里只定義了1個輸入端口與1個輸出端口,但大家也可根據需要定義多個端口。
通用opeartor的生命周期非常簡單。Apex會首先調用 setup()以進行任何必要的初始化操作;在以上示例中, setup()負責完成HashMap的創(chuàng)建工作。其隨后調用beginWindow()以聲明新的輸入處理窗口/批量任務正在開始,接著在整個過程中對各數據條目調用。如果當前窗口的剩余時間歸零,Apex則會調用endWindow()。我們不需要任何針對單一窗口的邏輯,因此將BaseOperator中的beginWindow()定義留空即可。然而,在每個窗口的末尾,我們都需要發(fā)送當前計數結果,從而將HashMap通過輸出端口進行發(fā)送。
與此同時,經過重寫的process()方法負責處理我們的業(yè)務邏輯,即從日志行中提取***個詞并更新計數器。***,我們調用teardown()方法,從而保證Apex流程得到必要的清理——本示例其實并不需要清理,但出于演示的考慮,我們將清理HashMap。
現在我們的operator已經創(chuàng)建完成,接下來需要構建流程本身。如果大家熟悉Storm拓撲結構,那么應該能夠輕松理解以下代碼:
public void populateDAG(DAG dag, Configuration conf) { KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("KafkaInput", new KafkaSinglePortStringInputOperator()); kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); LogCounterOperator logCounter = dag.addOperator("LogCounterOperator", new LogCounterOperator()); ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); dag.addStream("LogLines", kafkaInput.outputPort, logCounter.input); dag.addStream("Console", logCounter.output, console.input); }
我們首先定義DAG(即operator)節(jié)點。之后,我們定義圖形邊界(在Apex詞匯中稱其為‘stream’)。這些stream負責將某一operator的輸出端口接入另一opeartor的輸入端口。在這里,我們將Kafka接入LogCounterOperator,并將輸出端口接入ConsoleOutputOperator。工作完成!如果我們編譯并運行該應用,則能夠在標準輸出結果中看到HashMap:
{INFO=1} {ERROR=1, INFO=1} {ERROR=1, INFO=2} {ERROR=1, INFO=2, DEBUG=1} …
Malhar: 豐富的實用組件
Operator的***優(yōu)勢在于其體積小巧且經過明確定義,因此能夠輕松實現構建與測試。其接合方式類似于樂高積木——惟一的區(qū)別在于樂高積木是現成的,但operator需要我們自行創(chuàng)建。
Malhar就像是一個巨大的樂高積木桶,其中旋轉有大量標準的2 x 4基本件供大家使用。無論是讀取Splunk,在FTP站點上合并文本文件信息還是將結果存儲在HBase當中,Malhar都能幫助我們實現。
有了Malhar提供的豐富operator組件,Apex就變得***吸引力,這意味著我們只需要設計業(yè)務邏輯即可。有時候Malhar operator的說明文檔比較粗糙,但該庫中的一切都配備有測試機制,因此我們可以輕松查看不同組件間的協作效果。
Apex還提供其它一些出色的設計成果。除了常見的指標與報告方案外,dtCli應用允許我們以動態(tài)方式變更運行時中的已提交應用。大家是否希望向HDFS當中添加一些負責寫入日志條目的operator,但又不希望影響到應用的整體運行?Apex能夠輕松完成這項任務。
開源數據流處理引擎已經相當豐富,但要在其中脫穎而出則絕非易事。隨著Malhar庫提供的龐大opeartor選項以及Apex自身所具備的出色容錯能力、低延遲以及可擴展性,Apex已經成為一款速度出色且可用于生產環(huán)境的理想框架。
在這里,我建議DataTorrent為Apache Beam開發(fā)一套Apex運行器,從而幫助開發(fā)者們更輕松地將自己的應用從現有框架中移植出來。當然,Apex目前已經相當優(yōu)秀,足以成為值得大家認真考量的數據流處理引擎。
原文鏈接:Look out, Spark and Storm, here comes Apache Apex