大數(shù)據(jù)開發(fā):剖析Hadoop和Spark的Shuffle過程差異
一、前言
對于基于MapReduce編程范式的分布式計算來說,本質(zhì)上而言,就是在計算數(shù)據(jù)的交、并、差、聚合、排序等過程。而分布式計算分而治之的思想,讓每個節(jié)點(diǎn)只計算部分?jǐn)?shù)據(jù),也就是只處理一個分片,那么要想求得某個key對應(yīng)的全量數(shù)據(jù),那就必須把相同key的數(shù)據(jù)匯集到同一個Reduce任務(wù)節(jié)點(diǎn)來處理,那么Mapreduce范式定義了一個叫做Shuffle的過程來實(shí)現(xiàn)這個效果。
二、編寫本文的目的
本文旨在剖析Hadoop和Spark的Shuffle過程,并對比兩者Shuffle的差異。
三、Hadoop的Shuffle過程
Shuffle描述的是數(shù)據(jù)從Map端到Reduce端的過程,大致分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge sort)這幾個過程,大體流程如下:
- 
上圖的Map的輸出的文件被分片為紅綠藍(lán)三個分片,這個分片的就是根據(jù)Key為條件來分片的,分片算法可以自己實(shí)現(xiàn),例如Hash、Range等,最終Reduce任務(wù)只拉取對應(yīng)顏色的數(shù)據(jù)來進(jìn)行處理,就實(shí)現(xiàn)把相同的Key拉取到相同的Reduce節(jié)點(diǎn)處理的功能。下面分開來說Shuffle的的各個過程。
Map端做了下圖所示的操作:
- 1、Map端sort
Map端的輸出數(shù)據(jù),先寫環(huán)形緩存區(qū)kvbuffer,當(dāng)環(huán)形緩沖區(qū)到達(dá)一個閥值(可以通過配置文件設(shè)置,默認(rèn)80),便要開始溢寫,但溢寫之前會有一個sort操作,這個sort操作先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個關(guān)鍵字來排序,移動的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。
- 2、spill(溢寫)
- 當(dāng)排序完成,便開始把數(shù)據(jù)刷到磁盤,刷磁盤的過程以分區(qū)為單位,一個分區(qū)寫完,寫下一個分區(qū),分區(qū)內(nèi)數(shù)據(jù)有序,最終實(shí)際上會多次溢寫,然后生成多個文件
- 3、merge(合并)
- spill會生成多個小文件,對于Reduce端拉取數(shù)據(jù)是相當(dāng)?shù)托У模敲催@時候就有了merge的過程,合并的過程也是同分片的合并成一個片段(segment),最終所有的segment組裝成一個最終文件,那么合并過程就完成了,如下圖所示
至此,Map的操作就已經(jīng)完成,Reduce端操作即將登場
Reduce操作
總體過程如下圖的紅框處:
- 
- 1、拉取拷貝(fetch copy)
Reduce任務(wù)通過向各個Map任務(wù)拉取對應(yīng)分片。這個過程都是以Http協(xié)議完成,每個Map節(jié)點(diǎn)都會啟動一個常駐的HTTP server服務(wù),Reduce節(jié)點(diǎn)會請求這個Http Server拉取數(shù)據(jù),這個過程完全通過網(wǎng)絡(luò)傳輸,所以是一個非常重量級的操作。
- 2、合并排序
Reduce端,拉取到各個Map節(jié)點(diǎn)對應(yīng)分片的數(shù)據(jù)之后,會進(jìn)行再次排序,排序完成,結(jié)果丟給Reduce函數(shù)進(jìn)行計算。
四、總結(jié)
至此整個shuffle過程完成,***總結(jié)幾點(diǎn):
- shuffle過程就是為了對key進(jìn)行全局聚合
- 排序操作伴隨著整個shuffle過程,所以Hadoop的shuffle是sort-based的
Spark shuffle相對來說更簡單,因?yàn)椴灰笕钟行?,所以沒有那么多排序合并的操作。Spark shuffle分為write和read兩個過程。我們先來看shuffle write。
- 一、shuffle write
shuffle write的處理邏輯會放到該ShuffleMapStage的***(因?yàn)閟park以shuffle發(fā)生與否來劃分stage,也就是寬依賴),final RDD的每一條記錄都會寫到對應(yīng)的分區(qū)緩存區(qū)bucket,如下圖所示:
說明:
- 上圖有2個CPU,可以同時運(yùn)行兩個ShuffleMapTask
- 每個task將寫一個buket緩沖區(qū),緩沖區(qū)的數(shù)量和reduce任務(wù)的數(shù)量相等
- 每個buket緩沖區(qū)會生成一個對應(yīng)ShuffleBlockFile
- ShuffleMapTask 如何決定數(shù)據(jù)被寫到哪個緩沖區(qū)呢?這個就是跟partition算法有關(guān)系,這個分區(qū)算法可以是hash的,也可以是range的
- 最終產(chǎn)生的ShuffleBlockFile會有多少呢?就是ShuffleMapTask 數(shù)量乘以reduce的數(shù)量,這個是非常巨大的
那么有沒有辦法解決生成文件過多的問題呢?有,開啟FileConsolidation即可,開啟FileConsolidation之后的shuffle過程如下:
在同一核CPU執(zhí)行先后執(zhí)行的ShuffleMapTask可以共用一個bucket緩沖區(qū),然后寫到同一份ShuffleFile里去,上圖所示的ShuffleFile實(shí)際上是用多個ShuffleBlock構(gòu)成,那么,那么每個worker最終生成的文件數(shù)量,變成了cpu核數(shù)乘以reduce任務(wù)的數(shù)量,大大縮減了文件量。
- 二、Shuffle read
Shuffle write過程將數(shù)據(jù)分片寫到對應(yīng)的分片文件,這時候萬事具備,只差去拉取對應(yīng)的數(shù)據(jù)過來計算了。
那么Shuffle Read發(fā)送的時機(jī)是什么?是要等所有ShuffleMapTask執(zhí)行完,再去fetch數(shù)據(jù)嗎?理論上,只要有一個 ShuffleMapTask執(zhí)行完,就可以開始fetch數(shù)據(jù)了,實(shí)際上,spark必須等到父stage執(zhí)行完,才能執(zhí)行子stage,所以,必須等到所有 ShuffleMapTask執(zhí)行完畢,才去fetch數(shù)據(jù)。fetch過來的數(shù)據(jù),先存入一個Buffer緩沖區(qū),所以這里一次性fetch的FileSegment不能太大,當(dāng)然如果fetch過來的數(shù)據(jù)大于每一個閥值,也是會spill到磁盤的。
fetch的過程過來一個buffer的數(shù)據(jù),就可以開始聚合了,這里就遇到一個問題,每次fetch部分?jǐn)?shù)據(jù),怎么能實(shí)現(xiàn)全局聚合呢?以word count的reduceByKey(《Spark RDD操作之ReduceByKey 》)為例,假設(shè)單詞hello有十個,但是一次fetch只拉取了2個,那么怎么全局聚合呢?Spark的做法是用HashMap,聚合操作實(shí)際上是map.put(key,map.get(key)+1),將map中的聚合過的數(shù)據(jù)get出來相加,然后put回去,等到所有數(shù)據(jù)fetch完,也就完成了全局聚合。
- 三、總結(jié)
Hadoop的MapReduce Shuffle和Spark Shuffle差別總結(jié)如下:
- Hadoop的有一個Map完成,Reduce便可以去fetch數(shù)據(jù)了,不必等到所有Map任務(wù)完成,而Spark的必須等到父stage完成,也就是父stage的map操作全部完成才能去fetch數(shù)據(jù)。
- Hadoop的Shuffle是sort-base的,那么不管是Map的輸出,還是Reduce的輸出,都是partion內(nèi)有序的,而spark不要求這一點(diǎn)。
- Hadoop的Reduce要等到fetch完全部數(shù)據(jù),才將數(shù)據(jù)傳入reduce函數(shù)進(jìn)行聚合,而spark是一邊f(xié)etch一邊聚合。