Hadoop源码分析23:MapReduce的Job提交过程

命令为:

hadoop_debugjar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount/user/admin/in/yellow.txt /user/admin/out/555

首先调用org.apache.hadoop.util.runJar.main

public static void main(String[]args){

  // 加载Jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar

 JarFile jarFile = new JarFile(fileName); 

  //根据META-INF得知主Classorg/apache/hadoop/examples/ExampleDriver

 Manifest manifest = jarFile.getManifest();

 if (manifest !=null){

     mainClassName =manifest.getMainAttributes().getValue("Main-Class");

 }

 

   //建立本地临时文件夹 /tmp/hadoop-admin

  File tmpDir = newFile(newConfiguration().get("hadoop.tmp.dir"));

  tmpDir.mkdirs();

 

   //建立本地工作文件夹 /tmp/hadoop-admin/hadoop-unjar4705742737164408087               finalFile workDir = File.createTempFile("hadoop-unjar", "", tmpDir);

   workDir.delete();

   workDir.mkdirs();

//JVM退出时将tmp/hadoop-admin/hadoop-unjar4705742737164408087删除

Runtime.getRuntime().addShutdownHook(newThread(){

       publicvoidrun(){

         try{

           FileUtil.fullyDelete(workDir);

         } catch(IOExceptione) {

         }

       }

     });

  //Jar包解压到/tmp/hadoop-admin/hadoop-unjar4705742737164408087               

   unJar(file, workDir);

 

  ///tmp/hadoop-admin/hadoop-unjar4705742737164408087/tmp/hadoop-admin/hadoop-unjar4705742737164408087/classes/,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/lib全部添加到classpath

   classPath.add(newFile(workDir+"/").toURL());

   classPath.add(file.toURL());

   classPath.add(newFile(workDir,"classes/").toURL());

   File[] libs = newFile(workDir,"lib").listFiles();

   if(libs!= null){

     for(inti = 0;i libs.length;i++) {

       classPath.add(libs[i].toURL());

     }

   }

 

  //运行主函数

 main.invoke(null,newObject[]{ newArgs });

}  

设置属性:

job.setJarByClass(WordCount.class);         //mapred.jar

job.setMapperClass(WordCountMap.class);     //mapreduce.map.class

job.setReducerClass(WordCountReduce.class); //mapreduce.reduce.class

job.setCombinerClass(WordCountReduce.class);//mapreduce.combine.class

job.setMapOutputKeyClass(Text.class);       //mapred.mapoutput.key.class

job.setMapOutputValueClass(IntWritable.class);//mapred.mapoutput.value.class

job.setOutputKeyClass(Text.class);            //mapred.output.key.class

job.setOutputValueClass(IntWritable.class);   //mapred.output.value.class

job.setJobName("WordCount");                 //mapred.job.name

 

FileInputFormat.addInputPath(job,input);    //mapred.input.dir

FileOutputFormat.setOutputPath(job,output);  //mapred.output.dir

 

 

job.submit()

 

 publicvoidsubmit()throwsIOException,InterruptedException,

                             ClassNotFoundException {

   ......

   // Connect tothe JobTracker and submit the job

   connect();

   info=jobClient.submitJobInternal(conf);

   ......

  }

 

 

连接JobTracker

 

privatevoidconnect()throwsIOException,InterruptedException {

       ......

       jobClient=newJobClient((JobConf)getConfiguration());   

       ......

      

 }

 

其中:

 publicJobClient(JobConfconf) throwsIOException{

   ......

   init(conf);

 }

publicvoidinit(JobConfconf) throwsIOException{

    ......

    this.jobSubmitClient=createRPCProxy(JobTracker.getAddress(conf),conf);

 }

 privatestaticJobSubmissionProtocolcreateRPCProxy(InetSocketAddress addr,

     Configuration conf) throwsIOException{

   return(JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class,

       JobSubmissionProtocol.versionID,addr,

       UserGroupInformation.getCurrentUser(), conf,

       NetUtils.getSocketFactory(conf,JobSubmissionProtocol.class));

 }

 

此时获得一个实现JobSubmissionProtocolRPC调用,即JobTracker的代理。

 

获取job StagingArea

 

PathjobStagingArea =JobSubmissionFiles.getStagingDir(JobClient.this,

           jobCopy);

RPC请求:JobSubmissionProtocol.getStagingAreaDir()

返回:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging

 

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)

返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@5521691b,即存在

 

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)

