自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Apache Spark 內(nèi)存管理詳解

大數(shù)據(jù) Spark
Spark 作為一個(gè)基于內(nèi)存的分布式計(jì)算引擎,其內(nèi)存管理模塊在整個(gè)系統(tǒng)中扮演著非常重要的角色。理解 Spark 內(nèi)存管理的基本原理,有助于更好地開(kāi)發(fā) Spark 應(yīng)用程序和進(jìn)行性能調(diào)優(yōu)。

Spark 作為一個(gè)基于內(nèi)存的分布式計(jì)算引擎,其內(nèi)存管理模塊在整個(gè)系統(tǒng)中扮演著非常重要的角色。理解 Spark 內(nèi)存管理的基本原理,有助于更好地開(kāi)發(fā) Spark 應(yīng)用程序和進(jìn)行性能調(diào)優(yōu)。本文旨在梳理出 Spark 內(nèi)存管理的脈絡(luò),拋磚引玉,引出讀者對(duì)這個(gè)話題的深入探討。本文中闡述的原理基于 Spark 2.1 版本,閱讀本文需要讀者有一定的 Spark 和 Java 基礎(chǔ),了解 RDD、Shuffle、JVM 等相關(guān)概念。

在執(zhí)行 Spark 的應(yīng)用程序時(shí),Spark 集群會(huì)啟動(dòng) Driver 和 Executor 兩種 JVM 進(jìn)程,前者為主控進(jìn)程,負(fù)責(zé)創(chuàng)建 Spark 上下文,提交 Spark 作業(yè)(Job),并將作業(yè)轉(zhuǎn)化為計(jì)算任務(wù)(Task),在各個(gè) Executor 進(jìn)程間協(xié)調(diào)任務(wù)的調(diào)度,后者負(fù)責(zé)在工作節(jié)點(diǎn)上執(zhí)行具體的計(jì)算任務(wù),并將結(jié)果返回給 Driver,同時(shí)為需要持久化的 RDD 提供存儲(chǔ)功能[1]。由于 Driver 的內(nèi)存管理相對(duì)來(lái)說(shuō)較為簡(jiǎn)單,本文主要對(duì) Executor 的內(nèi)存管理進(jìn)行分析,下文中的 Spark 內(nèi)存均特指 Executor 的內(nèi)存。

1. 堆內(nèi)和堆外內(nèi)存規(guī)劃

作為一個(gè) JVM 進(jìn)程,Executor 的內(nèi)存管理建立在 JVM 的內(nèi)存管理之上,Spark 對(duì) JVM 的堆內(nèi)(On-heap)空間進(jìn)行了更為詳細(xì)的分配,以充分利用內(nèi)存。同時(shí),Spark 引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開(kāi)辟空間,進(jìn)一步優(yōu)化了內(nèi)存的使用。

圖 1 . 堆內(nèi)和堆外內(nèi)存示意圖

圖 1 . 堆內(nèi)和堆外內(nèi)存示意圖

1.1 堆內(nèi)內(nèi)存

堆內(nèi)內(nèi)存的大小,由 Spark 應(yīng)用程序啟動(dòng)時(shí)的 –executor-memory 或 spark.executor.memory 參數(shù)配置。Executor 內(nèi)運(yùn)行的并發(fā)任務(wù)共享 JVM 堆內(nèi)內(nèi)存,這些任務(wù)在緩存 RDD 數(shù)據(jù)和廣播(Broadcast)數(shù)據(jù)時(shí)占用的內(nèi)存被規(guī)劃為存儲(chǔ)(Storage)內(nèi)存,而這些任務(wù)在執(zhí)行 Shuffle 時(shí)占用的內(nèi)存被規(guī)劃為執(zhí)行(Execution)內(nèi)存,剩余的部分不做特殊規(guī)劃,那些 Spark 內(nèi)部的對(duì)象實(shí)例,或者用戶定義的 Spark 應(yīng)用程序中的對(duì)象實(shí)例,均占用剩余的空間。不同的管理模式下,這三部分占用的空間大小各不相同(下面第 2 小節(jié)會(huì)進(jìn)行介紹)。

