Spark Shuffle過程分析:Map階段處理流程
默認(rèn)配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數(shù)據(jù)的計算。我們可以在Driver和Executor對應(yīng)的SparkEnv對象創(chuàng)建過程中看到對應(yīng)的配置,如下代碼所示:
- // Let the user specify short names for shuffle managers
- val shortShuffleMgrNames = Map(
- "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
- "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
- val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
- val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
- val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
如果需要修改ShuffleManager實現(xiàn),則只需要修改配置項spark.shuffle.manager即可,默認(rèn)支持sort和 tungsten-sort,可以指定自己實現(xiàn)的ShuffleManager類。
因為Shuffle過程中需要將Map結(jié)果數(shù)據(jù)輸出到文件,所以需要通過注冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,通過它來控制Map階段記錄數(shù)據(jù)輸出的行為。其中,ShuffleHandle包含了如下基本信息:
- shuffleId:標(biāo)識Shuffle過程的唯一ID
- numMaps:RDD對應(yīng)的Partitioner指定的Partition的個數(shù),也就是ShuffleMapTask輸出的Partition個數(shù)
- dependency:RDD對應(yīng)的依賴ShuffleDependency
下面我們看下,在SortShuffleManager中是如何注冊Shuffle的,代碼如下所示:
- override def registerShuffle[K, V, C](
- shuffleId: Int,
- numMaps: Int,
- dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
- if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
- new BypassMergeSortShuffleHandle[K, V](
- shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
- } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
- new SerializedShuffleHandle[K, V](
- shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
- } else {
- new BaseShuffleHandle(shuffleId, numMaps, dependency)
- }
- }
上面代碼中,對應(yīng)如下3種ShuffleHandle可以選擇,說明如下:
- BypassMergeSortShuffleHandle
如果dependency不需要進(jìn)行Map Side Combine,并且RDD對應(yīng)的ShuffleDependency中的Partitioner設(shè)置的Partition的數(shù)量(這個不要和parent RDD的Partition個數(shù)混淆,Partitioner指定了map處理結(jié)果的Partition個數(shù),每個Partition數(shù)據(jù)會在Shuffle過程中全部被拉取而拷貝到下游的某個Executor端)小于等于配置參數(shù)spark.shuffle.sort.bypassMergeThreshold的值,則會注冊BypassMergeSortShuffleHandle。默認(rèn)情況下,spark.shuffle.sort.bypassMergeThreshold的取值是200,這種情況下會直接將對RDD的 map處理結(jié)果的各個Partition數(shù)據(jù)寫入文件,并***做一個合并處理。
- SerializedShuffleHandle
如果ShuffleDependency中的Serializer,允許對將要輸出數(shù)據(jù)對象進(jìn)行排序后,再執(zhí)行序列化寫入到文件,則會選擇創(chuàng)建一個SerializedShuffleHandle。
- BaseShuffleHandle
除了上面兩種ShuffleHandle以后,其他情況都會創(chuàng)建一個BaseShuffleHandle對象,它會以反序列化的格式處理Shuffle輸出數(shù)據(jù)。
Map階段處理流程分析
Map階段RDD的計算,對應(yīng)ShuffleMapTask這個實現(xiàn)類,它最終會在每個Executor上啟動運行,每個ShuffleMapTask處理RDD的一個Partition的數(shù)據(jù)。這個過程的核心處理邏輯,代碼如下所示:
- val manager = SparkEnv.get.shuffleManager
- writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
- writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
上面代碼中,在調(diào)用rdd的iterator()方法時,會根據(jù)RDD實現(xiàn)類的compute方法指定的處理邏輯對數(shù)據(jù)進(jìn)行處理,當(dāng)然,如果該Partition對應(yīng)的數(shù)據(jù)已經(jīng)處理過并存儲在MemoryStore或DiskStore,直接通過BlockManager獲取到對應(yīng)的Block數(shù)據(jù),而無需每次需要時重新計算。然后,write()方法會將已經(jīng)處理過的Partition數(shù)據(jù)輸出到磁盤文件。
在Spark Shuffle過程中,每個ShuffleMapTask會通過配置的ShuffleManager實現(xiàn)類對應(yīng)的ShuffleManager對象(實際上是在SparkEnv中創(chuàng)建),根據(jù)已經(jīng)注冊的ShuffleHandle,獲取到對應(yīng)的ShuffleWriter對象,然后通過ShuffleWriter對象將Partition數(shù)據(jù)寫入內(nèi)存或文件。所以,接下來我們可能關(guān)心每一種ShuffleHandle對應(yīng)的ShuffleWriter的行為,可以看到SortShuffleManager中獲取到ShuffleWriter的實現(xiàn)代碼,如下所示:
- /** Get a writer for a given partition. Called on executors by map tasks. */
- override def getWriter[K, V](
- handle: ShuffleHandle,
- mapId: Int,
- context: TaskContext): ShuffleWriter[K, V] = {
- numMapsForShuffle.putIfAbsent(
- handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
- val env = SparkEnv.get
- handle match {
- case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
- new UnsafeShuffleWriter(
- env.blockManager,
- shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
- context.taskMemoryManager(),
- unsafeShuffleHandle,
- mapId,
- context,
- env.conf)
- case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
- new BypassMergeSortShuffleWriter(
- env.blockManager,
- shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
- bypassMergeSortHandle,
- mapId,
- context,
- env.conf)
- case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
- new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
- }
- }
我們以最簡單的SortShuffleWriter為例進(jìn)行分析,在SortShuffleManager可以通過getWriter()方法創(chuàng)建一個SortShuffleWriter對象,然后在ShuffleMapTask中調(diào)用SortShuffleWriter對象的write()方法處理Map輸出的記錄數(shù)據(jù),write()方法的處理代碼,如下所示:
- /** Write a bunch of records to this task's output */
- override def write(records: Iterator[Product2[K, V]]): Unit = {
- sorter = if (dep.mapSideCombine) {
- require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
- new ExternalSorter[K, V, C](
- context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
- } else {
- // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
- // care whether the keys get sorted in each partition; that will be done on the reduce side
- // if the operation being run is sortByKey.
- new ExternalSorter[K, V, V](
- context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
- }
- sorter.insertAll(records)
- // Don't bother including the time to open the merged output file in the shuffle write time,
- // because it just opens a single file, so is typically too fast to measure accurately
- // (see SPARK-3570).
- val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
- val tmp = Utils.tempFileWith(output)
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
- val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
- shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
- mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
- }
從SortShuffleWriter類中的write()方法可以看到,最終調(diào)用了ExeternalSorter的insertAll()方法,實現(xiàn)了Map端RDD某個Partition數(shù)據(jù)處理并輸出到內(nèi)存或磁盤文件,這也是處理Map階段輸出記錄數(shù)據(jù)最核心、最復(fù)雜的過程。我們將其分為兩個階段進(jìn)行分析:***階段是,ExeternalSorter的insertAll()方法處理過程,將記錄數(shù)據(jù)Spill到磁盤文件;第二階段是,執(zhí)行完insertAll()方法之后的處理邏輯,創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件。
內(nèi)存緩沖寫記錄數(shù)據(jù)并Spill到磁盤文件
查看SortShuffleWriter類的write()方法可以看到,在內(nèi)存中緩存記錄數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)有兩種:一種是Buffer,對應(yīng)的實現(xiàn)類PartitionedPairBuffer,設(shè)置mapSideCombine=false時會使用該結(jié)構(gòu);另一種是Map,對應(yīng)的實現(xiàn)類是PartitionedAppendOnlyMap,設(shè)置mapSideCombine=false時會使用該結(jié)構(gòu)。根據(jù)是否指定mapSideCombine選項,分別對應(yīng)不同的處理流程,我們分別說明如下:
設(shè)置mapSideCombine=false時
這種情況在Map階段不進(jìn)行Combine操作,在內(nèi)存中緩存記錄數(shù)據(jù)會使用PartitionedPairBuffer這種數(shù)據(jù)結(jié)構(gòu)來緩存、排序記錄數(shù)據(jù),它是一個Append-only Buffer,僅支持向Buffer中追加數(shù)據(jù)鍵值對記錄,PartitionedPairBuffer的結(jié)構(gòu)如下圖所示:
默認(rèn)情況下,PartitionedPairBuffer初始分配的存儲容量為capacity = initialCapacity = 64,實際上這個容量是針對key的容量,因為要存儲的是鍵值對記錄數(shù)據(jù),所以實際存儲鍵值對的容量為2*initialCapacity = 128。PartitionedPairBuffer是一個能夠動態(tài)擴充容量的Buffer,內(nèi)部使用一個一維數(shù)組來存儲鍵值對,每次擴容結(jié)果為當(dāng)前Buffer容量的2倍,即2*capacity,***支持存儲2^31-1個鍵值對記錄(1073741823個)。
通過上圖可以看到,PartitionedPairBuffer存儲的鍵值對記錄數(shù)據(jù),鍵是(partition, key)這樣一個Tuple,值是對應(yīng)的數(shù)據(jù)value,而且curSize是用來跟蹤寫入Buffer中的記錄的,key在Buffer中的索引位置為2*curSize,value的索引位置為2*curSize+1,可見一個鍵值對的key和value的存儲在PartitionedPairBuffer內(nèi)部的數(shù)組中是相鄰的。
使用PartitionedPairBuffer緩存鍵值對記錄數(shù)據(jù),通過跟蹤實際寫入到Buffer內(nèi)的記錄數(shù)據(jù)的字節(jié)數(shù)來判斷,是否需要將Buffer中的數(shù)據(jù)Spill到磁盤文件,如下代碼所示:
- protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
- var shouldSpill = false
- if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
- // Claim up to double our current memory from the shuffle memory pool
- val amountToRequest = 2 * currentMemory - myMemoryThreshold
- val granted = acquireMemory(amountToRequest)
- myMemoryThreshold += granted
- // If we were granted too little memory to grow further (either tryToAcquire returned 0,
- // or we already had more memory than myMemoryThreshold), spill the current collection
- shouldSpill = currentMemory >= myMemoryThreshold
- }
- shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
- // Actually spill
- if (shouldSpill) {
- _spillCount += 1
- logSpillage(currentMemory)
- spill(collection)
- _elementsRead = 0
- _memoryBytesSpilled += currentMemory
- releaseMemory()
- }
- shouldSpill
- }
上面elementsRead表示存儲到PartitionedPairBuffer中的記錄數(shù),currentMemory是對Buffer中的總記錄數(shù)據(jù)大小(字節(jié)數(shù))的估算,myMemoryThreshold通過配置項spark.shuffle.spill.initialMemoryThreshold來進(jìn)行設(shè)置的,默認(rèn)值為5 * 1024 * 1024 = 5M。當(dāng)滿足條件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold時,會先嘗試向MemoryManager申請2 * currentMemory – myMemoryThreshold大小的內(nèi)存,如果能夠申請到,則不進(jìn)行Spill操作,而是繼續(xù)向Buffer中存儲數(shù)據(jù),否則就會調(diào)用spill()方法將Buffer中數(shù)據(jù)輸出到磁盤文件。
向PartitionedPairBuffer中寫入記錄數(shù)據(jù),以及滿足條件Spill記錄數(shù)據(jù)到磁盤文件,具體處理流程,如下圖所示:
為了查看按照怎樣的規(guī)則進(jìn)行排序,我們看一下,當(dāng)不進(jìn)行Map Side Combine時,創(chuàng)建ExternalSorter對象的代碼如下所示:
- // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
- // care whether the keys get sorted in each partition; that will be done on the reduce side
- // if the operation being run is sortByKey.
- new ExternalSorter[K, V, V](
- context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
上面aggregator = None,ordering = None,在對PartitionedPairBuffer中的記錄數(shù)據(jù)Spill到磁盤之前,要使用默認(rèn)的排序規(guī)則進(jìn)行排序,排序的規(guī)則是只對PartitionedPairBuffer中的記錄按Partition ID進(jìn)行升序排序,可以查看WritablePartitionedPairCollection伴生對象類的代碼(其中PartitionedPairBuffer類實現(xiàn)了特質(zhì)WritablePartitionedPairCollection),如下所示:
- /**
- * A comparator for (Int, K) pairs that orders them by only their partition ID.
- */
- def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
- override def compare(a: (Int, K), b: (Int, K)): Int = {
- a._1 - b._1
- }
- }
上面圖中,引用了SortShuffleWriter.writeBlockFiles這個子序列圖,用來生成Block數(shù)據(jù)文件和索引文件,后面我們會單獨說明。通過對RDD進(jìn)行計算生成一個記錄迭代器對象,通過該迭代器迭代出的記錄會存儲到PartitionedPairBuffer中,當(dāng)滿足Spill條件時,先對PartitionedPairBuffer中記錄進(jìn)行排序,***Spill到磁盤文件,這個過程中PartitionedPairBuffer中的記錄數(shù)據(jù)的變化情況,如下圖所示:
上圖中,對內(nèi)存中PartitionedPairBuffer中的記錄按照Partition ID進(jìn)行排序,并且屬于同一個Partition的數(shù)據(jù)記錄在PartitionedPairBuffer內(nèi)部的data數(shù)組中是連續(xù)的。排序結(jié)束后,在Spill到磁盤文件時,將對應(yīng)的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中連續(xù)存儲鍵值對數(shù)據(jù),但同時在另一個內(nèi)存數(shù)組結(jié)構(gòu)中會保存文件中每個Partition擁有的記錄數(shù),這樣就能根據(jù)Partition的記錄數(shù)來順序讀取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中屬于同一個Partition的全部記錄數(shù)據(jù)。
ExternalSorter類內(nèi)部維護了一個SpillFile的ArrayBuffer數(shù)組,最終可能會生成多個SpillFile,SpillFile的定義如下所示:
- private[this] case class SpilledFile(
- file: File,
- blockId: BlockId,
- serializerBatchSizes: Array[Long],
- elementsPerPartition: Array[Long])
每個SpillFile包含一個blockId,標(biāo)識Map輸出的該臨時文件;serializerBatchSizes表示每次批量寫入到文件的Object的數(shù)量,默認(rèn)為10000,由配置項spark.shuffle.spill.batchSize來控制;elementsPerPartition表示每個Partition中的Object的數(shù)量。調(diào)用ExternalSorter的insertAll()方法,最終可能有如下3種情況:
- Map階段輸出記錄數(shù)較少,沒有生成SpillFile,那么所有數(shù)據(jù)都在Buffer中,直接對Buffer中記錄排序并輸出到文件
- Map階段輸出記錄數(shù)較多,生成多個SpillFile,同時Buffer中也有部分記錄數(shù)據(jù)
- Map階段輸出記錄數(shù)較多,只生成多個SpillFile
- 有關(guān)后續(xù)如何對上面3種情況進(jìn)行處理,可以想見后面對子序列圖SortShuffleWriter.writeBlockFiles的說明。
- 設(shè)置mapSideCombine=true時
這種情況在Map階段會執(zhí)行Combine操作,在Map階段進(jìn)行Combine操作能夠降低Map階段數(shù)據(jù)記錄的總數(shù),從而降低Shuffle過程中數(shù)據(jù)的跨網(wǎng)絡(luò)拷貝傳輸。這時,RDD對應(yīng)的ShuffleDependency需要設(shè)置一個Aggregator用來執(zhí)行Combine操作,可以看下Aggregator類聲明,代碼如下所示:
- /**
- * :: DeveloperApi ::
- * A set of functions used to aggregate data.
- *
- * @param createCombiner function to create the initial value of the aggregation.
- * @param mergeValue function to merge a new value into the aggregation result.
- * @param mergeCombiners function to merge outputs from multiple mergeValue function.
- */
- @DeveloperApi
- case class Aggregator[K, V, C] (
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C) {
- ... ...
- }
由于在Map階段只用到了構(gòu)造Aggregator的幾個函數(shù)參數(shù)createCombiner、mergeValue、mergeCombiners,我們對這幾個函數(shù)詳細(xì)說明如下:
- createCombiner:進(jìn)行Aggregation開始時,需要設(shè)置初始值。因為在Aggregation過程中使用了類似Map的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來管理鍵值對,每次加入前會先查看Map內(nèi)存結(jié)構(gòu)中是否存在Key對應(yīng)的Value,***次肯定不存在,所以***將某個Key的Value加入到Map內(nèi)存結(jié)構(gòu)中時,Key在Map內(nèi)存結(jié)構(gòu)中***次有了Value。
- mergeValue:某個Key已經(jīng)在Map結(jié)構(gòu)中存在Value,后續(xù)某次又遇到相同的Key和一個新的Value,這時需要通過該函數(shù),將舊Value和新Value進(jìn)行合并,根據(jù)Key檢索能夠得到合并后的新Value。
- mergeCombiners:一個Map內(nèi)存結(jié)構(gòu)中Key和Value是由mergeValue生成的,那么在向Map中插入數(shù)據(jù),肯定會遇到Map使用容量達(dá)到上限,這時需要將記錄數(shù)據(jù)Spill到磁盤文件,那么多個Spill輸出的磁盤文件中可能存在同一個Key,這時需要對多個Spill輸出的磁盤文件中的Key的多個Value進(jìn)行合并,這時需要使用mergeCombiners函數(shù)進(jìn)行處理。
該類中定義了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于這些函數(shù)是在Reduce階段使用的,所以在這里先不說明,后續(xù)文章我們會單獨詳細(xì)來分析。
我們通過下面的序列圖來描述,需要進(jìn)行Map Side Combine時的處理流程,如下所示:
對照上圖,我們看一下,當(dāng)需要進(jìn)行Map Side Combine時,對應(yīng)的ExternalSorter類insertAll()方法中的處理邏輯,代碼如下所示:
- val shouldCombine = aggregator.isDefined
- if (shouldCombine) {
- // Combine values in-memory first using our AppendOnlyMap
- val mergeValue = aggregator.get.mergeValue
- val createCombiner = aggregator.get.createCombiner
- var kv: Product2[K, V] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
- }
- while (records.hasNext) {
- addElementsRead()
- kv = records.next()
- map.changeValue((getPartition(kv._1), kv._1), update)
- maybeSpillCollection(usingMap = true)
- }
- }
上面代碼中,map是內(nèi)存數(shù)據(jù)結(jié)構(gòu),最重要的是update函數(shù)和map的changeValue方法(這里的map對應(yīng)的實現(xiàn)類是PartitionedAppendOnlyMap)。update函數(shù)所做的工作,其實就是對createCombiner和mergeValue這兩個函數(shù)的使用,***次遇到一個Key調(diào)用createCombiner函數(shù)處理,非***遇到同一個Key對應(yīng)新的Value調(diào)用mergeValue函數(shù)進(jìn)行合并處理。map的changeValue方法主要是將Key和Value在map中存儲或者進(jìn)行修改(對出現(xiàn)的同一個Key的多個Value進(jìn)行合并,并將合并后的新Value替換舊Value)。
PartitionedAppendOnlyMap是一個經(jīng)過優(yōu)化的哈希表,它支持向map中追加數(shù)據(jù),以及修改Key對應(yīng)的Value,但是不支持刪除某個Key及其對應(yīng)的Value。它能夠支持的存儲容量是0.7 * 2 ^ 29 = 375809638。當(dāng)達(dá)到指定存儲容量或者指定限制,就會將map中記錄數(shù)據(jù)Spill到磁盤文件,這個過程和前面的類似,不再累述。
創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件
無論是使用PartitionedPairBuffer,還是使用PartitionedAppendOnlyMap,當(dāng)需要容量滿足Spill條件時,都會將該內(nèi)存結(jié)構(gòu)(buffer/map)中記錄數(shù)據(jù)Spill到磁盤文件,所以Spill到磁盤文件的格式是相同的。對于后續(xù)Block數(shù)據(jù)文件和索引文件的生成邏輯也是相同,如下圖所示:
假設(shè),我們生成的Shuffle Block文件對應(yīng)各個參數(shù)為:shuffleId=2901,mapId=11825,reduceId=0,這里reduceId是一個NOOP_REDUCE_ID,表示與DiskStore進(jìn)行磁盤I/O交互操作,而DiskStore期望對應(yīng)一個(map, reduce)對,但是對于排序的Shuffle輸出,通常Reducer拉取數(shù)據(jù)后只生成一個文件(Reduce文件),所以這里默認(rèn)reduceId為0。經(jīng)過上圖的處理流程,可以生成一個.data文件,也就是Block數(shù)據(jù)文件;一個.index文件,也就是包含了各個Partition在數(shù)據(jù)文件中的偏移位置的索引文件。這個過程生成的文件,示例如下所示:
- shuffle_2901_11825_0.data
- shuffle_2901_11825_0.index
這樣,對于每個RDD的多個Partition進(jìn)行處理后,都會生成對應(yīng)的數(shù)據(jù)文件和索引文件,后續(xù)在Reduce端就可以讀取這些Block文件,這些記錄數(shù)據(jù)在文件中都是經(jīng)過分區(qū)(Partitioned)的。