Spark Streaming 的玫瑰與刺
前言
說人話:其實就是講Spark Streaming 的好處與坑。好處主要從一些大的方面講,坑則是從實際場景中遇到的一些小細節(jié)描述。
玫瑰篇
玫瑰篇主要是說Spark Streaming的優(yōu)勢點。
玫瑰之代碼復(fù)用
這主要得益于Spark的設(shè)計,以及平臺的全面性。你寫的流處理的代碼可以很方便的適用于Spark平臺上的批處理,交互式處理。因為他們本身都是基于RDD模型的,并且Spark Streaming的設(shè)計者也做了比較好的封裝和兼容。所以我說RDD是個很強大的框,能把各種場景都給框住,這就是高度抽象和思考后的結(jié)果。
玫瑰之機器學(xué)習(xí)
如果你使用Spark MLlib 做模型訓(xùn)練。恭喜你,首先是很多算法已經(jīng)支持Spark Streaming,譬如k-means 就支持流式數(shù)據(jù)更新模型。 其次,你也可以在Spark Streaming中直接將離線計算好的模型load進來,然后對新進來的數(shù)據(jù)做實時的Predict操作。
玫瑰之SQL支持
Spark Streaming 里天然就可以使用 sql/dataframe/datasets 等。而且時間窗口的使用可以極大擴展這種使用場景,譬如各種系統(tǒng)預(yù)警等。類似Storm則需要額外的開發(fā)與支持。
玫瑰之吞吐和實時的有效控制
Spark Streaming 可以很好的控制實時的程度(小時,分鐘,秒)。極端情況可以設(shè)置到毫秒。
玫瑰之概述
Spark Streaming 可以很好的和Spark其他組件進行交互,獲取其支持。同時Spark 生態(tài)圈的快速發(fā)展,亦能從中受益。
刺篇
刺篇就是描述Spark Streaming 的一些問題,做選型前關(guān)注這些問題可以有效的降低使用風(fēng)險。
checkpoint 之刺
checkpoint 是個很好的恢復(fù)機制。但是方案比較粗暴,直接通過序列化的機制寫入到文件系統(tǒng),導(dǎo)致代碼變更和配置變更無法生效。實際場景是升級往往比系統(tǒng)崩潰的頻率高太多。但是升級需要能夠無縫的銜接上一次的偏移量。所以spark streaming在無法容忍數(shù)據(jù)有丟失的情況下,你需要自己記錄偏移量,然后從上一次進行恢復(fù)。
我們目前是重寫了相關(guān)的代碼,每次記錄偏移量,不過只有在升級的時候才會讀取自己記錄的偏移量,其他情況都是依然采用checkpoint機制。
Kafka 之刺
這個和Spark Streaming相關(guān),也不太相關(guān)。說相關(guān)是因為Spark 對很多異常處理比較簡單。很多是和Kafka配置相關(guān)的。我舉個例子:
如果消息體太大了,超過 fetch.message.max.bytes=1m ,那么Spark Streaming會直接拋出OffsetOutOfRangeException異常,然后停止服務(wù)。
對應(yīng)的錯誤會從這行代碼拋出:
- if (!iter.hasNext) {
- assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
- finished = true
- null.asInstanceOf[R]
- }
其實就是消費的完成后 實際的消費數(shù)據(jù)量和預(yù)先估計的量不一致。
你在日志中看到的信息其實是這個代碼答應(yīng)出來的:
private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
" This should not happen, and indicates that messages may have been lost"
解決辦法自然是把 fetch.message.max.bytes 設(shè)置大些。
如果你使用Spark Streaming去追數(shù)據(jù),從頭開始消費kafka,而Kafka因為某種原因,老數(shù)據(jù)快速的被清理掉,也會引發(fā)OffsetOutOfRangeException錯誤。并且使得Spark Streaming程序異常的終止。
解決辦法是事先記錄kafka偏移量和時間的關(guān)系(可以隔幾秒記錄一次),然后根據(jù)時間找到一個較大的偏移量開始消費。
或者你根據(jù)目前Kafka新增數(shù)據(jù)的消費速度,給smallest獲取到的偏移量再加一個較大的值,避免出現(xiàn)Spark Streaming 在fetch的時候數(shù)據(jù)不存在的情況。
Kafka partition 映射 RDD partition 之刺
Kafka的分區(qū)數(shù)決定了你的并行度(我們假設(shè)你使用Direct Approach的模式集成)。為了獲得更大的并行度,則需要進行一次repartition,而repartition 就意味著需要發(fā)生Shuffle,在流式計算里,可能會消耗掉我們寶貴的時間。
為了能夠避免Shuffle,并且提高Spark Streaming處理的并行度,我們重寫了 DirectKafkaInputDStream,KafkaRDD,KafkaUtils等類,實現(xiàn)了一個Kafka partition 可以映射為多個RDD partition的功能。譬如你有M個Kafka partitions,則可映射成 M*N個 RDD partitions。 其中N 為>1 的正整數(shù)。
我們期望官方能夠?qū)崿F(xiàn)將一個Kafka的partitions 映射為多個Spark 的partitions,避免發(fā)生Shuffle而導(dǎo)致多次的數(shù)據(jù)移動。
textFileStream
其實使用textFileStream 的人應(yīng)該也不少。因為可以很方便的監(jiān)控HDFS上某個文件夾下的文件,并且進行計算。這里我們遇到的一個問題是,如果底層比如是壓縮文件,遇到有順壞的文件,你是跳不過去的,直接會讓Spark Streaming 異常退出。 官方并沒有提供合適的方式讓你跳過損壞的文件。
以NewHadoopRDD為例,里面有這么幾行代碼,獲取一條新的數(shù)據(jù):
- override def getNext(): (K, V) = {
- try {
- finished = !reader.next(key, value)
- } catch {
- case eof: EOFException =>
- finished = true
- }
- if (!finished) {
- inputMetrics.incRecordsRead(1)
- }
- (key, value)
- }
通過reader 獲取下一條記錄的時候,譬如是一個損壞的gzip文件,可能就會拋出異常,而這個異常是用戶catch不到的,直接讓Spark Streaming程序掛掉了。
而在 HadoopRDD類中,對應(yīng)的實現(xiàn)如下:
- override def getNext(): (K, V) = {
- try {
- finished = !reader.next(key, value)
- } catch {
- case eof: EOFException =>
- finished = true
- }
- if (!finished) {
- inputMetrics.incRecordsRead(1)
- }
- (key, value)
- }
這里好歹做了個EOFException。然而,如果是一個壓縮文件,解壓的時候就直接產(chǎn)生錯誤了,一般而言是 IOException,而不是EOFException了,這個時候也就歇菜了。
個人認為應(yīng)該添加一些配置,允許用戶可以選擇如何對待這種有損壞或者無法解壓的文件。
因為現(xiàn)階段我們并沒有維護一個Spark的私有版本,所以是通過重寫FileInputDStream,NewHadoopRDD 等相關(guān)類來修正該問題。
Shuffle 之刺
Shuffle (尤其是每個周期數(shù)據(jù)量很大的情況)是Spark Streaming 不可避免的疼痛,尤其是數(shù)據(jù)量極大的情況,因為Spark Streaming對處理的時間是有限制的。我們有一個場景,是五分鐘一個周期,我們僅僅是做了一個repartion,耗時就達到2.1分鐘(包括到 Kafka取數(shù)據(jù))?,F(xiàn)階段Spark 的Shuffle實現(xiàn)都需要落磁盤,并且Shuffle Write 和 Shuffle Read 階段是完全分開,后者必須等到前者都完成才能開始工作。我認為Spark Streaming有必要單獨開發(fā)一個更快速,完全基于內(nèi)存的Shuffle方案。
內(nèi)存之刺
在Spark Streaming中,你也會遇到在Spark中常見的問題,典型如Executor Lost 相關(guān)的問題(shuffle fetch 失敗,Task失敗重試等)。這就意味著發(fā)生了內(nèi)存不足或者數(shù)據(jù)傾斜的問題。這個目前你需要考慮如下幾個點以期獲得解決方案:
相同資源下,增加partition數(shù)可以減少內(nèi)存問題。 原因如下:通過增加partition數(shù),每個task要處理的數(shù)據(jù)少了,同一時間內(nèi),所有正在 運行的task要處理的數(shù)量少了很多,所有Executor占用的內(nèi)存也變小了。這可以緩解數(shù)據(jù)傾斜以及內(nèi)存不足的壓力。
關(guān)注shuffle read 階段的并行數(shù)。例如reduce,group 之類的函數(shù),其實他們都有第二個參數(shù),并行度(partition數(shù)),只是大家一般都不設(shè)置。不過出了問題再設(shè)置一下,也不錯。
給一個Executor 核數(shù)設(shè)置的太多,也就意味著同一時刻,在該Executor 的內(nèi)存壓力會更大,GC也會更頻繁。我一般會控制在3個左右。然后通過提高Executor數(shù)量來保持資源的總量不變。
監(jiān)控之刺
Spark Streaming 的UI 上的Executors Tab缺少一個***的監(jiān)控,就是Worker內(nèi)存GC詳情。雖然我們可以將這些信息導(dǎo)入到 第三方監(jiān)控中,然而終究是不如在 Spark UI上展現(xiàn)更加方便。 為此我們也將該功能列入研發(fā)計劃。
總結(jié)
目前Spark Streaming 可以應(yīng)對的場景不少,但是在很多場景上,還是有這樣那樣的問題。建議調(diào)研后都進一步做測試再做出是否遷移到該平臺的決定。