Spark 對(duì)堆內(nèi)內(nèi)存的管理是一種邏輯上的”規(guī)劃式”的管理,因?yàn)閷?duì)象實(shí)例占用內(nèi)存的申請(qǐng)和釋放都由 JVM 完成,Spark 只能在申請(qǐng)后和釋放前記錄這些內(nèi)存,我們來(lái)看其具體流程:

  • 申請(qǐng)內(nèi)存:
  1. Spark 在代碼中 new 一個(gè)對(duì)象實(shí)例
  2. JVM 從堆內(nèi)內(nèi)存分配空間,創(chuàng)建對(duì)象并返回對(duì)象引用
  3. Spark 保存該對(duì)象的引用,記錄該對(duì)象占用的內(nèi)存
  • 釋放內(nèi)存:
  • Spark 記錄該對(duì)象釋放的內(nèi)存,刪除該對(duì)象的引用
  • 等待 JVM 的垃圾回收機(jī)制釋放該對(duì)象占用的堆內(nèi)內(nèi)存

我們知道,JVM 的對(duì)象可以以序列化的方式存儲(chǔ),序列化的過(guò)程是將對(duì)象轉(zhuǎn)換為二進(jìn)制字節(jié)流,本質(zhì)上可以理解為將非連續(xù)空間的鏈?zhǔn)酱鎯?chǔ)轉(zhuǎn)化為連續(xù)空間或塊存儲(chǔ),在訪問(wèn)時(shí)則需要進(jìn)行序列化的逆過(guò)程——反序列化,將字節(jié)流轉(zhuǎn)化為對(duì)象,序列化的方式可以節(jié)省存儲(chǔ)空間,但增加了存儲(chǔ)和讀取時(shí)候的計(jì)算開(kāi)銷。

對(duì)于 Spark 中序列化的對(duì)象,由于是字節(jié)流的形式,其占用的內(nèi)存大小可直接計(jì)算,而對(duì)于非序列化的對(duì)象,其占用的內(nèi)存是通過(guò)周期性地采樣近似估算而得,即并不是每次新增的數(shù)據(jù)項(xiàng)都會(huì)計(jì)算一次占用的內(nèi)存大小,這種方法降低了時(shí)間開(kāi)銷但是有可能誤差較大,導(dǎo)致某一時(shí)刻的實(shí)際內(nèi)存有可能遠(yuǎn)遠(yuǎn)超出預(yù)期[2]。此外,在被 Spark 標(biāo)記為釋放的對(duì)象實(shí)例,很有可能在實(shí)際上并沒(méi)有被 JVM 回收,導(dǎo)致實(shí)際可用的內(nèi)存小于 Spark 記錄的可用內(nèi)存。所以 Spark 并不能準(zhǔn)確記錄實(shí)際可用的堆內(nèi)內(nèi)存,從而也就無(wú)法完全避免內(nèi)存溢出(OOM, Out of Memory)的異常。

雖然不能精準(zhǔn)控制堆內(nèi)內(nèi)存的申請(qǐng)和釋放,但 Spark 通過(guò)對(duì)存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存各自獨(dú)立的規(guī)劃管理,可以決定是否要在存儲(chǔ)內(nèi)存里緩存新的 RDD,以及是否為新的任務(wù)分配執(zhí)行內(nèi)存,在一定程度上可以提升內(nèi)存的利用率,減少異常的出現(xiàn)。

1.2 堆外內(nèi)存

為了進(jìn)一步優(yōu)化內(nèi)存的使用以及提高 Shuffle 時(shí)排序的效率,Spark 引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開(kāi)辟空間,存儲(chǔ)經(jīng)過(guò)序列化的二進(jìn)制數(shù)據(jù)。利用 JDK Unsafe API(從 Spark 2.0 開(kāi)始,在管理堆外的存儲(chǔ)內(nèi)存時(shí)不再基于 Tachyon,而是與堆外的執(zhí)行內(nèi)存一樣,基于 JDK Unsafe API 實(shí)現(xiàn)[3]),Spark 可以直接操作系統(tǒng)堆外內(nèi)存,減少了不必要的內(nèi)存開(kāi)銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內(nèi)存可以被精確地申請(qǐng)和釋放,而且序列化的數(shù)據(jù)占用的空間可以被精確計(jì)算,所以相比堆內(nèi)內(nèi)存來(lái)說(shuō)降低了管理的難度,也降低了誤差。

在默認(rèn)情況下堆外內(nèi)存并不啟用,可通過(guò)配置 spark.memory.offHeap.enabled 參數(shù)啟用,并由 spark.memory.offHeap.size 參數(shù)設(shè)定堆外空間的大小。除了沒(méi)有 other 空間,堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同,所有運(yùn)行中的并發(fā)任務(wù)共享存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存。

1.3 內(nèi)存管理接口

Spark 為存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存的管理提供了統(tǒng)一的接口——MemoryManager,同一個(gè) Executor 內(nèi)的任務(wù)都調(diào)用這個(gè)接口的方法來(lái)申請(qǐng)或釋放內(nèi)存:

