自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

大數(shù)據(jù)開發(fā):剖析Hadoop和Spark的Shuffle過程差異

大數(shù)據(jù) Hadoop Spark
本文旨在剖析Hadoop和Spark的Shuffle過程,并對比兩者Shuffle的差異。Shuffle描述的是數(shù)據(jù)從Map端到Reduce端的過程,大致分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge sort)這幾個過程。

一、前言

對于基于MapReduce編程范式的分布式計算來說,本質(zhì)上而言,就是在計算數(shù)據(jù)的交、并、差、聚合、排序等過程。而分布式計算分而治之的思想,讓每個節(jié)點(diǎn)只計算部分?jǐn)?shù)據(jù),也就是只處理一個分片,那么要想求得某個key對應(yīng)的全量數(shù)據(jù),那就必須把相同key的數(shù)據(jù)匯集到同一個Reduce任務(wù)節(jié)點(diǎn)來處理,那么Mapreduce范式定義了一個叫做Shuffle的過程來實(shí)現(xiàn)這個效果。

二、編寫本文的目

本文旨在剖析Hadoop和Spark的Shuffle過程,并對比兩者Shuffle的差異。

三、Hadoop的Shuffle過程

Shuffle描述的是數(shù)據(jù)從Map端到Reduce端的過程,大致分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge sort)這幾個過程,大體流程如下:

  1. ![image](https://yqfile.alicdn.com/e4ccedfb6ccaaa0d3c0ad5b3b7ab83d96dd9fed2.png) 

上圖的Map的輸出的文件被分片為紅綠藍(lán)三個分片,這個分片的就是根據(jù)Key為條件來分片的,分片算法可以自己實(shí)現(xiàn),例如Hash、Range等,最終Reduce任務(wù)只拉取對應(yīng)顏色的數(shù)據(jù)來進(jìn)行處理,就實(shí)現(xiàn)把相同的Key拉取到相同的Reduce節(jié)點(diǎn)處理的功能。下面分開來說Shuffle的的各個過程。

Map端做了下圖所示的操作:

  1. 1、Map端sort 

Map端的輸出數(shù)據(jù),先寫環(huán)形緩存區(qū)kvbuffer,當(dāng)環(huán)形緩沖區(qū)到達(dá)一個閥值(可以通過配置文件設(shè)置,默認(rèn)80),便要開始溢寫,但溢寫之前會有一個sort操作,這個sort操作先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個關(guān)鍵字來排序,移動的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。 

  1. 2、spill(溢寫) 
  2.  
  3. 當(dāng)排序完成,便開始把數(shù)據(jù)刷到磁盤,刷磁盤的過程以分區(qū)為單位,一個分區(qū)寫完,寫下一個分區(qū),分區(qū)內(nèi)數(shù)據(jù)有序,最終實(shí)際上會多次溢寫,然后生成多個文件 
  4.  
  5. 3、merge(合并) 
  6.  
  7. spill會生成多個小文件,對于Reduce端拉取數(shù)據(jù)是相當(dāng)?shù)托У模敲催@時候就有了merge的過程,合并的過程也是同分片的合并成一個片段(segment),最終所有的segment組裝成一個最終文件,那么合并過程就完成了,如下圖所示 

 

大數(shù)據(jù)開發(fā):剖析Hadoop和Spark的Shuffle過程差異

至此,Map的操作就已經(jīng)完成,Reduce端操作即將登場

Reduce操作

總體過程如下圖的紅框處: 

  1. ![image](https://yqfile.alicdn.com/71a52ed4799d3dbbde4552028f3aea05bc1c98c0.png) 
  2.  
  3. 1、拉取拷貝(fetch copy) 

Reduce任務(wù)通過向各個Map任務(wù)拉取對應(yīng)分片。這個過程都是以Http協(xié)議完成,每個Map節(jié)點(diǎn)都會啟動一個常駐的HTTP server服務(wù),Reduce節(jié)點(diǎn)會請求這個Http Server拉取數(shù)據(jù),這個過程完全通過網(wǎng)絡(luò)傳輸,所以是一個非常重量級的操作。

  1. 2、合并排序 

Reduce端,拉取到各個Map節(jié)點(diǎn)對應(yīng)分片的數(shù)據(jù)之后,會進(jìn)行再次排序,排序完成,結(jié)果丟給Reduce函數(shù)進(jìn)行計算。

四、總結(jié)

至此整個shuffle過程完成,***總結(jié)幾點(diǎn):

  1. shuffle過程就是為了對key進(jìn)行全局聚合
  2. 排序操作伴隨著整個shuffle過程,所以Hadoop的shuffle是sort-based的

Spark shuffle相對來說更簡單,因?yàn)椴灰笕钟行?,所以沒有那么多排序合并的操作。Spark shuffle分為write和read兩個過程。我們先來看shuffle write。

  • 一、shuffle write

shuffle write的處理邏輯會放到該ShuffleMapStage的***(因?yàn)閟park以shuffle發(fā)生與否來劃分stage,也就是寬依賴),final RDD的每一條記錄都會寫到對應(yīng)的分區(qū)緩存區(qū)bucket,如下圖所示:

大數(shù)據(jù)開發(fā):剖析Hadoop和Spark的Shuffle過程差異

