利用 MapReduce分析明星微博數(shù)據(jù)實(shí)戰(zhàn)
互聯(lián)網(wǎng)時(shí)代的到來(lái),使得名人的形象變得更加鮮活,也拉近了明星和粉絲之間的距離。歌星、影星、體育明星、作家等名人通過(guò)互聯(lián)網(wǎng)能夠輕易實(shí)現(xiàn)和粉絲的互動(dòng),賺錢也變得***的簡(jiǎn)單。同時(shí),互聯(lián)網(wǎng)的飛速發(fā)展本身也造就了一批互聯(lián)網(wǎng)明星,這些人借助新的手段,***程度發(fā)揮了粉絲經(jīng)濟(jì)的能量和作用,在互聯(lián)網(wǎng)時(shí)代賺得盆滿缽滿。
正是基于這樣一個(gè)大背景,今天我們做一個(gè)分析明星微博數(shù)據(jù)的小項(xiàng)目。
1、項(xiàng)目需求
自定義輸入格式,將明星微博數(shù)據(jù)排序后按粉絲數(shù)關(guān)注數(shù) 微博數(shù)分別輸出到不同文件中。
2、數(shù)據(jù)集
明星 明星微博名稱 粉絲數(shù) 關(guān)注數(shù) 微博數(shù)
俞灝明 俞灝明 10591367 206 558
李敏鎬 李敏鎬 22898071 11 268
林心如 林心如 57488649 214 5940
黃曉明 黃曉明 22616497 506 2011
張靚穎 張靚穎 27878708 238 3846
李娜 李娜 23309493 81 631
徐小平 徐小平 11659926 1929 13795
唐嫣 唐嫣 24301532 200 2391
有斐君 有斐君 8779383 577 4251
3、分析
自定義InputFormat讀取明星微博數(shù)據(jù),通過(guò)自定義getSortedHashtableByValue方法分別對(duì)明星的fan、followers、microblogs數(shù)據(jù)進(jìn)行排序,然后利用MultipleOutputs輸出不同項(xiàng)到不同的文件中
4、實(shí)現(xiàn)
1)、定義WeiBo實(shí)體類,實(shí)現(xiàn)WritableComparable接口
- package com.buaa;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.WritableComparable;
- /**
- * @ProjectName MicroblogStar
- * @PackageName com.buaa
- * @ClassName WeiBo
- * @Description TODO
- * @Author 劉吉超
- * @Date 2016-05-07 14:54:29
- */
- public class WeiBo implements WritableComparable<Object> {
- // 粉絲
- private int fan;
- // 關(guān)注
- private int followers;
- // 微博數(shù)
- private int microblogs;
- public WeiBo(){};
- public WeiBo(int fan,int followers,int microblogs){
- this.fan = fan;
- this.followers = followers;
- this.microblogs = microblogs;
- }
- public void set(int fan,int followers,int microblogs){
- this.fan = fan;
- this.followers = followers;
- this.microblogs = microblogs;
- }
- // 實(shí)現(xiàn)WritableComparable的readFields()方法,以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸入
- @Override
- public void readFields(DataInput in) throws IOException {
- fan = in.readInt();
- followers = in.readInt();
- microblogs = in.readInt();
- }
- // 實(shí)現(xiàn)WritableComparable的write()方法,以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸出
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(fan);
- out.writeInt(followers);
- out.writeInt(microblogs);
- }
- @Override
- public int compareTo(Object o) {
- // TODO Auto-generated method stub
- return 0;
- }
- public int getFan() {
- return fan;
- }
- public void setFan(int fan) {
- this.fan = fan;
- }
- public int getFollowers() {
- return followers;
- }
- public void setFollowers(int followers) {
- this.followers = followers;
- }
- public int getMicroblogs() {
- return microblogs;
- }
- public void setMicroblogs(int microblogs) {
- this.microblogs = microblogs;
- }
- }
2)、自定義WeiboInputFormat,繼承FileInputFormat抽象類
- package com.buaa;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.util.LineReader;
- /**
- * @ProjectName MicroblogStar
- * @PackageName com.buaa
- * @ClassName WeiboInputFormat
- * @Description TODO
- * @Author 劉吉超
- * @Date 2016-05-07 10:23:28
- */
- public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{
- @Override
- public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
- // 自定義WeiboRecordReader類,按行讀取
- return new WeiboRecordReader();
- }
- public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
- public LineReader in;
- // 聲明key類型
- public Text lineKey = new Text();
- // 聲明 value類型
- public WeiBo lineValue = new WeiBo();
- @Override
- public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
- // 獲取split
- FileSplit split = (FileSplit)input;
- // 獲取配置
- Configuration job = context.getConfiguration();
- // 分片路徑
- Path file = split.getPath();
- FileSystem fs = file.getFileSystem(job);
- // 打開(kāi)文件
- FSDataInputStream filein = fs.open(file);
- in = new LineReader(filein,job);
- }
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- // 一行數(shù)據(jù)
- Text line = new Text();
- int linesize = in.readLine(line);
- if(linesize == 0)
- return false;
- // 通過(guò)分隔符'\t',將每行的數(shù)據(jù)解析成數(shù)組
- String[] pieces = line.toString().split("\t");
- if(pieces.length != 5){
- throw new IOException("Invalid record received");
- }
- int a,b,c;
- try{
- // 粉絲
- a = Integer.parseInt(pieces[2].trim());
- // 關(guān)注
- b = Integer.parseInt(pieces[3].trim());
- // 微博數(shù)
- c = Integer.parseInt(pieces[4].trim());
- }catch(NumberFormatException nfe){
- throw new IOException("Error parsing floating poing value in record");
- }
- //自定義key和value值
- lineKey.set(pieces[0]);
- lineValue.set(a, b, c);
- return true;
- }
- @Override
- public void close() throws IOException {
- if(in != null){
- in.close();
- }
- }
- @Override
- public Text getCurrentKey() throws IOException, InterruptedException {
- return lineKey;
- }
- @Override
- public WeiBo getCurrentValue() throws IOException, InterruptedException {
- return lineValue;
- }
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
- }
- }
3)、編寫mr程序
- package com.buaa;
- import java.io.IOException;
- import java.util.Arrays;
- import java.util.Comparator;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Map.Entry;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- /**
- * @ProjectName MicroblogStar
- * @PackageName com.buaa
- * @ClassName WeiboCount
- * @Description TODO
- * @Author 劉吉超
- * @Date 2016-05-07 09:07:36
- */
- public class WeiboCount extends Configured implements Tool {
- // tab分隔符
- private static String TAB_SEPARATOR = "\t";
- // 粉絲
- private static String FAN = "fan";
- // 關(guān)注
- private static String FOLLOWERS = "followers";
- // 微博數(shù)
- private static String MICROBLOGS = "microblogs";
- public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {
- @Override
- protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {
- // 粉絲
- context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));
- // 關(guān)注
- context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));
- // 微博數(shù)
- context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));
- }
- }
- public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
- private MultipleOutputs<Text, IntWritable> mos;
- protected void setup(Context context) throws IOException, InterruptedException {
- mos = new MultipleOutputs<Text, IntWritable>(context);
- }
- protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {
- Map<String,Integer> map = new HashMap< String,Integer>();
- for(Text value : Values){
- // value = 名稱 + (粉絲數(shù) 或 關(guān)注數(shù) 或 微博數(shù))
- String[] records = value.toString().split(TAB_SEPARATOR);
- map.put(records[0], Integer.parseInt(records[1].toString()));
- }
- // 對(duì)Map內(nèi)的數(shù)據(jù)進(jìn)行排序
- Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);
- for(int i = 0; i < entries.length;i++){
- mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());
- }
- }
- protected void cleanup(Context context) throws IOException, InterruptedException {
- mos.close();
- }
- }
- @SuppressWarnings("deprecation")
- @Override
- public int run(String[] args) throws Exception {
- // 配置文件對(duì)象
- Configuration conf = new Configuration();
- // 判斷路徑是否存在,如果存在,則刪除
- Path mypath = new Path(args[1]);
- FileSystem hdfs = mypath.getFileSystem(conf);
- if (hdfs.isDirectory(mypath)) {
- hdfs.delete(mypath, true);
- }
- // 構(gòu)造任務(wù)
- Job job = new Job(conf, "weibo");
- // 主類
- job.setJarByClass(WeiboCount.class);
- // Mapper
- job.setMapperClass(WeiBoMapper.class);
- // Mapper key輸出類型
- job.setMapOutputKeyClass(Text.class);
- // Mapper value輸出類型
- job.setMapOutputValueClass(Text.class);
- // Reducer
- job.setReducerClass(WeiBoReducer.class);
- // Reducer key輸出類型
- job.setOutputKeyClass(Text.class);
- // Reducer value輸出類型
- job.setOutputValueClass(IntWritable.class);
- // 輸入路徑
- FileInputFormat.addInputPath(job, new Path(args[0]));
- // 輸出路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- // 自定義輸入格式
- job.setInputFormatClass(WeiboInputFormat.class) ;
- //自定義文件輸出類別
- MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);
- MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);
- MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);
- // 去掉job設(shè)置outputFormatClass,改為通過(guò)LazyOutputFormat設(shè)置
- LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
- //提交任務(wù)
- return job.waitForCompletion(true)?0:1;
- }
- // 對(duì)Map內(nèi)的數(shù)據(jù)進(jìn)行排序(只適合小數(shù)據(jù)量)
- @SuppressWarnings("unchecked")
- public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {
- Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);
- // 排序
- Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {
- public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
- return entry2.getValue().compareTo(entry1.getValue());
- }
- });
- return entries;
- }
- public static void main(String[] args) throws Exception {
- String[] args0 = {
- "hdfs://ljc:9000/buaa/microblog/weibo.txt",
- "hdfs://ljc:9000/buaa/microblog/out/"
- };
- int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
- System.exit(ec);
- }
- }
5、運(yùn)行結(jié)果