一篇文章了解 Spark Shuffle 內(nèi)存使用
在使用 Spark 進(jìn)行計(jì)算時(shí),我們經(jīng)常會(huì)碰到作業(yè) (Job) Out Of Memory(OOM) 的情況,而且很大一部分情況是發(fā)生在 Shuffle 階段。那么在 Spark Shuffle 中具體是哪些地方會(huì)使用比較多的內(nèi)存而有可能導(dǎo)致 OOM 呢? 為此,本文將圍繞以上問(wèn)題梳理 Spark 內(nèi)存管理和 Shuffle 過(guò)程中與內(nèi)存使用相關(guān)的知識(shí);然后,簡(jiǎn)要分析下在 Spark Shuffle 中有可能導(dǎo)致 OOM 的原因。
一、Spark 內(nèi)存管理和消費(fèi)模型
在分析 Spark Shuffle 內(nèi)存使用之前。我們首先了解下以下問(wèn)題:當(dāng)一個(gè) Spark 子任務(wù) (Task) 被分配到 Executor 上運(yùn)行時(shí),Spark 管理內(nèi)存以及消費(fèi)內(nèi)存的大體模型是什么樣呢?(注:由于 OOM 主要發(fā)生在 Executor 端,所以接下來(lái)的討論主要針對(duì) Executor 端的內(nèi)存管理和使用)。
1,在 Spark 中,使用抽象類 MemoryConsumer 來(lái)表示需要使用內(nèi)存的消費(fèi)者。在這個(gè)類中定義了分配,釋放以及 Spill 內(nèi)存數(shù)據(jù)到磁盤的一些方法或者接口。具體的消費(fèi)者可以繼承 MemoryConsumer 從而實(shí)現(xiàn)具體的行為。 因此,在 Spark Task 執(zhí)行過(guò)程中,會(huì)有各種類型不同,數(shù)量不一的具體消費(fèi)者。如在 Spark Shuffle 中使用的 ExternalAppendOnlyMap, ExternalSorter 等等(具體后面會(huì)分析)。
2,MemoryConsumer 會(huì)將申請(qǐng),釋放相關(guān)內(nèi)存的工作交由 TaskMemoryManager 來(lái)執(zhí)行。當(dāng)一個(gè) Spark Task 被分配到 Executor 上運(yùn)行時(shí),會(huì)創(chuàng)建一個(gè) TaskMemoryManager。在 TaskMemoryManager 執(zhí)行分配內(nèi)存之前,需要首先向 MemoryManager 進(jìn)行申請(qǐng),然后由 TaskMemoryManager 借助 MemoryAllocator 執(zhí)行實(shí)際的內(nèi)存分配。
3,Executor 中的 MemoryManager 會(huì)統(tǒng)一管理內(nèi)存的使用。由于每個(gè) TaskMemoryManager 在執(zhí)行實(shí)際的內(nèi)存分配之前,會(huì)首先向 MemoryManager 提出申請(qǐng)。因此 MemoryManager 會(huì)對(duì)當(dāng)前進(jìn)程使用內(nèi)存的情況有著全局的了解。
MemoryManager,TaskMemoryManager 和 MemoryConsumer 之前的對(duì)應(yīng)關(guān)系,如下圖。總體上,一個(gè) MemoryManager 對(duì)應(yīng)著至少一個(gè) TaskMemoryManager (具體由 executor-core 參數(shù)指定),而一個(gè) TaskMemoryManager 對(duì)應(yīng)著多個(gè) MemoryConsumer (具體由任務(wù)而定)。

