分布式計(jì)算MapReduce究竟是怎么一回事?
?前言
如果要對(duì)文件中的內(nèi)容進(jìn)行統(tǒng)計(jì),大家覺得怎么做呢?一般的思路都是將不同地方的文件數(shù)據(jù)讀取到內(nèi)存中,最后集中進(jìn)行統(tǒng)計(jì)。如果數(shù)據(jù)量少還好,但是面對(duì)海量數(shù)據(jù)、大數(shù)據(jù)的場景這樣真的合適嗎?不合適的話,那有什么比較好的方式進(jìn)行計(jì)算呢?不急,看完本文給你答案。
分布式計(jì)算思想
我們打開思路,既然文件數(shù)據(jù)遍布在各個(gè)節(jié)點(diǎn)上,那么我們就不把文件從各個(gè)節(jié)點(diǎn)加載過來,而是把算法分到各個(gè)節(jié)點(diǎn)進(jìn)行計(jì)算,最后統(tǒng)一進(jìn)行合并處理。這就是所謂的分布式計(jì)算。
? 分布式計(jì)算將該應(yīng)用分解成許多小的部分,分配給多臺(tái)計(jì)算機(jī)進(jìn)行處理。這樣可以節(jié)約整體計(jì)算時(shí)間,大大提高計(jì)算效率。
整個(gè)思想的核心就是“先分再合,分而治之”。所謂“分而治之”就是把一個(gè)復(fù)雜的問題,按照一定的“分解”方法分為等價(jià)的規(guī)模較小的若干部分,然后逐個(gè)解決,分別找出各部分的結(jié)果,然后把各部分的結(jié)果組成整個(gè)問題的最終結(jié)果。 ?
那么Hadoop也借鑒了這樣的思想,設(shè)計(jì)出了MapReduce計(jì)算框架。那么MapReduce框架具體設(shè)計(jì)上有什么亮點(diǎn)呢?
MapReduce設(shè)計(jì)思想
Hadoop在設(shè)計(jì)MapReduce的時(shí)候,吸取了分布式計(jì)算中分而治之的思想,同時(shí)需要考慮更多細(xì)節(jié)的問題。
(1)如何對(duì)付大數(shù)據(jù)處理場景
對(duì)相互間不具有計(jì)算依賴關(guān)系的大數(shù)據(jù)計(jì)算任務(wù),實(shí)現(xiàn)并行最自然的辦法就是采取MapReduce分而治之的策略。
首先Map階段進(jìn)行拆分,把大數(shù)據(jù)拆分成若干份小數(shù)據(jù),多個(gè)程序同時(shí)并行計(jì)算產(chǎn)生中間結(jié)果;然后是Reduce聚
合階段,通過程序?qū)Σ⑿械慕Y(jié)果進(jìn)行最終的匯總計(jì)算,得出最終的結(jié)果。 ?
不可拆分的計(jì)算任務(wù)或相互間有依賴關(guān)系的數(shù)據(jù)無法進(jìn)行并行計(jì)算。
(2)構(gòu)建抽象編程模型
MapReduce借鑒了函數(shù)式語言中的思想,用Map和Reduce兩個(gè)函數(shù)提供了高層的并行編程抽象模型。
map: 對(duì)一組數(shù)據(jù)元素進(jìn)行某種重復(fù)式的處理;
reduce: 對(duì)Map的中間結(jié)果進(jìn)行某種進(jìn)一步的結(jié)果整理。
MapReduce中定義了如下的Map和Reduce兩個(gè)抽象的編程接口,由用戶去編程實(shí)現(xiàn):
通過以上兩個(gè)編程接口,大家可以看出MapReduce?處理的數(shù)據(jù)類型是<key,value>鍵值對(duì)。
(3)統(tǒng)一架構(gòu)、隱藏底層細(xì)節(jié)
如何提供統(tǒng)一的計(jì)算框架,如果沒有統(tǒng)一封裝底層細(xì)節(jié),那么程序員則需要考慮諸如數(shù)據(jù)存儲(chǔ)、劃分、分發(fā)、結(jié)果
收集、錯(cuò)誤恢復(fù)等諸多細(xì)節(jié);為此,MapReduce設(shè)計(jì)并提供了統(tǒng)一的計(jì)算框架,為程序員隱藏了絕大多數(shù)系統(tǒng)層
面的處理細(xì)節(jié)。
MapReduce最大的亮點(diǎn)在于通過抽象模型和計(jì)算框架把需要做什么(what need to do)?與具體怎么做(how to do)分開了,為程序員提供一個(gè)抽象和高層的編程接口和框架。
程序員僅需要關(guān)心其應(yīng)用層的具體計(jì)算問題,僅需編寫少量的處理應(yīng)用本身計(jì)算問題的業(yè)務(wù)程序代碼。 ?
至于如何具體完成這個(gè)并行計(jì)算任務(wù)所相關(guān)的諸多系統(tǒng)層細(xì)節(jié)被隱藏起來,交給計(jì)算框架去處理: 從分布代碼的執(zhí)行,到大到數(shù)千小到單個(gè)節(jié)點(diǎn)集群的自動(dòng)調(diào)度使用。
MapReduce介紹
Hadoop MapReduce是一個(gè)分布式計(jì)算框架,用于輕松編寫分布式應(yīng)用程序,這些應(yīng)用程序以可靠,容錯(cuò)的方式并行處理大型硬件集群(數(shù)千個(gè)節(jié)點(diǎn))上的大量數(shù)據(jù)(多TB數(shù)據(jù)集)。
MapReduce是一種面向海量數(shù)據(jù)處理的一種指導(dǎo)思想,也是一種用于對(duì)大規(guī)模數(shù)據(jù)進(jìn)行分布式計(jì)算的編程模型。
MapReduce特點(diǎn)
易于編程
Mapreduce框架提供了用于二次開發(fā)的接口;簡單地實(shí)現(xiàn)一些接口,就可以完成一個(gè)分布式程序。任務(wù)計(jì)算交給計(jì)算框架去處理,將分布式程序部署到hadoop集群上運(yùn)行,集群節(jié)點(diǎn)可以擴(kuò)展到成百上千個(gè)等。
良好的擴(kuò)展性
當(dāng)計(jì)算機(jī)資源不能得到滿足的時(shí)候,可以通過增加機(jī)器來擴(kuò)展它的計(jì)算能力。基于MapReduce的分布式計(jì)算得特點(diǎn)可以隨節(jié)點(diǎn)數(shù)目增長保持近似于線性的增長,這個(gè)特點(diǎn)是MapReduce處理海量數(shù)據(jù)的關(guān)鍵,通過將計(jì)算節(jié)點(diǎn)增至幾百或者幾千可以很容易地處理數(shù)百TB甚至PB級(jí)別的離線數(shù)據(jù)。
高容錯(cuò)性
Hadoop集群是分布式搭建和部署得,任何單一機(jī)器節(jié)點(diǎn)宕機(jī)了,它可以把上面的計(jì)算任務(wù)轉(zhuǎn)移到另一個(gè)節(jié)點(diǎn)上運(yùn)行,不影響整個(gè)作業(yè)任務(wù)得完成,過程完全是由Hadoop內(nèi)部完成的。
適合海量數(shù)據(jù)的離線處理
可以處理GB、TB和PB級(jí)別得數(shù)據(jù)量。
MapReduce局限性
MapReduce雖然有很多的優(yōu)勢(shì),也有相對(duì)得局限性,局限性不代表不能做,而是在有些場景下實(shí)現(xiàn)的效果比較差,并不適合用MapReduce來處理,主要表現(xiàn)在以下結(jié)果方面:
實(shí)時(shí)計(jì)算性能差
MapReduce主要應(yīng)用于離線作業(yè),無法作到秒級(jí)或者是亞秒級(jí)得數(shù)據(jù)響應(yīng)。
不能進(jìn)行流式計(jì)算
流式計(jì)算特點(diǎn)是數(shù)據(jù)是源源不斷得計(jì)算,并且數(shù)據(jù)是動(dòng)態(tài)的;而MapReduce作為一個(gè)離線計(jì)算框架,主要是針對(duì)靜態(tài)數(shù)據(jù)集得,數(shù)據(jù)是不能動(dòng)態(tài)變化得。
MapReduce實(shí)戰(zhàn)
WordCount算是大數(shù)據(jù)計(jì)算領(lǐng)域經(jīng)典的入門案例,相當(dāng)于Hello World。主要是統(tǒng)計(jì)指定文件中,每個(gè)單詞出現(xiàn)的總次數(shù)。
雖然WordCount業(yè)務(wù)極其簡單,但是希望能夠通過案例感受背后MapReduce的執(zhí)行流程和默認(rèn)的行為機(jī)制,這才是關(guān)鍵。
Map階段代碼實(shí)現(xiàn)
實(shí)現(xiàn)了map接口,把輸入的數(shù)據(jù)經(jīng)過切割,全部標(biāo)記1,因此輸出就是<單詞,1>。
Reduce階段代碼實(shí)現(xiàn)
實(shí)現(xiàn)了reduce接口,對(duì)所有的1進(jìn)行累加求和,就是單詞的總次數(shù)
啟動(dòng)代碼
可以參考官方例子:https://github.com/apache/hadoop/blob/branch-3.3.0/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java
運(yùn)行
- 第一個(gè)參數(shù):wordcount表示執(zhí)行單詞統(tǒng)計(jì)任務(wù);
- 第二個(gè)參數(shù):指定輸入文件的路徑;
- 第三個(gè)參數(shù):指定輸出結(jié)果的路徑(該路徑不能已存在)
查看運(yùn)行結(jié)果
最終可以在/output目錄下看到輸出的結(jié)果
MapReduce執(zhí)行流程
從資源運(yùn)行層面,一個(gè)完整的MapReduce程序在分布式運(yùn)行時(shí)有三類程序,如下所示:
- MRAppMaster:負(fù)責(zé)整個(gè)MR程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
- MapTask:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程
- ReduceTask:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程
MapReduce任務(wù)優(yōu)先會(huì)提交到Y(jié)arn組件上,這個(gè)主要是用來管理資源的,因?yàn)橛?jì)算需要CPU、內(nèi)存等資源。首先會(huì)運(yùn)行1個(gè)MRAppMaster?程序,主要負(fù)責(zé)整個(gè)MR程序的過程調(diào)度及狀態(tài)協(xié)調(diào)。然后運(yùn)行多個(gè)MapTask?,最后運(yùn)行ReduceTask。
從業(yè)務(wù)邏輯層面上,以上面的wordCount為例,它的運(yùn)行流程如下圖所示:
Map階段執(zhí)行流程
- 第一階段:把輸入目錄下文件按照一定的標(biāo)準(zhǔn)逐個(gè)進(jìn)行邏輯切片,形成切片規(guī)劃。默認(rèn)Split size = Block size(128M)?,每一個(gè)切片由一個(gè)MapTask處理。
- 第二階段:對(duì)切片中的數(shù)據(jù)按照一定的規(guī)則讀取解析返回<key,value>對(duì)。默認(rèn)是按行讀取數(shù)據(jù)。key是每一行的起始位置偏移量,value是本行的文本內(nèi)容。
- 第三階段:調(diào)用Mapper類中的map方法處理數(shù)據(jù)。每讀取解析出來的一個(gè)<key,value>,調(diào)用一次map方法。
- 第四階段:按照一定的規(guī)則對(duì)Map輸出的鍵值對(duì)進(jìn)行分區(qū)partition。默認(rèn)不分區(qū),因?yàn)橹挥幸粋€(gè)reducetask。分區(qū)的數(shù)量就是reducetask運(yùn)行的數(shù)量。
- 第五階段:Map輸出數(shù)據(jù)寫入內(nèi)存緩沖區(qū),達(dá)到比例溢出到磁盤上。溢出spill的時(shí)候根據(jù)key進(jìn)行排序sort。默認(rèn)根據(jù)key字典序排序。
- 第六階段:對(duì)所有溢出文件進(jìn)行最終的merge合并,成為一個(gè)文件。
Reduce階段執(zhí)行過程
- 第一階段:ReduceTask會(huì)主動(dòng)從MapTask復(fù)制拉取屬于需要自己處理的數(shù)據(jù)。
- 第二階段:把拉取來數(shù)據(jù),全部進(jìn)行合并merge,即把分散的數(shù)據(jù)合并成一個(gè)大的數(shù)據(jù)。再對(duì)合并后的數(shù)據(jù)排序。 ?
- 第三階段是對(duì)排序后的鍵值對(duì)調(diào)用reduce方法。鍵相等的鍵值對(duì)調(diào)用一次reduce方法。最后把這些輸出的鍵值對(duì)寫入到HDFS文件中。
shuffle階段
- Shuffle的本意是洗牌、混洗的意思,把一組有規(guī)則的數(shù)據(jù)盡量打亂成無規(guī)則的數(shù)據(jù)。
- 而在MapReduce中,Shuffle更像是洗牌的逆過程,指的是將map端的無規(guī)則輸出按指定的規(guī)則“打亂”成具有一定規(guī)則的數(shù)據(jù),以便reduce端接收處理。 ?
- 一般把從Map產(chǎn)生輸出開始到Reduce取得數(shù)據(jù)作為輸入之前的過程稱作shuffle。
以上就是整個(gè)MapReduce執(zhí)行的整個(gè)流程。
總結(jié)
MapReduce是Hadoop提供的一個(gè)分布式計(jì)算框架,對(duì)于大數(shù)據(jù)開發(fā)人員來說,只要關(guān)注于自己的業(yè)務(wù),實(shí)現(xiàn)他們提供的Map和Reduce接口,接下來底層都交給Hadoop來處理。但是MapReduce已經(jīng)日薄西山,企業(yè)用的也越來越少了,慢慢被Spark、Flink等計(jì)算引擎代替,主要原因還是太慢,比如shuffle階段中頻繁涉及到數(shù)據(jù)在內(nèi)存、磁盤之間的多次往復(fù),但是這種計(jì)算思想還是很值得一學(xué)的。