清單 1 . 內(nèi)存管理接口的主要方法

  1. //申請(qǐng)存儲(chǔ)內(nèi)存 
  2. def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean 
  3. //申請(qǐng)展開(kāi)內(nèi)存 
  4. def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean 
  5. //申請(qǐng)執(zhí)行內(nèi)存 
  6. def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long 
  7. //釋放存儲(chǔ)內(nèi)存 
  8. def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit 
  9. //釋放執(zhí)行內(nèi)存 
  10. def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit 
  11. //釋放展開(kāi)內(nèi)存 
  12. def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit 

我們看到,在調(diào)用這些方法時(shí)都需要指定其內(nèi)存模式(MemoryMode),這個(gè)參數(shù)決定了是在堆內(nèi)還是堆外完成這次操作。

MemoryManager 的具體實(shí)現(xiàn)上,Spark 1.6 之后默認(rèn)為統(tǒng)一管理(Unified Memory Manager)方式,1.6 之前采用的靜態(tài)管理(Static Memory Manager)方式仍被保留,可通過(guò)配置 spark.memory.useLegacyMode 參數(shù)啟用。兩種方式的區(qū)別在于對(duì)空間分配的方式,下面的第 2 小節(jié)會(huì)分別對(duì)這兩種方式進(jìn)行介紹。

2 . 內(nèi)存空間分配

2.1 靜態(tài)內(nèi)存管理

在 Spark 最初采用的靜態(tài)內(nèi)存管理機(jī)制下,存儲(chǔ)內(nèi)存、執(zhí)行內(nèi)存和其他內(nèi)存的大小在 Spark 應(yīng)用程序運(yùn)行期間均為固定的,但用戶可以應(yīng)用程序啟動(dòng)前進(jìn)行配置,堆內(nèi)內(nèi)存的分配如圖 2 所示:

圖 2 . 靜態(tài)內(nèi)存管理圖示——堆內(nèi)

圖 2 . 靜態(tài)內(nèi)存管理圖示——堆內(nèi)

可以看到,可用的堆內(nèi)內(nèi)存的大小需要按照下面的方式計(jì)算:

清單 2 . 可用堆內(nèi)內(nèi)存空間

  1. 可用的存儲(chǔ)內(nèi)存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction 
  2. 可用的執(zhí)行內(nèi)存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction 

其中 systemMaxMemory 取決于當(dāng)前 JVM 堆內(nèi)內(nèi)存的大小,***可用的執(zhí)行內(nèi)存或者存儲(chǔ)內(nèi)存要在此基礎(chǔ)上與各自的 memoryFraction 參數(shù)和 safetyFraction 參數(shù)相乘得出。上述計(jì)算公式中的兩個(gè) safetyFraction 參數(shù),其意義在于在邏輯上預(yù)留出 1-safetyFraction 這么一塊保險(xiǎn)區(qū)域,降低因?qū)嶋H內(nèi)存超出當(dāng)前預(yù)設(shè)范圍而導(dǎo)致 OOM 的風(fēng)險(xiǎn)(上文提到,對(duì)于非序列化對(duì)象的內(nèi)存采樣估算會(huì)產(chǎn)生誤差)。值得注意的是,這個(gè)預(yù)留的保險(xiǎn)區(qū)域僅僅是一種邏輯上的規(guī)劃,在具體使用時(shí) Spark 并沒(méi)有區(qū)別對(duì)待,和”其它內(nèi)存”一樣交給了 JVM 去管理。

堆外的空間分配較為簡(jiǎn)單,只有存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存,如圖 3 所示??捎玫膱?zhí)行內(nèi)存和存儲(chǔ)內(nèi)存占用的空間大小直接由參數(shù) spark.memory.storageFraction 決定,由于堆外內(nèi)存占用的空間可以被精確計(jì)算,所以無(wú)需再設(shè)定保險(xiǎn)區(qū)域。

圖 3 . 靜態(tài)內(nèi)存管理圖示——堆外

圖 3 . 靜態(tài)內(nèi)存管理圖示——堆外

靜態(tài)內(nèi)存管理機(jī)制實(shí)現(xiàn)起來(lái)較為簡(jiǎn)單,但如果用戶不熟悉 Spark 的存儲(chǔ)機(jī)制,或沒(méi)有根據(jù)具體的數(shù)據(jù)規(guī)模和計(jì)算任務(wù)或做相應(yīng)的配置,很容易造成”一半海水,一半火焰”的局面,即存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存中的一方剩余大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的內(nèi)容以存儲(chǔ)新的內(nèi)容。由于新的內(nèi)存管理機(jī)制的出現(xiàn),這種方式目前已經(jīng)很少有開(kāi)發(fā)者使用,出于兼容舊版本的應(yīng)用程序的目的,Spark 仍然保留了它的實(shí)現(xiàn)。