了解了以上內(nèi)存消費(fèi)的整體過(guò)程以后,有兩個(gè)問(wèn)題需要注意下:
1,當(dāng)有多個(gè) Task 同時(shí)在 Executor 上執(zhí)行時(shí), 將會(huì)有多個(gè) TaskMemoryManager 共享 MemoryManager 管理的內(nèi)存。那么 MemoryManager 是怎么分配的呢?答案是每個(gè)任務(wù)可以分配到的內(nèi)存范圍是 [1 / (2 * n), 1 / n],其中 n 是正在運(yùn)行的 Task 個(gè)數(shù)。因此,多個(gè)并發(fā)運(yùn)行的 Task 會(huì)使得每個(gè) Task 可以獲得的內(nèi)存變小。
2,前面提到,在 MemoryConsumer 中有 Spill 方法,當(dāng) MemoryConsumer 申請(qǐng)不到足夠的內(nèi)存時(shí),可以 Spill 當(dāng)前內(nèi)存到磁盤,從而避免無(wú)節(jié)制的使用內(nèi)存。但是,對(duì)于堆內(nèi)內(nèi)存的申請(qǐng)和釋放實(shí)際是由 JVM 來(lái)管理的。因此,在統(tǒng)計(jì)堆內(nèi)內(nèi)存具體使用量時(shí),考慮性能等各方面原因,Spark 目前采用的是抽樣統(tǒng)計(jì)的方式來(lái)計(jì)算 MemoryConsumer 已經(jīng)使用的內(nèi)存,從而造成堆內(nèi)內(nèi)存的實(shí)際使用量不是特別準(zhǔn)確。從而有可能因?yàn)椴荒芗皶r(shí) Spill 而導(dǎo)致 OOM。
二、Spark Shuffle 過(guò)程
整體上 Spark Shuffle 具體過(guò)程如下圖,主要分為兩個(gè)階段:Shuffle Write 和 Shuffle Read。
Write 階段大體經(jīng)歷排序(最低要求是需要按照分區(qū)進(jìn)行排序),可能的聚合 (combine) 和歸并(有多個(gè)文件 spill 磁盤的情況 ),最終每個(gè)寫 Task 會(huì)產(chǎn)生數(shù)據(jù)和索引兩個(gè)文件。其中,數(shù)據(jù)文件會(huì)按照分區(qū)進(jìn)行存儲(chǔ),即相同分區(qū)的數(shù)據(jù)在文件中是連續(xù)的,而索引文件記錄了每個(gè)分區(qū)在文件中的起始和結(jié)束位置。
而對(duì)于 Shuffle Read, 首先可能需要通過(guò)網(wǎng)絡(luò)從各個(gè) Write 任務(wù)節(jié)點(diǎn)獲取給定分區(qū)的數(shù)據(jù),即數(shù)據(jù)文件中某一段連續(xù)的區(qū)域,然后經(jīng)過(guò)排序,歸并等過(guò)程,最終形成計(jì)算結(jié)果。

對(duì)于 Shuffle Write,Spark 當(dāng)前有三種實(shí)現(xiàn),具體分別為 BypassMergeSortShuffleWriter, UnsafeShuffleWriter 和 SortShuffleWriter (具體使用哪一個(gè)實(shí)現(xiàn)有一個(gè)判斷條件,此處不表)。而 Shuffle Read 只有一種實(shí)現(xiàn)。
2.1 Shuffle Write 階段分析
2.1.1 BypassMergeSortShuffleWriter 分析
對(duì)于 BypassMergeSortShuffleWriter 的實(shí)現(xiàn),大體實(shí)現(xiàn)過(guò)程是首先為每個(gè)分區(qū)創(chuàng)建一個(gè)臨時(shí)分區(qū)文件,數(shù)據(jù)寫入對(duì)應(yīng)的分區(qū)文件,最終所有的分區(qū)文件合并成一個(gè)數(shù)據(jù)文件,并且產(chǎn)生一個(gè)索引文件。由于這個(gè)過(guò)程不做排序,combine(如果需要 combine 不會(huì)使用這個(gè)實(shí)現(xiàn))等操作,因此對(duì)于 BypassMergeSortShuffleWriter,總體來(lái)說(shuō)是不怎么耗費(fèi)內(nèi)存的。
2.1.2 SortShuffleWriter 分析
SortShuffleWriter 是最一般的實(shí)現(xiàn),也是日常使用最頻繁的。SortShuffleWriter 主要委托 ExternalSorter 做數(shù)據(jù)插入,排序,歸并 (Merge),聚合 (Combine) 以及最終寫數(shù)據(jù)和索引文件的工作。ExternalSorter 實(shí)現(xiàn)了之前提到的 MemoryConsumer 接口。下面分析一下各個(gè)過(guò)程使用內(nèi)存的情況:
1,對(duì)于數(shù)據(jù)寫入,根據(jù)是否需要做 Combine,數(shù)據(jù)會(huì)被插入到 PartitionedAppendOnlyMap 這個(gè) Map 或者 PartitionedPairBuffer 這個(gè)數(shù)組中。每隔一段時(shí)間,當(dāng)向 MemoryManager 申請(qǐng)不到足夠的內(nèi)存時(shí),或者數(shù)據(jù)量超過(guò) spark.shuffle.spill.numElementsForceSpillThreshold 這個(gè)閾值時(shí) (默認(rèn)是 Long 的最大值,不起作用),就會(huì)進(jìn)行 Spill 內(nèi)存數(shù)據(jù)到文件。假設(shè)可以源源不斷的申請(qǐng)到內(nèi)存,那么 Write 階段的所有數(shù)據(jù)將一直保存在內(nèi)存中,由此可見(jiàn),PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比較吃內(nèi)存的。
2,無(wú)論是 PartitionedAppendOnlyMap 還是 PartitionedPairBuffer, 使用的排序算法是 TimSort。在使用該算法是正常情況下使用的臨時(shí)額外空間是很小,但是最壞情況下是 n / 2,其中 n 表示待排序的數(shù)組長(zhǎng)度(具體見(jiàn) TimSort 實(shí)現(xiàn))。
3,當(dāng)插入數(shù)據(jù)因?yàn)樯暾?qǐng)不到足夠的內(nèi)存將會(huì) Spill 數(shù)據(jù)到磁盤,在將最終排序結(jié)果寫入到數(shù)據(jù)文件之前,需要將內(nèi)存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已經(jīng) spill 到磁盤的 SpillFiles 進(jìn)行合并。Merge 的大體過(guò)程如下圖。

