逆水行舟,看前行中的Spark
近兩年,Spark技術(shù)發(fā)展速度驚人,用戶越來越多,社區(qū)也愈加活躍,生態(tài)更加豐富,這些都證明了Spark的魅力。在生態(tài)建設(shè)上,Spark取得了極大的成功,主要體現(xiàn)在Application、Environment及Data Source三個方面。此外,值得一提的是,Spark的貢獻者目前已經(jīng)超過650人,而另一方面,圍繞Spark創(chuàng)業(yè)的公司同樣隨之增多,“Spark as a Service”的概念被越來越多的人接受,Spark的未來值得期待。在本文中,國內(nèi)最早的Spark研究者與使用者、七牛云存儲技術(shù)總監(jiān)陳超將為您解讀Spark技術(shù)及其生態(tài)系統(tǒng)的***進展。(本文已被選登在程序員電子版2015年9月A刊)。
到目前為止,Spark看起來一帆風(fēng)順,但事實上,大數(shù)據(jù)這個領(lǐng)域從來不缺強者,其中***代表性的無疑當屬Flink(https://flink.apache.org)。Flink采了MPP的思想,具備很多有意思的設(shè)計和特性。本文不準備用過多的篇幅介紹Flink,主要給大家分享Spark最近幾個極其重要的改進。注意,下面所提到的改進有些已經(jīng)實現(xiàn),而有些尚未完成。
1. Project Tungsten
Spark近期的發(fā)展中,最引人矚目的當屬鎢絲計劃(Project Tungsten)。Kay Ousterhout在名為“Making Sense of Performance in Data Analytics Frameworks”的論文中談到,類似Spark這樣的計算框架,其瓶頸主要在于CPU與內(nèi)存,而不是大家之前所認為的磁盤IO及網(wǎng)絡(luò)開銷,因為帶寬增大、SSD或者磁盤陣列的使用均可以緩解這個問題。但是在序列化、反序列化及Hash等場景下,CPU確實能形成瓶頸,Tungsten的啟動就是為了解決這些問題。Tungsten主要包含以下三個方面。
內(nèi)存管理與二進制處理。
毋庸置疑,JVM確實是個非常優(yōu)秀的平臺,但是短板也非常明顯,那就是GC,而Java對象的內(nèi)存開銷同樣不能忽視?;谶@個問題,Spark選擇自己管理內(nèi)存,所用的工具就是sun.misc.Unsafe。如果大家對細節(jié)有興趣,可以關(guān)注BytesToBytesMap結(jié)構(gòu)。需要注意,它是append-only的,并且key與value都是連續(xù)的字節(jié)區(qū)域。自己管理內(nèi)存不僅緩解了GC的壓力,也顯著地降低了內(nèi)存使用。但這里必須提醒大家,Unsafe千萬不能濫用,否則后果很嚴重。
緩存友好的計算。
目前,大家看到緩存似乎都只想到將數(shù)據(jù)加載到內(nèi)存就完事了。事實上,更佳的做法應(yīng)該是CPU級別的緩存。因此,Spark自1.4版本開始便在這個點上發(fā)力,其中,最重要的當屬在 Aggregations、Joins和Shuffle時可以更有效地排序和哈希。Spark引入了UnsafeShuffleManager這個新的ShuffleManager。它的好處是可以直接對二進制數(shù)據(jù)進行排序,從而減少了內(nèi)存占用,也省去了反序列化的過程。這里大家可以注意下UnsafeShuffleExternalSorter,可以稱得上整個優(yōu)化的基礎(chǔ)。實際上,CPU反復(fù)從內(nèi)存讀取數(shù)據(jù)在一定程度上阻礙了CPU的Pipeline操作。
代碼生成(Code Gen)。
熟悉LLVM的朋友應(yīng)該能較好地理解這一點,之前的Spark SQL已經(jīng)在使用該部分了。近日發(fā)布的Spark 1.5中,Code Gen得到了更加廣泛的應(yīng)用。需要Code Gen的原因很簡單,它能免去昂貴的虛函數(shù)調(diào)用,當然,也就不存在對Java基本類型裝箱之類的操作了。Spark SQL將Code Gen用于表達式的求值,效果顯著。值得一提的是,在Spark 1.5中,Spark SQL將Code Gen默認打開了。
Tungsten的部分就先談到這里,整個項目的完成需要等到1.6版本。不過1.4與1.5已經(jīng)逐步融入了Tungsten的部分優(yōu)化,讓大家可以及時感受Tungsten帶來的各種改進。
2. Dynamic Resource Allocation
嚴格地講,動態(tài)資源分配(Dynamic Resource Allocation)這個特性在Spark 1.2時就出現(xiàn)了,那時只支持在YARN上對資源做動態(tài)分配。在Spark 1.5中,Standalone及Mesos也將支持這個特性,個人認為此舉有較大意義,但大家仍需充分了解該特性的使用場景。
在Spark中,動態(tài)資源分配的粒度是Executor,通過spark.dynamicAllocation.enabled開啟,通過spark.dynamicAllocation.schedulerBacklogTimeout及 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout兩個參數(shù)進行時間上的控制。另外,Spark對于YARN及Mesos的支持均得到了顯著地增強。
3. Adaptive Query Plan
適應(yīng)查詢計劃(Adaptive Query Plan)是一個“可能”的特性。“可能”的原因是這一特性可能要等到Spark 1.6版本或者之后的版本才會有。首先,陳述幾個問題,如何自動確定并行度(Level of Parallelism);如何自動選擇采用Broadcast Join還是Hash Join;Spark如何感知數(shù)據(jù)的行為。
目前,Spark需要在執(zhí)行Job前確定Job的DAG,即在執(zhí)行前,由Operator到DAG的轉(zhuǎn)換就已經(jīng)完成了,這樣顯然不夠靈活。因此,更好的方案是允許提交獨立的DAG stage,同時收集它們執(zhí)行結(jié)果的一些統(tǒng)計信息。基于這些信息,Spark可以動態(tài)決定Reduce Task的數(shù)量,同時也可以動態(tài)地選擇是采用Broadcast還是Shuffle。對于Spark SQL來講,它應(yīng)該能在聚合時自動設(shè)置Reduce Task的數(shù)量,并且在Join時自動選擇策略。主要的思路是,在決定Reduce Task的數(shù)量及采用的Shuffle策略前,先讓Map運算,然后輸出較大數(shù)量的Partition作為Map階段的結(jié)果。接下來,Spark會檢查Map Stages輸出的Partition的大小(或者其它一些狀態(tài)),然后基于這些信息做出***選擇。
估計大家已經(jīng)看出,這里其實需要修改DAGScheduler的實現(xiàn),因為目前的DAGScheduler僅支持接收一張完整的DAG圖,而上述討論的問題要求DAGScheduler支持接收Map Stages,且能收集Map Stages輸出結(jié)果的相關(guān)信息。Shuffle也需要支持能一次Fetch多個Map輸出的Partition,而目前的HashShuffleFetcher一次性只能獲取1個Partition。當然,這里還會涉及到其它改動,就不一一列出了。Adaptive Query Plan的重要性在于,Spark會替用戶確定運行時所需的一些參數(shù)及行為,從而用戶無需操心。還記得Flink那句宣傳語,即“用戶對內(nèi)核唯一需要了解的事就是不需要了解內(nèi)核”。
結(jié)語
概括來講,Spark的護城河其實有兩條——其一是先進的技術(shù),另一條則是豐富的生態(tài)系統(tǒng)。從上圖可以看出,無論是這段時間在容器領(lǐng)域無比火爆的Kubernete及Docker,還是在NoSQL領(lǐng)域的兩面錦旗HBase及Cassandra,亦或是其它如消息隊列Kafka、分布式搜索引擎Elasticsearch及各機器學(xué)習(xí)框架都與Spark產(chǎn)生了聯(lián)系,并且這樣的趨勢還在快速蔓延中。這意味著,Spark可能出現(xiàn)在大數(shù)據(jù)處理的各個領(lǐng)域,并給各個領(lǐng)域帶來明顯提升。
同時我了解到,很多朋友都在關(guān)注Spark在GPU方面的發(fā)展。關(guān)于這一點,現(xiàn)在業(yè)界也有了一些嘗試,但仍然有較長的路要走,讓我們一起期待在這個領(lǐng)域未來會發(fā)生些什么。
逆水行舟,不進則退。Spark在不停地進步著,真誠希望國內(nèi)能有更多的工程師參與到Spark的開發(fā)中,同時也渴望看到更多有意思的Spark應(yīng)用案例。目前,幾乎可以肯定,在大數(shù)據(jù)領(lǐng)域選擇Spark確實為一個明智之舉。