自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

MapReduce連接:重分區(qū)連接

數(shù)據(jù)庫
連接是關(guān)系運(yùn)算,可以用于合并關(guān)系(relation)。對于數(shù)據(jù)庫中的表連接操作,可能已經(jīng)廣為人知了。在MapReduce中,連接可以用于合并兩個(gè)或多個(gè)數(shù)據(jù)集。例如,用戶基本信息和用戶活動詳情信息。用戶基本信息來自于OLTP數(shù)據(jù)庫。用戶活動詳情信息來自于日志文件。

 MapReduce的連接操作可以用于以下場景:

  • 用戶的人口統(tǒng)計(jì)信息的聚合操作(例如:青少年和中年人的習(xí)慣差異)。

  • 當(dāng)用戶超過一定時(shí)間沒有使用網(wǎng)站后,發(fā)郵件提醒他們。(這個(gè)一定時(shí)間的閾值是用戶自己預(yù)定義的)

  • 分析用戶的瀏覽習(xí)慣。讓系統(tǒng)可以基于這個(gè)分析提示用戶有哪些網(wǎng)站特性還沒有使用到。進(jìn)而形成一個(gè)反饋循環(huán)。

所有這些場景都要求將多個(gè)數(shù)據(jù)集連接起來。

最常用的兩個(gè)連接類型是內(nèi)連接(inner join)和外連接(outer join)。如下圖所示,內(nèi)連接比較兩個(gè)關(guān)系中所有的元組,判斷是否滿足連接條件,然后生成一個(gè)滿足連接條件的結(jié)果集。與內(nèi)連接相反的是,外連接并不需要兩個(gè)關(guān)系的元組都滿足連接條件。在連接條件不滿足的時(shí)候,外連接可以將其中一方的數(shù)據(jù)保留在結(jié)果集中。

為了實(shí)現(xiàn)內(nèi)連接和外連接,MapReduce中有三種連接策略,如下所示。這三種連接策略有的在map階段,有的在reduce階段。它們都針對MapReduce的排序-合并(sort-merge)的架構(gòu)進(jìn)行了優(yōu)化。

  1. 重分區(qū)連接(Repartition join)—— reduce端連接。使用場景:連接兩個(gè)或多個(gè)大型數(shù)據(jù)集。

  2. 復(fù)制連接(Replication join)—— map端連接。使用場景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集足夠小到可以完全放在緩存中。

  3. 半連接(Semi-join)—— 另一個(gè)map端連接。使用場景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集非常大,但同時(shí)這個(gè)數(shù)據(jù)集可以被過濾成小到可以放在緩存中。

在介紹完這些連接策略之后,還會介紹另一個(gè)策略:決策樹??梢愿鶕?jù)實(shí)際情況選擇最優(yōu)策略。

4.1.1 重分區(qū)連接(Repartition join)

重分區(qū)連接是reduce端連接。它利用MapReduce的排序-合并機(jī)制來分組數(shù)據(jù)。它只使用一個(gè)單獨(dú)的MapReduce任務(wù),并支持多路連接(N-way join)。多路指的是多個(gè)數(shù)據(jù)集。

Map階段負(fù)責(zé)從多個(gè)數(shù)據(jù)集中讀取數(shù)據(jù),決定每個(gè)數(shù)據(jù)的連接值,將連接值作為輸出鍵(output key)。輸出值(output value)則包含將在reduce階段被合并的值。

Reduce階段,一個(gè)reduce接收map函數(shù)傳來的每一個(gè)輸出鍵的所有輸出值,并將數(shù)據(jù)分為多個(gè)分區(qū)。在此之后,reduce對所有的分區(qū)進(jìn)行笛卡爾積(Cartersian product)連接運(yùn)算,并生成全部的結(jié)果集。

以上MapReduce過程如圖4.2所示:

注:過濾(filtering)和投影(projection)

在MapReduce重分區(qū)連接中,最好能夠減少map階段傳輸?shù)絩educe階段的數(shù)據(jù)量。因?yàn)橥ㄟ^網(wǎng)絡(luò)在這兩個(gè)階段中排序和傳輸數(shù)據(jù)會產(chǎn)生很高的成本。如果不能避免reduce端的工作,那么一個(gè)最佳實(shí)踐就是盡可能在map階段多過濾數(shù)據(jù)和投影。過濾指的是將map極端的輸入數(shù)據(jù)中不需要的部分丟棄。投影是關(guān)系代數(shù)的概念。投影用于減少發(fā)送給reduce的字段。例如:在分析用戶數(shù)據(jù)的時(shí)候,如果只關(guān)注用戶的年齡,那么在map任務(wù)中應(yīng)該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。

