MapReduce Design Patterns(6 、Job 链)(十二)

MapReduce Design Patterns(6 、Job 链)(十二)

http://blog.csdn.net/cuirong1986/article/details/8502460

Chain Folding

这是对job 链的一种优化。基本上是一种大体规则:每条记录都会提交给多个mapper,或者给reducer然后给mapper。这种综合处理方法会节省很多读文件和传输数据的时间。多个job的结构通常这样处理是可行的,因为map阶段是完全无共享的:看起来每条记录是单独的,数据的组织或是否分组是没有关系的。当创建大的MapReduce链时,合并这个链使多个map阶段合并会带来很大的性能提升。

 

链合并带来的主要好处是减少MapReduce管道中的数据移动量,无论是加载时的io,存到磁盘,或混洗时的网络传输。在job链中。临时数据保存在hdfs上,所以能减少访问磁盘的次数,减少了链中总的io

 

关于合并哪些有几种情况要看:

1、  看链中的map阶段,如果有多个map阶段相邻,合并成一个阶段。如果是map-onlyjob就符合这种情况,还有数值聚合。这一阶段,减少了访问磁盘次数。考虑一个两个job的链,第一个只有map,第二个既有map又有reduce。如果使用这种优化,第一个job会把输出写到分布式文件系统,然后再由第二个job load数据。

 

相反,如果我们合并这两个job,就可避免第一个job的临时目录的写,明显的减少IOTask数目也会减少,减轻了任务管理的消耗。把很多的map任务合并起来会带来更显著的优化。这样做几乎没有任何缺点,除了要修改代码。

2、  如果第二个jobmap阶段结束,就把这一阶段合并到第一个jobreduce阶段运行。这是一种特殊的情况,同样可以获得性能上的提高。这样也会减少写临时目录的数据量,同样会减少任务数量。

3、  注意链的第一个map阶段不能受益于下面的优化。尽可能的,在减少数据操作和增加数据量的操作之间分割每个map阶段。在某些情况下,这是不可能的,因为你做数据的丰富,增加,就是为了更好的过滤。在这些情况下,看看依赖的大的阶段是增加数据量还是减少数据量。把减少数据量的操作放到前一个jobreducer做,增加数据量的操作位置不动。

这一阶段有点复杂并且不同点是比较微小的。如果把极小的map阶段处理放到前一个reducer,将会减少写到临时存储的数据量,以及下一阶段从磁盘load的数据量。如果有大量数据的过滤去做,这将非常有效。

 

Notice:注意合并阶段需要很多内存,例如,合并5个重复join可能不是好的做法,因为可能超过任务的可用内存大小,这种情况下,分开执行反而更好。

 

不管job是否是链,试着尽早尽可能多的过滤数据。MapReduce代价最高的部分一般是数据在管道中的流动:加载数据,shufflesort,往磁盘写。例如,如果你只关心2012条目的数据,在map阶段就做过滤,而不是在reducer分组以后。

 

下面运行几个例子帮助解释为什么这里的做法很有用。

 

为了验证第一阶段,考虑图6-1中的链。顶部的链要被优化,让重复join合并到第二个jobmap阶段。

MapReduce Design Patterns(6 、Job 链)(十二)

Figure 6-1. Original chain and optimizing mappers

这个job执行的是来自青少年的评论的单词统计。做这个的目的是找出年轻人对什么主题感兴趣。用户的年龄不在评论里,所以要做join。这种情况下,这个map-only的重复join可以合并到第二个job

 

为了验证第二步,考虑下面图中的链。上面的链要被优化,让这个重复join合并到第二个jobreducer

 

这个job拿用户的评论数丰富用户信息。它使用了一个通用MapReduce job,然后使用了一个重复joincount添加到用户信息。这种情况下,这个map-only的重复join可以合并到reducer

MapReduce Design Patterns(6 、Job 链)(十二)

Figure 6-2. Original chain and optimizing a reducer with a mapper

 