2.2 統(tǒng)一內(nèi)存管理

Spark 1.6 之后引入的統(tǒng)一內(nèi)存管理機(jī)制,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間,可以動(dòng)態(tài)占用對(duì)方的空閑區(qū)域,如圖 4 和圖 5 所示

 

圖 4 . 統(tǒng)一內(nèi)存管理圖示——堆內(nèi)

圖 4 . 統(tǒng)一內(nèi)存管理圖示——堆內(nèi)

圖 5 . 統(tǒng)一內(nèi)存管理圖示——堆外

圖 5 . 統(tǒng)一內(nèi)存管理圖示——堆外

其中最重要的優(yōu)化在于動(dòng)態(tài)占用機(jī)制,其規(guī)則如下:

  • 設(shè)定基本的存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存區(qū)域(spark.storage.storageFraction 參數(shù)),該設(shè)定確定了雙方各自擁有的空間的范圍
  • 雙方的空間都不足時(shí),則存儲(chǔ)到硬盤;若己方空間不足而對(duì)方空余時(shí),可借用對(duì)方的空間;(存儲(chǔ)空間不足是指不足以放下一個(gè)完整的 Block)
  • 執(zhí)行內(nèi)存的空間被對(duì)方占用后,可讓對(duì)方將占用的部分轉(zhuǎn)存到硬盤,然后”歸還”借用的空間
  • 存儲(chǔ)內(nèi)存的空間被對(duì)方占用后,無(wú)法讓對(duì)方”歸還”,因?yàn)樾枰紤] Shuffle 過(guò)程中的很多因素,實(shí)現(xiàn)起來(lái)較為復(fù)雜

圖 6 . 動(dòng)態(tài)占用機(jī)制圖示

圖 6 . 動(dòng)態(tài)占用機(jī)制圖示

憑借統(tǒng)一內(nèi)存管理機(jī)制,Spark 在一定程度上提高了堆內(nèi)和堆外內(nèi)存資源的利用率,降低了開(kāi)發(fā)者維護(hù) Spark 內(nèi)存的難度,但并不意味著開(kāi)發(fā)者可以高枕無(wú)憂。譬如,所以如果存儲(chǔ)內(nèi)存的空間太大或者說(shuō)緩存的數(shù)據(jù)過(guò)多,反而會(huì)導(dǎo)致頻繁的全量垃圾回收,降低任務(wù)執(zhí)行時(shí)的性能,因?yàn)榫彺娴? RDD 數(shù)據(jù)通常都是長(zhǎng)期駐留內(nèi)存的 [5] 。所以要想充分發(fā)揮 Spark 的性能,需要開(kāi)發(fā)者進(jìn)一步了解存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存各自的管理方式和實(shí)現(xiàn)原理。

3. 存儲(chǔ)內(nèi)存管理

3.1 RDD 的持久化機(jī)制

彈性分布式數(shù)據(jù)集(RDD)作為 Spark 最根本的數(shù)據(jù)抽象,是只讀的分區(qū)記錄(Partition)的集合,只能基于在穩(wěn)定物理存儲(chǔ)中的數(shù)據(jù)集上創(chuàng)建,或者在其他已有的 RDD 上執(zhí)行轉(zhuǎn)換(Transformation)操作產(chǎn)生一個(gè)新的 RDD。轉(zhuǎn)換后的 RDD 與原始的 RDD 之間產(chǎn)生的依賴關(guān)系,構(gòu)成了血統(tǒng)(Lineage)。憑借血統(tǒng),Spark 保證了每一個(gè) RDD 都可以被重新恢復(fù)。但 RDD 的所有轉(zhuǎn)換都是惰性的,即只有當(dāng)一個(gè)返回結(jié)果給 Driver 的行動(dòng)(Action)發(fā)生時(shí),Spark 才會(huì)創(chuàng)建任務(wù)讀取 RDD,然后真正觸發(fā)轉(zhuǎn)換的執(zhí)行。