返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@726c554,用以判断权限

 

获得 NewJobId

JobIDjobId = jobSubmitClient.getNewJobId();

 

RPC请求:JobSubmissionProtocol.getNewJobId()

返回:job_201404010621_0004

 

建立 submitJob Dir

PathsubmitJobDir = newPath(jobStagingArea,jobId.toString());

 

hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004

 

复制JarHDFS

copyAndConfigureFiles(jobCopy,submitJobDir);

 

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004)

返回:null

 

RPC请求:ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwxr-xr-x)

返回:true

 

RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwx------)

返回:null

 

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar

返回:null,即不存在

 

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)

返回:输出流

 

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261, null)

返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1a9b701

Blockblk_6689254996395759186_2720

BlockTokenIdent: ,Pass: , Kind: , Service:

DataNode[10.1.1.103:50010,10.1.1.102:50010]

 

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261

返回:true

 

RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,10)

返回:true

 

RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rw-r--r--)

返回:null

 

RPC请求:ClientProtocol.renewLease(DFSClient_-1317833261)

返回:null

此后有1个守护线程会不断发送 renewLease请求

 

此时本地文件/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar被复制到HDFS文件系统/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml

 

Reduce数目:

int reduces= jobCopy.getNumReduceTasks();

reduce数目为2

 

检查输出目录

RPC请求:ClientProtocol.getFileInfo(/user/admin/out/555)

返回:null,即不存在

 

获取输入分片信息:

int maps =writeSplits(context, submitJobDir);

其中:

 privateT extendsInputSplit intwriteNewSplits(JobContextjob, Path jobSubmitDir) throwsIOException,

   InterruptedException, ClassNotFoundException {

   Configuration conf = job.getConfiguration();

   InputFormat?, ? input =

     ReflectionUtils.newInstance(job.getInputFormatClass(),conf);

 

   ListInputSplit splits =input.getSplits(job);

   T[] array = (T[]) splits.toArray(newInputSplit[splits.size()]);

 

   // sort thesplits into order based on size, so that the biggest

   // gofirst

   Arrays.sort(array, newSplitComparator());

   JobSplitWriter.createSplitFiles(jobSubmitDir,conf,

       jobSubmitDir.getFileSystem(conf), array);

   returnarray.length;

 }

  

其中:

 publicListInputSplitgetSplits(JobContextjob

                                   ) throwsIOException{

  ...........

 }

 

RPC请求:ClientProtocol.getFileInfo(/user/admin/in/yellow.txt)

返回:path="hdfs://server1:9000/user/admin/in/yellow.txt",length=201000000,isdir=false,block_replication=3, blocksize=67108864, permission=rw-r--r--,owner=Admin, group=supergroup

 

RPC请求:ClientProtocol.getBlockLocations(/user/admin/in/yellow.txt,0, 201000000)

返回:3BlockLocation

offset={0},        length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

offset={67108864}, length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]}, topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

offset={134217728},length={66782272}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]},topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

 

最终确定的分片信息 3Filespit

Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={0}

Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={67108864}

Filespit file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 66782272}, start={134217728}

 

map数量为3

jobCopy.setNumMapTasks(maps);

 

建立分片文件:

JobSplitWriter.createSplitFiles(jobSubmitDir,conf,

       jobSubmitDir.getFileSystem(conf), array);

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864);

返回:输出流

 

RPC请求:ClientProtocolsetPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rw-r--r--)

返回:null

 

RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,10)

返回:true

 

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261, null)

返回:LocatedBlock对象为

 

Block blockid=-921399365952861077,generationStamp=2714numBytes=0

BlockTokenIdentifierIdent: ,Pass: , Kind: , Service:

DatanodeInfo[][10.1.1.103:50010,10.1.1.102:50010]

offset0

 

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261)

返回:true

 

写入的 SplitMetaInfo

[data-size :67108864 start-offset : 7 locations :  server3  server2]

[data-size :67108864 start-offset : 116 locations:   server2 server3]

[data-size :66782272 start-offset : 225 locations : server2  server3 ]

 

 RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)

返回:输出流

 

RPC请求: ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rw-r--r--)

返回:null

 

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261, null)

返回:LocatedBlock对象为

 

Block blockid=789965327875207186,generationStamp=2715numBytes=0

BlockTokenIdentifierIdent: ,Pass: , Kind: , Service:

DatanodeInfo[][10.1.1.103:50010,10.1.1.102:50010]

offset0

 

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261)

返回:true

 