從上圖可見(jiàn),大體差不多就是歸并排序的過(guò)程,由此可見(jiàn)這個(gè)過(guò)程是沒(méi)有太多額外的內(nèi)存消耗。歸并過(guò)程中的聚合計(jì)算大體也是差不多的過(guò)程,唯一需要注意的是鍵值碰撞的情況,即當(dāng)前輸入的各個(gè)有序隊(duì)列的鍵值的哈希值相同,但是實(shí)際的鍵值不等的情況。這種情況下,需要額外的空間保存所有鍵值不同,但哈希值相同值的中間結(jié)果。但是總體上來(lái)說(shuō),發(fā)生這種情況的概率并不是特別大。
4,寫數(shù)據(jù)文件的過(guò)程涉及到不同數(shù)據(jù)流之間的轉(zhuǎn)化,而在流的寫入過(guò)程中,一般都有緩存,主要由參數(shù) spark.shuffle.file.buffer 和 spark.shuffle.spill.batchSize 控制,總體上這部分開(kāi)銷也不大。
以上分析了 SortShuffleWriter write 階段的主要過(guò)程,從中可以看出主要的內(nèi)存消耗在寫入 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 這個(gè)階段。
2.1.3 UnsafeShuffleWriter
UnsafeShuffleWriter 是對(duì) SortShuffleWriter 的優(yōu)化,大體上也和 SortShuffleWriter 差不多,在此不再贅述。從內(nèi)存使用角度看,主要差異在以下兩點(diǎn):
一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存儲(chǔ)的是鍵值或者值的具體類型,也就是 Java 對(duì)象,是反序列化過(guò)后的數(shù)據(jù)。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中數(shù)據(jù)是序列化以后存儲(chǔ)到實(shí)際的 Page 中,而且在寫入數(shù)據(jù)過(guò)程中會(huì)額外寫入長(zhǎng)度信息??傮w而言,序列化以后數(shù)據(jù)大小是遠(yuǎn)遠(yuǎn)小于序列化之前的數(shù)據(jù)。
另一方面,UnsafeShuffleWriter 中需要額外的存儲(chǔ)記錄(LongArray),它保存著分區(qū)信息和實(shí)際指向序列化后數(shù)據(jù)的指針(經(jīng)過(guò)編碼的Page num 以及 Offset)。相對(duì)于 SortShuffleWriter, UnsafeShuffleWriter 中這部分存儲(chǔ)的開(kāi)銷是額外的。
2.2 Shuffle Read 階段分析
Spark Shuffle Read 主要經(jīng)歷從獲取數(shù)據(jù),序列化流,添加指標(biāo)統(tǒng)計(jì),可能的聚合 (Aggregation) 計(jì)算以及排序等過(guò)程。大體流程如下圖。