Task 在啟動(dòng)之初讀取一個(gè)分區(qū)時(shí),會(huì)先判斷這個(gè)分區(qū)是否已經(jīng)被持久化,如果沒(méi)有則需要檢查 Checkpoint 或按照血統(tǒng)重新計(jì)算。所以如果一個(gè) RDD 上要執(zhí)行多次行動(dòng),可以在***次行動(dòng)中使用 persist 或 cache 方法,在內(nèi)存或磁盤中持久化或緩存這個(gè) RDD,從而在后面的行動(dòng)時(shí)提升計(jì)算速度。事實(shí)上,cache 方法是使用默認(rèn)的 MEMORY_ONLY 的存儲(chǔ)級(jí)別將 RDD 持久化到內(nèi)存,故緩存是一種特殊的持久化。 堆內(nèi)和堆外存儲(chǔ)內(nèi)存的設(shè)計(jì),便可以對(duì)緩存 RDD 時(shí)使用的內(nèi)存做統(tǒng)一的規(guī)劃和管 理 (存儲(chǔ)內(nèi)存的其他應(yīng)用場(chǎng)景,如緩存 broadcast 數(shù)據(jù),暫時(shí)不在本文的討論范圍之內(nèi))。

RDD 的持久化由 Spark 的 Storage 模塊 [7] 負(fù)責(zé),實(shí)現(xiàn)了 RDD 與物理存儲(chǔ)的解耦合。Storage 模塊負(fù)責(zé)管理 Spark 在計(jì)算過(guò)程中產(chǎn)生的數(shù)據(jù),將那些在內(nèi)存或磁盤、在本地或遠(yuǎn)程存取數(shù)據(jù)的功能封裝了起來(lái)。在具體實(shí)現(xiàn)時(shí) Driver 端和 Executor 端的 Storage 模塊構(gòu)成了主從式的架構(gòu),即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。Storage 模塊在邏輯上以 Block 為基本存儲(chǔ)單位,RDD 的每個(gè) Partition 經(jīng)過(guò)處理后唯一對(duì)應(yīng)一個(gè) Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Master 負(fù)責(zé)整個(gè) Spark 應(yīng)用程序的 Block 的元數(shù)據(jù)信息的管理和維護(hù),而 Slave 需要將 Block 的更新等狀態(tài)上報(bào)到 Master,同時(shí)接收 Master 的命令,例如新增或刪除一個(gè) RDD。

圖 7 . Storage 模塊示意圖

圖 7 . Storage 模塊示意圖

在對(duì) RDD 持久化時(shí),Spark 規(guī)定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的 存儲(chǔ)級(jí)別 ,而存儲(chǔ)級(jí)別是以下 5 個(gè)變量的組合:

清單 3 . 存儲(chǔ)級(jí)別

  1. class StorageLevel private( 
  2. private var _useDisk: Boolean, //磁盤 
  3. private var _useMemory: Boolean, //這里其實(shí)是指堆內(nèi)內(nèi)存 
  4. private var _useOffHeap: Boolean, //堆外內(nèi)存 
  5. private var _deserialized: Boolean, //是否為非序列化 
  6. private var _replication: Int = 1 //副本個(gè)數(shù) 

通過(guò)對(duì)數(shù)據(jù)結(jié)構(gòu)的分析,可以看出存儲(chǔ)級(jí)別從三個(gè)維度定義了 RDD 的 Partition(同時(shí)也就是 Block)的存儲(chǔ)方式:

  • 存儲(chǔ)位置:磁盤/堆內(nèi)內(nèi)存/堆外內(nèi)存。如 MEMORY_AND_DISK 是同時(shí)在磁盤和堆內(nèi)內(nèi)存上存儲(chǔ),實(shí)現(xiàn)了冗余備份。OFF_HEAP 則是只在堆外內(nèi)存存儲(chǔ),目前選擇堆外內(nèi)存時(shí)不能同時(shí)存儲(chǔ)到其他位置。
  • 存儲(chǔ)形式:Block 緩存到存儲(chǔ)內(nèi)存后,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲(chǔ),OFF_HEAP 是序列化方式存儲(chǔ)。
  • 副本數(shù)量:大于 1 時(shí)需要遠(yuǎn)程冗余備份到其他節(jié)點(diǎn)。如 DISK_ONLY_2 需要遠(yuǎn)程備份 1 個(gè)副本。

3.2 RDD 緩存的過(guò)程

RDD 在緩存到存儲(chǔ)內(nèi)存之前,Partition 中的數(shù)據(jù)一般以迭代器(Iterator)的數(shù)據(jù)結(jié)構(gòu)來(lái)訪問(wèn),這是 Scala 語(yǔ)言中一種遍歷數(shù)據(jù)集合的方法。通過(guò) Iterator 可以獲取分區(qū)中每一條序列化或者非序列化的數(shù)據(jù)項(xiàng)(Record),這些 Record 的對(duì)象實(shí)例在邏輯上占用了 JVM 堆內(nèi)內(nèi)存的 other 部分的空間,同一 Partition 的不同 Record 的空間并不連續(xù)。

RDD 在緩存到存儲(chǔ)內(nèi)存之后,Partition 被轉(zhuǎn)換成 Block,Record 在堆內(nèi)或堆外存儲(chǔ)內(nèi)存中占用一塊連續(xù)的空間。將Partition由不連續(xù)的存儲(chǔ)空間轉(zhuǎn)換為連續(xù)存儲(chǔ)空間的過(guò)程,Spark稱之為”展開(kāi)”(Unroll)。Block 有序列化和非序列化兩種存儲(chǔ)格式,具體以哪種方式取決于該 RDD 的存儲(chǔ)級(jí)別。非反序列化的 Block 以一種 DeserializedMemoryEntry 的數(shù)據(jù)結(jié)構(gòu)定義,用一個(gè)數(shù)組存儲(chǔ)所有的 Java 對(duì)象,非序列化的 Block 則以 SerializedMemoryEntry 的數(shù)據(jù)結(jié)構(gòu)定義,用字節(jié)緩沖區(qū)(ByteBuffer)來(lái)存儲(chǔ)二進(jìn)制數(shù)據(jù)。每個(gè) Executor 的 Storage 模塊用一個(gè)鏈?zhǔn)?Map 結(jié)構(gòu)(LinkedHashMap)來(lái)管理堆內(nèi)和堆外存儲(chǔ)內(nèi)存中所有的 Block 對(duì)象的實(shí)例[6],對(duì)這個(gè) LinkedHashMap 新增和刪除間接記錄了內(nèi)存的申請(qǐng)和釋放。