技術(shù)19:優(yōu)化重分區(qū)連接

《Hadoop in Action》給出了一個(gè)例子,說明如何使用Hadoop的社區(qū)包(contrib package)org.apache.hadoop.contrib.utils.join實(shí)現(xiàn)重分區(qū)連接。這個(gè)貢獻(xiàn)包打包了所有的處理細(xì)節(jié),僅僅需要實(shí)現(xiàn)一個(gè)非常簡單的方法。

然而,這個(gè)社區(qū)包對重分區(qū)的實(shí)現(xiàn)方法的空間效率低下。它需要將待連接的所有輸出值都讀取到內(nèi)存中,然后進(jìn)行多路連接(multiway join)。實(shí)際上,如果僅僅將小數(shù)據(jù)集讀取到內(nèi)存中,然后用小數(shù)據(jù)集遍歷大數(shù)據(jù)集來進(jìn)行連接,這樣將更加高效。

問題

需要在MapReduce中進(jìn)行重分區(qū)連接,但是不希望在reduce階段將所有的數(shù)據(jù)都放到緩存中。

解決方案

這個(gè)技術(shù)運(yùn)用了優(yōu)化后的重分區(qū)框架。它僅僅將一個(gè)待連接的數(shù)據(jù)集放在緩存中,減少了reduce需要放在緩存中的數(shù)據(jù)。

討論

附錄D中介紹了優(yōu)化后的重分區(qū)框架的實(shí)現(xiàn)。這個(gè)實(shí)現(xiàn)是根據(jù)org.apache.hadoop.contrib.utils.join社區(qū)包進(jìn)行建模。這個(gè)優(yōu)化后的框架僅僅緩存兩個(gè)數(shù)據(jù)集中比較小的那一個(gè),以減少內(nèi)存消耗。圖4.3是優(yōu)化后的重分區(qū)連接的流程圖:

圖4.4是實(shí)現(xiàn)的類圖。類圖中包含兩個(gè)部分,一個(gè)通用框架和一些類的實(shí)現(xiàn)樣例。

使用這個(gè)連接框架需要實(shí)現(xiàn)抽象類OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。

例如,需要連接用戶詳情數(shù)據(jù)和用戶活動日志。第一步,判斷兩個(gè)數(shù)據(jù)集中那一個(gè)比較小。對于一般的網(wǎng)站來說,用戶詳情數(shù)據(jù)會比較小,用戶活動日志會比較大。

在如下示例中,用戶數(shù)據(jù)中有用戶姓名,年齡和所在州

  1. $ cat test-data/ch4/users.txt 
  2. anne 22 NY 
  3. joe 39 CO 
  4. alison 35 NY 
  5. mike 69 VA 
  6. marie 27 OR 
  7. jim 21 OR 
  8. bob 71 CA 
  9. mary 53 NY 
  10. dave 36 VA 
  11. dude 50 CA 

用戶活動日志中有用戶姓名,進(jìn)行的動作,來源IP。這個(gè)文件一般都要比用戶數(shù)據(jù)要大得多。

  1. $ cat test-data/ch4/user-logs.txt 
  2. jim logout 93.24.237.12 
  3. mike new_tweet 87.124.79.252 
  4. bob new_tweet 58.133.120.100 
  5. mike logout 55.237.104.36 
  6. jim new_tweet 93.24.237.12 
  7. marie view_user 122.158.130.90 