为了验证第三步,考虑下图6-3的链。要优化上面的链,让重复join合并到第二个jobreducer

 

这个job有点复杂,很明显链有点长。目的是找出每个年龄段内最受欢迎的标签。做法是:对用户计数,丰富他们的信息,过滤掉用户标签提交数少于5的记录。然后按年龄段分组,统计求和。

map阶段(丰富和过滤),重复join是增加数据,而过滤是减少数据。这里,我们要把过滤移到第一个job,而把重复join移到第二个jobmap阶段。这样就得到一个图6-3下面的链。现在第一个job写出的数据就比之前的少,第二个job加载的数据量也少了。

 

有两个主要方法用于链合并:手动分开,合并代码,还有一种更优雅的方法是使用类ChainMapper  ChainReducer。如果这是个已有的job并在逻辑上有多个map阶段,只需要手动实现一次。如果几个map阶段存在复用,就应该用这两个类处理,遵循良好的软件编程风格。

 

MapReduce Design Patterns(6 、Job 链)(十二)

Figure 6-3. Original chain and optimizing a mapper with a reducer

The ChainMapper and ChainReducer Approach

ChainMapper  ChainReducer 是特殊的mapperreducer类,允许在mapper里运行多个map阶段和在reducer以后运行多个map阶段。可以有效扩展传统的MapReduce规范成几个map阶段,紧随在一个reduce或几个map阶段之后。之前只有一个map和一个reduce阶段可以调用。

 

每个map链中的阶段提供给管道中下一阶段输入。第一个阶段的输出会由第二个处理,依次类推。Reduce后面的map阶段处理reduce的输出做附加的计算。这对后处理操作和额外的过滤操作很有用。

 

Notice:确保链上的输入类型和输出类型匹配。如果第一阶段输出<LongWritable,Text>,第二阶段的输入也要是这种类型。

 

Chain Folding Example

根据声誉值分类用户

本例对job链的例子做了轻微的改动。这里,用两个mapper初始化map阶段。首先格式化输入的xml记录,输入键值对userid和计数值1。第二个mapper用户的声誉值加到用户上,声誉值是从setup阶段的分布式缓存读的。

这两个单独的mapper就连在一起作为单reduce的输入。ReducerLongSumReducer,简单的迭代所有值计算求和。然后跟输入key一同输出。

最后,调用第三个mapper,根据用户的声誉值是否在5000以上分类记录。整个流是在一个job里完成的。

 

Notice:例子使用了旧的Api,新的api不支持。

 

问题:给出用户发帖和用户信息数据,根据声誉值在5000上或下分类用户。

Parsing mapper code

 

public static class UserIdCountMapper extends MapReduceBase implements
       Mapper<Object, Text, Text, LongWritable> {
    public static final String RECORDS_COUNTER_NAME = "Records";
    private static final LongWritable ONE = new LongWritable(1);
    private Text outkey = new Text();
 
    public void map(Object key, Text value,
           OutputCollector<Text, LongWritable> output, Reporter reporter)
           throws IOException {
       Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
               .toString());
       // Get the value for the OwnerUserId attribute
       outkey.set(parsed.get("OwnerUserId"));
       output.collect(outkey, ONE);
    }
}


 

Replicated join mapper code。这个map接收前一个mapper的输出。在setup阶段读取用户信息创建userid和声誉值的映射。

 