因?yàn)椴荒鼙WC存儲(chǔ)空間可以一次容納 Iterator 中的所有數(shù)據(jù),當(dāng)前的計(jì)算任務(wù)在 Unroll 時(shí)要向 MemoryManager 申請(qǐng)足夠的 Unroll 空間來(lái)臨時(shí)占位,空間不足則 Unroll 失敗,空間足夠時(shí)可以繼續(xù)進(jìn)行。對(duì)于序列化的 Partition,其所需的 Unroll 空間可以直接累加計(jì)算,一次申請(qǐng)。而非序列化的 Partition 則要在遍歷 Record 的過(guò)程中依次申請(qǐng),即每讀取一條 Record,采樣估算其所需的 Unroll 空間并進(jìn)行申請(qǐng),空間不足時(shí)可以中斷,釋放已占用的 Unroll 空間。如果最終 Unroll 成功,當(dāng)前 Partition 所占用的 Unroll 空間被轉(zhuǎn)換為正常的緩存 RDD 的存儲(chǔ)空間。

在圖 3 和圖 5 中可以看到,在靜態(tài)內(nèi)存管理時(shí),Spark 在存儲(chǔ)內(nèi)存中專門劃分了一塊 Unroll 空間,其大小是固定的,統(tǒng)一內(nèi)存管理時(shí)則沒(méi)有對(duì) Unroll 空間進(jìn)行特別區(qū)分,當(dāng)存儲(chǔ)空間不足時(shí)會(huì)根據(jù)動(dòng)態(tài)占用機(jī)制進(jìn)行處理。

3.3 淘汰和落盤

由于同一個(gè) Executor 的所有的計(jì)算任務(wù)共享有限的存儲(chǔ)內(nèi)存空間,當(dāng)有新的 Block 需要緩存但是剩余空間不足且無(wú)法動(dòng)態(tài)占用時(shí),就要對(duì) LinkedHashMap 中的舊 Block 進(jìn)行淘汰(Eviction),而被淘汰的 Block 如果其存儲(chǔ)級(jí)別中同時(shí)包含存儲(chǔ)到磁盤的要求,則要對(duì)其進(jìn)行落盤(Drop),否則直接刪除該 Block。

存儲(chǔ)內(nèi)存的淘汰規(guī)則為:

  • 被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬于堆外或堆內(nèi)內(nèi)存
  • 新舊 Block 不能屬于同一個(gè) RDD,避免循環(huán)淘汰
  • 舊 Block 所屬 RDD 不能處于被讀狀態(tài),避免引發(fā)一致性問(wèn)題
  • 遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。

落盤的流程則比較簡(jiǎn)單,如果其存儲(chǔ)級(jí)別符合_useDisk 為 true 的條件,再根據(jù)其_deserialized 判斷是否是非序列化的形式,若是則對(duì)其進(jìn)行序列化,***將數(shù)據(jù)存儲(chǔ)到磁盤,在 Storage 模塊中更新其信息。

