Hadoop(最新版ChainMapper链接MapReduce功课带原理分析)
Hadoop(最新版ChainMapper链接MapReduce作业带原理分析)
代码很简单,就是取出来编号为100和101的学生。主要是体现ChainMapper链接MapReduce作业。值得注意网上的写法都太旧了,所以写出来一个新的供大家参考
package com.zzg.test1; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapClass1 extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String[] citation=ivalue.toString().split(" "); if(!citation[0].equals("100")) { context.write(new Text(citation[0]), new Text(ivalue)); } } }
package com.zzg.test1; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapClass2 extends Mapper<Text, Text, Text, Text> { public void map(Text ikey, Text ivalue, Context context) throws IOException, InterruptedException { String[] citation=ivalue.toString().split(" "); if(!ikey.toString().equals("101")) { context.write(ikey, ivalue); } } }
package com.zzg.test1; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // process values for (Text val : values) { context.write(null, val); } } }
package com.zzg.test1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.zzg.test1.MapClass1; import com.zzg.test1.MapClass2; public class Drive { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(com.zzg.test1.Drive.class); Configuration map1Conf = new Configuration(false); ChainMapper.addMapper(job, MapClass1.class, LongWritable.class, Text.class, Text.class, Text.class, map1Conf); Configuration map2Conf = new Configuration(false); ChainMapper.addMapper(job, MapClass2.class, Text.class, Text.class, Text.class, Text.class, map2Conf); Configuration map3Conf = new Configuration(false); job.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output/test5")); if (!job.waitForCompletion(true)) return; } }
ChainMapper.txt 100 jack 90 23 78 101 zzg 85 21 32 102 qw 60 12 36
WARN [main] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable INFO [main] - session.id is deprecated. Instead, use dfs.metrics.session-id INFO [main] - Initializing JVM Metrics with processName=JobTracker, sessionId= WARN [main] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. WARN [main] - No job jar file set. User classes may not be found. See Job or Job#setJar(String). INFO [main] - Total input paths to process : 1 INFO [main] - number of splits:1 INFO [main] - Submitting tokens for job: job_local1466046461_0001 INFO [main] - The url to track the job: http://localhost:8080/ INFO [main] - Running job: job_local1466046461_0001 INFO [Thread-12] - OutputCommitter set in config null INFO [Thread-12] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter INFO [Thread-12] - Waiting for map tasks INFO [LocalJobRunner Map Task Executor #0] - Starting task: attempt_local1466046461_0001_m_000000_0 INFO [LocalJobRunner Map Task Executor #0] - Using ResourceCalculatorProcessTree : [ ] INFO [LocalJobRunner Map Task Executor #0] - Processing split: hdfs://localhost:9000/input/ChainMapper.txt:0+35 INFO [LocalJobRunner Map Task Executor #0] - (EQUATOR) 0 kvi 26214396(104857584) INFO [LocalJobRunner Map Task Executor #0] - mapreduce.task.io.sort.mb: 100 INFO [LocalJobRunner Map Task Executor #0] - soft limit at 83886080 INFO [LocalJobRunner Map Task Executor #0] - bufstart = 0; bufvoid = 104857600 INFO [LocalJobRunner Map Task Executor #0] - kvstart = 26214396; length = 6553600 INFO [LocalJobRunner Map Task Executor #0] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer INFO [main] - Job job_local1466046461_0001 running in uber mode : false INFO [main] - map 0% reduce 0% INFO [LocalJobRunner Map Task Executor #0] - INFO [LocalJobRunner Map Task Executor #0] - Starting flush of map output INFO [LocalJobRunner Map Task Executor #0] - Spilling map output INFO [LocalJobRunner Map Task Executor #0] - bufstart = 0; bufend = 16; bufvoid = 104857600 INFO [LocalJobRunner Map Task Executor #0] - kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600 INFO [LocalJobRunner Map Task Executor #0] - Finished spill 0 INFO [LocalJobRunner Map Task Executor #0] - Task:attempt_local1466046461_0001_m_000000_0 is done. And is in the process of committing INFO [LocalJobRunner Map Task Executor #0] - map INFO [LocalJobRunner Map Task Executor #0] - Task 'attempt_local1466046461_0001_m_000000_0' done. INFO [LocalJobRunner Map Task Executor #0] - Finishing task: attempt_local1466046461_0001_m_000000_0 INFO [Thread-12] - map task executor complete. INFO [Thread-12] - Waiting for reduce tasks INFO [pool-6-thread-1] - Starting task: attempt_local1466046461_0001_r_000000_0 INFO [pool-6-thread-1] - Using ResourceCalculatorProcessTree : [ ] INFO [pool-6-thread-1] - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2be55f70 INFO [pool-6-thread-1] - MergerManager: memoryLimit=503893184, maxSingleShuffleLimit=125973296, mergeThreshold=332569504, ioSortFactor=10, memToMemMergeOutputsThreshold=10 INFO [EventFetcher for fetching Map Completion Events] - attempt_local1466046461_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events INFO [localfetcher#1] - localfetcher#1 about to shuffle output of map attempt_local1466046461_0001_m_000000_0 decomp: 20 len: 24 to MEMORY INFO [localfetcher#1] - Read 20 bytes from map-output for attempt_local1466046461_0001_m_000000_0 INFO [localfetcher#1] - closeInMemoryFile -> map-output of size: 20, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->20 INFO [EventFetcher for fetching Map Completion Events] - EventFetcher is interrupted.. Returning INFO [pool-6-thread-1] - 1 / 1 copied. INFO [pool-6-thread-1] - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs INFO [main] - map 100% reduce 0% INFO [pool-6-thread-1] - Merging 1 sorted segments INFO [pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 14 bytes INFO [pool-6-thread-1] - Merged 1 segments, 20 bytes to disk to satisfy reduce memory limit INFO [pool-6-thread-1] - Merging 1 files, 24 bytes from disk INFO [pool-6-thread-1] - Merging 0 segments, 0 bytes from memory into reduce INFO [pool-6-thread-1] - Merging 1 sorted segments INFO [pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 14 bytes INFO [pool-6-thread-1] - 1 / 1 copied. INFO [pool-6-thread-1] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords INFO [pool-6-thread-1] - Task:attempt_local1466046461_0001_r_000000_0 is done. And is in the process of committing INFO [pool-6-thread-1] - 1 / 1 copied. INFO [pool-6-thread-1] - Task attempt_local1466046461_0001_r_000000_0 is allowed to commit now INFO [pool-6-thread-1] - Saved output of task 'attempt_local1466046461_0001_r_000000_0' to hdfs://localhost:9000/output/test5/_temporary/0/task_local1466046461_0001_r_000000 INFO [pool-6-thread-1] - reduce > reduce INFO [pool-6-thread-1] - Task 'attempt_local1466046461_0001_r_000000_0' done. INFO [pool-6-thread-1] - Finishing task: attempt_local1466046461_0001_r_000000_0 INFO [Thread-12] - reduce task executor complete. INFO [main] - map 100% reduce 100% INFO [main] - Job job_local1466046461_0001 completed successfully INFO [main] - Counters: 38 File System Counters FILE: Number of bytes read=388 FILE: Number of bytes written=498456 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=70 HDFS: Number of bytes written=12 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=3 Map output records=1 Map output bytes=16 Map output materialized bytes=24 Input split bytes=108 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=24 Reduce input records=1 Reduce output records=1 Spilled Records=2 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=606 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=344981504 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=35 File Output Format Counters Bytes Written=12
代码很简单,就是取出来编号为100和101的学生。主要是体现ChainMapper链接MapReduce作业。值得注意网上的写法都太旧了,所以写出来一个新的供大家参考
一开始我就是把import org.apache.hadoop.mapreduce.lib.chain.ChainMapper这个正确的包导错了,这个包如果你导对了,根据提示你就能写出的最新的API来了,
和旧版不同的就是里面的参数变化比较大,没有了(boolean byValue),而且(JobConf mapperConf)被替换成了Configurantion
另外给大家讲讲原理:
ChainMapper/ChainReducer主要为了解决线性链式Mapper而提出的。也就是说,在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,也就是这里的MapReduce1的输出作为了MapReduce2的输入,所以我们要特别注意数据格式之间的对应关系,然后还要提醒一点的就是对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但Reducer只能有一个。原理图给个参考
其实仔细想想就是在MapReduce的基础上,可以链接多个Mapper了。
Mapper,Reduce的顺序是可以根据你程序先后执行顺序任意调的。今天就到这了,欢迎大家继续留意我的博客
版权声明:本文为博主原创文章,未经博主允许不得转载。