說明:

  1. 上圖有2個CPU,可以同時運(yùn)行兩個ShuffleMapTask
  2. 每個task將寫一個buket緩沖區(qū),緩沖區(qū)的數(shù)量和reduce任務(wù)的數(shù)量相等
  3. 每個buket緩沖區(qū)會生成一個對應(yīng)ShuffleBlockFile
  4. ShuffleMapTask 如何決定數(shù)據(jù)被寫到哪個緩沖區(qū)呢?這個就是跟partition算法有關(guān)系,這個分區(qū)算法可以是hash的,也可以是range的
  5. 最終產(chǎn)生的ShuffleBlockFile會有多少呢?就是ShuffleMapTask 數(shù)量乘以reduce的數(shù)量,這個是非常巨大的

那么有沒有辦法解決生成文件過多的問題呢?有,開啟FileConsolidation即可,開啟FileConsolidation之后的shuffle過程如下:

大數(shù)據(jù)開發(fā):剖析Hadoop和Spark的Shuffle過程差異

在同一核CPU執(zhí)行先后執(zhí)行的ShuffleMapTask可以共用一個bucket緩沖區(qū),然后寫到同一份ShuffleFile里去,上圖所示的ShuffleFile實(shí)際上是用多個ShuffleBlock構(gòu)成,那么,那么每個worker最終生成的文件數(shù)量,變成了cpu核數(shù)乘以reduce任務(wù)的數(shù)量,大大縮減了文件量。

  • 二、Shuffle read

Shuffle write過程將數(shù)據(jù)分片寫到對應(yīng)的分片文件,這時候萬事具備,只差去拉取對應(yīng)的數(shù)據(jù)過來計算了。

那么Shuffle Read發(fā)送的時機(jī)是什么?是要等所有ShuffleMapTask執(zhí)行完,再去fetch數(shù)據(jù)嗎?理論上,只要有一個 ShuffleMapTask執(zhí)行完,就可以開始fetch數(shù)據(jù)了,實(shí)際上,spark必須等到父stage執(zhí)行完,才能執(zhí)行子stage,所以,必須等到所有 ShuffleMapTask執(zhí)行完畢,才去fetch數(shù)據(jù)。fetch過來的數(shù)據(jù),先存入一個Buffer緩沖區(qū),所以這里一次性fetch的FileSegment不能太大,當(dāng)然如果fetch過來的數(shù)據(jù)大于每一個閥值,也是會spill到磁盤的。

fetch的過程過來一個buffer的數(shù)據(jù),就可以開始聚合了,這里就遇到一個問題,每次fetch部分?jǐn)?shù)據(jù),怎么能實(shí)現(xiàn)全局聚合呢?以word count的reduceByKey(《Spark RDD操作之ReduceByKey 》)為例,假設(shè)單詞hello有十個,但是一次fetch只拉取了2個,那么怎么全局聚合呢?Spark的做法是用HashMap,聚合操作實(shí)際上是map.put(key,map.get(key)+1),將map中的聚合過的數(shù)據(jù)get出來相加,然后put回去,等到所有數(shù)據(jù)fetch完,也就完成了全局聚合。

  • 三、總結(jié)

Hadoop的MapReduce Shuffle和Spark Shuffle差別總結(jié)如下:

  1. Hadoop的有一個Map完成,Reduce便可以去fetch數(shù)據(jù)了,不必等到所有Map任務(wù)完成,而Spark的必須等到父stage完成,也就是父stage的map操作全部完成才能去fetch數(shù)據(jù)。
  2. Hadoop的Shuffle是sort-base的,那么不管是Map的輸出,還是Reduce的輸出,都是partion內(nèi)有序的,而spark不要求這一點(diǎn)。
  3. Hadoop的Reduce要等到fetch完全部數(shù)據(jù),才將數(shù)據(jù)傳入reduce函數(shù)進(jìn)行聚合,而spark是一邊f(xié)etch一邊聚合。
責(zé)任編輯:未麗燕 來源: 阿里云棲社區(qū)
相關(guān)推薦

2013-05-06 10:22:28

大數(shù)據(jù)Hadoop

2021-12-14 09:56:51

HadoopSparkKafka

2016-10-12 09:41:45

Hadoop+Spar大數(shù)據(jù)開發(fā)

2015-07-23 14:29:28

大數(shù)據(jù)sparkhadoop

2017-02-14 13:11:23

HadoopStormSamza

2017-06-07 12:25:37

Shuffle代碼Map階段處理

2019-07-22 10:45:31

2017-10-19 08:28:15

大數(shù)據(jù)HadoopSpark

2019-04-24 13:07:16

HadoopSpark分布式架構(gòu)

2021-03-15 14:02:21

大數(shù)據(jù)數(shù)據(jù)開發(fā)Spark

2012-10-09 10:51:51

大數(shù)據(jù)數(shù)據(jù)中心大數(shù)據(jù)應(yīng)用

2017-02-10 09:00:03

HadoopSparkStorm

2017-07-13 11:13:18

大數(shù)據(jù)數(shù)據(jù)存儲

2022-07-20 15:10:38

Docker大數(shù)據(jù)平臺

2018-01-22 08:33:28

SparkHadoop計算

2015-03-04 11:19:59

2017-10-25 14:15:55

大數(shù)據(jù)Hadoop維度建模

2017-11-27 14:12:34

大數(shù)據(jù)Hadoop數(shù)據(jù)分析

2023-03-30 09:06:20

HiveSpark大數(shù)據(jù)

2017-07-21 14:22:17

大數(shù)據(jù)大數(shù)據(jù)平臺數(shù)據(jù)處理
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號