4. 執(zhí)行內(nèi)存管理

4.1 多任務(wù)間內(nèi)存分配

Executor 內(nèi)運(yùn)行的任務(wù)同樣共享執(zhí)行內(nèi)存,Spark 用一個(gè) HashMap 結(jié)構(gòu)保存了任務(wù)到內(nèi)存耗費(fèi)的映射。每個(gè)任務(wù)可占用的執(zhí)行內(nèi)存大小的范圍為 1/2N ~ 1/N,其中 N 為當(dāng)前 Executor 內(nèi)正在運(yùn)行的任務(wù)的個(gè)數(shù)。每個(gè)任務(wù)在啟動(dòng)之時(shí),要向 MemoryManager 請(qǐng)求申請(qǐng)最少為 1/2N 的執(zhí)行內(nèi)存,如果不能被滿足要求則該任務(wù)被阻塞,直到有其他任務(wù)釋放了足夠的執(zhí)行內(nèi)存,該任務(wù)才可以被喚醒。

4.2 Shuffle 的內(nèi)存占用

執(zhí)行內(nèi)存主要用來(lái)存儲(chǔ)任務(wù)在執(zhí)行 Shuffle 時(shí)占用的內(nèi)存,Shuffle 是按照一定規(guī)則對(duì) RDD 數(shù)據(jù)重新分區(qū)的過(guò)程,我們來(lái)看 Shuffle 的 Write 和 Read 兩階段對(duì)執(zhí)行內(nèi)存的使用:

  • Shuffle Write
  1. 若在 map 端選擇普通的排序方式,會(huì)采用 ExternalSorter 進(jìn)行外排,在內(nèi)存中存儲(chǔ)數(shù)據(jù)時(shí)主要占用堆內(nèi)執(zhí)行空間。
  2. 若在 map 端選擇 Tungsten 的排序方式,則采用 ShuffleExternalSorter 直接對(duì)以序列化形式存儲(chǔ)的數(shù)據(jù)排序,在內(nèi)存中存儲(chǔ)數(shù)據(jù)時(shí)可以占用堆外或堆內(nèi)執(zhí)行空間,取決于用戶是否開(kāi)啟了堆外內(nèi)存以及堆外執(zhí)行內(nèi)存是否足夠。
  • Shuffle Read
  1. 在對(duì) reduce 端的數(shù)據(jù)進(jìn)行聚合時(shí),要將數(shù)據(jù)交給 Aggregator 處理,在內(nèi)存中存儲(chǔ)數(shù)據(jù)時(shí)占用堆內(nèi)執(zhí)行空間。
  2. 如果需要進(jìn)行最終結(jié)果排序,則要將再次將數(shù)據(jù)交給 ExternalSorter 處理,占用堆內(nèi)執(zhí)行空間。

在 ExternalSorter 和 Aggregator 中,Spark 會(huì)使用一種叫 AppendOnlyMap 的哈希表在堆內(nèi)執(zhí)行內(nèi)存中存儲(chǔ)數(shù)據(jù),但在 Shuffle 過(guò)程中所有數(shù)據(jù)并不能都保存到該哈希表中,當(dāng)這個(gè)哈希表占用的內(nèi)存會(huì)進(jìn)行周期性地采樣估算,當(dāng)其大到一定程度,無(wú)法再?gòu)?MemoryManager 申請(qǐng)到新的執(zhí)行內(nèi)存時(shí),Spark 就會(huì)將其全部?jī)?nèi)容存儲(chǔ)到磁盤文件中,這個(gè)過(guò)程被稱為溢存(Spill),溢存到磁盤的文件***會(huì)被歸并(Merge)。

Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對(duì) Spark 優(yōu)化內(nèi)存和 CPU 使用的計(jì)劃[9],解決了一些 JVM 在性能上的限制和弊端。Spark 會(huì)根據(jù) Shuffle 的情況來(lái)自動(dòng)選擇是否采用 Tungsten 排序。Tungsten 采用的頁(yè)式內(nèi)存管理機(jī)制建立在 MemoryManager 之上,即 Tungsten 對(duì)執(zhí)行內(nèi)存的使用進(jìn)行了一步的抽象,這樣在 Shuffle 過(guò)程中無(wú)需關(guān)心數(shù)據(jù)具體存儲(chǔ)在堆內(nèi)還是堆外。每個(gè)內(nèi)存頁(yè)用一個(gè) MemoryBlock 來(lái)定義,并用 Object obj 和 long offset 這兩個(gè)變量統(tǒng)一標(biāo)識(shí)一個(gè)內(nèi)存頁(yè)在系統(tǒng)內(nèi)存中的地址。堆內(nèi)的 MemoryBlock 是以 long 型數(shù)組的形式分配的內(nèi)存,其 obj 的值為是這個(gè)數(shù)組的對(duì)象引用,offset 是 long 型數(shù)組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個(gè)數(shù)組在堆內(nèi)的絕對(duì)地址;堆外的 MemoryBlock 是直接申請(qǐng)到的內(nèi)存塊,其 obj 為 null,offset 是這個(gè)內(nèi)存塊在系統(tǒng)內(nèi)存中的 64 位絕對(duì)地址。Spark 用 MemoryBlock 巧妙地將堆內(nèi)和堆外內(nèi)存頁(yè)統(tǒng)一抽象封裝,并用頁(yè)表(pageTable)管理每個(gè) Task 申請(qǐng)到的內(nèi)存頁(yè)。

