mapreduce top n實現(xiàn)方式實例
在最初接觸mapreduce時,top n 問題的解決辦法是將mapreduce輸出(排序后)放入一個集合中,取前n個,但這種寫法過于簡單,內(nèi)存能夠加載的集合的大小是有上限的,一旦數(shù)據(jù)量大,很容易出現(xiàn)內(nèi)存溢出。
今天在這里介紹另一種實現(xiàn)方式,當然這也不是***的方式,不過正所謂一步一個腳印,邁好每一步,以后的步伐才能更堅定,哈哈說了點題外話。恩恩,以后還會有更好的方式需求,得到top ***的前n條記錄。
這里只給出一些核心的代碼,其他job等配置的代碼略
。Configuration conf = new Configuration();
conf.setInt("N", 5);
初始化job之前需要 conf.setInt("N",5); 意在在mapreduce階段讀取N,N就代表著top N。
以下是map
- package com.lzz.one;
- import java.io.IOException;
- import java.util.Arrays;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- /**
- * topN
- * #orderid,userid,payment,productid
- * [root@x00 hd]# cat seventeen_a.txt
- * 1,9819,100,121
- * 2,8918,2000,111
- * 3,2813,1234,22
- * 4,9100,10,1101
- * 5,3210,490,111
- * 6,1298,28,1211
- * 7,1010,281,90
- * 8,1818,9000,20
- * [root@x00 hd]# cat seventeen_b.txt
- * 100,3333,10,100
- * 101,9321,1000,293
- * 102,3881,701,20
- * 103,6791,910,30
- * 104,8888,11,39
- * 預測結果:(求 Top N=5 的結果)
- * 1 9000
- * 2 2000
- * 3 1234
- * 4 1000
- * 5 910
- * @author Administrator
- *
- */
- public class TopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
- int len;
- int top[];
- @Override
- public void setup(Context context) throws IOException,InterruptedException {
- len = context.getConfiguration().getInt("N", 10);
- top = new int[len+1];
- }
- @Override
- public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
- String line = value.toString();
- String arr []= line.split(",");
- if(arr != null && arr.length == 4){
- int pay = Integer.parseInt(arr[2]);
- add(pay);
- }
- }
- public void add(int pay){
- top[0] = pay;
- Arrays.sort(top);
- }
- @Override
- public void cleanup(Context context) throws IOException,InterruptedException {
- for(int i=1;i<=len;i++){
- <span></span>context.write(new IntWritable(top[i]),new IntWritable(top[i]));
- <span></span>}
- }
- }
- <div>
- </div>
接下來是reduce
- package com.lzz.one;
- import java.io.IOException;
- import java.util.Arrays;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.Reducer;
- public class TopNReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
- int len;
- int top[];
- @Override
- public void setup(Context context)
- throws IOException, InterruptedException {
- len = context.getConfiguration().getInt("N", 10);
- top = new int[len+1];
- }
- @Override
- public void reduce(IntWritable key, Iterable<IntWritable> values,
- Context context)
- throws IOException, InterruptedException {
- for(IntWritable val : values){
- add(val.get());
- }
- }
- public void add(int pay){
- top[0] = pay;
- Arrays.sort(top);
- }
- @Override
- public void cleanup(Context context)
- throws IOException, InterruptedException {
- for(int i=len;i>0;i--){
- context.write(new IntWritable(len-i+1),new IntWritable(top[i]));
- }
- }
- }
說一下邏輯,雖然畫圖比較清晰,但是時間有限,畫圖水平有限,只用語言來描述吧,希望能說的明白。
如果要取top 5,則應該定義一個長度為為6的數(shù)組,map所要做的事情就是將每條日志的那個需要排序的字段放入數(shù)組***個元素中,調(diào)用Arrays.sort(Array[])方法可以將數(shù)組按照正序,從數(shù)字角度說是從小到大排序,比如***條記錄是9000,那么排序結果是[0,0,0,0,0,9000],第二條日志記錄是8000,排序結果是[0,0,0,0,8000,9000],第三條日志記錄是8500,排序結果是[0,0,0,8000,8500,9000],以此類推,每次放進去一個數(shù)字如果大于數(shù)組里面最小的元素,相當于將最小的覆蓋掉了,也就是說數(shù)組中元素永遠是拿到日志中***的那些個記錄。
ok,map將數(shù)組原封不動按照順序輸出,reduce接收到從每個map拿到的五個排好序的元素,在進行跟map一樣的排序,排序后數(shù)組里面就是按照從小到大排好序的元素,將這些元素倒序輸出就是最終我們要的結果了。
與之前的方式做個比較,之前的map做的事情很少,在reduce中排序后哪前5條,reduce的壓力是很大的,要把所有的數(shù)據(jù)都處理一遍,而一般設置reduce的個數(shù)較少,一旦數(shù)據(jù)較多,reduce就會承受不了,悲劇了。而現(xiàn)在的方式巧妙的將reduce的壓力轉移到了map,而map是集群效應的,很多臺服務器來做這件事情,減少了一臺機器上的負擔,每個map其實只是輸出了5個元素而已,如果有5個map,其實reduce才對5*5個數(shù)據(jù)進行了操作,也就不會出現(xiàn)內(nèi)存溢出等問題了。