以上計(jì)算主要都是迭代進(jìn)行。在以上步驟中,比較復(fù)雜的操作是從遠(yuǎn)程獲取數(shù)據(jù),聚合和排序操作。接下來(lái),依次分析這三個(gè)步驟內(nèi)存的使用情況。
1,數(shù)據(jù)獲取分為遠(yuǎn)程獲取和本地獲取。本地獲取將直接從本地的 BlockManager 取數(shù)據(jù), 而對(duì)于遠(yuǎn)程數(shù)據(jù),需要走網(wǎng)絡(luò)。在遠(yuǎn)程獲取過(guò)程中,有相關(guān)參數(shù)可以控制從遠(yuǎn)程并發(fā)獲取數(shù)據(jù)的大小,正在獲取數(shù)據(jù)的請(qǐng)求數(shù),以及單次數(shù)據(jù)塊請(qǐng)求是否放到內(nèi)存等參數(shù)。具體參數(shù)包括 spark.reducer.maxSizeInFlight (默認(rèn) 48M),spark.reducer.maxReqsInFlight, spark.reducer.maxBlocksInFlightPerAddress 和 spark.maxRemoteBlockSizeFetchToMem。
考慮到數(shù)據(jù)傾斜的場(chǎng)景,如果 Map 階段有一個(gè) Block 數(shù)據(jù)特別的大,默認(rèn)情況由于 spark.maxRemoteBlockSizeFetchToMem 沒(méi)有做限制,所以在這個(gè)階段需要將需要獲取的整個(gè) Block 數(shù)據(jù)放到 Reduce 端的內(nèi)存中,這個(gè)時(shí)候是非常的耗內(nèi)存的??梢栽O(shè)置 spark.maxRemoteBlockSizeFetchToMem 值,如果超過(guò)該閾值,可以落盤,避免這種情況的 OOM。 另外,在獲取到數(shù)據(jù)以后,默認(rèn)情況下會(huì)對(duì)獲取的數(shù)據(jù)進(jìn)行校驗(yàn)(參數(shù) spark.shuffle.detectCorrupt 控制),這個(gè)過(guò)程也增加了一定的內(nèi)存消耗。
2,對(duì)于需要聚合和排序的情況,這個(gè)過(guò)程是借助 ExternalAppendOnlyMap 來(lái)實(shí)現(xiàn)的。整個(gè)插入,Spill 以及 Merge 的過(guò)程和 Write 階段差不多??傮w上,這塊也是比較消耗內(nèi)存的,但是因?yàn)橛?Spill 操作,當(dāng)內(nèi)存不足時(shí),可以將內(nèi)存數(shù)據(jù)刷到磁盤,從而釋放內(nèi)存空間。
三、Spark Shuffle OOM 可能性分析
圍繞內(nèi)存使用,前面比較詳細(xì)的分析了 Spark 內(nèi)存管理以及在 Shuffle 過(guò)程可能使用較多內(nèi)存的地方。接下來(lái)總結(jié)的要點(diǎn)如下:
1,首先需要注意 Executor 端的任務(wù)并發(fā)度,多個(gè)同時(shí)運(yùn)行的 Task 會(huì)共享 Executor 端的內(nèi)存,使得單個(gè) Task 可使用的內(nèi)存減少。
2,無(wú)論是在 Map 還是在 Reduce 端,插入數(shù)據(jù)到內(nèi)存,排序,歸并都是比較都是比較占用內(nèi)存的。因?yàn)橛?Spill,理論上不會(huì)因?yàn)閿?shù)據(jù)傾斜造成 OOM。 但是,由于對(duì)堆內(nèi)對(duì)象的分配和釋放是由 JVM 管理的,而 Spark 是通過(guò)采樣獲取已經(jīng)使用的內(nèi)存情況,有可能因?yàn)椴蓸硬粶?zhǔn)確而不能及時(shí) Spill,導(dǎo)致OOM。
3,在 Reduce 獲取數(shù)據(jù)時(shí),由于數(shù)據(jù)傾斜,有可能造成單個(gè) Block 的數(shù)據(jù)非常的大,默認(rèn)情況下是需要有足夠的內(nèi)存來(lái)保存單個(gè) Block 的數(shù)據(jù)。因此,此時(shí)極有可能因?yàn)閿?shù)據(jù)傾斜造成 OOM。 可以設(shè)置 spark.maxRemoteBlockSizeFetchToMem 參數(shù),設(shè)置這個(gè)參數(shù)以后,超過(guò)一定的閾值,會(huì)自動(dòng)將數(shù)據(jù) Spill 到磁盤,此時(shí)便可以避免因?yàn)閿?shù)據(jù)傾斜造成 OOM 的情況。在我們的生產(chǎn)環(huán)境中也驗(yàn)證了這點(diǎn),在設(shè)置這個(gè)參數(shù)到合理的閾值后,生產(chǎn)環(huán)境任務(wù) OOM 的情況大大減少了。
4,在 Reduce 獲取數(shù)據(jù)后,默認(rèn)情況會(huì)對(duì)數(shù)據(jù)流進(jìn)行解壓校驗(yàn)(參數(shù) spark.shuffle.detectCorrupt)。正如在代碼注釋中提到,由于這部分沒(méi)有 Spill 到磁盤操作,也有很大的可性能會(huì)導(dǎo)致 OOM。在我們的生產(chǎn)環(huán)境中也有碰到因?yàn)闄z驗(yàn)導(dǎo)致 OOM 的情況。
四、小結(jié)
本文主要圍繞內(nèi)存使用這個(gè)點(diǎn),對(duì) Spark shuffle 的過(guò)程做了一個(gè)比較詳細(xì)的梳理,并且分析了可能造成 OOM 的一些情況以及我們?cè)谏a(chǎn)環(huán)境碰到的一些問(wèn)題。本文主要基于作者對(duì) Spark 源碼的理解以及實(shí)際生產(chǎn)過(guò)程中遇到 OOM 案例總結(jié)而成,限于經(jīng)驗(yàn)等各方面原因,難免有所疏漏或者有失偏頗。如有問(wèn)題,歡迎聯(lián)系一起討論。