MapReduce連接:重分區(qū)連接
MapReduce的連接操作可以用于以下場景:
-
用戶的人口統(tǒng)計(jì)信息的聚合操作(例如:青少年和中年人的習(xí)慣差異)。
-
當(dāng)用戶超過一定時(shí)間沒有使用網(wǎng)站后,發(fā)郵件提醒他們。(這個(gè)一定時(shí)間的閾值是用戶自己預(yù)定義的)
-
分析用戶的瀏覽習(xí)慣。讓系統(tǒng)可以基于這個(gè)分析提示用戶有哪些網(wǎng)站特性還沒有使用到。進(jìn)而形成一個(gè)反饋循環(huán)。
所有這些場景都要求將多個(gè)數(shù)據(jù)集連接起來。
最常用的兩個(gè)連接類型是內(nèi)連接(inner join)和外連接(outer join)。如下圖所示,內(nèi)連接比較兩個(gè)關(guān)系中所有的元組,判斷是否滿足連接條件,然后生成一個(gè)滿足連接條件的結(jié)果集。與內(nèi)連接相反的是,外連接并不需要兩個(gè)關(guān)系的元組都滿足連接條件。在連接條件不滿足的時(shí)候,外連接可以將其中一方的數(shù)據(jù)保留在結(jié)果集中。
為了實(shí)現(xiàn)內(nèi)連接和外連接,MapReduce中有三種連接策略,如下所示。這三種連接策略有的在map階段,有的在reduce階段。它們都針對MapReduce的排序-合并(sort-merge)的架構(gòu)進(jìn)行了優(yōu)化。
-
重分區(qū)連接(Repartition join)—— reduce端連接。使用場景:連接兩個(gè)或多個(gè)大型數(shù)據(jù)集。
-
復(fù)制連接(Replication join)—— map端連接。使用場景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集足夠小到可以完全放在緩存中。
-
半連接(Semi-join)—— 另一個(gè)map端連接。使用場景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集非常大,但同時(shí)這個(gè)數(shù)據(jù)集可以被過濾成小到可以放在緩存中。
在介紹完這些連接策略之后,還會介紹另一個(gè)策略:決策樹??梢愿鶕?jù)實(shí)際情況選擇最優(yōu)策略。
4.1.1 重分區(qū)連接(Repartition join)
重分區(qū)連接是reduce端連接。它利用MapReduce的排序-合并機(jī)制來分組數(shù)據(jù)。它只使用一個(gè)單獨(dú)的MapReduce任務(wù),并支持多路連接(N-way join)。多路指的是多個(gè)數(shù)據(jù)集。
Map階段負(fù)責(zé)從多個(gè)數(shù)據(jù)集中讀取數(shù)據(jù),決定每個(gè)數(shù)據(jù)的連接值,將連接值作為輸出鍵(output key)。輸出值(output value)則包含將在reduce階段被合并的值。
Reduce階段,一個(gè)reduce接收map函數(shù)傳來的每一個(gè)輸出鍵的所有輸出值,并將數(shù)據(jù)分為多個(gè)分區(qū)。在此之后,reduce對所有的分區(qū)進(jìn)行笛卡爾積(Cartersian product)連接運(yùn)算,并生成全部的結(jié)果集。
以上MapReduce過程如圖4.2所示:
注:過濾(filtering)和投影(projection) 在MapReduce重分區(qū)連接中,最好能夠減少map階段傳輸?shù)絩educe階段的數(shù)據(jù)量。因?yàn)橥ㄟ^網(wǎng)絡(luò)在這兩個(gè)階段中排序和傳輸數(shù)據(jù)會產(chǎn)生很高的成本。如果不能避免reduce端的工作,那么一個(gè)最佳實(shí)踐就是盡可能在map階段多過濾數(shù)據(jù)和投影。過濾指的是將map極端的輸入數(shù)據(jù)中不需要的部分丟棄。投影是關(guān)系代數(shù)的概念。投影用于減少發(fā)送給reduce的字段。例如:在分析用戶數(shù)據(jù)的時(shí)候,如果只關(guān)注用戶的年齡,那么在map任務(wù)中應(yīng)該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。 |
技術(shù)19:優(yōu)化重分區(qū)連接
《Hadoop in Action》給出了一個(gè)例子,說明如何使用Hadoop的社區(qū)包(contrib package)org.apache.hadoop.contrib.utils.join實(shí)現(xiàn)重分區(qū)連接。這個(gè)貢獻(xiàn)包打包了所有的處理細(xì)節(jié),僅僅需要實(shí)現(xiàn)一個(gè)非常簡單的方法。
然而,這個(gè)社區(qū)包對重分區(qū)的實(shí)現(xiàn)方法的空間效率低下。它需要將待連接的所有輸出值都讀取到內(nèi)存中,然后進(jìn)行多路連接(multiway join)。實(shí)際上,如果僅僅將小數(shù)據(jù)集讀取到內(nèi)存中,然后用小數(shù)據(jù)集遍歷大數(shù)據(jù)集來進(jìn)行連接,這樣將更加高效。
問題
需要在MapReduce中進(jìn)行重分區(qū)連接,但是不希望在reduce階段將所有的數(shù)據(jù)都放到緩存中。
解決方案
這個(gè)技術(shù)運(yùn)用了優(yōu)化后的重分區(qū)框架。它僅僅將一個(gè)待連接的數(shù)據(jù)集放在緩存中,減少了reduce需要放在緩存中的數(shù)據(jù)。
討論
附錄D中介紹了優(yōu)化后的重分區(qū)框架的實(shí)現(xiàn)。這個(gè)實(shí)現(xiàn)是根據(jù)org.apache.hadoop.contrib.utils.join社區(qū)包進(jìn)行建模。這個(gè)優(yōu)化后的框架僅僅緩存兩個(gè)數(shù)據(jù)集中比較小的那一個(gè),以減少內(nèi)存消耗。圖4.3是優(yōu)化后的重分區(qū)連接的流程圖:
圖4.4是實(shí)現(xiàn)的類圖。類圖中包含兩個(gè)部分,一個(gè)通用框架和一些類的實(shí)現(xiàn)樣例。
使用這個(gè)連接框架需要實(shí)現(xiàn)抽象類OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。
例如,需要連接用戶詳情數(shù)據(jù)和用戶活動日志。第一步,判斷兩個(gè)數(shù)據(jù)集中那一個(gè)比較小。對于一般的網(wǎng)站來說,用戶詳情數(shù)據(jù)會比較小,用戶活動日志會比較大。
在如下示例中,用戶數(shù)據(jù)中有用戶姓名,年齡和所在州
- $ cat test-data/ch4/users.txt
- anne 22 NY
- joe 39 CO
- alison 35 NY
- mike 69 VA
- marie 27 OR
- jim 21 OR
- bob 71 CA
- mary 53 NY
- dave 36 VA
- dude 50 CA
用戶活動日志中有用戶姓名,進(jìn)行的動作,來源IP。這個(gè)文件一般都要比用戶數(shù)據(jù)要大得多。
- $ cat test-data/ch4/user-logs.txt
- jim logout 93.24.237.12
- mike new_tweet 87.124.79.252
- bob new_tweet 58.133.120.100
- mike logout 55.237.104.36
- jim new_tweet 93.24.237.12
- marie view_user 122.158.130.90
首先,必須實(shí)現(xiàn)抽象類OptimizedDataJoinMapperBase。這個(gè)將在map端被調(diào)用。這個(gè)類將創(chuàng)建map的輸出鍵和輸出值。同時(shí),它還將提示整個(gè)框架,當(dāng)前處理的文件是不是比較小的那個(gè)。
- public class SampleMap extends OptimizedDataJoinMapperBase {
- private boolean smaller;
- @Override
- protected Text generateInputTag(String inputFile) {
- // tag the row with input file name (data source)
- smaller = inputFile.contains("users.txt");
- return new Text(inputFile);
- }
- @Override
- protected String genGroupKey(Object key, OutputValue output) {
- return key.toString();
- }
- @Override
- protected boolean isInputSmaller(String inputFile) {
- return smaller;
- }
- @Override
- protected OutputValue genMapOutputValue(Object o) {
- return new TextTaggedOutputValue((Text) o);
- }
- }
下一步,你需要實(shí)現(xiàn)抽象類 OptimizedDataJoinReducerBase。它將在reduce端被調(diào)用。在這個(gè)類中,將從map端傳入不同數(shù)據(jù)集的輸出鍵和輸出值,然后返回reduce端的輸出數(shù)組。
- public class SampleReduce extends OptimizedDataJoinReducerBase {
- private TextTaggedOutputValue output = new TextTaggedOutputValue();
- private Text textOutput = new Text();
- @Override
- protected OutputValue combine(String key,
- OutputValue smallValue,
- OutputValue largeValue) {
- if(smallValue == null || largeValue == null) {
- return null;
- }
- Object[] values = {
- smallValue.getData(), largeValue.getData()
- };
- textOutput.set(StringUtils.join(values, "\t"));
- output.setData(textOutput);
- return output;
- }
最后,任務(wù)的主代碼(driver code)需要指明InputFormat類,并設(shè)置次排序(Secondary sort)。
- job.setInputFormat(KeyValueTextInputFormat.class);
- job.setMapOutputKeyClass(CompositeKey.class);
- job.setMapOutputValueClass(TextTaggedOutputValue.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setPartitionerClass(CompositeKeyPartitioner.class);
- job.setOutputKeyComparatorClass(CompositeKeyComparator.class);
- job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);
現(xiàn)在連接的準(zhǔn)備工作就做完了,可以開始運(yùn)行連接:
- $ hadoop fs -put test-data/ch4/users.txt users.txt
- $ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt
- $ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output
- $ hadoop fs -cat output/part*
- bob 71 CA new_tweet 58.133.120.100
- jim 21 OR logout 93.24.237.12
- jim 21 OR new_tweet 93.24.237.12
- jim 21 OR login 198.184.237.49
- marie 27 OR login 58.133.120.100
- marie 27 OR view_user 122.158.130.90
- mike 69 VA new_tweet 87.124.79.252
- mike 69 VA logout 55.237.104.36
如果和連接的源文件相對比,可以看到因?yàn)閷?shí)現(xiàn)了一個(gè)內(nèi)連接,輸出中不包括用戶anne,alison等不存在于日志文件中的記錄。
小結(jié):
這個(gè)連接的實(shí)現(xiàn)通過只緩存比較小的數(shù)據(jù)集來提高來Hadoop社區(qū)包的效率。但是,當(dāng)數(shù)據(jù)從map階段傳輸?shù)絩educe階段的時(shí)候,仍然產(chǎn)生了很高的網(wǎng)絡(luò)成本。
此外,Hadoop社區(qū)包支持多路連接,這里的實(shí)現(xiàn)只支持二路連接。
如果要更多地減少reduce端連接的內(nèi)存足跡(memory footprint),一個(gè)簡單的機(jī)制是在map函數(shù)中更多地進(jìn)行投影操作。投影減少了map階段的輸出中的字段。例如:在分析用戶數(shù)據(jù)的時(shí)候,如果只關(guān)注用戶的年齡,那么在map任務(wù)中應(yīng)該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。這樣就減少了map和reduce之間的網(wǎng)絡(luò)負(fù)擔(dān),也減少了reduce在連接時(shí)的內(nèi)存消耗。
和原始的社區(qū)包一樣,這里的重分區(qū)的實(shí)現(xiàn)也支持過濾和投影。通過允許genMapOutputValue方法返回空值,就可以支持過濾。通過在genMapOutputValue方法中定義輸出值的內(nèi)容,就可以支持投影。
如果你既想輸出所有的數(shù)據(jù)到reduce,又想避免排序的損耗,就需要考慮另外兩種連接策略,復(fù)制連接和半連接。
附錄D 優(yōu)化后的MapReduce連接框架
在這個(gè)附錄,我們將討論在第4張中使用的兩個(gè)連接框架。第一個(gè)是重連接框架。它減少了org.apache.hadoop.contrib.utils.join包的實(shí)現(xiàn)的Hadoop連接的內(nèi)存足跡。第二個(gè)是復(fù)制連接框架。它可以將較小的數(shù)據(jù)集放在緩存中。
D.1 優(yōu)化后的重分區(qū)框架
Hadoop社區(qū)連接包需要將每個(gè)鍵的所有值都讀取到內(nèi)存中。如何才能在reduce端的連接減少內(nèi)存開銷呢?本文提供的優(yōu)化中,只需要緩存較小的數(shù)據(jù)集,然后在連接中遍歷較大數(shù)據(jù)集中的數(shù)據(jù)。這個(gè)方法中還包括針對map的輸出數(shù)據(jù)的次排序,那么reducer先接收到較小的數(shù)據(jù)集,然后接收到較大的數(shù)據(jù)集。圖D.1是這個(gè)過程的流程圖。
圖D.2是實(shí)現(xiàn)的類圖。類圖中包含兩個(gè)部分,一個(gè)通用框架和一些類的實(shí)現(xiàn)樣例。
連接框架
我們以和Hadoop社區(qū)連接包的近似的風(fēng)格編寫連接的代碼。目標(biāo)是創(chuàng)建可以處理任意數(shù)據(jù)集的通用重分區(qū)機(jī)制。為簡潔起見,我們重點(diǎn)說明主要部分。
首先是OptimizedDataJoinMapperBase類。這個(gè)類的作用是辨認(rèn)出較小的數(shù)據(jù)集,并生成輸出鍵和輸出值。Configure方法在mapper創(chuàng)建時(shí)被調(diào)用。Configure方法的作用之一是標(biāo)識每一個(gè)數(shù)據(jù)集,讓reducer可以區(qū)分?jǐn)?shù)據(jù)的源數(shù)據(jù)集。另一個(gè)作用是辨認(rèn)當(dāng)前的輸入數(shù)據(jù)是否是較小的數(shù)據(jù)集。
- protected abstract Text generateInputTag(String inputFile);
- protected abstract boolean isInputSmaller(String inputFile);
- public void configure(JobConf job) {
- this.inputFile = job.get("map.input.file");
- this.inputTag = generateInputTag(this.inputFile);
- if(isInputSmaller(this.inputFile)) {
- smaller = new BooleanWritable(true);
- outputKey.setOrder(0);
- } else {
- smaller = new BooleanWritable(false);
- outputKey.setOrder(1);
- }
- }
Map方法首先調(diào)用自定義的方法 (generateTaggedMapOutput) 來生成OutputValue對象。這個(gè)對象包含了在連接中需要使用的值(也可能包含了最終輸出的值),和一個(gè)標(biāo)識較大或較小數(shù)據(jù)集的布爾值。如果map方法可以調(diào)用自定義的方法 (generateGroupKey) 來得到可以在連接中使用的鍵,那么這個(gè)鍵就作為map的輸出鍵。
- protected abstract OptimizedTaggedMapOutput generateTaggedMapOutput(Object value);
- protected abstract String generateGroupKey(Object key, OptimizedTaggedMapOutput aRecord);
- public void map(Object key, Object value, OutputCollector output, Reporter reporter)
- throws IOException {
- OptimizedTaggedMapOutput aRecord = generateTaggedMapOutput(value);
- if (aRecord == null) {
- return;
- }
- aRecord.setSmaller(smaller);
- String groupKey = generateGroupKey(aRecord);
- if (groupKey == null) {
- return;
- }
- outputKey.setKey(groupKey);
- output.collect(outputKey, aRecord);
- }
圖D.3 說明了map輸出的組合鍵(composite 可以)和組合值。次排序?qū)鶕?jù)連接鍵(join key)進(jìn)行分區(qū),并用整個(gè)組合鍵來進(jìn)行排序。組合鍵包括一個(gè)標(biāo)識源數(shù)據(jù)集(較大或較小)的整形值,因此可以根據(jù)這個(gè)整形值來保證較小源數(shù)據(jù)集的值先于較大源數(shù)據(jù)的值被reduce接收。
下一步是深入reduce。此前已經(jīng)可以保證較小源數(shù)據(jù)集的值將會先于較大源數(shù)據(jù)集的值被接收。這里就可以將所有的較小源數(shù)據(jù)集的值放到緩存中。在開始接收較大源數(shù)據(jù)集的值的時(shí)候,就開始和緩存中的值做連接操作。
- public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter)
- throws IOException {
- CompositeKey k = (CompositeKey) key;
- List<OptimizedTaggedMapOutput> smaller = new ArrayList<OptimizedTaggedMapOutput>();
- while (values.hasNext()) {
- Object value = values.next();
- OptimizedTaggedMapOutput cloned =((OptimizedTaggedMapOutput) value).clone(job);
- if (cloned.isSmaller().get()) {
- smaller.add(cloned);
- } else {
- joinAndCollect(k, smaller, cloned, output, reporter);
- }
- }
- }
方法joinAndCollect包含了兩個(gè)數(shù)據(jù)集的值,并輸出它們。
- protected abstract OptimizedTaggedMapOutput combine(
- String key,
- OptimizedTaggedMapOutput value1,
- OptimizedTaggedMapOutput value2);
- private void joinAndCollect(CompositeKey key,
- List<OptimizedTaggedMapOutput> smaller,
- OptimizedTaggedMapOutput value,
- OutputCollector output,
- Reporter reporter)
- throws IOException {
- if (smaller.size() < 1) {
- OptimizedTaggedMapOutput combined = combine(key.getKey(), null, value);
- collect(key, combined, output, reporter);
- } else {
- for (OptimizedTaggedMapOutput small : smaller) {
- OptimizedTaggedMapOutput combined = combine(key.getKey(), small, value);
- collect(key, combined, output, reporter);
- }
- }
- }
這些就是這個(gè)框架的主要內(nèi)容。
原文鏈接:http://www.cnblogs.com/datacloud/p/3578509.html