public static class UserIdReputationEnrichmentMapper extends MapReduceBase
        implements Mapper<Text, LongWritable, Text, LongWritable> {

    private Text outkey = new Text();
    private HashMap<String, String> userIdToReputation = new HashMap<String, String>();

    public void configure(JobConf job) {
        Path[] files = DistributedCache.getLocalCacheFiles(job);
// Read all files in the DistributedCache
        for (Path p : files) {
            BufferedReader rdr = new BufferedReader(
                    new InputStreamReader(
                            new GZIPInputStream(new FileInputStream( new File(p.toString())))));
            String line;
// For each record in the user file
            while ((line = rdr.readLine()) != null) {
// Get the user ID and reputation
                Map<String, String> parsed = MRDPUtils .transformXmlToMap(line);
// Map the user ID to the reputation
                userIdToReputation.put(parsed.get("Id",  parsed.get("Reputation"));
            }
        }
    }

    public void map(Text key, LongWritable value,
            OutputCollector<Text, LongWritable> output, Reporter reporter)
            throws IOException {
        String reputation = userIdToReputation.get(key.toString());
        if (reputation != null) {
            outkey.set(value.get() + "	" + reputation);
            output.collect(outkey, value);
        }
    }
}


Reducer code。Reducer负责计算求和,并作为输出值,输出key是输入key:user id

和声誉值。

 

public static class LongSumReducer extends MapReduceBase implements
       Reducer<Text, LongWritable, Text, LongWritable> {
    private LongWritable outvalue = new LongWritable();
 
    public void reduce(Text key, Iterator<LongWritable> values,
           OutputCollector<Text, LongWritable> output, Reporter reporter)
           throws IOException {
       int sum = 0;
       while (values.hasNext()) {
           sum += values.next().get();
       }
       outvalue.set(sum);
       output.collect(key, outvalue);
    }
}


 

Binning mapper code。这个mapper使用MultipleOutputs把用户分成两个数据集。解析输入key拿出声誉值,跟5000做比较,完成分类。

 

public static class UserIdBinningMapper extends MapReduceBase implements
       Mapper<Text, LongWritable, Text, LongWritable> {
    private MultipleOutputs mos = null;
    public void configure(JobConf conf) {
       mos = new MultipleOutputs(conf);
    }
    public void map(Text key, LongWritable value,
       OutputCollector<Text, LongWritable> output, Reporter reporter)
           throws IOException {
       if (Integer.parseInt(key.toString().split("	")[1]) < 5000) {
           mos.getCollector(MULTIPLE_OUTPUTS_BELOW_5000, reporter)
                  .collect(key, value);
       } else {
           mos.getCollector(MULTIPLE_OUTPUTS_ABOVE_5000, reporter)
                  .collect(key, value);
       }
    }
 
    public void close() {
       mos.close();
    }
}


Driver code。驱动处理ChainMapper 和 ChainReducer的配置。这里最有趣的地方是增加mappers和设置reducer。增加的顺序会影响不同实现的mapper的执行。ChainMapper第一个加入。然后,chainReducer的静态方法用于设置reducer的实现,最后是一个mapper。

注意不是用ChainMapperreducer后增加mapper,而是用chainReducer

每个方法就收的参数有,mapperreducer类的jobconf,输入输出键值对类型,另一个jobconf。这个jobconf可用于mapperreducer覆盖配置参数时,不需要特殊的配置,只需要传入一个空的jobconf对象。第7个参数是一个标志位:传递所有values时是值传递还是引用传递。这是一种增加的优化措施,在mapperreducercollector没有修改keyvalue时可以使用。这里,使用引用传递(byvalue=false)。

 

public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf("ChainMapperReducer");
    conf.setJarByClass(ChainMapperDriver.class);
    Path postInput = new Path(args[0]);
    Path userInput = new Path(args[1]);
    Path outputDir = new Path(args[2]);
    ChainMapper.addMapper(conf, UserIdCountMapper.class,  LongWritable.class, Text.class, Text.class, LongWritable.class,  false, new JobConf(false));
    ChainMapper.addMapper(conf, UserIdReputationEnrichmentMapper.class, Text.class, LongWritable.class, Text.class, LongWritable.class, false, new JobConf(false));
    ChainReducer.setReducer(conf, LongSumReducer.class, Text.class,  LongWritable.class, Text.class, LongWritable.class, false,   new JobConf(false));
    ChainReducer.addMapper(conf, UserIdBinningMapper.class, Text.class, LongWritable.class, Text.class, LongWritable.class, false,   new JobConf(false));
    conf.setCombinerClass(LongSumReducer.class);
    conf.setInputFormat(TextInputFormat.class);
    TextInputFormat.setInputPaths(conf, postInput);
    // Configure multiple outputs
    conf.setOutputFormat(NullOutputFormat.class);
    FileOutputFormat.setOutputPath(conf, outputDir);
    MultipleOutputs.addNamedOutput(conf, MULTIPLE_OUTPUTS_ABOVE_5000,   TextOutputFormat.class, Text.class, LongWritable.class);
    MultipleOutputs.addNamedOutput(conf, MULTIPLE_OUTPUTS_BELOW_5000,    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LongWritable.class);
    // Add the user files to the DistributedCache
    FileStatus[] userFiles = FileSystem.get(conf).listStatus(userInput);
    for (FileStatus status : userFiles) {
              DistributedCache.addCacheFile(status.getPath().toUri(), conf);
    }
    RunningJob job = JobClient.runJob(conf);
    while (!job.isComplete()) {
         Thread.sleep(5000);
    }
    System.exit(job.isSuccessful() ? 0 : 1);
}


