Hadoop源码分析34 Child的Map

提交作业:

hadoop  jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount /user/admin/in/yellow2.txt /user/admin/out/128

 

生成2Map2Reduce任务。

 

执行Maps[0]:

args=[127.0.0.1, 40996, attempt_201404282305_0001_m_000000_0,/opt/hadoop-1.0.0/logs/userlogs/job_201404282305_0001/attempt_201404282305_0001_m_000000_0,-518526792]

 

cwd =/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/work

 

jobTokenFile= /tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/jobToken

jvmContext=JvmContext(jvmId=jvm_201404282305_0001_m_,pid=29184)

 

myTask=JvmTask{shouldDie=false,t=MapTask{taskId=attempt_201404282305_0001_m_000000_0,jobFile="/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml" ,jobSetup=false, jobCleanup=false, taskCleanup     =false, taskStatus=MapTaskStatus { UNASSIGNED},splitMetaInfo=JobSplit$TaskSplitIndex={splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404282305_0001/job.split",startOffset=7     }}

 

job=JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

 

TaskjobContext=JobContext{job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,}

 

TasktaskContext=TaskAttemptContext(job=Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml,id=job_201404282305_0001,taskId=attempt_201404282305_0001_m_000000_0, reporter=org.apache.hadoop.mapred.Task$TaskReporter@67323b17);

 

TasktaskStatus=RUNNING

 

TaskoutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat@a27b2d9

 

Taskcommitter =FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]]outputPath=/user/admin/out/128workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}

 

runNewMapper(job,splitMetaInfo,umbilical, reporter)里面的变量:

 

taskContext=TaskAttemptContext{ conf=JobConf  {Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}              jobId=job_201404282305_0001taskId=attempt_201404282305_0001_m_000000_0}

 

mapper=org.apache.hadoop.examples.WordCount$TokenizerMapper@3e4a762a

 

inputFormat=org.apache.hadoop.mapreduce.lib.input.TextInputFormat@21833d8a

 

split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864

 

input=NewTrackingRecordReader{  

inputSplit=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864

job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

real= org.apache.hadoop.mapreduce.lib.input.LineRecordReader@5afbee67

}

 

output= NewOutputCollector{

collector =newMapOutputBuffer{............}

partitions=2

partitioner=org.apache.hadoop.mapreduce.lib.partition.HashPartitioner@5c66b7ea

}

 

output的成员collector(NewOutputCollector类型)

job= JobConf{Configuration:core-default.xml, core-site.xml, mapred-default.xml,mapred-site.xml, hdfs-default.xml, hdfs-site.xml,/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/job.xml}

localFs= LocalFileSystem@fdcb254

partitions =2

rfs= rawLocalFileSystem

PARTITION= 0;// partitionoffset in acct

KEYSTART=1;  // keyoffset in acct

VALSTART=2;  // valoffset in acct

ACCTSIZE=3;  // total#fields in acct

RECSIZE=(ACCTSIZE+ 1) * 4 =16;  // acctbytes per record

 

spillper=0.8

recper=0.05

sortmb=100

sorter =org.apache.hadoop.util.QuickSort@aa9502d

maxMemUsage= sortmb << 20;  // 104857600

recordCapacity= (int)(maxMemUsage* recper);  // 5242880

recordCapacity-= recordCapacity % RECSIZE// 5242880

kvbuffer =new byte[maxMemUsage- recordCapacity]; // byte[99614720]

bufvoid =kvbuffer.length// 99614720

recordCapacity/= RECSIZE;// 327680

kvoffsets =new int[recordCapacity]; // int[327680]

kvindices =new int[recordCapacity* ACCTSIZE]; // int[983040]

softBufferLimit =(int)(kvbuffer.length*spillper); //79691776

softRecordLimit =(int)(kvoffsets.length*spillper); //262144

comparator =job.getOutputKeyComparator();  // Text$Comparator@42b5e6a1

keyClass= job.getMapOutputKeyClass(); //org.apache.hadoop.io.Text

valClass= job.getMapOutputValueClass(); //org.apache.hadoop.io.IntWritable

 

combinerRunner=CombinerRunner.create(job, getTaskID(),

                                            combineInputCounter,

                                            reporter, null);

//NewCombinerRunner{reducerClass=org.apache.hadoop.examples.WordCount$IntSumReducer,taskId=attempt_201404282305_0001_m_000000_0,keyClass=org.apache.hadoop.io.Text,valueClass=org.apache.hadoop.io.IntWritable,comparator=org.apache.hadoop.io.Text$Comparator@42b5e6a1,committer=null}

combineCollector=newCombineOutputCollector(combineOutputCounter,reporter, conf);

minSpillsForCombine=job.getInt("min.num.spills.for.combine",3); //3

 

spillThread= = newSpillThread();

spillLock=newReentrantLock();

spillDone=spillLock.newCondition();

spillReady=spillLock.newCondition();

 

 

