课堂练习之mapperduce

Map类:

package com.lq.testt;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.StringTokenizer;
import java.io.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class Map extends Mapper<LongWritable, Text, Text, Writable>
{
//    @Override
//    protected void map(LongWritable key, Text value, Context context)
//            throws IOException, InterruptedException 
//    {
//        String line = value.toString();
//        //切分成各个字段
//        String[] fields = StringUtils.split(line, "[,]");
//        
//        //拿到我们需要的字段
//        String ip = fields[0]+"."+fields[1]+"."+fields[2]+"."+fields[3];
//        String article=fields[7]+"/"+fields[8];
//        String time=fields[4];
//        int traffic=new Integer(fields[6]);
//        String video=fields[7]+"/"+fields[8];
//        //封装数据为kv并输出      
//        context.write(new Text(ip), new Writable(ip, time, traffic, article, video));
//    }
     BufferedReader br=new  BufferedReader(new FileReader("D:"+File.separator+"result.txt"));//map存放引用数据类型
       Map<String,Integer> map=new HashMap<String,Integer>();
       String line=null;
       //读取文件,将向map中添加单词
       while((line=br.readLine())!=null)
       {
    String[] fields = StringUtils.split(line, "[,]");
    
    //拿到我们需要的字段
    String ip = fields[0]+"."+fields[1]+"."+fields[2]+"."+fields[3];
    String article=fields[7]+"/"+fields[8];
    String time=fields[4];
    int traffic=new Integer(fields[6]);
    String video=fields[7]+"/"+fields[8];
}

Reduce类

package com.lq.testt;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reduce extends Reducer<Text, Writable, Text, Writable>
{
   @Override
  protected void reduce(Text key, Iterable<Writable> values, Context context)
        throws IOException, InterruptedException 
   {
       String time=null;
       int traffic=0;
       String article=null;
       String video=null;
       for(Writable writable:values)
       {
           time+=writable.getTime();
           traffic+=writable.getTraffic();
           article+=writable.getArticle();
           video=writable.getVideo();
       }
      context.write(key, new Writable(key.toString(), time, traffic, article, video));
   }
}

Writable类:

package com.lq.testt;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Writable implements org.apache.hadoop.io.Writable
{
   private String ip;
   private String time;
   private int traffic;
   private String article;
   private String video;
   public Writable()
   {
    super();
    }
   public Writable(String ip, String time, int traffic, String article, String video)
   {
    super();
    this.ip = ip;
    this.time = time;
    this.traffic = traffic;
    this.article = article;
    this.video = video;
  }
   public void write(DataOutput out) throws IOException
   {
       out.writeInt(this.traffic);
       out.writeUTF(this.ip);
       out.writeUTF(this.time);
       out.writeUTF(this.article);
       out.writeUTF(this.video);
   }
   @Override
   public void readFields(DataInput in) throws IOException
   {
      this.traffic=in.readInt();
      this.ip=in.readUTF();
      this.time=in.readUTF();
      this.article=in.readUTF();
      this.video=in.readUTF();
       
   }
  public String getIp() {
    return ip;
}
public void setIp(String ip) {
    this.ip = ip;
}
public String getTime() {
    return time;
}
public void setTime(String time) {
    this.time = time;
}
public int getTraffic() {
    return traffic;
}
public void setTraffic(int traffic) {
    this.traffic = traffic;
}
public String getArticle() {
    return article;
}
public void setArticle(String article) {
    this.article = article;
}
public String getVideo() {
    return video;
}
public void setVideo(String video) {
    this.video = video;
}

}

Runn类:

package com.lq.testt;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Runner 
{
   public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
   {
       Configuration conf=new Configuration();
        Job job=Job.getInstance(conf);  //调用计算程序,封装计算程序的mapper,reduce,输入,输出
        job.setJarByClass(Runner.class);   //设置主驱动类反射  Hadoop运行是jar包类型
        job.setMapperClass(Map.class);//设置mapper类
        job.setReducerClass(Reduce.class);//设置reduce类
        job.setMapOutputKeyClass(Text.class); //设置map的输出类型
        job.setMapOutputValueClass(Writable.class);
        job.setOutputKeyClass(Text.class);//设置reduce的输出类型
        job.setOutputValueClass(Writable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0])); //设置输入,需要统计单词的路径,args[0]为控制台手动输入的参数
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出,最终结果输出的路径,输出路径之前不能存在
//        job.submit();  //job提交,一般不打印日志
        job.waitForCompletion(true); //true为打印执行日志 
   }
}