聊聊MapReduce處理過程中的數(shù)據(jù)類型與數(shù)據(jù)格式
MapReduce處理過程總覽
對于MP的處理過程我想大部分人都已經(jīng)知道了其原理,思路不難,這是肯定的,但是整個(gè)過程中需要考慮的細(xì)枝末節(jié)的點(diǎn)還挺多的,MP的輸入輸出格式就是其中的一點(diǎn),那本文就帶領(lǐng)大家看看MP中的格式設(shè)置問題。
map函數(shù),起到了如下的轉(zhuǎn)換作用:map:(k1,v1)—>list(k2,v2)
reduce 函數(shù),則起到了這樣的格式轉(zhuǎn)換作用:reduce:(k2,list(v2))—>list(k3,v3)
怎么,你說你看不懂?那看來你還是沒有對mapreduce的過程有所理解,看看這幅圖,不需要解釋,你就明白上面的格式轉(zhuǎn)化是什么意思了:
上面這幅圖出自我的另外一篇博文:編寫自己的***個(gè)Hadoop實(shí)例,如果你設(shè)置了combiner函數(shù),那么中間的格式轉(zhuǎn)化將會(huì)是這個(gè)樣子:
- map:(k1,v1)—>list(k2,v2)
- combiner:(k2,list(v2))—>list(k2,v2)
- reduce:(k2,list(v2))—>list(k3,v3)
是的,沒錯(cuò),combiner從功能上來講就是一個(gè)reducer,它的存在大大減小了reducer的壓力。
partition函數(shù)對中間結(jié)果(k2,v2)進(jìn)行處理,返回一個(gè)索引值,即分區(qū)號(hào)
- partition:(k2,v2)—>integer
在前面的文章中,也就是我的博客:MapReduce輸入分片詳解中,我提到了分片是與map函數(shù)數(shù)量相等,同時(shí)它不是咱們想的那樣是一個(gè)實(shí)物分片,在程序上輸入分片在java中表現(xiàn)為InputSplit接口
- public abstract class InputSpilt{
- public abstract long getLength();
- public abstract String[] getLoacations();
- }
存儲(chǔ)位置供mapreduce使用,以便使map任務(wù)盡量在分片附近。分片大小是用來對分片進(jìn)行排序,以便優(yōu)先處理***的***分片,從而最小化時(shí)間。InputSplit不需要MR開發(fā)人直接處理,而是由InputFormat創(chuàng)建。
客戶端通過調(diào)用InputFormat的getSplits()計(jì)算分片,然后將他們送到application master(或jobtracker),am使用存儲(chǔ)位置信息調(diào)度map任務(wù)在tasktracker上處理這些分片數(shù)據(jù)。map任務(wù)把輸入分片傳遞給InputFormat的getRecordReader()方法來獲得這個(gè)分片的RecordReader。RecordReader類似迭代器,對map任務(wù)進(jìn)行迭代,來生成鍵/值對,然后傳遞給map函數(shù)。也就是說InputFormat不僅僅可以計(jì)算分片,進(jìn)行數(shù)據(jù)分割,還可以對分片進(jìn)行迭代,也就是說獲得分片的迭代器,所有有關(guān)分片的操作都由InputFormat來支持,可見其強(qiáng)大性。
輸入格式
那既然InputFormat這么牛逼,那我們就來看看這個(gè)接口到底包含了什么,先來看看下面這張圖:
FileInputFormat類
FileInputFormat是所有文件作為數(shù)據(jù)源的InputFormat的實(shí)現(xiàn)類,主要有兩個(gè)功能:指定輸入文件位置和輸入文件生成分片的實(shí)現(xiàn)代碼段。換句話說,它并不生成分片,只是返回文件位置,并且實(shí)現(xiàn)了分片算法。
FileInputFormat指定輸入路徑
- addInputPath(Job job,Path path);
- addInputPaths(Job job,String paths);
- setInputPaths(Job job,Path ...inputPaths);
可以添加一個(gè)路徑或者多個(gè)路徑,其中setInputPaths是以此設(shè)定完成的路徑列表。其中路徑可以是一個(gè)文件、一個(gè)目錄、或者一個(gè)glob(通配,通過通配符來獲取路徑),當(dāng)路徑是一個(gè)目錄的時(shí)候表示包含目錄下的所有文件。當(dāng)目錄中包含目錄的時(shí)候,這個(gè)目錄也會(huì)被解釋稱文件,所以會(huì)報(bào)錯(cuò)??梢酝ㄟ^使用一個(gè)文件glob或者一個(gè)過濾器根據(jù)命名模式限定選擇目錄中的文件。還可以通過設(shè)置屬性mapred.input.dir.recursive為true強(qiáng)制對目錄進(jìn)行遞歸讀取。如果需要排除目錄中的個(gè)別文件,可以通過setInputPathFileter()設(shè)置一個(gè)過濾器來進(jìn)行過濾,如果不設(shè)置過濾器,也會(huì)有默認(rèn)的過濾器排除隱藏文件(以.和_開頭的)。路徑和過濾器業(yè)可以使用配置文件進(jìn)行配置:mapred.input.dir和mapred.input.path.Fileter.class
小文件處理
(小文件是指比HDFS塊小很多)在Hadoop中使用小文件的弊端:
(1)、增加map開銷,因?yàn)槊總€(gè)分片都要執(zhí)行一次map任務(wù),map操作會(huì)造成額外的開銷
(2)、MapReduce處理數(shù)據(jù)的***速度就是和集群中的傳輸速度相同,而處理小文件將增加作業(yè)的尋址次數(shù)
(3)、浪費(fèi)namenode的內(nèi)存
解決方法:
使用SequenceFile將這些小文件合并成一個(gè)大文件或多個(gè)大文件:將文件名作為鍵,文本內(nèi)容作為值。
但是如果HDFS中已經(jīng)存在的大批小文件,可以使用CombinerFileInputFormat。
CombinerFileInputFormat把多個(gè)文件打包成一個(gè)文件以便每個(gè)mapper能夠處理更過的數(shù)據(jù)
避免切分
有時(shí)候不需要將文件進(jìn)行切分,mapper完整處理每個(gè)輸入文件。例如檢查一個(gè)文件的所有記錄是否有序。
可以通過設(shè)置最小分片大小大于要處理的文件。第二種就是使用FileInputFormat的具體子類,并且重載isSplitable()方法,把返回值設(shè)置為false。
mapper中的信息
通過調(diào)用Mapper中的Context的getInputSolit()返回一個(gè)InputSplit,如果使用的是FileInputFormat,則可以強(qiáng)轉(zhuǎn)為FileSplit,然后用此訪問正在輸入文件的路徑getPath(),分片開始處的字節(jié)偏移量,getLength()分片的長度。
TextInputFormat
文本輸入是默認(rèn)的InputFormat,每條記錄是一行輸入,鍵是LongWritable類型,存儲(chǔ)該記錄在整個(gè)文件的字節(jié)偏移量。值是該行的內(nèi)容,不包括終止符(回車、換行等),它被打包成Text對象。
KeyValueTextInputFormat
當(dāng)文件中的每一行是一個(gè)鍵/值對,使用某個(gè)分界符進(jìn)行分割,如制表符。可以通過mapreduce.input.keyvaluelinerecordreader.key.value.seperator屬性來指定分隔符。默認(rèn)是一個(gè)制表符。其中這個(gè)鍵是分隔符前的文本,值是分隔符后的文本,其類型都是Text類型。如:
- line1:this is line1 text
- line2:this is line2 text
則被分為兩條記錄,分別是:
- (line1,this is line1 text)
- (line2,this is line2 text)
NLineInputFormat
在TextInputFormat和KeyValueTextInputFormat中,每個(gè)mapper收到的輸入行數(shù)并不確定,行數(shù)取決于輸入分片的大小和行的長度。如果希望mapper收到固定行數(shù)的輸入,可以使用NLineInputFormat作為InputFormat。與TextInputFormat奕揚(yáng),鍵是文件中行的字節(jié)偏移量,值是行的內(nèi)容。
N是每個(gè)mapper收到的輸入行數(shù),默認(rèn)是1??梢酝ㄟ^mapreduce.input.lineinputformat.linespermap屬性設(shè)置。如:
On the top of the Crumetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
當(dāng)N為2的時(shí)候,每個(gè)輸入分片包含兩行。
(0,On the top of the Crumetty Tree)
(33,The Quangle Wangle sat,)
另一個(gè)mapper則收到后兩行
(57,But his face you could not see,)
(89,On account of his Beaver Hat.)
StreamInputFormat
當(dāng)解析XMl文件的時(shí)候可以使用StreamInputFormat,將stream.recordreader.class屬性設(shè)置為org.apache.Hadoop.Streaming.StreamXmlRecordReader使用StreamXmlRecordReader類。具體實(shí)現(xiàn)(沒用過)可以查看該類官方文檔
SequenceFileInputFormat
Hadoop順序文件格式存儲(chǔ)二進(jìn)制的鍵/值對的序列。當(dāng)需要使用順序文件作為MapReduce的輸入時(shí),應(yīng)該使用SequenceFileInputFormat。鍵和值由順序文件指定,只需要保證map輸入的類型匹配。
SequenceFileAsTextInputFormat
SequenceFileAsTextInputFormat是SequenceFileInputFormat的變體,將順序文件的鍵和值轉(zhuǎn)化為Text對象。
SequenceFileAsBinaryInputFormat
SequenceFileAsBinaryInputFormat是SequenceFileInputFormat的一種變體,獲取順序文件的鍵和值作為二進(jìn)制對象。
MutipleInputs
一個(gè)MapReduce作業(yè)可能由多個(gè)輸入文件,但所有文件都由同一個(gè)InputFormat和同一個(gè)mapper來處理。但是數(shù)據(jù)格式卻有所不同,需要對不同的數(shù)據(jù)集進(jìn)行連接操作。可以使用MutipleInputs類處理
DBInputFormat
DBInputFormat用于使用JDBC從關(guān)系數(shù)據(jù)庫中讀取數(shù)據(jù)。需要注意在數(shù)據(jù)庫中運(yùn)行太多mapper讀取數(shù)據(jù),可能會(huì)使數(shù)據(jù)庫受不了,所以一般使用DBInputFormat加載少量數(shù)據(jù)??梢袁F(xiàn)將數(shù)據(jù)導(dǎo)入到HDFS中,一般將關(guān)系性數(shù)據(jù)庫中數(shù)據(jù)導(dǎo)入到HDFS中可以使用Sqoop
輸出格式
TextOutputFormat
默認(rèn)的輸出模式TextOutputFormat,每條記錄寫為一行。鍵和值是任意的,因?yàn)門extOutputFormat都要將其toString()轉(zhuǎn)換為字符串。鍵值默認(rèn)使用制表符分割,可以使用mapreduce.output.textoutputformat.separator屬性改變分割符
SequenceFileOutputFormat
將輸出寫為一個(gè)順序文件,當(dāng)輸出需要作為后續(xù)的MapReduce輸入的時(shí)候,這種輸出非常合適,因?yàn)樗袷骄o湊,容易被壓縮。
SequenceFileAsBinaryOutputFormat
SequenceFileAsBinaryOutputForamt與SequenceFileAsBinaryInputFormat對應(yīng),將輸出的鍵和值作為二進(jìn)制格式寫到SequenceFile容器中。
MultipleOutputFormat
有時(shí)候可能需要將每個(gè)reduce輸出多個(gè)文件,可以使用MutltipleOutputFormat。
LazyOutputFormat
延遲輸出,他是封裝輸出格式,可以保證指定分區(qū)***條記錄輸出時(shí)才真正創(chuàng)建文件。