自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

hadoop中mapreduce的常用類

網(wǎng)絡(luò) 網(wǎng)絡(luò)管理 網(wǎng)絡(luò)運(yùn)維 Hadoop
新舊API是同時(shí)存在于1.1.2的hadoop中的。以前還一直納悶兒為什么有時(shí)候是jobClient提交任務(wù),有時(shí)是Job...不管API是否更新,下面這些類也還是存在于API中的,經(jīng)過自己跟蹤源碼,發(fā)現(xiàn)原理還是這些。只不過進(jìn)行了重新組織,進(jìn)行了一些封裝,使得擴(kuò)展性更好。

寫這個(gè)文章的時(shí)候才意識(shí)到新舊API是同時(shí)存在于1.1.2的hadoop中的。以前還一直納悶兒為什么有時(shí)候是jobClient提交任務(wù),有時(shí)是Job...不管API是否更新,下面這些類也還是存在于API中的,經(jīng)過自己跟蹤源碼,發(fā)現(xiàn)原理還是這些。只不過進(jìn)行了重新組織,進(jìn)行了一些封裝,使得擴(kuò)展性更好。所以還是把這些東西從記事本貼進(jìn)來吧。

關(guān)于這些類的介紹以及使用,有的是在自己debug中看到的,多數(shù)為純翻譯API的注釋,但是翻譯的過程受益良多。

GenericOptionsParser

parseGeneralOptions(Options opts, Configuration conf, String[] args)解析命令行參數(shù)

GenericOptionsParser是為hadoop框架解析命令行參數(shù)的工具類。它能夠辨認(rèn)標(biāo)準(zhǔn)的命令行參數(shù),使app能夠輕松指定namenode,jobtracker,以及額外的配置資源或信息等。它支持的功能有:

-conf 指定配置文件;

-D 指定配置信息;

-fs 指定namenode

-jt 指定jobtracker

-files 指定需要copy到MR集群的文件,以逗號(hào)分隔

-libjars指定需要copy到MR集群的classpath的jar包,以逗號(hào)分隔

-archives指定需要copy到MR集群的壓縮文件,以逗號(hào)分隔,會(huì)自動(dòng)解壓縮

1. String[] otherArgs = new GenericOptionsParser(job, args)

2. .getRemainingArgs();

3. if (otherArgs.length != 2) {

4. System.err.println("Usage: wordcount ");

5. System.exit(2);

6. }

ToolRunner

用來跑實(shí)現(xiàn)Tool接口的工具。它與GenericOptionsParser合作來解析命令行參數(shù),只在此次運(yùn)行中更改configuration的參數(shù)。

Tool

