大数据Hadoop第七周——Eclipse环境下java语言mapreduce程序开发环境配置+WordCount.java详解... 1 Eclipse环境下java语言mapreduce程序开发环境配置 2 新建Mapreduce项目 3 WordCount程序解释
大数据第七周
1.1 拷贝eclipse软件
先拷贝到根目录下,然后解压
tar -zxvf eclipse-java-2020-03-R-linux-gtk-x86_64.tar.gz
打开eclipse:进入eclipse 文件夹,输入:./eclipse
1.2 下载hadoop-eclipse插件
下载hadoop-eclipse-plugin-2.6.0.jar本插件与我们使用的hadoop版本2.7.7不对应,使用过程中有可能出现警告提示。实际开发过程中应该使用对应版本的插件。当前之所以使用是因为时间问题。
eclipse下hdfs配置
将hadoop-eclipse-plugin-2.6.0.jar拷贝到eclipse的路径下的dropins文件夹下(老版本放plugins,现在时dropins)。重启eclipse,点击windows,选择Preferences,出现下面界面。
点击左边的"Hadoop/Mapreduce",出现如下界面。
点击右边的"Browse",选择hadoop软件的安装目录(注意不是bin的),点击"Apply and Close"。
1.4 创建并配置Mapreduce Locations
如下图,从window菜单进入,选择"Other"。
选择Mapreduce Locations,点击"Open"。
出现如下界面:
点击下方的图标,打开如下界面:
在"Location name"栏中填入一个名字,任意字符串都行。
Map/Reduce(V2)Maseter标签页的Host,需要填入master节点的IP地址,填入其它内容(比如域名)会出现问题。
端口号,Map Reduce的Port填写yarn.site.xml里的yarn.resourcemanager.address。网上有多种说法,我们填8032。
DFS Master标签页中,端口号Port填写core-site.xml中的fs.defaultF5。
填好后结果如下:
点击"finish"。
2 新建Mapreduce项目
2.1 从主菜单file开始,如下图点击"Other"。
选择"Map/Reduce Project"。
点击next,输入名字,注意建立的位置。
填入名字后,点击"Finish",出现如下界面:
在左侧可以看到HDFS文件存储情况。
输入:hadoop fs -mkdir /aaa新建一个文件夹存放hadoop fs -put abc.txt/上传的文件。
接着新建一个Java Package,使用默认名字。
接着新建一个class,要把main加上。
从如下路径,将WordCount.java(官方demo)文件里的内容拷贝到新建的class文件里。
3 WordCount程序解释
WorldCount相当于MapReduce中的helloworld,它主要完成的任务是单词计数,即统计一系列文本文件中每个单词出现的次数。在Map端进行文章单词拆分,并且以k-v即 word-1这种形式发送出去,接着MapReduce在reduce端,相同的key即相同的word会把所有的1放在Iterable<Text> values中,只需要统计Iterable中Text有多少个1即可,最后输出。
从main开始。
3.1 main的参数
public static void main(String[] args) throws Exception { /*main函数有形参,所以运行时需要有一个参数传入,类型是String[], * 数组元素数量没有指定,即数组大小可以任意。 * 数组:[5,6,7,8]索引是:0,1,2,3,4数组的长度length=5。 * For(i=0;i<数组.length-1;i++)数组[i]数组最后一个元素没有使用。 */ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) {//输入的参数数量不小于2,否则会报错 System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); for (int i = 0; i < otherArgs.length - 1; ++i) { //输入参数中,除最后一个元素外,其他都是输入文件路径。 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //输入参数的最后一个元素是输出文件的路径。 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
}
3.2 main的流程
Hadoop都是以作业(job)形式运行用户程序。首先要告诉框架调用哪个类,指定本业务job要用的是mapper/Reducer业务类。
//job是一个对象。类实例化以后才是一个对象。getInstance用反射的方式获得一个实例,传入两个参数。其中,conf是输入路径和输出路径,另一个参数是字串,即这个job任务的名称。 Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class);//job是用哪一个class进行操作的。 job.setMapperClass(TokenizerMapper.class);//用TokenizerMapper分解任务 job.setCombinerClass(IntSumReducer.class);//将任务复合 job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class);//设置输出键值对的键的数据类型是Text.class job.setOutputValueClass(IntWritable.class); //设置输出键值对的值的数据类型是IntWritable.class,即int型
3.3 TokenizerMapper程序
map端主要任务是拆分单词。输入为一个split中的数据,对split中的数据进行拆分,并以 < key, value> 对的格式保存数据,其中 key 的值为一个单词,value的值固定为 1。如 < I , 1>、< wish, 1> …
输入的参数含义分别为:
Object:输入< key, value >对的 key 值,此处为文本数据的起始位置。
Text:输入< key, value >对的 value 值,此处为一段具体的文本数据。
Text:输出< key, value >对的 key 值,此处为一个单词。
IntWritable:输出< key, value >对的 value 值,此处固定为 1。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ // 从hadoop程序包Mapper中继承过来并在这个类的继承类中至少自定义实现 Map() 方法,其中 org.apache.hadoop.mapreduce.Mapper
//要求的参数有四个(keyIn、valueIn、keyOut、valueOut),即Map()任务的输入和输出都是< key,value >对的形式。 private final static IntWritable one = new IntWritable(1); //私有静态常量Int数据类型,对象的值是1,即one始终是1 private Text word = new Text();//word是text数据类型,即string类型。 // map通过参数指定重写了Mapper里的方法,Object是最原始的数据类型,说明key可以是任意的数据类型。Value是一个string。 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // itr是个分词器。先将传进来的文件内容value转为string类型,然后对它进行分词,结果保存在itr。 StringTokenizer itr = new StringTokenizer(value.toString()); //把词变成一个个,每个词作为一个键值对输出一次,写入到context上下文里。 //键是词,值一直是1 while (itr.hasMoreTokens()) {//判断是否还有词 word.set(itr.nextToken());//如果有就送给word将其写入word context.write(word, one); } } }
把输入数据分成一个一个的词,然后输出一个一个的键值对。每个键值对的值都是1。有多少个词就有多少个键值对,键是一个词,值是1。
3.4 IntSumReducer程序
统计单词个数。在map到reduce 的过程中,系统会自动进行combine、shuffle、sort等过程对map task的输出进行处理,因此reduce端的输入数据已经不仅仅是简单的< key, value >对的形式,而是一个一系列key值相同的序列化结构,如:< hello,1,1,2,2,3…>。因此,代码中value的值就是单词后面出现的序列化的结构:(1,1,1,2,2,3…….)
输入的四个参数含义分别为:
Text:输入< key, value >对的key值,此处为一个单词
IntWritable:输入< key, value >对的value值。
Text:输出< key, value >对的key值,此处为一个单词
IntWritable:输出< key, value >对,此处为相同单词词频累加之后的值。
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //IntSumReducer继承Reducer,重写这些对象与方法,Reducer 类的参数也是四个(keyIn、valueIn、keyOut、valueOut),即Reduce()任务的输入和输出都是< key,value >对的形式。 private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //检测val的个数 for (IntWritable val : values) {// val是从values里一个个遍历得到 sum += val.get();//sum把val里面的值取出来加起来 } result.set(sum);//把result的值赋成sum context.write(key, result); } }
所有代码:
package testHadoop1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { /*main函数有形参,所以运行时需要有一个参数传入,类型是String[], * 数组元素数量没有指定,即数组大小可以任意。 */ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) {//输入的参数数量不小于2,否则会报错 System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) {// FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
大数据第七周
1.1 拷贝eclipse软件
先拷贝到根目录下,然后解压
tar -zxvf eclipse-java-2020-03-R-linux-gtk-x86_64.tar.gz
打开eclipse:进入eclipse 文件夹,输入:./eclipse
1.2 下载hadoop-eclipse插件
下载hadoop-eclipse-plugin-2.6.0.jar本插件与我们使用的hadoop版本2.7.7不对应,使用过程中有可能出现警告提示。实际开发过程中应该使用对应版本的插件。当前之所以使用是因为时间问题。
eclipse下hdfs配置
将hadoop-eclipse-plugin-2.6.0.jar拷贝到eclipse的路径下的dropins文件夹下(老版本放plugins,现在时dropins)。重启eclipse,点击windows,选择Preferences,出现下面界面。
点击左边的"Hadoop/Mapreduce",出现如下界面。
点击右边的"Browse",选择hadoop软件的安装目录(注意不是bin的),点击"Apply and Close"。
1.4 创建并配置Mapreduce Locations
如下图,从window菜单进入,选择"Other"。
选择Mapreduce Locations,点击"Open"。
出现如下界面:
点击下方的图标,打开如下界面:
在"Location name"栏中填入一个名字,任意字符串都行。
Map/Reduce(V2)Maseter标签页的Host,需要填入master节点的IP地址,填入其它内容(比如域名)会出现问题。
端口号,Map Reduce的Port填写yarn.site.xml里的yarn.resourcemanager.address。网上有多种说法,我们填8032。
DFS Master标签页中,端口号Port填写core-site.xml中的fs.defaultF5。
填好后结果如下:
点击"finish"。
2 新建Mapreduce项目
2.1 从主菜单file开始,如下图点击"Other"。
选择"Map/Reduce Project"。
点击next,输入名字,注意建立的位置。
填入名字后,点击"Finish",出现如下界面:
在左侧可以看到HDFS文件存储情况。
输入:hadoop fs -mkdir /aaa新建一个文件夹存放hadoop fs -put abc.txt/上传的文件。
接着新建一个Java Package,使用默认名字。
接着新建一个class,要把main加上。
从如下路径,将WordCount.java(官方demo)文件里的内容拷贝到新建的class文件里。
3 WordCount程序解释
WorldCount相当于MapReduce中的helloworld,它主要完成的任务是单词计数,即统计一系列文本文件中每个单词出现的次数。在Map端进行文章单词拆分,并且以k-v即 word-1这种形式发送出去,接着MapReduce在reduce端,相同的key即相同的word会把所有的1放在Iterable<Text> values中,只需要统计Iterable中Text有多少个1即可,最后输出。
从main开始。
3.1 main的参数
public static void main(String[] args) throws Exception { /*main函数有形参,所以运行时需要有一个参数传入,类型是String[], * 数组元素数量没有指定,即数组大小可以任意。 * 数组:[5,6,7,8]索引是:0,1,2,3,4数组的长度length=5。 * For(i=0;i<数组.length-1;i++)数组[i]数组最后一个元素没有使用。 */ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) {//输入的参数数量不小于2,否则会报错 System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); for (int i = 0; i < otherArgs.length - 1; ++i) { //输入参数中,除最后一个元素外,其他都是输入文件路径。 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //输入参数的最后一个元素是输出文件的路径。 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
}
3.2 main的流程
Hadoop都是以作业(job)形式运行用户程序。首先要告诉框架调用哪个类,指定本业务job要用的是mapper/Reducer业务类。
//job是一个对象。类实例化以后才是一个对象。getInstance用反射的方式获得一个实例,传入两个参数。其中,conf是输入路径和输出路径,另一个参数是字串,即这个job任务的名称。 Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class);//job是用哪一个class进行操作的。 job.setMapperClass(TokenizerMapper.class);//用TokenizerMapper分解任务 job.setCombinerClass(IntSumReducer.class);//将任务复合 job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class);//设置输出键值对的键的数据类型是Text.class job.setOutputValueClass(IntWritable.class); //设置输出键值对的值的数据类型是IntWritable.class,即int型
3.3 TokenizerMapper程序
map端主要任务是拆分单词。输入为一个split中的数据,对split中的数据进行拆分,并以 < key, value> 对的格式保存数据,其中 key 的值为一个单词,value的值固定为 1。如 < I , 1>、< wish, 1> …
输入的参数含义分别为:
Object:输入< key, value >对的 key 值,此处为文本数据的起始位置。
Text:输入< key, value >对的 value 值,此处为一段具体的文本数据。
Text:输出< key, value >对的 key 值,此处为一个单词。
IntWritable:输出< key, value >对的 value 值,此处固定为 1。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ // 从hadoop程序包Mapper中继承过来并在这个类的继承类中至少自定义实现 Map() 方法,其中 org.apache.hadoop.mapreduce.Mapper
//要求的参数有四个(keyIn、valueIn、keyOut、valueOut),即Map()任务的输入和输出都是< key,value >对的形式。 private final static IntWritable one = new IntWritable(1); //私有静态常量Int数据类型,对象的值是1,即one始终是1 private Text word = new Text();//word是text数据类型,即string类型。 // map通过参数指定重写了Mapper里的方法,Object是最原始的数据类型,说明key可以是任意的数据类型。Value是一个string。 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // itr是个分词器。先将传进来的文件内容value转为string类型,然后对它进行分词,结果保存在itr。 StringTokenizer itr = new StringTokenizer(value.toString()); //把词变成一个个,每个词作为一个键值对输出一次,写入到context上下文里。 //键是词,值一直是1 while (itr.hasMoreTokens()) {//判断是否还有词 word.set(itr.nextToken());//如果有就送给word将其写入word context.write(word, one); } } }
把输入数据分成一个一个的词,然后输出一个一个的键值对。每个键值对的值都是1。有多少个词就有多少个键值对,键是一个词,值是1。
3.4 IntSumReducer程序
统计单词个数。在map到reduce 的过程中,系统会自动进行combine、shuffle、sort等过程对map task的输出进行处理,因此reduce端的输入数据已经不仅仅是简单的< key, value >对的形式,而是一个一系列key值相同的序列化结构,如:< hello,1,1,2,2,3…>。因此,代码中value的值就是单词后面出现的序列化的结构:(1,1,1,2,2,3…….)
输入的四个参数含义分别为:
Text:输入< key, value >对的key值,此处为一个单词
IntWritable:输入< key, value >对的value值。
Text:输出< key, value >对的key值,此处为一个单词
IntWritable:输出< key, value >对,此处为相同单词词频累加之后的值。
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //IntSumReducer继承Reducer,重写这些对象与方法,Reducer 类的参数也是四个(keyIn、valueIn、keyOut、valueOut),即Reduce()任务的输入和输出都是< key,value >对的形式。 private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //检测val的个数 for (IntWritable val : values) {// val是从values里一个个遍历得到 sum += val.get();//sum把val里面的值取出来加起来 } result.set(sum);//把result的值赋成sum context.write(key, result); } }
所有代码:
package testHadoop1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { /*main函数有形参,所以运行时需要有一个参数传入,类型是String[], * 数组元素数量没有指定,即数组大小可以任意。 */ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) {//输入的参数数量不小于2,否则会报错 System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) {// FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }