自己写的第一个地图-reduce程序
自己写的第一个map-reduce程序
今天尝试自己写了一个map-reduce程序,感觉不错.程序完成的是wordcount的功能,代码贴出来纪念一下:
package com.wjy.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyFirstHadoopTest extends Configured implements Tool{ public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ public void map(LongWritable key,Text value,Context context) throws IOException{ String str=value.toString(); try{ String temp[]=str.split(" "); context.write(new Text(temp[0]), new IntWritable(1)); context.write(new Text(temp[1]), new IntWritable(1)); }catch(ArrayIndexOutOfBoundsException e){ return; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ public void reduce (Text key,Iterable<IntWritable> values,Context context){ int count=0; for(IntWritable intNum : values){ count++; } try { context.write(key, new IntWritable(count)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf=getConf(); Job job=new Job(conf,"com.wjy.hadoop.MyFirstHadoopTest"); job.setJarByClass(com.wjy.hadoop.MyFirstHadoopTest.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String args[]){ try { int res=ToolRunner.run(new Configuration(), new MyFirstHadoopTest(),args); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
ps:在run configuration中设置一下hdfs的路径作为程序的输入和输出:
hdfs://localhost:9000/user/root/in hdfs://localhost:9000/user/root/out
在arguments中填上上面.切记要保证程序运行之前out文件夹不存在.
当然也可以将程序打包成jar文件,执行:
root@wjy-Lenovo:/usr/local/hadoop-0.20.2# bin/hadoop jar /usr/local/FirstHadoopTest.jar in myout
root@wjy-Lenovo:/usr/local/hadoop-0.20.2# bin/hadoop dfs -cat ./myout/*
注意:在eclipse中配置hadoop的时候,DFS Master的端口号设置为core-site.xml文件中fs.default.name指定的端口号.
Map/Reduce Master的端口号设置为:mapred-site.xml文件中mapred.job.tracker指定的端口号.