hadoop中mapreduce的常用類
寫這個(gè)文章的時(shí)候才意識(shí)到新舊API是同時(shí)存在于1.1.2的hadoop中的。以前還一直納悶兒為什么有時(shí)候是jobClient提交任務(wù),有時(shí)是Job...不管API是否更新,下面這些類也還是存在于API中的,經(jīng)過自己跟蹤源碼,發(fā)現(xiàn)原理還是這些。只不過進(jìn)行了重新組織,進(jìn)行了一些封裝,使得擴(kuò)展性更好。所以還是把這些東西從記事本貼進(jìn)來吧。
關(guān)于這些類的介紹以及使用,有的是在自己debug中看到的,多數(shù)為純翻譯API的注釋,但是翻譯的過程受益良多。
GenericOptionsParser
parseGeneralOptions(Options opts, Configuration conf, String[] args)解析命令行參數(shù)
GenericOptionsParser是為hadoop框架解析命令行參數(shù)的工具類。它能夠辨認(rèn)標(biāo)準(zhǔn)的命令行參數(shù),使app能夠輕松指定namenode,jobtracker,以及額外的配置資源或信息等。它支持的功能有:
-conf 指定配置文件;
-D 指定配置信息;
-fs 指定namenode
-jt 指定jobtracker
-files 指定需要copy到MR集群的文件,以逗號(hào)分隔
-libjars指定需要copy到MR集群的classpath的jar包,以逗號(hào)分隔
-archives指定需要copy到MR集群的壓縮文件,以逗號(hào)分隔,會(huì)自動(dòng)解壓縮
1. String[] otherArgs = new GenericOptionsParser(job, args)
2. .getRemainingArgs();
3. if (otherArgs.length != 2) {
4. System.err.println("Usage: wordcount
5. System.exit(2);
6. }
ToolRunner
用來跑實(shí)現(xiàn)Tool接口的工具。它與GenericOptionsParser合作來解析命令行參數(shù),只在此次運(yùn)行中更改configuration的參數(shù)。
Tool
處理命令行參數(shù)的接口。Tool是MR的任何tool/app的標(biāo)準(zhǔn)。這些實(shí)現(xiàn)應(yīng)該代理對(duì)標(biāo)準(zhǔn)命令行參數(shù)的處理。下面是典型實(shí)現(xiàn):
- public class MyApp extends Configured implements Tool {
- public int run(String[] args) throws Exception {
- // 即將被ToolRunner執(zhí)行的Configuration
- Configuration conf = getConf();
- // 使用conf建立JobConf
- JobConf job = new JobConf(conf, MyApp.class);
- // 執(zhí)行客戶端參數(shù)
- Path in = new Path(args[1]);
- Path out = new Path(args[2]);
- // 指定job相關(guān)的參數(shù)
- job.setJobName("my-app");
- job.setInputPath(in);
- job.setOutputPath(out);
- job.setMapperClass(MyApp.MyMapper.class);
- job.setReducerClass(MyApp.MyReducer.class);
- *
- // 提交job,然后監(jiān)視進(jìn)度直到j(luò)ob完成
- JobClient.runJob(job);
- }
- public static void main(String[] args) throws Exception {
- // 讓ToolRunner 處理命令行參數(shù)
- int res = ToolRunner.run(new Configuration(), new Sort(), //這里封裝了GenericOptionsParser解析args
- System.exit(res);
- }
- }
MultipleOutputFormat
自定義輸出文件名稱或者說名稱格式。在jobconf中setOutputFormat(MultipleOutputFormat的子類)就行了。而不是那種part-r-00000啥的了。。。并且可以分配結(jié)果到多個(gè)文件中。
MultipleOutputFormat繼承了FileOutputFormat, 允許將輸出數(shù)據(jù)寫進(jìn)不同的輸出文件中。有三種應(yīng)用場景:
a. 最少有一個(gè)reducer的mapreduce任務(wù)。這個(gè)reducer想要根據(jù)實(shí)際的key將輸出寫進(jìn)不同的文件中。假設(shè)一個(gè)key編碼了實(shí)際的key和為實(shí)際的key指定的位置
b. 只有map的任務(wù)。這個(gè)任務(wù)想要把輸入文件或者輸入內(nèi)容的部分名稱設(shè)為輸出文件名。
c. 只有map的任務(wù)。這個(gè)任務(wù)為輸出命名時(shí),需要依賴keys和輸入文件名。
- //這里是根據(jù)key生成多個(gè)文件的地方,可以看到還有value,name等參數(shù)
- @Override
- protected String generateFileNameForKeyValue(Text key,
- IntWritable value, String name) {
- char c = key.toString().toLowerCase().charAt(0);
- if (c >= 'a' && c <= 'z') {
- return c + ".txt";
- }
- return "result.txt";
- }
DistributedCache
在集群中快速分發(fā)大的只讀文件。DistributedCache是MR用來緩存app需要的諸如text,archive,jar等的文件的。app通過jobconf中的url來指定需要緩存的文件。它會(huì)假定指定的這個(gè)文件已經(jīng)在url指定的對(duì)應(yīng)位置上了。在job在node上執(zhí)行之前,DistributedCache會(huì)copy必要的文件到這個(gè)slave node。它的功效就是為每個(gè)job只copy一次,而且copy到指定位置,能夠自動(dòng)解壓縮。
DistributedCache可以用來分發(fā)簡單的只讀文件,或者一些復(fù)雜的例如archive,jar文件等。archive文件會(huì)自動(dòng)解壓縮,而jar文件會(huì)被自動(dòng)放置到任務(wù)的classpath中(lib)。分發(fā)壓縮archive時(shí),可以指定解壓名稱如:dict.zip#dict。這樣就會(huì)解壓到dict中,否則默認(rèn)是dict.zip中。
文件是有執(zhí)行權(quán)限的。用戶可以選擇在任務(wù)的工作目錄下建立指向DistributedCache的軟鏈接。
- DistributedCache.createSymlink(conf);
- DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);
DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當(dāng)前工作目錄下創(chuàng)建到緩存文件的符號(hào)鏈接。則在task的當(dāng)前工作目錄會(huì)有l(wèi)ink-name的鏈接,相當(dāng)于快捷方法,鏈接到expr.txt文件,在setup方法使用的情況則要簡單許多?;蛘咄ㄟ^設(shè)置配置文件屬性mapred.create.symlink為yes。 分布式緩存會(huì)截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當(dāng)前工作目錄會(huì)有名為lib.so的鏈接, 它會(huì)鏈接分布式緩存中的lib.so.1#p#
DistributedCache會(huì)跟蹤修改緩存文件的timestamp。
下面是使用的例子, 為應(yīng)用app設(shè)置緩存
- $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
- $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
- $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
- $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
- $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
- $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. 設(shè)置app的jobConf:
- JobConf job = new JobConf();
- DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
- job);
- DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
- DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
- DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
- DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
- DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. 在mapper或者reducer中使用緩存文件:
- public static class MapClass extends MapReduceBase
- implements Mapper<K, V, K, V> {
- private Path[] localArchives;
- private Path[] localFiles;
- public void configure(JobConf job) {
- // 得到剛剛緩存的文件
- localArchives = DistributedCache.getLocalCacheArchives(job);
- localFiles = DistributedCache.getLocalCacheFiles(job);
- }
- public void map(K key, V value,
- OutputCollector<K, V>; output, Reporter reporter)
- throws IOException {
- // 使用緩存文件
- // ...
- // ...
- output.collect(k, v);
- }
- }
它跟GenericOptionsParser的部分功能有異曲同工之妙。
PathFilter + 通配符。accept(Path path)篩選path是否通過。
NullWritable
不想輸出的時(shí)候,把它當(dāng)做key。NullWritable是Writable的一個(gè)特殊類,序列化的長度為0,實(shí)現(xiàn)方法為空實(shí)現(xiàn),不從數(shù)據(jù)流中讀數(shù)據(jù),也不寫入數(shù)據(jù),只充當(dāng)占位符,如在MapReduce中,如果你不需要使用鍵或值,你就可以將鍵或值聲明為NullWritable,NullWritable是一個(gè)不可變的單實(shí)例類型。
FileInputFormat繼承于InputFormat
InputFormat的作用:
驗(yàn)證輸入規(guī)范;
切分輸入文件為InputSpilts;
提供RecordReader來收集InputSplit中的輸入記錄,給Mapper進(jìn)行執(zhí)行。
RecordReader
將面向字節(jié)的InputSplit轉(zhuǎn)換為面向記錄的視圖,供Mapper或者Reducer使用運(yùn)行。因此假定處理記錄的責(zé)任界限,為任務(wù)呈現(xiàn)key-value。
SequenceFile:
SequenceFile是包含二進(jìn)制kv的扁平文件(序列化)。它提供Writer、Reader、Sorter來進(jìn)行寫、讀、排序功能?;贑ompressionType,SequenceFile有三種對(duì)于kv的壓縮方式:
●Writer:不壓縮records;
●RecordCompressWriter: 只壓縮values;
●BlockCompressWriter: 壓縮records,keys和values都被分開壓縮在block中,block的大小可以配置;
壓縮方式由合適的CompressionCodec指定。推薦使用此類的靜態(tài)方法createWriter來選擇格式。Reader作為橋接可以讀取以上任何一種壓縮格式。
CompressionCodec:
封裝了關(guān)于流式壓縮/解壓縮的相關(guān)方法。
Mapper
Mapper 將輸入的kv對(duì)映射成中間數(shù)據(jù)kv對(duì)集合。Maps 將輸入記錄轉(zhuǎn)變?yōu)橹虚g記錄,其中被轉(zhuǎn)化后的記錄不必和輸入記錄類型相同。一個(gè)給定的輸入對(duì)可以映射為0或者多個(gè)輸出對(duì)。
在MRJob執(zhí)行過程中,MapReduce框架根據(jù)提前指定的InputFormat(輸入格式對(duì)象)產(chǎn)生InputSplit(輸入分片),而每個(gè)InputSplit將會(huì)由一個(gè)map任務(wù)處理。
總起來講,Mapper實(shí)現(xiàn)類通過JobConfigurable.configure(JobConf)方法傳入JobConf對(duì)象來初始化,然后在每個(gè)map任務(wù)中調(diào)用map(WritableComparable,Writable,OutputCollector,Reporter)方法處理InputSplit的每個(gè)kv對(duì)。MR應(yīng)用可以覆蓋Closeable.close方法去處理一些必須的清理工作。
輸出對(duì)不一定和輸入對(duì)類型相同。一個(gè)給定的輸入對(duì)可能映射成0或者很多的輸出對(duì)。輸出對(duì)是框架通過調(diào)用OutputCollector.colect(WritableComparable,Writable)得到。
MR應(yīng)用可以使用Reporter匯報(bào)進(jìn)度,設(shè)置應(yīng)用層級(jí)的狀態(tài)信息,更新計(jì)數(shù)器或者只是顯示應(yīng)用處于運(yùn)行狀態(tài)等。
所有和給定的輸出key關(guān)聯(lián)的中間數(shù)據(jù)都會(huì)隨后被框架分組處理,并傳給Reducer處理以產(chǎn)生最終的輸出。用戶可以通過JobConf.setOutputKeyComparatorClass(Class)指定一個(gè)Comparator控制分組處理過程。
Mapper輸出都被排序后根據(jù)Reducer數(shù)量進(jìn)行分區(qū),分區(qū)數(shù)量等于reduce任務(wù)數(shù)量。用戶可以通過實(shí)現(xiàn)自定義的Partitioner來控制哪些keys(記錄)到哪個(gè)Reducer中去。
此外,用戶還可以指定一個(gè)Combiner,調(diào)用JobConf.setCombinerClass(Class)來實(shí)現(xiàn)。這個(gè)可以來對(duì)map輸出做本地的聚合,有助于減少從mapper到reducer的數(shù)據(jù)量。
經(jīng)過排序的中間輸出數(shù)據(jù)通常以一種簡單的格式(key-len,key,value-len,value)存儲(chǔ)在SequenceFile中。應(yīng)用可以決定是否或者怎樣被壓縮以及壓縮格式,可以通過JobConf來指定CompressionCodec.
如果job沒有reducer,那么mapper的輸出結(jié)果會(huì)不經(jīng)過分組排序,直接寫進(jìn)FileSystem.#p#
Map數(shù)
通常map數(shù)由輸入數(shù)據(jù)總大小決定,也就是所有輸入文件的blocks數(shù)目決定。
每個(gè)節(jié)點(diǎn)并行的運(yùn)行的map數(shù)正常在10到100個(gè)。由于Map任務(wù)初始化本身需要一段時(shí)間所以map運(yùn)行時(shí)間至少在1分鐘為好。
如此,如果有10T的數(shù)據(jù)文件,每個(gè)block大小128M,***使用為82000map數(shù),除非使用setNumMapTasks(int)(這個(gè)方法僅僅對(duì)MR框架提供一個(gè)建議值)將map數(shù)值設(shè)置到更高。
Reducer
Reducer根據(jù)key將中間數(shù)據(jù)集合處理合并為更小的數(shù)據(jù)結(jié)果集。
用戶可以通過JobConf.setNumReduceTasks(int)設(shè)置作業(yè)的reducer數(shù)目。
整體而言,Reducer實(shí)現(xiàn)類通過JobConfigurable.configure(JobConf)方法將JobConf對(duì)象傳入,并為Job設(shè)置和初始化Reducer。MR框架調(diào)用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 來處理以key被分組的輸入數(shù)據(jù)。應(yīng)用可以覆蓋Closeable.close()處理必要的清理操作。
Reducer由三個(gè)主要階段組成:shuffle,sort,reduce。
❈shuffle
輸入到Reducer的輸入數(shù)據(jù)是Mapper已經(jīng)排過序的數(shù)據(jù).在shuffle階段,根據(jù)partition算法獲取相關(guān)的mapper地址,并通過Http協(xié)議將mapper的相應(yīng)輸出數(shù)據(jù)由reducer拉取到reducer機(jī)器上處理。
❈sort
框架在這個(gè)階段會(huì)根據(jù)key對(duì)reducer的輸入進(jìn)行分組(因?yàn)椴煌膍apper輸出的數(shù)據(jù)中可能含有相同的key)。
shuffle和sort是同時(shí)進(jìn)行的,同時(shí)reducer仍然在拉取map的輸出。
❈Secondary Sort
如果對(duì)中間數(shù)據(jù)key進(jìn)行分組的規(guī)則和在處理化簡階段前對(duì)key分組規(guī)則不一致時(shí),可以通過 JobConf.setOutputValueGroupingComparator(Class)設(shè)置一個(gè)Comparator。因?yàn)橹虚g數(shù)據(jù)的分組策略是通過 JobConf.setOutputKeyComparatorClass(Class) 設(shè)置的,可以控制中間數(shù)據(jù)根據(jù)哪些key進(jìn)行分組。而JobConf.setOutputValueGroupingComparator(Class)則可用于在數(shù)據(jù)連接情況下對(duì)value進(jìn)行二次排序。
Reduce(化簡)
這個(gè)階段框架循環(huán)調(diào)用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 方法處理被分組的每個(gè)kv對(duì)。
reduce 任務(wù)一般通過 OutputCollector.collect(WritableComparable, Writable)將輸出數(shù)據(jù)寫入文件系統(tǒng)FileSystem。應(yīng)用可以使用Reporter匯報(bào)作業(yè)執(zhí)行進(jìn)度、設(shè)置應(yīng)用層級(jí)的狀態(tài)信息并更新計(jì)數(shù)器(Counter),或者只是提示作業(yè)在運(yùn)行。
注意,Reducer的輸出不會(huì)再進(jìn)行排序。
Reducer數(shù)目
合適的reducer數(shù)目可以這樣估算:(節(jié)點(diǎn)數(shù)目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或 乘以1.75。因子為0.95時(shí),當(dāng)所有map任務(wù)完成時(shí)所有reducer可以立即啟動(dòng),并開始從map機(jī)器上拉取數(shù)據(jù)。因子為1.75時(shí),最快的一些節(jié)點(diǎn)將完成***輪reduce處理,此時(shí)框架開始啟動(dòng)第二輪reduce任務(wù),這樣可以達(dá)到比較好的作業(yè)負(fù)載均衡。提高reduce數(shù)目會(huì)增加框架的運(yùn)行負(fù)擔(dān),但有利于提升作業(yè)的負(fù)載均衡并降低失敗的成本。上述的因子使用***在作業(yè)執(zhí)行時(shí)框架仍然有reduce槽為前提,畢竟框架還需要對(duì)作業(yè)進(jìn)行可能的推測(cè)執(zhí)行和失敗任務(wù)的處理。
不使用Reducer
如果不需要進(jìn)行化簡處理,可以將reduce數(shù)目設(shè)為0。這種情況下,map的輸出會(huì)直接寫入到文件系統(tǒng)。輸出路徑通過setOutputPath(Path)指定??蚣茉趯懭霐?shù)據(jù)到文件系統(tǒng)之前不再對(duì)map結(jié)果進(jìn)行排序。
Partitioner
Partitioner對(duì)數(shù)據(jù)按照key進(jìn)行分區(qū),從而控制map的輸出傳輸?shù)侥膫€(gè)reducer上。默認(rèn)的Partitioner算法是hash(哈希。分區(qū)數(shù)目由作業(yè)的reducer數(shù)目決定。HashPartitioner 是默認(rèn)的Partitioner。
Reporter
Reporter為MR應(yīng)用提供了進(jìn)度報(bào)告、應(yīng)用狀態(tài)信息設(shè)置,和計(jì)數(shù)器(Counter)更新等功能.
Mapper和Reducer實(shí)現(xiàn)可以使用Reporter匯報(bào)進(jìn)度或者提示作業(yè)在正常運(yùn)行。在一些場景下,應(yīng)用在處理一些特殊的kv對(duì)時(shí)耗費(fèi)了過多時(shí)間,這個(gè)可能會(huì)因?yàn)榭蚣芗俣ㄈ蝿?wù)超時(shí)而強(qiáng)制停止了這些作業(yè)。為避免該情況,可以設(shè)置mapred.task.timeout 為一個(gè)比較高的值或者將其設(shè)置為0以避免超時(shí)發(fā)生。
應(yīng)用也可以使用Reporter來更新計(jì)數(shù)(Counter)。
OutputCollector
OutputCollector是MR框架提供的通用工具來收集Mapper或者Reducer輸出數(shù)據(jù)(中間數(shù)據(jù)或者最終結(jié)果數(shù)據(jù))。
Hadoop MapReduce提供了一些經(jīng)常使用的mapper、reducer和partioner的實(shí)現(xiàn)類供我們進(jìn)行學(xué)習(xí)。
以上有關(guān)configuration和job的部分在新的API中有所改變,簡單說就是在Mapper和Reducer中引入了MapContext和ReduceContext,它們封裝了configuration和outputcollector,以及reporter。