首先,必須實(shí)現(xiàn)抽象類OptimizedDataJoinMapperBase。這個(gè)將在map端被調(diào)用。這個(gè)類將創(chuàng)建map的輸出鍵和輸出值。同時(shí),它還將提示整個(gè)框架,當(dāng)前處理的文件是不是比較小的那個(gè)。

  1. public class SampleMap extends OptimizedDataJoinMapperBase { 
  2.  
  3.   private boolean smaller; 
  4.  
  5.   @Override 
  6.   protected Text generateInputTag(String inputFile) { 
  7.     // tag the row with input file name (data source) 
  8.     smaller = inputFile.contains("users.txt"); 
  9.     return new Text(inputFile); 
  10.   } 
  11.  
  12.   @Override 
  13.   protected String genGroupKey(Object key, OutputValue output) { 
  14.     return key.toString(); 
  15.   } 
  16.  
  17.   @Override 
  18.   protected boolean isInputSmaller(String inputFile) { 
  19.     return smaller; 
  20.   } 
  21.  
  22.   @Override 
  23.   protected OutputValue genMapOutputValue(Object o) { 
  24.     return new TextTaggedOutputValue((Text) o); 
  25.   } 

下一步,你需要實(shí)現(xiàn)抽象類 OptimizedDataJoinReducerBase。它將在reduce端被調(diào)用。在這個(gè)類中,將從map端傳入不同數(shù)據(jù)集的輸出鍵和輸出值,然后返回reduce端的輸出數(shù)組。

  1. public class SampleReduce extends OptimizedDataJoinReducerBase { 
  2.  
  3.   private TextTaggedOutputValue output = new TextTaggedOutputValue(); 
  4.   private Text textOutput = new Text(); 
  5.  
  6.   @Override 
  7.   protected OutputValue combine(String key, 
  8.                                 OutputValue smallValue, 
  9.                                 OutputValue largeValue) { 
  10.     if(smallValue == null || largeValue == null) { 
  11.       return null; 
  12.     } 
  13.     Object[] values = { 
  14.         smallValue.getData(), largeValue.getData() 
  15.     }; 
  16.     textOutput.set(StringUtils.join(values, "\t")); 
  17.     output.setData(textOutput); 
  18.     return output; 
  19.   } 

最后,任務(wù)的主代碼(driver code)需要指明InputFormat類,并設(shè)置次排序(Secondary sort)。

  1. job.setInputFormat(KeyValueTextInputFormat.class); 
  2.  
  3.     job.setMapOutputKeyClass(CompositeKey.class); 
  4.     job.setMapOutputValueClass(TextTaggedOutputValue.class); 
  5.     job.setOutputKeyClass(Text.class); 
  6.     job.setOutputValueClass(Text.class); 
  7.  
  8.     job.setPartitionerClass(CompositeKeyPartitioner.class); 
  9.     job.setOutputKeyComparatorClass(CompositeKeyComparator.class); 
  10.     job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class); 

現(xiàn)在連接的準(zhǔn)備工作就做完了,可以開始運(yùn)行連接:

  1. $ hadoop fs -put test-data/ch4/users.txt users.txt 
  2. $ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt 
  3. $ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output 
  4. $ hadoop fs -cat output/part* 
  5. bob 71 CA new_tweet 58.133.120.100 
  6. jim 21 OR logout 93.24.237.12 
  7. jim 21 OR new_tweet 93.24.237.12 
  8. jim 21 OR login 198.184.237.49 
  9. marie 27 OR login 58.133.120.100 
  10. marie 27 OR view_user 122.158.130.90 
  11. mike 69 VA new_tweet 87.124.79.252 
  12. mike 69 VA logout 55.237.104.36 

如果和連接的源文件相對比,可以看到因?yàn)閷?shí)現(xiàn)了一個(gè)內(nèi)連接,輸出中不包括用戶anne,alison等不存在于日志文件中的記錄。

小結(jié):

這個(gè)連接的實(shí)現(xiàn)通過只緩存比較小的數(shù)據(jù)集來提高來Hadoop社區(qū)包的效率。但是,當(dāng)數(shù)據(jù)從map階段傳輸?shù)絩educe階段的時(shí)候,仍然產(chǎn)生了很高的網(wǎng)絡(luò)成本。

此外,Hadoop社區(qū)包支持多路連接,這里的實(shí)現(xiàn)只支持二路連接。

如果要更多地減少reduce端連接的內(nèi)存足跡(memory footprint),一個(gè)簡單的機(jī)制是在map函數(shù)中更多地進(jìn)行投影操作。投影減少了map階段的輸出中的字段。例如:在分析用戶數(shù)據(jù)的時(shí)候,如果只關(guān)注用戶的年齡,那么在map任務(wù)中應(yīng)該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。這樣就減少了map和reduce之間的網(wǎng)絡(luò)負(fù)擔(dān),也減少了reduce在連接時(shí)的內(nèi)存消耗。