Job Merging

Job merging是通过MapReduce管道减少io量的另一种优化。能使两个不相关的但load相同数据的job共享MapReduce管道。主要好处是数据只需要加载解析一次。对于一些大的job,那个任务可能是整个操作最耗资源的部分。Load时模式和存储原来的格式的不好影响是,要一次有一次解析数据,如果解析比较复杂,这很影响性能。

假设我们有两个job需要运行相同大小的数据量。都加载和解析数据,然后执行计算。用job merging,我们会使用一个MapReduce job在逻辑上执行两个job,而不是像以前那样的混合。见图6-4.优化上面的链让这两个mapperreducer都分别运行在相同数据上。

也可以对两个以上的job应用merging。越多越好。通过job之间分担的负荷越多,就会对你的集群得到更多的计算资源。

MapReduce Design Patterns(6 、Job 链)(十二)

Figure 6-4. Original jobs and merged jobs

这种处理很可能只跟生产集群上重要且已经存在的job有作用。开发组成员花时间去做出他们核心分析就会看到在减少集群资源利用方面的重要性。当jobmerged掉,他们会一起运行,代码也会放到一块。这可能不太适合以特定方式执行的或环境下新的job

 

不幸的是,用这种模式之前,你必须要满足一些前提条件。最明显的就是,两个job需要有相同的中间键和输出格式,因为他们将共享管道,而这样做需要使用相同的数据类型。如果这真的成为问题,可以使用序列化或多态性,但会增加复杂度。

 

Job merging是一个脏处理过程。要想使它更好的工作,需用到一些技巧。从软件工程的角度看,这增加了代码组织的复杂度,因为不相关的job共享了相同的代码。在更高一层,这个新的map方法将要执行旧的map方法原来的职责,而reduce方法会执行一种动作或另一种基于key的标签来告诉数据集来自哪里的动作,merging两个job的步骤如下:

 

1.合并两个mapper的代码。

有几种做这个的方式。拷贝粘贴代码,但可能使哪段代码做什么更复杂。好的代码注解可以帮你避免这种情况。另一种方法是把两部分代码放到两个帮助方法,对应处理每种逻辑的输入。

2.在mapper里,改变keyvalue的写为标记key和映射来源。

标记key表明它来自于哪个map是很重要的,让来自不同map的数据不会混淆。这里有几种针对不同原数据类型的方式。如果是striing,可以简单使用第一个字符标记,例如可以把从第一个map来的“parks”改为“Aparks”,从第二个map来的改为“Bparks”。

通用的标记方式是使用自定义的复合组类型key对来源不容的数据分开存储标记。这显然一种清晰的方式,但有一点工作量。

3.reducer里,解析出标签,使用if语句段决定执行哪段reducer代码。跟mapper一样,你可以拷贝粘贴代码到if语句段,也可以封装成几个帮助方法。If代码段基于标签控制执行。

4.使用MultipleOutputs分离两个job的输出目录。