设置AccessControl

RPC请求:JobSubmissionProtocol.getQueueAdmins(default)

返回:All usersare allowed

 

Write jobfile to JobTracker'sfs  

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rwxr-xr-x, DFSClient_-1317833261, true, true, 3,67108864)

返回:输出流

 

RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rw-r--r--)

返回:null

 

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xmlDFSClient_-1317833261,null)

返回:LocatedBlock对象为

 

Block blockid= -7725157033540829125,generationStamp= 2716numBytes=0

BlockTokenIdentifierIdent: ,Pass: , Kind: , Service:

DatanodeInfo[][10.1.1.103:50010,10.1.1.102:50010]

offset0

 

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261)

返回:true

 

此时"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下生成文件 job.xml,包含了所有的配置信息.

此时HDFS目录"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下面文件为:

-rw-r--r--  10 admin supergroup    142465 2014-04-08 00:20  job.jar

-rw-r--r--  10 admin supergroup       334 2014-04-08 00:45    job.split

-rw-r--r--  3 admin supergroup        80 2014-04-08 00:50      job.splitmetainfo

-rw-r--r--  3 admin supergroup  20416 2014-04-08 00:55job.xml

job.jar 为运行的Jar,  job.split内容 为(FileSplit 对象), job.splitmetainfo内容 为(SplitMetaInfo对象),job.xml job的配置文件

 

提交作业:

status= jobSubmitClient.submitJob(

             jobId, submitJobDir.toString(),jobCopy.getCredentials());

 

RPC请求:JobSubmissionProtocol.submitJob(job_201404010621_0004,hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,org.apache.hadoop.security.Credentials@70677770)

返回:JobStatus setProgress=0mapProgress=0reduceProgress=0cleanProgress=0runstate=4priority=NOMAL..

 

RPC请求:JobSubmissionProtocol.getJobProfile(job_201404010621_0004

返回:JobProfilejobFile=hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xmljobID=job_201404010621_0004name=WordCountqueue=defaulturl=http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004user=Admin

 

综合JobStatusJobProfile

Job:job_201404010621_0004

file:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml

tracking URL:http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004

map()completion: 0.0

reduce()completion: 0.0

 

监控Job状态:

jobClient.monitorAndPrintJob(conf,info);

 

RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)

返回:   setProgress=1mapProgress=1reduceProgress=0.22222224cleanProgress=1runstate=1priority=NOMAL

 

RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)

返回:   setProgress=1mapProgress=1reduceProgress=1cleanProgress=1runstate=2priority=NOMAL

 

map 100%reduce 100%

之后会多次发送JobSubmissionProtocol.getJobStatus(job_201404010621_0004)请求

 

RPC请求:JobSubmissionProtocol.getTaskCompletionEvents(job_201404010621_0004,0, 10)

返回: [Task Id :attempt_201404010621_0004_m_000004_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000002_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_1, Status : KILLED, Task Id :attempt_201404010621_0004_r_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_r_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000003_0, Status :SUCCEEDED]

 

 

RPC请求:JobSubmissionProtocol.getJobCounters(job_201404010621_0004)

返回:OW[class=classorg.apache.hadoop.mapred.Counters,value=Counters: 29

      Job Counters

             Launched reduce tasks=2

             SLOTS_MILLIS_MAPS=293879

             Total time spent by all reduces waiting after reserving slots(ms)=0

             Total time spent by all maps waiting after reserving slots(ms)=0

             Launched map tasks=4

             Data-local map tasks=4

             SLOTS_MILLIS_REDUCES=74342

      File Output Format Counters

             Bytes Written=933

      FileSystemCounters

             FILE_BYTES_READ=316152

             HDFS_BYTES_READ=201008521

             FILE_BYTES_WRITTEN=370366

             HDFS_BYTES_WRITTEN=933

      File Input Format Counters

             Bytes Read=201008194

      Map-Reduce Framework

             Map output materialized bytes=2574

             Map input records=15600000

             Reduce shuffle bytes=2574

             Spilled Records=23025

             Map output bytes=356000000

             Total committed heap usage (bytes)=378023936

             CPU time spent (ms)=158350

             Combine input records=41011850

             SPLIT_RAW_BYTES=327

             Reduce input records=225

             Reduce input groups=75

             Combine output records=12075

             Physical memory (bytes) snapshot=650371072

             Reduce output records=75

             Virtual memory (bytes) snapshot=5300277248

             Map output records=41000000]