處理命令行參數(shù)的接口。Tool是MR的任何tool/app的標(biāo)準(zhǔn)。這些實(shí)現(xiàn)應(yīng)該代理對(duì)標(biāo)準(zhǔn)命令行參數(shù)的處理。下面是典型實(shí)現(xiàn):

  1. public class MyApp extends Configured implements Tool {   
  2.            
  3. public int run(String[] args) throws Exception {   
  4. // 即將被ToolRunner執(zhí)行的Configuration   
  5. Configuration conf = getConf();   
  6.             
  7. // 使用conf建立JobConf   
  8. JobConf job = new JobConf(conf, MyApp.class);   
  9.         
  10. // 執(zhí)行客戶端參數(shù)   
  11. Path in = new Path(args[1]);   
  12. Path out = new Path(args[2]);   
  13.             
  14. // 指定job相關(guān)的參數(shù)        
  15. job.setJobName("my-app");   
  16. job.setInputPath(in);   
  17. job.setOutputPath(out);   
  18. job.setMapperClass(MyApp.MyMapper.class);   
  19. job.setReducerClass(MyApp.MyReducer.class);   
  20. *   
  21. // 提交job,然后監(jiān)視進(jìn)度直到j(luò)ob完成   
  22. JobClient.runJob(job);   
  23. }   
  24.           
  25.  public static void main(String[] args) throws Exception {   
  26. // 讓ToolRunner 處理命令行參數(shù)    
  27. int res = ToolRunner.run(new Configuration(), new Sort(), //這里封裝了GenericOptionsParser解析args   
  28.             
  29. System.exit(res);   
  30. }   
  31. }   

MultipleOutputFormat

自定義輸出文件名稱或者說名稱格式。在jobconf中setOutputFormat(MultipleOutputFormat的子類)就行了。而不是那種part-r-00000啥的了。。。并且可以分配結(jié)果到多個(gè)文件中。

MultipleOutputFormat繼承了FileOutputFormat, 允許將輸出數(shù)據(jù)寫進(jìn)不同的輸出文件中。有三種應(yīng)用場景:

a. 最少有一個(gè)reducer的mapreduce任務(wù)。這個(gè)reducer想要根據(jù)實(shí)際的key將輸出寫進(jìn)不同的文件中。假設(shè)一個(gè)key編碼了實(shí)際的key和為實(shí)際的key指定的位置

b. 只有map的任務(wù)。這個(gè)任務(wù)想要把輸入文件或者輸入內(nèi)容的部分名稱設(shè)為輸出文件名。

c. 只有map的任務(wù)。這個(gè)任務(wù)為輸出命名時(shí),需要依賴keys和輸入文件名。 

  1. //這里是根據(jù)key生成多個(gè)文件的地方,可以看到還有value,name等參數(shù)   
  2. @Override   
  3. protected String generateFileNameForKeyValue(Text key,   
  4. IntWritable value, String name) {   
  5. char c = key.toString().toLowerCase().charAt(0);   
  6. if (c >= 'a' && c <= 'z') {   
  7. return c + ".txt";   
  8. }   
  9. return "result.txt";   
  10. }   

DistributedCache

在集群中快速分發(fā)大的只讀文件。DistributedCache是MR用來緩存app需要的諸如text,archive,jar等的文件的。app通過jobconf中的url來指定需要緩存的文件。它會(huì)假定指定的這個(gè)文件已經(jīng)在url指定的對(duì)應(yīng)位置上了。在job在node上執(zhí)行之前,DistributedCache會(huì)copy必要的文件到這個(gè)slave node。它的功效就是為每個(gè)job只copy一次,而且copy到指定位置,能夠自動(dòng)解壓縮。

DistributedCache可以用來分發(fā)簡單的只讀文件,或者一些復(fù)雜的例如archive,jar文件等。archive文件會(huì)自動(dòng)解壓縮,而jar文件會(huì)被自動(dòng)放置到任務(wù)的classpath中(lib)。分發(fā)壓縮archive時(shí),可以指定解壓名稱如:dict.zip#dict。這樣就會(huì)解壓到dict中,否則默認(rèn)是dict.zip中。

文件是有執(zhí)行權(quán)限的。用戶可以選擇在任務(wù)的工作目錄下建立指向DistributedCache的軟鏈接。

  1. DistributedCache.createSymlink(conf);     
  2. DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);      

DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當(dāng)前工作目錄下創(chuàng)建到緩存文件的符號(hào)鏈接。則在task的當(dāng)前工作目錄會(huì)有l(wèi)ink-name的鏈接,相當(dāng)于快捷方法,鏈接到expr.txt文件,在setup方法使用的情況則要簡單許多?;蛘咄ㄟ^設(shè)置配置文件屬性mapred.create.symlink為yes。 分布式緩存會(huì)截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當(dāng)前工作目錄會(huì)有名為lib.so的鏈接, 它會(huì)鏈接分布式緩存中的lib.so.1#p#

DistributedCache會(huì)跟蹤修改緩存文件的timestamp。

下面是使用的例子, 為應(yīng)用app設(shè)置緩存

  1. $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat     
  2. $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip     
  3. $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar   
  4. $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar   
  5. $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz   
  6. $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz   

2. 設(shè)置app的jobConf:

  1. JobConf job = new JobConf();   
  2. DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),    
  3.  job);   
  4. DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);  
  5. DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);   
  6. DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);   
  7. DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);   
  8. DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);   

3. 在mapper或者reducer中使用緩存文件:

  1. public static class MapClass extends MapReduceBase     
  2. implements Mapper<K, V, K, V> {   
  3.      
  4. private Path[] localArchives;   
  5. private Path[] localFiles;   
  6.            
  7. public void configure(JobConf job) {   
  8. // 得到剛剛緩存的文件   
  9. localArchives = DistributedCache.getLocalCacheArchives(job);   
  10. localFiles = DistributedCache.getLocalCacheFiles(job);   
  11. }   
  12.            
  13. public void map(K key, V value,    
  14.  OutputCollector<K, V>; output, Reporter reporter)    
  15. throws IOException {   
  16. // 使用緩存文件   
  17. // ...   
  18. // ...   
  19. output.collect(k, v);   
  20. }   
  21. }   