这个类可以使同一个reducer把输出写到不同的目录,而不是同一个目录。

Job Merging Examples

隐藏评论并对用户去重

这个例子联合“隐藏*评论信息(英文版101页)”和“根据用户id去重(引文版68页)”。两部分都是用了评论数据作为输入,输出却有很大不同。一个是创建去过重的用户数据集,另一个对每条记录隐藏信息。评论数据是很大的,所以合并这两个job会显著减少处理时间。这样,数据集只需要读一次。

 

问题:给出用户评论数据,生成数据的隐藏版本和去重的用户数据集。

 

TaggedText WritableComparable.需自定义WritableComparable对象用string标记Text。这是一种从逻辑上分离两个job的清晰的方式,且节省了reducer阶段的string解析。

这个对象有两个私有成员变量和对应的构造器。Mapper使用string标记这个对象的每个Text值。然后reducer检查标签找出要执行哪个逻辑reduce。这里用到了compareTo方法,也可是comparable或其它MapReduce框架中可以作为key使用的类型。这个方法首先检查标签,如果相等,对象里的text被比较并且立刻返回值。如果不相等,返回比较结果。条目现根据标签排序,然后根据text值排序。例如下面的:

A:100004122

A:120019879

D:10

D:22

D:23

 

 

public static class TaggedText implements WritableComparable<TaggedText> {
    private String tag = "";
    private Text text = new Text();
 
    public TaggedText() {
    }
    public void setTag(String tag) {
       this.tag = tag;
    }
    public String getTag() {
       return tag;
    }
    public void setText(Text text) {
       this.text.set(text);
    }
    publicvoid setText(String text) {
       this.text.set(text);
    }
    public Text getText() {
       return text;
    }
    public void readFields(DataInput in) throws IOException {
       tag = in.readUTF();
       text.readFields(in);
    }
    public void write(DataOutput out) throws IOException {
       out.writeUTF(tag);
       text.write(out);
    }
    public int compareTo(TaggedText obj) {
       int compare = tag.compareTo(obj.getTag());
       if (compare == 0) {
           return text.compareTo(obj.getText());
       } else {
           return compare;
       }
    }
    public String toString() {
       return tag.toString() + ":" + text.toString();
    }
}


 

Merged mapper codemap方法只需要简单的把参数传递给两个帮助方法,每一个方法单独处理map逻辑并写到contextMap方法稍微改变,为了两个输出的Text对象作为keyvalue。这是需要改变的,以便我们在分开的map逻辑中能得到相同类型的中间键值对。anonymizeMap方法从输入值生成隐藏记录,而distinctMap方法抽取userid并输出。被写出的每一中间键值对被标记为A表示要隐藏数据,D表示要去重数据。

 

Notice:每个数学帮助方法解析输入记录,但这种解析应该在实际map方法里完成,这种结果map<string,string>能直接传给两个帮助方法。任何像这样小的优化点都会给长时间运行的任务带来好处且应该运用。

 

public static class AnonymizeDistinctMergedMapper extends
       Mapper<Object, Text, TaggedText, Text> {
    private static final Text DISTINCT_OUT_VALUE = new Text();
    private Random rndm = new Random();
    private TaggedText anonymizeOutkey = new TaggedText(),
           distinctOutkey = new TaggedText();
    private Text anonymizeOutvalue = new Text();
    public void map(Object key, Text value, Context context)
           throws IOException, InterruptedException {
       anonymizeMap(key, value, context);
       distinctMap(key, value, context);
    }
    private void anonymizeMap(Object key, Text value, Context context)
           throws IOException, InterruptedException {
       Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
              .toString());
       if (parsed.size() > 0) {
           StringBuilder bldr = new StringBuilder();
           bldr.append("<row ");
           for (Entry<String, String> entry : parsed.entrySet()) {
              if (entry.getKey().equals("UserId")
                     || entry.getKey().equals("Id")) {
                  // ignore these fields
              } elseif (entry.getKey().equals("CreationDate")) {
                  // Strip out the time, anything after the 'T'
                  // in the value
                  bldr.append(entry.getKey()
                         + "=""
                         + entry.getValue().substring(0,
       entry.getValue().indexOf('T')) + "" ");
              } else {
                  // Otherwise, output this.
                  bldr.append(entry.getKey() + "="" + entry.getValue()
                         + "" ");
              }
           }
           bldr.append(">");
           anonymizeOutkey.setTag("A");
           anonymizeOutkey.setText(Integer.toString(rndm.nextInt()));
           anonymizeOutvalue.set(bldr.toString());
           context.write(anonymizeOutkey, anonymizeOutvalue);
       }
    }
    private void distinctMap(Object key, Text value, Context context)
           throws IOException, InterruptedException {
       Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
              .toString());
       // Otherwise, set our output key to the user's id,
       // tagged with a "D"
       distinctOutkey.setTag("D");
       distinctOutkey.setText(parsed.get("UserId"));
       // Write the user's id with a null value
       context.write(distinctOutkey, DISTINCT_OUT_VALUE);
    }
}


