Hadoop源码分析29 split和splitmetainfo

输入文件:hdfs://server1:9000/user/admin/in/yellow.txt

1.splits

formatMinSplitSize1;

minSplitSize=conf("mapred.min.split.size")1;

minSize=Math.max(formatMinSplitSize, minSplitSize)=1;

maxSize=conf("mapred.max.split.size"):Long.MAX_VALUE;

 

fileLength=201000000;

 

blkLocations=[{0,67108864,server3,server2},

{67108864,67108864,server2,server3},

{134217728,66782272,server2,server3}];

 

 

blockSize=67108864;

 

splitSize=Math.max(minSize, Math.min(maxSize,blockSize)): 67108864;

 

SPLIT_SLOP=1.1;

 

splits生成代码:

long bytesRemaining= length;

while(((double)bytesRemaining)/splitSize SPLIT_SLOP){

  int blkIndex =getBlockIndex(blkLocations, length-bytesRemaining);

  splits.add(new FileSplit(path,length-bytesRemaining, splitSize,

                                  blkLocations[blkIndex].getHosts()));

  bytesRemaining -= splitSize;

}

       

if(bytesRemaining != 0){

  splits.add(new FileSplit(path,length-bytesRemaining, bytesRemaining,

                    blkLocations[blkLocations.length-1].getHosts()));

}

 

splits内容:

FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,hosts=[server3, server2],length=67108864,start=0}

FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,hosts=[server3, server2],length=67108864,start= 67108864}

FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,hosts=[server3, server2],length= 66782272,start=134217728}

 

splits写入文件:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200127_0001/job.split

 

splits文件头:

out.write(SPLIT_FILE_HEADER);// "SPL".getBytes("UTF-8")=[83, 80,76]

out.writeInt(splitVersion); //1

 

2.SplitMetaInfo

SplitMetaInfo生成代码:

   SplitMetaInfo[] info = new SplitMetaInfo[array.length];

   if(array.length!= 0){

     SerializationFactory factory = new SerializationFactory(conf);

     int i =0;

     long offset =out.size();

     for(Tsplit: array) {

       int prevCount =out.size();

       Text.writeString(out,split.getClass().getName());

       SerializerTserializer=

         factory.getSerializer((ClassT)split.getClass());

       serializer.open(out);

       serializer.serialize(split);

       int currCount =out.size();

       String[] locations = split.getLocations();

       final int max_loc =conf.getInt(MAX_SPLIT_LOCATIONS,10);

       if(locations.length max_loc) {

         LOG.warn("Maxblock location exceeded for split: "

             + split+ " splitsize:" +locations.length+

             "maxsize: " + max_loc);

         locations = Arrays.copyOf(locations, max_loc);

       }

       info[i++] =

         new JobSplit.SplitMetaInfo(

             locations, offset,

             split.getLength());

       offset += currCount - prevCount;

     }

   }

   return info;

 

SplitMetaInfo内容:

JobSplit$SplitMetaInfo={data-size : 67108864start-offset: 7locations:[server3,  server2]

}

JobSplit$SplitMetaInfo={data-size : 67108864start-offset: 116locations:[server3,  server2]

}

JobSplit$SplitMetaInfo={data-size : 66782272start-offset: 225locations:[server3,  server2]

}

 

SplitMetaInfo写入文件:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200127_0001/job.splitmetainfo

 

 

对比splitsSplitMetaInfo内容:

SplitMetaInfodata-sizeFileSplitlength

SplitMetaInfolocationsFileSplithosts

SplitMetaInfostart-offset意思是splits中某条FileSplit记录的起始地址。

 

SplitMetaInfo文件头:

out.write(JobSplit.META_SPLIT_FILE_HEADER);    // "META-SPL".getBytes("UTF-8")

WritableUtils.writeVInt(out,splitMetaInfoVersion);  //splitVersion

WritableUtils.writeVInt(out,allSplitMetaInfo.length); //

 

3.splits使用

Task中,待补充

4.SplitMetaInfo使用

JobTracker进程中,读取SplitMetaInfo,转化为TaskSplitMetaInfo

TaskSplitMetaInfo[0]={inputDataLength=67108864,locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=7    }      

}

TaskSplitMetaInfo[1]={inputDataLength=67108864,locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=116}

}

TaskSplitMetaInfo[2]={inputDataLength=66782272, locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=225

}

然后生成TaskInprogress

   mapsnew TaskInProgress[numMapTasks];

   for(int i=0; i numMapTasks;++i) {

     inputLength+=splits[i].getInputDataLength();

     maps[i]= new TaskInProgress(jobId,jobFile,

                                  splits[i],

                                  jobtracker,conf,this,i, numSlotsPerMap);

   }

其中jobFilehdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.xml

splits[i]TaskSplitMetaInfo