它跟GenericOptionsParser的部分功能有異曲同工之妙。

PathFilter + 通配符。accept(Path path)篩選path是否通過。

NullWritable

不想輸出的時(shí)候,把它當(dāng)做key。NullWritable是Writable的一個(gè)特殊類,序列化的長度為0,實(shí)現(xiàn)方法為空實(shí)現(xiàn),不從數(shù)據(jù)流中讀數(shù)據(jù),也不寫入數(shù)據(jù),只充當(dāng)占位符,如在MapReduce中,如果你不需要使用鍵或值,你就可以將鍵或值聲明為NullWritable,NullWritable是一個(gè)不可變的單實(shí)例類型。

FileInputFormat繼承于InputFormat

InputFormat的作用:

驗(yàn)證輸入規(guī)范;

切分輸入文件為InputSpilts;

提供RecordReader來收集InputSplit中的輸入記錄,給Mapper進(jìn)行執(zhí)行。

RecordReader

將面向字節(jié)的InputSplit轉(zhuǎn)換為面向記錄的視圖,供Mapper或者Reducer使用運(yùn)行。因此假定處理記錄的責(zé)任界限,為任務(wù)呈現(xiàn)key-value。

SequenceFile:

SequenceFile是包含二進(jìn)制kv的扁平文件(序列化)。它提供Writer、Reader、Sorter來進(jìn)行寫、讀、排序功能?;贑ompressionType,SequenceFile有三種對(duì)于kv的壓縮方式:

●Writer:不壓縮records;

RecordCompressWriter: 只壓縮values;

BlockCompressWriter: 壓縮records,keys和values都被分開壓縮在block中,block的大小可以配置;

壓縮方式由合適的CompressionCodec指定。推薦使用此類的靜態(tài)方法createWriter來選擇格式。Reader作為橋接可以讀取以上任何一種壓縮格式。

CompressionCodec:

封裝了關(guān)于流式壓縮/解壓縮的相關(guān)方法。

Mapper

Mapper 將輸入的kv對(duì)映射成中間數(shù)據(jù)kv對(duì)集合。Maps 將輸入記錄轉(zhuǎn)變?yōu)橹虚g記錄,其中被轉(zhuǎn)化后的記錄不必和輸入記錄類型相同。一個(gè)給定的輸入對(duì)可以映射為0或者多個(gè)輸出對(duì)。

在MRJob執(zhí)行過程中,MapReduce框架根據(jù)提前指定的InputFormat(輸入格式對(duì)象)產(chǎn)生InputSplit(輸入分片),而每個(gè)InputSplit將會(huì)由一個(gè)map任務(wù)處理。

總起來講,Mapper實(shí)現(xiàn)類通過JobConfigurable.configure(JobConf)方法傳入JobConf對(duì)象來初始化,然后在每個(gè)map任務(wù)中調(diào)用map(WritableComparable,Writable,OutputCollector,Reporter)方法處理InputSplit的每個(gè)kv對(duì)。MR應(yīng)用可以覆蓋Closeable.close方法去處理一些必須的清理工作。

輸出對(duì)不一定和輸入對(duì)類型相同。一個(gè)給定的輸入對(duì)可能映射成0或者很多的輸出對(duì)。輸出對(duì)是框架通過調(diào)用OutputCollector.colect(WritableComparable,Writable)得到。

MR應(yīng)用可以使用Reporter匯報(bào)進(jìn)度,設(shè)置應(yīng)用層級(jí)的狀態(tài)信息,更新計(jì)數(shù)器或者只是顯示應(yīng)用處于運(yùn)行狀態(tài)等。

所有和給定的輸出key關(guān)聯(lián)的中間數(shù)據(jù)都會(huì)隨后被框架分組處理,并傳給Reducer處理以產(chǎn)生最終的輸出。用戶可以通過JobConf.setOutputKeyComparatorClass(Class)指定一個(gè)Comparator控制分組處理過程。

Mapper輸出都被排序后根據(jù)Reducer數(shù)量進(jìn)行分區(qū),分區(qū)數(shù)量等于reduce任務(wù)數(shù)量。用戶可以通過實(shí)現(xiàn)自定義的Partitioner來控制哪些keys(記錄)到哪個(gè)Reducer中去。