和原始的社區(qū)包一樣,這里的重分區(qū)的實(shí)現(xiàn)也支持過濾和投影。通過允許genMapOutputValue方法返回空值,就可以支持過濾。通過在genMapOutputValue方法中定義輸出值的內(nèi)容,就可以支持投影。

如果你既想輸出所有的數(shù)據(jù)到reduce,又想避免排序的損耗,就需要考慮另外兩種連接策略,復(fù)制連接和半連接。

附錄D 優(yōu)化后的MapReduce連接框架

在這個(gè)附錄,我們將討論在第4張中使用的兩個(gè)連接框架。第一個(gè)是重連接框架。它減少了org.apache.hadoop.contrib.utils.join包的實(shí)現(xiàn)的Hadoop連接的內(nèi)存足跡。第二個(gè)是復(fù)制連接框架。它可以將較小的數(shù)據(jù)集放在緩存中。

D.1 優(yōu)化后的重分區(qū)框架

Hadoop社區(qū)連接包需要將每個(gè)鍵的所有值都讀取到內(nèi)存中。如何才能在reduce端的連接減少內(nèi)存開銷呢?本文提供的優(yōu)化中,只需要緩存較小的數(shù)據(jù)集,然后在連接中遍歷較大數(shù)據(jù)集中的數(shù)據(jù)。這個(gè)方法中還包括針對map的輸出數(shù)據(jù)的次排序,那么reducer先接收到較小的數(shù)據(jù)集,然后接收到較大的數(shù)據(jù)集。圖D.1是這個(gè)過程的流程圖。

圖D.2是實(shí)現(xiàn)的類圖。類圖中包含兩個(gè)部分,一個(gè)通用框架和一些類的實(shí)現(xiàn)樣例。

連接框架

我們以和Hadoop社區(qū)連接包的近似的風(fēng)格編寫連接的代碼。目標(biāo)是創(chuàng)建可以處理任意數(shù)據(jù)集的通用重分區(qū)機(jī)制。為簡潔起見,我們重點(diǎn)說明主要部分。

首先是OptimizedDataJoinMapperBase類。這個(gè)類的作用是辨認(rèn)出較小的數(shù)據(jù)集,并生成輸出鍵和輸出值。Configure方法在mapper創(chuàng)建時(shí)被調(diào)用。Configure方法的作用之一是標(biāo)識每一個(gè)數(shù)據(jù)集,讓reducer可以區(qū)分?jǐn)?shù)據(jù)的源數(shù)據(jù)集。另一個(gè)作用是辨認(rèn)當(dāng)前的輸入數(shù)據(jù)是否是較小的數(shù)據(jù)集。

  1. protected abstract Text generateInputTag(String inputFile); 
  2.  
  3. protected abstract boolean isInputSmaller(String inputFile); 
  4.  
  5. public void configure(JobConf job) { 
  6.  
  7.     this.inputFile = job.get("map.input.file"); 
  8.     this.inputTag = generateInputTag(this.inputFile); 
  9.      
  10.     if(isInputSmaller(this.inputFile)) { 
  11.         smaller = new BooleanWritable(true); 
  12.         outputKey.setOrder(0); 
  13.     } else { 
  14.         smaller = new BooleanWritable(false); 
  15.         outputKey.setOrder(1); 
  16.     } 

Map方法首先調(diào)用自定義的方法 (generateTaggedMapOutput) 來生成OutputValue對象。這個(gè)對象包含了在連接中需要使用的值(也可能包含了最終輸出的值),和一個(gè)標(biāo)識較大或較小數(shù)據(jù)集的布爾值。如果map方法可以調(diào)用自定義的方法 (generateGroupKey) 來得到可以在連接中使用的鍵,那么這個(gè)鍵就作為map的輸出鍵。

  1. protected abstract OptimizedTaggedMapOutput generateTaggedMapOutput(Object value); 
  2.  
  3. protected abstract String generateGroupKey(Object key, OptimizedTaggedMapOutput aRecord); 
  4.  
  5. public void map(Object key, Object value, OutputCollector output, Reporter reporter) 
  6.     throws IOException { 
  7.      
  8.     OptimizedTaggedMapOutput aRecord = generateTaggedMapOutput(value); 
  9.      
  10.     if (aRecord == null) { 
  11.         return
  12.     } 
  13.      
  14.     aRecord.setSmaller(smaller); 
  15.     String groupKey = generateGroupKey(aRecord); 
  16.      
  17.     if (groupKey == null) { 
  18.         return
  19.     } 
  20.      
  21.     outputKey.setKey(groupKey); 
  22.     output.collect(outputKey, aRecord); 

圖D.3 說明了map輸出的組合鍵(composite 可以)和組合值。次排序?qū)鶕?jù)連接鍵(join key)進(jìn)行分區(qū),并用整個(gè)組合鍵來進(jìn)行排序。組合鍵包括一個(gè)標(biāo)識源數(shù)據(jù)集(較大或較小)的整形值,因此可以根據(jù)這個(gè)整形值來保證較小源數(shù)據(jù)集的值先于較大源數(shù)據(jù)的值被reduce接收。

