Apache五大主流流計算框架詳細(xì)對比
幾個月之前我們在這里討論過[](http://www.cakesolutions.net/teamblogs/introduction-into-distributed-real-time-stream-processing)目前對于這種日漸增加的分布式流計算的需求的原因。當(dāng)然,目前也有很多的各式各樣的框架被用于處理這一些問題?,F(xiàn)在我們會在這篇文章中進(jìn)行回顧,來討論下各種框架之間的相似點以及區(qū)別在哪里,還有就是從我的角度分析的,推薦的適用的用戶場景。
如你所想,分布式的流處理也就是通常意義上的持續(xù)處理、數(shù)據(jù)富集以及對于無界數(shù)據(jù)的分析過程的組合。它是一個類似于MapReduce這樣的通用計算模型,但是我們希望它能夠在毫秒級別或者秒級別完成響應(yīng)。這些系統(tǒng)經(jīng)常被有向非循環(huán)圖(Directed ACyclic Graphs,DAGs)來表示。
DAG主要功能即是用圖來表示鏈?zhǔn)降娜蝿?wù)組合,而在流處理系統(tǒng)中,我們便常常用DAG來描述一個流工作的拓?fù)?。筆者自己是從Akka的Stream 中的術(shù)語得到了啟發(fā)。如下圖所示,數(shù)據(jù)流經(jīng)過一系列的處理器從源點流動到了終點,也就是用來描述這流工作。談到Akka的Streams,我覺得要著重強(qiáng)調(diào)下分布式這個概念,因為即使也有一些單機(jī)的解決方案可以創(chuàng)建并且運行DAG,但是我們?nèi)匀恢塾谀切┛梢赃\行在多機(jī)上的解決方案。
Points of Interest
在不同的系統(tǒng)之間進(jìn)行選擇的時候,我們主要關(guān)注到以下幾點。
- Runtime and Programming model(運行與編程模型)
一個平臺提供的編程模型往往會決定很多它的特性,并且這個編程模型應(yīng)該足夠處理所有可能的用戶案例。這是一個決定性的因素,我也會在下文中多次討論。
- Functional Primitives(函數(shù)式單元)
一個合格的處理平臺應(yīng)該能夠提供豐富的能夠在獨立信息級別進(jìn)行處理的函數(shù),像map、filter這樣易于實現(xiàn)與擴(kuò)展的一些函數(shù)。同樣也應(yīng)提供像aggregation這樣的跨信息處理函數(shù)以及像join這樣的跨流進(jìn)行操作的函數(shù),雖然這樣的操作會難以擴(kuò)展。
- State Management(狀態(tài)管理)
大部分這些應(yīng)用都有狀態(tài)性的邏輯處理過程,因此,框架本身應(yīng)該允許開發(fā)者去維護(hù)、訪問以及更新這些狀態(tài)信息。
- Message Delivery Guarantees(消息投遞的可達(dá)性保證)
一般來說,對于消息投遞而言,我們有至多一次(at most once)、至少一次(at least once)以及恰好一次(exactly once)這三種方案。
- at most once
At most once投遞保證每個消息會被投遞0次或者1次,在這種機(jī)制下消息很有可能會丟失。
- at least once
At least once投遞保證了每個消息會被默認(rèn)投遞多次,至少保證有一次被成功接收,信息可能有重復(fù),但是不會丟失。
- exactly once
exactly once意味著每個消息對于接收者而言正好被接收一次,保證即不會丟失也不會重復(fù)。
- Failures Handling
在一個流處理系統(tǒng)中,錯誤可能經(jīng)常在不同的層級發(fā)生,譬如網(wǎng)絡(luò)分割、磁盤錯誤或者某個節(jié)點莫名其妙掛掉了。平臺要能夠從這些故障中順利恢復(fù),并且能夠從最后一個正常的狀態(tài)繼續(xù)處理而不會損害結(jié)果。
除此之外,我們也應(yīng)該考慮到平臺的生態(tài)系統(tǒng)、社區(qū)的完備程度,以及是否易于開發(fā)或者是否易于運維等等。
RunTime and Programming Model
運行環(huán)境與編程模型可能是某個系統(tǒng)的最重要的特性,因為它定義了整個系統(tǒng)的呈現(xiàn)特性、可能支持的操作以及未來的一些限制等等。因此,運行環(huán)境與編程模型就確定了系統(tǒng)的能力與適用的用戶案例。目前,主要有兩種不同的方法來構(gòu)建流處理系統(tǒng),其中一個叫Native Streaming,意味著所有輸入的記錄或者事件都會根據(jù)它們進(jìn)入的順序一個接著一個的處理。
另一種方法叫做Micro-Batching。大量短的Batches會從輸入的記錄中創(chuàng)建出然后經(jīng)過整個系統(tǒng)的處理,這些Batches會根據(jù)預(yù)設(shè)好的時間常量進(jìn)行創(chuàng)建,通常是每隔幾秒創(chuàng)建一批。
兩種方法都有一些內(nèi)在的優(yōu)勢與不足,首先來談?wù)凬ative Streaming。好的一方面呢是Native Streaming的表現(xiàn)性會更好一點,因為它是直接處理輸入的流本身的,并沒有被一些不自然的抽象方法所限制住。同時,因為所有的記錄都是在輸入之后立馬被處理,這樣對于請求方而言響應(yīng)的延遲就會優(yōu)于那種Micro-Batching系統(tǒng)。處理這些,有狀態(tài)的操作符也會更容易被實現(xiàn),我們在下文中也會描述這個特點。不過Native Streaming系統(tǒng)往往吞吐量會比較低,并且因為它需要去持久化或者重放幾乎每一條請求,它的容錯的代價也會更高一些。并且負(fù)載均衡也是一個不可忽視的問題,舉例而言,我們根據(jù)鍵對數(shù)據(jù)進(jìn)行了分割并且想做進(jìn)一步地處理。如果某些鍵對應(yīng)的分區(qū)因為某些原因需要更多地資源去處理,那么這個分區(qū)往往就會變成整個系統(tǒng)的瓶頸。
而對于Micro-Batching而言,將流切分為小的Batches不可避免地會降低整個系統(tǒng)的變現(xiàn)性,也就是可讀性。而一些類似于狀態(tài)管理的或者joins、splits這些操作也會更加難以實現(xiàn),因為系統(tǒng)必須去處理整個Batch。另外,每個Batch本身也將架構(gòu)屬性與邏輯這兩個本來不應(yīng)該被糅合在一起的部分相連接了起來。而Micro-Batching的優(yōu)勢在于它的容錯與負(fù)載均衡會更加易于實現(xiàn),它只要簡單地在某個節(jié)點上處理失敗之后轉(zhuǎn)發(fā)給另一個節(jié)點即可。最后,值得一提的是,我們可以在Native Streaming的基礎(chǔ)上快速地構(gòu)建Micro-Batching的系統(tǒng)。
而對于編程模型而言,又可以分為Compositional(組合式)與Declarative(聲明式)。組合式會提供一系列的基礎(chǔ)構(gòu)件,類似于源讀取與操作符等等,開發(fā)人員需要將這些基礎(chǔ)構(gòu)件組合在一起然后形成一個期望的拓?fù)浣Y(jié)構(gòu)。新的構(gòu)件往往可以通過繼承與實現(xiàn)某個接口來創(chuàng)建。另一方面,聲明式API中的操作符往往會被定義為高階函數(shù)。聲明式編程模型允許我們利用抽象類型和所有其他的精選的材料來編寫函數(shù)式的代碼以及優(yōu)化整個拓?fù)鋱D。同時,聲明式API也提供了一些開箱即用的高等級的類似于窗口管理、狀態(tài)管理這樣的操作符。下文中我們也會提供一些代碼示例。
Apache Streaming Landscape
目前已經(jīng)有了各種各樣的流處理框架,自然也無法在本文中全部攘括。所以我必須將討論限定在某些范圍內(nèi),本文中是選擇了所有Apache旗下的流處理的框架進(jìn)行討論,并且這些框架都已經(jīng)提供了Scala的語法接口。主要的話就是Storm以及它的一個改進(jìn)Trident Storm,還有就是當(dāng)下正火的Spark。最后還會討論下來自LinkedIn的Samza以及比較有希望的Apache Flink。筆者個人覺得這是一個非常不錯的選擇,因為雖然這些框架都是出于流處理的范疇,但是他們的實現(xiàn)手段千差萬別。
Apache Storm 最初由Nathan Marz以及他的BackType的團(tuán)隊在2010年創(chuàng)建。后來它被Twitter收購并且開源出來,并且在2014年變成了Apache的頂層項目。毫無疑問,Storm是大規(guī)模流處理中的先行者并且逐漸成為了行業(yè)標(biāo)準(zhǔn)。Storm是一個典型的Native Streaming系統(tǒng)并且提供了大量底層的操作接口。另外,Storm使用了Thrift來進(jìn)行拓?fù)涞亩x,并且提供了大量其他語言的接口。
Trident 是一個基于Storm構(gòu)建的上層的Micro-Batching系統(tǒng),它簡化了Storm的拓?fù)錁?gòu)建過程并且提供了類似于窗口、聚合以及狀態(tài)管理等等沒有被Storm原生支持的功能。另外,Storm是實現(xiàn)了至多一次的投遞原則,而Trident實現(xiàn)了恰巧一次的投遞原則。Trident 提供了 Java, Clojure 以及 Scala 接口。
眾所周知,Spark是一個非常流行的提供了類似于SparkSQL、Mlib這樣內(nèi)建的批處理框架的庫,并且它也提供了 Spark Streaming這樣優(yōu)秀地流處理框架。Spark的運行環(huán)境提供了批處理功能,因此,Spark Streaming毫無疑問是實現(xiàn)了Micro-Batching機(jī)制。輸入的數(shù)據(jù)流會被接收者分割創(chuàng)建為Micro-Batches,然后像其他 Spark任務(wù)一樣進(jìn)行處理。Spark 提供了 Java, Python 以及 Scala 接口。
Samza最早是由LinkedIn提出的與Kafka協(xié)同工作的優(yōu)秀地流解決方案,Samza已經(jīng)是LinkedIn內(nèi)部關(guān)鍵的基礎(chǔ)設(shè)施之一。Samza重負(fù)依賴于Kafaka的基于日志的機(jī)制,二者結(jié)合地非常好。Samza提供了Compositional接口,并且也支持Scala。
最后聊聊Flink. Flink可謂一個非常老的項目了,最早在2008年就啟動了,不過目前正在吸引越來越多的關(guān)注。Flink也是一個Native Streaming的系統(tǒng),并且提供了大量高級別的API。Flink也像Spark一樣提供了批處理的功能,可以作為流處理的一個特殊案例來看。 Flink強(qiáng)調(diào)萬物皆流,這是一個絕對的更好地抽象,畢竟確實是這樣。
下表就簡單列舉了上述幾個框架之間的特性:
Counting Words
Wordcount就好比流處理領(lǐng)域的HelloWorld,它能夠很好地描述不同框架間的差異性。首先看看Storm是如何編寫WordCount程序的:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new Split(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); ... Map
首先來看看它的拓?fù)涠x,在第2行那邊是定義了一個Spout,也就是一個輸入源。然后定義了一個Bold,也就是一個處理的組件,用于將某個句子分割成詞序列。然后還定義了另一個Bolt用來負(fù)責(zé)真實的詞計算。5,8到12行省略的過程用于定義集群中使用了多少個線程來供每一個組件使用。如你所見,所有的定義都是比較底層的與手動的。接下來繼續(xù)看看這個8-15行,也就是真正用于WordCount的部分代碼。因為Storm沒有內(nèi)建的狀態(tài)處理的支持,所以我必須自定義這樣一個本地狀態(tài),和理想的相差甚遠(yuǎn)啊。下面我們繼續(xù)看看Trident。
正如我上文中提及的,Trident是一個基于Storm的Micro-Batching的擴(kuò)展,它提供了狀態(tài)管理等等功能。
public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = ... TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"),new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); ... }
從代碼中就可以看出,在Trident中就可以使用一些上層的譬如each、groupBy這樣的操作符,并且可以在Trident中內(nèi)建的進(jìn)行狀態(tài)管理了。接下來我們再看看Spark提供的聲明式的接口,要記住,與前幾個例子不同的是,基于Spark的代碼已經(jīng)相當(dāng)簡化了,下面基本上就是要用到的全部的代碼了:
val conf = new SparkConf().setAppName("wordcount") val ssc = new StreamingContext(conf, Seconds(1)) val text = ... val counts = text.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.print() ssc.start() ssc.awaitTermination()
每個Spark的流任務(wù)都需要一個StreamingContext用來指定整個流處理的入口。StreamingContext定義了Batch的間隔,上面是設(shè)置到了1秒。在6-8行即是全部的詞統(tǒng)計的計算過程,非常不一樣啊。下面再看看Apache Samza,另一個代表性的組合式的API:
class WordCountTask extends StreamTask { override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val text = envelope.getMessage.asInstanceOf[String] val counts = text.split(" ").foldLeft(Map.empty[String, Int]) { (count, word) => count + (word -> (count.getOrElse(word, 0) + 1)) } collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts)) }
Topology定義在了Samza的屬性配置文件里,為了明晰起見,這里沒有列出來。下面再看看Fink,可以看出它的接口風(fēng)格非常類似于Spark Streaming,不過我們沒有設(shè)置時間間隔:
val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements(...) val counts = text.flatMap ( _.split(" ") ) .map ( (_, 1) ) .groupBy(0) .sum(1) counts.print() env.execute("wordcount")
與批處理系統(tǒng)相比,流處理系統(tǒng)中的容錯機(jī)制固然的會比批處理中的要難一點。在批處理系統(tǒng)中,如果碰到了什么錯誤,只要將計算中與該部分錯誤關(guān)聯(lián)的重新啟動就好了。不過在流計算的場景下,容錯處理會更加困難,因為會不斷地有數(shù)據(jù)進(jìn)來,并且有些任務(wù)可能需要7*24地運行著。另一個我們碰到的挑戰(zhàn)就是如何保證狀態(tài)的一致性,在每天結(jié)束的時候我們會開始事件重放,當(dāng)然不可能所有的狀態(tài)操作都會保證冪等性。下面我們就看看其他的系統(tǒng)是怎么處理的:
Storm
Storm使用了所謂的逆流備份與記錄確認(rèn)的機(jī)制來保證消息會在某個錯誤之后被重新處理。記錄確認(rèn)這一個操作工作如下:一個操作器會在處理完成一個記錄之后向它的上游發(fā)送一個確認(rèn)消息。而一個拓?fù)涞脑磿4嬗兴衅鋭?chuàng)建好的記錄的備份。一旦受到了從Sinks發(fā)來的包含有所有記錄的確認(rèn)消息,就會把這些確認(rèn)消息安全地刪除掉。當(dāng)發(fā)生錯誤時,如果還沒有接收到全部的確認(rèn)消息,就會從拓?fù)涞脑撮_始重放這些記錄。這就確保了沒有數(shù)據(jù)丟失,不過會導(dǎo)致重復(fù)的 Records處理過程,這就屬于At-Least投送原則。
Storm用一套非常巧妙的機(jī)制來保證了只用很少的字節(jié)就能保存并且追蹤確認(rèn)消息,但是并沒有太多關(guān)注于這套機(jī)制的性能,從而使得Storm有較低地吞吐量,并且在流控制上存在一些問題,譬如這種確認(rèn)機(jī)制往往在存在背壓的時候錯誤地認(rèn)為發(fā)生了故障。
Spark Streaming
Spark Streaming以及它的Micro-Batching機(jī)制則使用了另一套方案,道理很簡單,Spark將Micro-Batches分配到多個節(jié)點運行,每個Micro-Batch可以成功運行或者發(fā)生故障,當(dāng)發(fā)生故障時,那個對應(yīng)的Micro-Batch只要簡單地重新計算即可,因為它是持久化并且無狀態(tài)的,所以要保證Exactly-Once這種投遞方式也是很簡單的。
Samza
Samza的實現(xiàn)手段又不一樣了,它利用了一套可靠地、基于Offset的消息系統(tǒng),在很多情況下指的就是Kafka。Samza會監(jiān)控每個任務(wù)的偏移量,然后在接收到消息的時候修正這些偏移量。Offset可以是存儲在持久化介質(zhì)中的一個檢查點,然后在發(fā)生故障時可以進(jìn)行恢復(fù)。不過問題在于你并不知道恢復(fù)到上一個CheckPoint之后到底哪個消息是處理過的,有時候會導(dǎo)致某些消息多次處理,這也是At-Least的投遞原則。
Flink
Flink主要是基于分布式快照,每個快照會保存流任務(wù)的狀態(tài)。鏈路中運送著大量的CheckPoint Barrier(檢查點障礙,就是分隔符、標(biāo)識器之類的),當(dāng)這些Barrier到達(dá)某個Operator的時候,Operator將自身的檢查點與流相關(guān)聯(lián)。與Storm相比,這種方式會更加高效,畢竟不用對每個Record進(jìn)行確認(rèn)操作。不過要注意的是,F(xiàn)link還是Native Streaming,概念上和Spark還是相去甚遠(yuǎn)的。Flink也是達(dá)成了Exactly-Once投遞原則。
Managing State
大部分重要的流處理應(yīng)用都會保有狀態(tài),與無狀態(tài)的操作符相比,這些應(yīng)用中需要一個輸入和一個狀態(tài)變量,然后進(jìn)行處理最終輸出一個改變了的狀態(tài)。我們需要去管理、存儲這些狀態(tài),要保證在發(fā)生故障的時候能夠重現(xiàn)這些狀態(tài)。狀態(tài)的重造可能會比較困難,畢竟上面提到的不少框架都不能保證Exactly- Once,有些Record可能被重放多次。
Storm
Storm是實踐了At-Least投遞原則,而怎么利用Trident來保證Exactly-Once呢?概念上還是很簡單的,只需要使用事務(wù)進(jìn)行提交Records,不過很明顯這種方式及其低效。所以呢,還是可以構(gòu)建一些小的Batches,并且進(jìn)行一些優(yōu)化。Trident是提供了一些抽象的接口來保證實現(xiàn)Exactly-Once,如下圖所示,還有很多東西等著你去挖掘。
Spark Streaming
當(dāng)想要在流處理系統(tǒng)中實現(xiàn)有狀態(tài)的操作時,我們往往想到的是一個長時間運行的Operator,然后輸入一個狀態(tài)以及一系列的Records。不過 Spark Streaming是以另外一種方式進(jìn)行處理的,Spark Streaming將狀態(tài)作為一個單獨地Micro-Batching流進(jìn)行處理,所以在對每個小的Micro-Spark任務(wù)進(jìn)行處理時會輸入一個當(dāng)前的狀態(tài)和一個代表當(dāng)前操作的函數(shù),最后輸出一個經(jīng)過處理的Micro-Batch以及一個更新好的狀態(tài)。
Samza
Samza的處理方式更加簡單明了,就是把它們放到Kafka中,然后問題就解決了。Samza提供了真正意義上的有狀態(tài)的Operators,這樣每個任務(wù)都能保有狀態(tài),然后所有狀態(tài)的變化都會被提交到Kafka中。在有需要的情況下某個狀態(tài)可以很方便地從Kafka的Topic中完成重造。為了提高效率,Samza允許使用插件化的鍵值本地存儲來避免所有的消息全部提交到Kafka。這種思路如下圖所示,不過Samza只是提高了At- Least這種機(jī)制,未來可能會提供Exactly-Once。
Flink
Flink提供了類似于Samza的有狀態(tài)的Operator的概念,在Flink中,我們可以使用兩種不同的狀態(tài)。第一種是本地的或者叫做任務(wù)狀態(tài),它是某個特定的Operator實例的當(dāng)前狀態(tài),并且這種狀態(tài)不會與其他進(jìn)行交互。另一種呢就是維護(hù)了整個分區(qū)的狀態(tài)。
Counting Words with State
Trident
public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = ... TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"),new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); ... }
在第9行中,我們可以通過調(diào)用一個持久化的聚合函數(shù)來創(chuàng)建一個狀態(tài)。
Spark Streaming
// Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)]) val lines = ... val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) Some(output) } val stateDstream = wordDstream.trackStateByKey( StateSpec.function(trackStateFunc).initialState(initialRDD))
在第2行中,我們創(chuàng)建了一個RDD用來保存初始狀態(tài)。然后在5,6行中進(jìn)行一些轉(zhuǎn)換,接下來可以看出,在8-14行中,我們定義了具體的轉(zhuǎn)換方程,即輸入時一個單詞、它的統(tǒng)計數(shù)量和它的當(dāng)前狀態(tài)。函數(shù)用來計算、更新狀態(tài)以及返回結(jié)果,最后我們將所有的Bits一起聚合。
Samza
class WordCountTask extends StreamTask with InitableTask { private var store: CountStore = _ def init(config: Config, context: TaskContext) { this.store = context.getStore("wordcount-store") .asInstanceOf[KeyValueStore[String, Integer]] } override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val words = envelope.getMessage.asInstanceOf[String].split(" ") words.foreach { key => val count: Integer = Option(store.get(key)).getOrElse(0) store.put(key, count + 1) collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), (key, count))) } }
在上述代碼中第3行定義了全局的狀態(tài),這里是使用了鍵值存儲方式,并且在5~6行中定義了如何初始化。然后,在整個計算過程中我們都使用了該狀態(tài)。
Flink
val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements(...) val words = text.flatMap ( _.split(" ") ) words.keyBy(x => x).mapWithState { (word, count: Option[Int]) => { val newCount = count.getOrElse(0) + 1 val output = (word, newCount) (output, Some(newCount)) } }
在第6行中使用了mapWithState函數(shù),第一個參數(shù)是即將需要處理的單次,第二個參數(shù)是一個全局的狀態(tài)。
Performance
合理的性能比較也是本文的一個重要主題之一。不同的系統(tǒng)的解決方案差異很大,因此也是很難設(shè)置一個無偏的測試。通常而言,在一個流處理系統(tǒng)中,我們常說的性能就是指延遲與吞吐量。這取決于很多的變量,但是總體而言標(biāo)準(zhǔn)為如果單節(jié)點每秒能處理500K的Records就是個合格的,如果能達(dá)到100萬次以上就已經(jīng)不錯了。每個節(jié)點一般就是指24核附帶上24或者48GB的內(nèi)存。
對于延遲而言,如果是Micro-Batch的話往往希望能在秒級別處理。如果是Native Streaming的話,希望能有百倍的減少,調(diào)優(yōu)之后的Storm可以很輕易達(dá)到幾十毫秒。
另一方面,消息的可達(dá)性保證、容錯以及狀態(tài)管理都是需要考慮進(jìn)去的。譬如如果你開啟了容錯機(jī)制,那么會增加10%到15%的額外消耗。除此之外,以文章中兩個WordCount為例,第一個是無狀態(tài)的WordCount,第二個是有狀態(tài)的WordCount,后者在Flink中可能會有25%額外的消耗,而在Spark中可能有50%的額外消耗。當(dāng)然,我們肯定可以通過調(diào)優(yōu)來減少這種損耗,并且不同的系統(tǒng)都提供了很多的可調(diào)優(yōu)的選項。
還有就是一定要記住,在分布式環(huán)境下進(jìn)行大數(shù)據(jù)傳輸也是一件非常昂貴的消耗,因此我們要利用好數(shù)據(jù)本地化以及整個應(yīng)用的序列化的調(diào)優(yōu)。
Project Maturity(項目成熟度)
在為你的應(yīng)用選擇一個合適的框架的時候,框架本身的成熟度與社區(qū)的完備度也是一個不可忽略的部分。Storm是第一個正式提出的流處理框架,它已經(jīng)成為了業(yè)界的標(biāo)準(zhǔn)并且被應(yīng)用到了像Twitter、Yahoo、Spotify等等很多公司的生產(chǎn)環(huán)境下。Spark則是目前最流行的Scala的庫之一,并且Spark正逐步被更多的人采納,它已經(jīng)成功應(yīng)用在了像Netflix、Cisco、DataStax、Indel、IBM等等很多公司內(nèi)。而 Samza最早由LinkedIn提出,并且正在運行在幾十個公司內(nèi)。Flink則是一個正在開發(fā)中的項目,不過我相信它發(fā)展的會非常迅速。
Summary
在我們進(jìn)最后的框架推薦之前,我們再看一下上面那張圖:
Framework Recommendations
這個問題的回答呢,也很俗套,具體情況具體分析??偟膩碚f,你首先呢要仔細(xì)評估下你應(yīng)用的需求并且完全理解各個框架之間的優(yōu)劣比較。同時我建議是使用一個提供了上層接口的框架,這樣會更加的開發(fā)友好,并且能夠更快地投入生產(chǎn)環(huán)境。不過別忘了,絕大部分流應(yīng)用都是有狀態(tài)的,因此狀態(tài)管理也是不可忽略地一個部分。同時,我也是推薦那些遵循Exactly-Once原則的框架,這樣也會讓開發(fā)和維護(hù)更加簡單。不過不能教條主義,畢竟還是有很多應(yīng)用會需要 At-Least-Once與At-Most-Once這些投遞模式的。最后,一定要保證你的系統(tǒng)可以在故障情況下很快恢復(fù),可以使用Chaos Monkey或者其他類似的工具進(jìn)行測試。在我們之前的討論中也發(fā)現(xiàn)這個快速恢復(fù)的能力至關(guān)重要。
對于小型與需要快速響應(yīng)地項目,Storm依舊是一個非常好的選擇,特別是在你非常關(guān)注延遲度的情況下。不過還是要謹(jǐn)記容錯機(jī)制和 Trident的狀態(tài)管理會嚴(yán)重影響性能。Twitter目前正在設(shè)計新的流計算系統(tǒng)Heron用來替代Storm,它可以在單個項目中有很好地表現(xiàn)。不過Twitter可不一定會開源它。
對于Spark Streaming而言,如果你的系統(tǒng)的基礎(chǔ)架構(gòu)中已經(jīng)使用了Spark,那還是很推薦你試試的。另一方面,如果你想使用Lambda架構(gòu),那Spark 也是個不錯的選擇。不過你一定要記住,Micro-Batching本身的限制和延遲對于你而言不是一個關(guān)鍵因素。
如果你想用Samza的話,那最好Kafka已經(jīng)是你的基礎(chǔ)設(shè)施的一員了。雖然在Samza中Kafka只是個可插拔的組件,不過基本上所有人都會使用Kafka。正如上文所說,Samza提供了強(qiáng)大的本地存儲功能,能夠輕松管理數(shù)十G的狀態(tài)數(shù)據(jù)。不過它的At-Least-Once的投遞限制也是很大一個瓶頸。
Flink目前在概念上是一個非常優(yōu)秀的流處理系統(tǒng),它能夠滿足大部分的用戶場景并且提供了很多先進(jìn)的功能,譬如窗口管理或者時間控制。所以當(dāng)你發(fā)現(xiàn)你需要的功能在Spark當(dāng)中無法很好地實現(xiàn)的時候,你可以考慮下Flink。另外,F(xiàn)link也提供了很好地通用的批處理的接口,只不過你需要很大的勇氣來將你的項目結(jié)合到Flink中,并且別忘了多關(guān)注關(guān)注它的路線圖。
Dataflow與開源
我最后一個要提到的就是Dataflow和它的開源計劃。Dataflow是Google云平臺的一個組成部分,是目前在Google內(nèi)部提供了統(tǒng)一的用于批處理與流計算的服務(wù)接口。譬如用于批處理的MapReduce,用于編程模型定義的FlumeJava以及用于流計算的MillWheel。 Google最近打算開源這貨的SDK了,Spark與Flink都可以成為它的一個運行驅(qū)動。
Conclusion
本文我們過了一遍常用的流計算框架,它們的特性與優(yōu)劣對比,希望能對你有用吧。