此外,用戶還可以指定一個(gè)Combiner,調(diào)用JobConf.setCombinerClass(Class)來實(shí)現(xiàn)。這個(gè)可以來對(duì)map輸出做本地的聚合,有助于減少從mapper到reducer的數(shù)據(jù)量。

經(jīng)過排序的中間輸出數(shù)據(jù)通常以一種簡單的格式(key-len,key,value-len,value)存儲(chǔ)在SequenceFile中。應(yīng)用可以決定是否或者怎樣被壓縮以及壓縮格式,可以通過JobConf來指定CompressionCodec.

如果job沒有reducer,那么mapper的輸出結(jié)果會(huì)不經(jīng)過分組排序,直接寫進(jìn)FileSystem.#p#

Map數(shù)

通常map數(shù)由輸入數(shù)據(jù)總大小決定,也就是所有輸入文件的blocks數(shù)目決定。

每個(gè)節(jié)點(diǎn)并行的運(yùn)行的map數(shù)正常在10到100個(gè)。由于Map任務(wù)初始化本身需要一段時(shí)間所以map運(yùn)行時(shí)間至少在1分鐘為好。

如此,如果有10T的數(shù)據(jù)文件,每個(gè)block大小128M,***使用為82000map數(shù),除非使用setNumMapTasks(int)(這個(gè)方法僅僅對(duì)MR框架提供一個(gè)建議值)將map數(shù)值設(shè)置到更高。

Reducer

Reducer根據(jù)key將中間數(shù)據(jù)集合處理合并為更小的數(shù)據(jù)結(jié)果集。

用戶可以通過JobConf.setNumReduceTasks(int)設(shè)置作業(yè)的reducer數(shù)目。

整體而言,Reducer實(shí)現(xiàn)類通過JobConfigurable.configure(JobConf)方法將JobConf對(duì)象傳入,并為Job設(shè)置和初始化Reducer。MR框架調(diào)用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 來處理以key被分組的輸入數(shù)據(jù)。應(yīng)用可以覆蓋Closeable.close()處理必要的清理操作。

Reducer由三個(gè)主要階段組成:shuffle,sort,reduce。

❈shuffle

 

輸入到Reducer的輸入數(shù)據(jù)是Mapper已經(jīng)排過序的數(shù)據(jù).在shuffle階段,根據(jù)partition算法獲取相關(guān)的mapper地址,并通過Http協(xié)議將mapper的相應(yīng)輸出數(shù)據(jù)由reducer拉取到reducer機(jī)器上處理。

sort

 

框架在這個(gè)階段會(huì)根據(jù)key對(duì)reducer的輸入進(jìn)行分組(因?yàn)椴煌膍apper輸出的數(shù)據(jù)中可能含有相同的key)。

shuffle和sort是同時(shí)進(jìn)行的,同時(shí)reducer仍然在拉取map的輸出。

Secondary Sort

 

如果對(duì)中間數(shù)據(jù)key進(jìn)行分組的規(guī)則和在處理化簡階段前對(duì)key分組規(guī)則不一致時(shí),可以通過 JobConf.setOutputValueGroupingComparator(Class)設(shè)置一個(gè)Comparator。因?yàn)橹虚g數(shù)據(jù)的分組策略是通過 JobConf.setOutputKeyComparatorClass(Class) 設(shè)置的,可以控制中間數(shù)據(jù)根據(jù)哪些key進(jìn)行分組。而JobConf.setOutputValueGroupingComparator(Class)則可用于在數(shù)據(jù)連接情況下對(duì)value進(jìn)行二次排序。

Reduce(化簡)

這個(gè)階段框架循環(huán)調(diào)用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 方法處理被分組的每個(gè)kv對(duì)。

reduce 任務(wù)一般通過 OutputCollector.collect(WritableComparable, Writable)將輸出數(shù)據(jù)寫入文件系統(tǒng)FileSystem。應(yīng)用可以使用Reporter匯報(bào)作業(yè)執(zhí)行進(jìn)度、設(shè)置應(yīng)用層級(jí)的狀態(tài)信息并更新計(jì)數(shù)器(Counter),或者只是提示作業(yè)在運(yùn)行。

注意,Reducer的輸出不會(huì)再進(jìn)行排序。

Reducer數(shù)目