下一步是深入reduce。此前已經(jīng)可以保證較小源數(shù)據(jù)集的值將會先于較大源數(shù)據(jù)集的值被接收。這里就可以將所有的較小源數(shù)據(jù)集的值放到緩存中。在開始接收較大源數(shù)據(jù)集的值的時(shí)候,就開始和緩存中的值做連接操作。

  1. public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) 
  2.     throws IOException {  
  3.  
  4.     CompositeKey k = (CompositeKey) key; 
  5.     List<OptimizedTaggedMapOutput> smaller = new ArrayList<OptimizedTaggedMapOutput>(); 
  6.      
  7.     while (values.hasNext()) { 
  8.         Object value = values.next(); 
  9.         OptimizedTaggedMapOutput cloned =((OptimizedTaggedMapOutput) value).clone(job); 
  10.          
  11.         if (cloned.isSmaller().get()) { 
  12.             smaller.add(cloned); 
  13.         } else { 
  14.             joinAndCollect(k, smaller, cloned, output, reporter); 
  15.         } 
  16.     } 

方法joinAndCollect包含了兩個(gè)數(shù)據(jù)集的值,并輸出它們。

 

  1. protected abstract OptimizedTaggedMapOutput combine( 
  2.                         String key, 
  3.                         OptimizedTaggedMapOutput value1, 
  4.                         OptimizedTaggedMapOutput value2); 
  5.                          
  6. private void joinAndCollect(CompositeKey key, 
  7.                             List<OptimizedTaggedMapOutput> smaller, 
  8.                             OptimizedTaggedMapOutput value, 
  9.                             OutputCollector output, 
  10.                             Reporter reporter) 
  11.     throws IOException { 
  12.      
  13.     if (smaller.size() < 1) { 
  14.         OptimizedTaggedMapOutput combined = combine(key.getKey(), null, value); 
  15.         collect(key, combined, output, reporter); 
  16.     } else { 
  17.         for (OptimizedTaggedMapOutput small : smaller) { 
  18.             OptimizedTaggedMapOutput combined = combine(key.getKey(), small, value); 
  19.             collect(key, combined, output, reporter); 
  20.         } 
  21.     } 

這些就是這個(gè)框架的主要內(nèi)容。

原文鏈接:http://www.cnblogs.com/datacloud/p/3578509.html

責(zé)任編輯:彭凡 來源: 博客園
相關(guān)推薦

2014-03-20 09:49:51

MapReduce

2015-08-21 13:50:49

Oracle連接

2009-07-22 10:53:42

MySQL左連接

2021-03-24 09:06:01

MySQL長連接短連接

2010-05-10 15:48:37

Unix連接

2011-03-28 14:04:10

SQL左連接右連接

2018-06-06 11:01:25

HTTP長連接短連接

2011-06-01 13:54:10

MySQL

2022-01-11 08:46:56

Oracle 在線重定義數(shù)據(jù)庫

2015-04-23 18:46:38

TCPTCP協(xié)議

2010-01-04 09:51:52

ADO連接對象

2010-11-08 15:47:01

SQL Server外

2010-11-11 13:51:36

SQL Server內(nèi)

2019-09-16 09:29:01

TCP全連接隊(duì)列半連接隊(duì)列

2010-06-07 15:24:34

Java連接MYSQL

2014-01-02 14:04:39

PostgreSQLPerl

2014-01-02 15:41:24

PostgreSQLPHP

2014-01-02 13:22:01

PythonPostgreSQL

2023-01-31 18:09:12

物聯(lián)網(wǎng)移動物聯(lián)網(wǎng)

2010-08-24 09:29:37

內(nèi)連接全連接
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號