Merged reducer codeReducer调用setupcleanup处理Multipleoutputs实例的创建和关闭。检查输入键的标签,调用对应的帮助方法。Reduce方法把text对象封装在TaggedText里传递。

 

对于隐藏数据的处理,迭代所有输入值写到输出目录anonymize/part。增加斜杠符号和“part”创建配置的输出目录,里面包含的part文件数等于reduce任务数。

对于去重处理,输入键和空的值写到目录distinct/part。这也是job配置里面需要创建的输出目录。

 

Notice:在这个例子中,从每个reduce调用部分输出了相同的基本格式---一个Text对象和一个nullwritable对象---。不总是这样的情况,如果你的job输出键值对类型存在冲突,你可以使用Text对象让输出规范化。

 

public static class AnonymizeDistinctMergedReducer extends
       Reducer<TaggedText, Text, Text, NullWritable> {
    private MultipleOutputs<Text, NullWritable> mos = null;
    protected void setup(Context context) throws IOException,
           InterruptedException {
       mos = new MultipleOutputs<Text, NullWritable>(context);
    }
    protected void reduce(TaggedText key, Iterable<Text> values,
           Context context) throws IOException, InterruptedException {
       if (key.getTag().equals("A")) {
           anonymizeReduce(key.getText(), values, context);
       } else {
           distinctReduce(key.getText(), values, context);
       }
    }
    private void anonymizeReduce(Text key, Iterable<Text> values,
           Context context) throws IOException, InterruptedException {
       for (Text value : values) {
           mos.write(MULTIPLE_OUTPUTS_ANONYMIZE, value,
                  NullWritable.get(), MULTIPLE_OUTPUTS_ANONYMIZE
                         + "/part");
       }
    }
    private void distinctReduce(Text key, Iterable<Text> values,
           Context context) throws IOException, InterruptedException {
       mos.write(MULTIPLE_OUTPUTS_DISTINCT, key, NullWritable.get(),
              MULTIPLE_OUTPUTS_DISTINCT + "/part");
    }
    protected void cleanup(Context context) throws IOException,
           InterruptedException {
       mos.close();
    }
}


Driver code。使用MultipleOutputs的通用驱动。

 

public static void main(String[] args) throws Exception {
    // Configure the merged job
    Job job = new Job(new Configuration(), "MergedJob");
    job.setJarByClass(MergedJobDriver.class);
    job.setMapperClass(AnonymizeDistinctMergedMapper.class);
    job.setReducerClass(AnonymizeDistinctMergedReducer.class);
    job.setNumReduceTasks(10);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    MultipleOutputs.addNamedOutput(job, MULTIPLE_OUTPUTS_ANONYMIZE, TextOutputFormat.class, Text.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, MULTIPLE_OUTPUTS_DISTINCT, TextOutputFormat.class, Text.class, NullWritable.class);
    job.setOutputKeyClass(TaggedText.class);
    job.setOutputValueClass(Text.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}