合適的reducer數(shù)目可以這樣估算:(節(jié)點(diǎn)數(shù)目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或 乘以1.75。因子為0.95時(shí),當(dāng)所有map任務(wù)完成時(shí)所有reducer可以立即啟動(dòng),并開始從map機(jī)器上拉取數(shù)據(jù)。因子為1.75時(shí),最快的一些節(jié)點(diǎn)將完成***輪reduce處理,此時(shí)框架開始啟動(dòng)第二輪reduce任務(wù),這樣可以達(dá)到比較好的作業(yè)負(fù)載均衡。提高reduce數(shù)目會(huì)增加框架的運(yùn)行負(fù)擔(dān),但有利于提升作業(yè)的負(fù)載均衡并降低失敗的成本。上述的因子使用***在作業(yè)執(zhí)行時(shí)框架仍然有reduce槽為前提,畢竟框架還需要對(duì)作業(yè)進(jìn)行可能的推測(cè)執(zhí)行和失敗任務(wù)的處理。

不使用Reducer

如果不需要進(jìn)行化簡處理,可以將reduce數(shù)目設(shè)為0。這種情況下,map的輸出會(huì)直接寫入到文件系統(tǒng)。輸出路徑通過setOutputPath(Path)指定??蚣茉趯懭霐?shù)據(jù)到文件系統(tǒng)之前不再對(duì)map結(jié)果進(jìn)行排序。

Partitioner

Partitioner對(duì)數(shù)據(jù)按照key進(jìn)行分區(qū),從而控制map的輸出傳輸?shù)侥膫€(gè)reducer上。默認(rèn)的Partitioner算法是hash(哈希。分區(qū)數(shù)目由作業(yè)的reducer數(shù)目決定。HashPartitioner 是默認(rèn)的Partitioner。

Reporter

Reporter為MR應(yīng)用提供了進(jìn)度報(bào)告、應(yīng)用狀態(tài)信息設(shè)置,和計(jì)數(shù)器(Counter)更新等功能.

Mapper和Reducer實(shí)現(xiàn)可以使用Reporter匯報(bào)進(jìn)度或者提示作業(yè)在正常運(yùn)行。在一些場景下,應(yīng)用在處理一些特殊的kv對(duì)時(shí)耗費(fèi)了過多時(shí)間,這個(gè)可能會(huì)因?yàn)榭蚣芗俣ㄈ蝿?wù)超時(shí)而強(qiáng)制停止了這些作業(yè)。為避免該情況,可以設(shè)置mapred.task.timeout 為一個(gè)比較高的值或者將其設(shè)置為0以避免超時(shí)發(fā)生。

應(yīng)用也可以使用Reporter來更新計(jì)數(shù)(Counter)。

OutputCollector

OutputCollector是MR框架提供的通用工具來收集Mapper或者Reducer輸出數(shù)據(jù)(中間數(shù)據(jù)或者最終結(jié)果數(shù)據(jù))。

Hadoop MapReduce提供了一些經(jīng)常使用的mapper、reducer和partioner的實(shí)現(xiàn)類供我們進(jìn)行學(xué)習(xí)。

 

以上有關(guān)configuration和job的部分在新的API中有所改變,簡單說就是在Mapper和Reducer中引入了MapContext和ReduceContext,它們封裝了configuration和outputcollector,以及reporter。

責(zé)任編輯:守望幸福 來源: 51CTO.com
相關(guān)推薦

2017-04-19 11:17:48

SparkHadoopMapReduce

2010-06-07 13:35:16

Hadoop簡介

2010-06-03 16:32:09

Hadoop MapR

2014-12-29 09:59:03

Spark 1.2MapReduce

2013-11-27 09:21:18

YARNMapReduceHadoop

2010-06-03 16:18:07

Hadoop MapR

2013-04-24 10:47:48

Hadoop集群

2013-01-21 13:22:56

IBMdW

2014-11-10 15:02:21

大數(shù)據(jù)云計(jì)算Hadoop

2019-10-31 09:52:02

HadoopJava大數(shù)據(jù)

2012-04-23 10:30:38

Hadoop

2010-06-07 11:12:52

Hadoop-0.20

2014-10-15 16:32:43

MapReducehadoop

2013-12-17 09:52:13

pythonhadoopmapreduce

2014-01-07 14:04:13

HadoopMapReduce

2014-03-18 10:19:55

Hadoop部署hadoop集群腳本

2024-06-03 10:07:22

Vector類元素向量

2010-06-07 11:30:24

Hadoop源代碼

2012-05-09 09:13:29

IDCHadoopMapReduce

2021-07-22 09:53:34

Vector類Java添加元素
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)