mapperContext= Mapper$Context{taskId=attempt_201404282305_0001_m_000000_0status=""split=hdfs://server1:9000/user/admin/in/yellow2.txt:0+67108864jobId=job_201404282305_0001committer=FileOutputCommitter{outputFileSystem=DFS[DFSClient[clientName=DFSClient_attempt_201404282305_0001_m_000000_0,ugi=admin]]outputPath=/user/admin/out/128workPath=hdfs://server1:9000/user/admin/out/128/_temporary/_attempt_201404282305_0001_m_000000_0}output=MapTask$NewOutputCollector{...与前面同...}reader=MapTask$NewTrackingRecordReader{...与前面input...}                         

}

 

input.initialize(split,mapperContext)

//设置LineRecordReader的成员,打开文件

start= split.getStart();

end =start +split.getLength();

pos =start;

FSDataInputStreamfileIn = fs.open(split.getPath());

in =newLineReader(fileIn,job);

bufferSize=65536;

buffer =new byte[this.bufferSize];

 

mapper.run(mapperContext)

//运行Map程序

 publicvoidrun(Contextcontext) throwsIOException,InterruptedException {

   setup(context);

   while(context.nextKeyValue()){

     map(context.getCurrentKey(), context.getCurrentValue(),context);

   }

   cleanup(context);

 }

//nextKeyValue()方法,读取一行,读前pos=0,读后key=0value=Yellowpos=7.

......

//读前pos=20,读后key=20value=Lookat the stars; look how they shine for you”,pos=67.

......

//读前pos=68,读后key=68value=Andeverything you do”,pos=90.

......

//

Map方法:

publicvoidmap(Objectkey, Text value, Context context

                   ) throwsIOException,InterruptedException {

     StringTokenizer itr = newStringTokenizer(value.toString());

     while(itr.hasMoreTokens()){

       word.set(itr.nextToken());

       context.write(word,one);

     }

   }

Mapper$Context.write(word,one)

--> TaskInputOutputContext.write(word,one)

--> MapTask$NewOutputCollector.write(word,one)

--> (MapTask.MapOutputCollector)collector.collect(key,value, partitioner.getPartition(key,value, partitions))

 

 

HashPartitioner.getPartition()定义为:

 

 publicintgetPartition(K key, Vvalue,

                         intnumReduceTasks){

   return(key.hashCode() &Integer.MAX_VALUE)% numReduceTasks;

 }

即通过KeyHashcode除以Recude的余数,确定属于哪个Reduce

 

 

collect的处理流程为:

 

keySerializer.serialize(key);//写入KeyBlockingBufferkvbuffer

valSerializer.serialize(value);//写入ValueBlockingBufferkvbuffer

 

 

intind =kvindex*ACCTSIZE;

kvoffsets[kvindex]= ind; //一级索引,kvindices中的位置

 

kvindices[ind+ PARTITION]= partition;  //二级索引,属于哪个Reduce

kvindices[ind+ KEYSTART]=keystart;   //二级索引,Key位置

kvindices[ind+ VALSTART]=valstart;   //二级索引,Value位置

 

kvindex=kvnext;

 

 

SpillThread线程将内存数据排序,并写入本地磁盘

 

MapTask.MapOutputBuffer.sortAndSpill()处理:

 

size=2276473partitions=2numSpills=0

filename=/tmp/hadoop-admin/mapred/local/taskTracker/admin/jobcache/job_201404282305_0001/attempt_201404282305_0001_m_000000_0/output/spill0.out

 

endPosition=262144

 

 

排序 sorter.sort(MapOutputBuffer.this,kvstart,endPosition, reporter)

 

Combine:combinerRunner.combine(kvIter,combineCollector);

 

运行Combine

 

写入结果keyvaluespill0.outspill1.outspill2.out......

 

keySerializer.serialize(key);

valueSerializer.serialize(value);

 

mergeParts()合并结果

 

Merge完成以后还要运行combine

combinerRunner.combine(kvIter,combineCollector);

 

最后生成两个文件:

file.out

file.out.index

 

 

sortAndSpillmergeParts过程:

 

Map阶段:主线程在Map阶段将所有结果写入内存kvbufferSpillThread线程将kvbuffer中内容分块进行Sort(快速排序)Combine,写入文件spill0.outspill1.outspill2.out......spill51.out中,每个文件包括了2Reduce的内容。

 

output.close()阶段:主线程将内存中最后一块进行Sort(快速排序)Combine,写入spill52.out中,每个文件包括了2Reduce的内容。

 

mergeParts阶段:主线程将文件spill0.outspill1.out......spill52.out加入 优先级队列 MergeQueue (实际是一个小顶堆Heap) 进行合并,每次合并最多使用10个文件,会生成intermediate.0, intermediate.1, intermediate.2, intermediate.3......等临时文件,临时文件也会加入MergeQueue 中。由于spill0.outspill1.out......spill52.out 等文件内部已经是有序的 (局部有序),所以,Merge的时候,从小顶堆 MergeQueue 取得根文件的指针指向的元素(即最小元素),取得最小元素后更新指针,最小元素加入到目标文件中,然后再调节小顶堆 MergeQueue 的顺序。 这样Merge后的文件也将是有序的。

再进行Combine,写入文件file.out,包括了2Reduce的内容。


注意这里利用了快速排序 (Quit Sort),小顶堆(Heap)等数据结构和算法。

注意 i << 1 相当于2*i ,i>>>1 相当于 i/2 .