Tungsten 頁(yè)式管理下的所有內(nèi)存用 64 位的邏輯地址表示,由頁(yè)號(hào)和頁(yè)內(nèi)偏移量組成:

  • 頁(yè)號(hào):占 13 位,唯一標(biāo)識(shí)一個(gè)內(nèi)存頁(yè),Spark 在申請(qǐng)內(nèi)存頁(yè)之前要先申請(qǐng)空閑頁(yè)號(hào)。
  • 頁(yè)內(nèi)偏移量:占 51 位,是在使用內(nèi)存頁(yè)存儲(chǔ)數(shù)據(jù)時(shí),數(shù)據(jù)在頁(yè)內(nèi)的偏移地址。

有了統(tǒng)一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內(nèi)或堆外的內(nèi)存,整個(gè) Shuffle Write 排序的過(guò)程只需要對(duì)指針進(jìn)行排序,并且無(wú)需反序列化,整個(gè)過(guò)程非常高效,對(duì)于內(nèi)存訪問(wèn)效率和 CPU 使用效率帶來(lái)了明顯的提升[10]。

Spark 的存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存有著截然不同的管理方式:對(duì)于存儲(chǔ)內(nèi)存來(lái)說(shuō),Spark 用一個(gè) LinkedHashMap 來(lái)集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉(zhuǎn)化而成;而對(duì)于執(zhí)行內(nèi)存,Spark 用 AppendOnlyMap 來(lái)存儲(chǔ) Shuffle 過(guò)程中的數(shù)據(jù),在 Tungsten 排序中甚至抽象成為頁(yè)式內(nèi)存管理,開(kāi)辟了全新的 JVM 內(nèi)存管理機(jī)制。

結(jié)束語(yǔ)

Spark 的內(nèi)存管理是一套復(fù)雜的機(jī)制,且 Spark 的版本更新比較快,筆者水平有限,難免有敘述不清、錯(cuò)誤的地方,若讀者有好的建議和更深的理解,還望不吝賜教。

責(zé)任編輯:武曉燕 來(lái)源: 36大數(shù)據(jù)
相關(guān)推薦

2018-08-09 11:06:39

Apache Spar內(nèi)存模型

2019-05-30 11:04:52

內(nèi)存Spark管理

2018-12-18 14:37:26

Spark內(nèi)存管理

2018-05-31 20:49:50

Spark堆內(nèi)內(nèi)存優(yōu)化機(jī)制

2014-02-14 15:43:16

ApacheSpark

2011-06-03 10:19:59

iphone Objective-

2019-04-17 14:44:42

Spark內(nèi)存源碼

2019-10-10 16:20:23

spark內(nèi)存管理

2011-07-19 15:37:13

Oracle 10g內(nèi)存管理PGA

2010-09-26 13:23:13

JVM內(nèi)存管理機(jī)制

2011-07-01 10:16:08

C++內(nèi)存管理

2010-12-10 15:40:58

JVM內(nèi)存管理

2011-06-29 17:20:20

Qt 內(nèi)存 QOBJECT

2020-08-18 19:15:44

Redis內(nèi)存管理

2022-06-01 13:52:11

開(kāi)源大數(shù)據(jù)

2024-03-26 00:33:59

JVM內(nèi)存對(duì)象

2013-11-29 15:41:08

解析漏洞ApacheApache解析漏洞

2018-06-06 08:28:37

Spark內(nèi)存管理

2017-06-26 15:00:17

2016-12-20 09:47:38

Apache SparLambda架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)