MapReduce Design Patterns(4、分箱、全排序、混洗)(八)

MapReduce Design Patterns(4、分箱、全排序、混洗)(八)

http://blog.csdn.net/cuirong1986/article/details/8481075


Binning

Pattern Description

分箱模式,跟前面的类似,分类记录且不考虑记录的顺序。

Intent

归档数据集中的每条记录到一个或多个类别。

Motivation

分箱和分区很相似,可以用来解决相同的问题。不同点是如何用MapReduce框架建立箱或分区。有些情况下,一种比另一种好用。

分箱是在map阶段分割数据而不是在partitioner阶段。主要的优势是消除了reduce阶段的使用。通常会带来更有效的资源分配。劣势是每个mapper对每个可能的输出箱都对应一个文件。这意味着,如果有1000个箱,1000mapper,结果会有1000000个文件。这对NameNode的可扩展性和随后的分析不利。分区模式每种分类一个文件,不会有这种问题。

Structure

·这种模式的独特之处是对MultipleOutputs类的使用,它设置job的输出为多个不同的文件。

·mapper查看每条记录,然后迭代每个箱的一系列的需要满足的条件。如果条件满足,就发到这个箱。如图4-3.

·这种模式没有combinerpartitionerreducer

Consequences

每个mapper对每个箱输出一个小文件。

Notice:不应该产生大量小文件,某些时候应该做一些后续处理合并小文件。

MapReduce Design Patterns(4、分箱、全排序、混洗)(八)

Figure 4-3. The structure of the binning pattern

Resemblances

Pig

Pig中的split操作实现了这种模式。

SPLIT data INTO

eights IF col1 == 8,

bigs IF col1 > 8,

smalls IF (col1 < 8 AND col1 > 0);

Performance analysis

跟其它只有mapjob有相同的性能分析。没有排序,shufflereduce,并且大多数处理都在本地完成。

Binning Examples

Binning by Hadoop-related tags

我们想要根据标签把数据过滤到不同的箱中,便于后面的分析。只关注hadoop相关的标签,就是:hadooppighivehbase。如果发帖任何地方,包括文本,标题,提到了hadoop,也会扔到对应的箱中。

 

问题:给出*发帖数据,基于上面四个标签分到四个箱。对于文本内容或标题提到hadoop的,放在跟上面不同的箱。

 

Driver code。其它部分是模板,除了对不同的箱使用MultipleOutputs,此类使用“bins”作为名字,在mapper中使用它来写到不同的输出。所以实际上是job的输出目录。默认禁用计数器,所以确保开启它,如果你不想看到大量输出。Reduce数量被设为0

 

 
//Configure the MultipleOutputs by adding an output called "bins"
//With the proper output format and mapper key/value pairs
MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,
Text.class, NullWritable.class);
//Enable the counters for the job
//If there are a significant number of different named outputs, this
//should be disabled
MultipleOutputs.setCountersEnabled(job, true);
//Map-only job
job.setNumReduceTasks(0);


 

mapper codeSetup阶段创建MultipleOutputs实例。Mapper由几个if-else判断组成,来检查发帖的标签。每个标签都会用我们感兴趣的标签检查一遍。帖子如果有多个标签,那就会发送到多个箱中。最后,检查帖子内容是否包含hadoop单词,如果有输出到一个新的箱中。

Cleanup阶段要关闭MultipleOutputs

Notice:一般情况下,输出文件名:part-mnnnnn,这些文件将是空文件,除非mapper中有键值对的write语句。这里,文件会命名为bin_name-mnnnnn。随后的例子,bin_name-mnnnnn可能是hadoop-tag, pig-tag, hive-tag,hbase-tag, or hadoop-post

注意job的输出格式设为NullOutputFormat,当使用maprd包(新api)下的类时将会移除空的输出文件。因为新的api里面,输出文件不是从临时目录提交到hdfs配置的输出目录。这个可能会在更新版本的hadoop中修复。

 

public static class BinningMapper extends
        Mapper<Object, Text, Text, NullWritable> {

    private MultipleOutputs mos = null;

    protected void setup(Context context) {
        // Create a new MultipleOutputs using the context object
        mos = new MultipleOutputs(context);
    }

    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());
        String rawtags = parsed.get("Tags");
        // Tags are delimited by ><. i.e. <tag1><tag2><tag3>
        String[] tagTokens = StringEscapeUtils.unescapeHtml(rawtags).split("><");
        // For each tag
        for (String tag : tagTokens) {
            // Remove any > or < from the token
            String groomed = tag.replaceAll(">|<", "").toLowerCase();
            // If this tag is one of the following, write to the named bin
            if (groomed.equalsIgnoreCase("hadoop")) {
                mos.write("bins", value, NullWritable.get(), "hadoop-tag");
            }
            if (groomed.equalsIgnoreCase("pig")) {
                mos.write("bins", value, NullWritable.get(), "pig-tag");
            }
            if (groomed.equalsIgnoreCase("hive")) {
                mos.write("bins", value, NullWritable.get(), "hive-tag");
            }
            if (groomed.equalsIgnoreCase("hbase")) {
                mos.write("bins", value, NullWritable.get(), "hbase-tag");
            }
        }
        // Get the body of the post
        String post = parsed.get("Body");
        // If the post contains the word "hadoop", write it to its own bin
        if (post.toLowerCase().contains("hadoop")) {
            mos.write("bins", value, NullWritable.get(), "hadoop-post");
        }
    }

    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        // Close multiple outputs!
        mos.close();
    }
}



Total Order Sorting

Pattern Description

全局排序模式关注数据中记录之间的排序。

Intent

你想根据某key对数据并行执行排序。

Motivation

排序在顺序程序中容易实现。但在MapReduce或其它并行执行的系统中是不容易的。因为典型的“分治”方法用在这里有些难度。

每个reduce都会根据key单独排序,不幸的是,这种排序在所有数据上不是全部有序的。这里要做的就是全局排序,就是把输出文件合起来看,数据也是有序的。

排序的数据有一些有用的地方。如果按时间排序,就会得到数据的时间线视图。在排序的数据上查找某数据可以用二分查找代替线性查找。在MapReduce中,我们通过查看文件的第一条和最后一条记录就能知道数据的上限和下限。这在查找记录时有用,也是hbase的主要特性。一些数据库,如果数据根据key或索引列排序,这种数据的load是很快的。不管从程序的角度,还是后续使用角度,都有很多原因要求数据的排序。然而,MapReduce对排序提供很少的支持,目的是希望保守使用这种代价高的操作。

Applicability

要求非常明显:排序key是可比较的。

Structure

全局排序可能是最难懂的模式之一。第一步要先依靠能产生相同大小数据集的值得范围决定分区集合。这个范围会决定哪个reducer会排序哪个范围的数据。也需要自定义partitioner根据分区键分区数据。最小范围的数据去到第一个reducer,下一个范围的去第二个,以此类推。

这种模式有两个阶段:分析阶段决定范围,排序阶段真正的去排序。分析阶段在某些地方是可选的。在数据变化之前或变化很小时,只需要运行一次,这个范围会一直适用。有些时候,特别是数据均匀分布时,这个范围可以自己去猜。例如,你要根据用户id排序评论数据,有100万的用户,你可以设想有一千个reducers,每个范围有一千个用户。用为用户id是均匀增长的,并知道用户的总数,可以靠简单的相除得到。

 

分析阶段就是对数据的随机取样。分区是跟据这个随机样本进行的。原理是,能把随机样本均匀分割的分区也应该把大数据集均匀分割(原文此处有单词拼写错误)。分析阶段的结构如下:

·mapper做一个简单的随机取样。当划分记录时,排序key作为输出key,使数据到reducer时看起来是排序的。这里不关心记录本身,为了节省空间使用null值。

·在这之前,确定整个数据集的记录数并算出对要分析的数据取多少百分比的样本是合理的。例如,计划跑1000reducer的排序,取样10万条记录,应该均匀分区。假设有10亿条记录,相除,采样率就是0.0001,意味着0.1%的记录会在分析阶段运行。

·这里只使用一个reducer。收集排序的key进入一个排序列表,然后精简列表数据得到数据范围的边界,形成一个分区文件。(1000reducer会有999个边界值)

 

排序阶段时相对简单的使用自定义partitionerMapReduce程序,结构如下:

·mapper抽取排序key,跟分析阶段方式相同。但value会被赋予记录本身。

·自定义的partitioner用于加载分区文件。在hadoop里,可以使用TotalOrderPartitioner,是专为此设计的类。它获取这个分区文件的数据范围,来决定每条记录被发到哪个reducer

·这里reducer比较简单。Shufflesort做了繁重的工作。Reduce只是简单的把值输出。Reducer的个数应该等于TotalOrderPartitioner 的分区的个数。

 

Notice:中间分区范围数目应该跟排序阶段reduce的数量相同。如果要改变reducer的数量,需要重建采样文件。

如果想根据两个key排序,例如,现根据name排序,再根据city排序,就用一个类似这样的keySmith^Baltimore

Hadoop中大多使用Text类型。排序数值数据时要小心,如果按字符串比较,1000要比9小,这时应该使用数值类型比较。

Consequences

输出文件包含排序的数据,并且输出文件名也是排序的。可使用hadoop fs –cat output/part-r-*用排序的方式查看数据。

Resemblances

Sql:用sql写全局排序非常简单:
SELECT FROM data ORDER BY col1;

 

Pig

Pig中的排序语句也很容易。但是一个代价高的操作。在这种场景下,会运行多个MapReduce job,首先决定分区,然后执行排序。

c = ORDER b BY col1;

Performance analysis

这种操作的代价是很高的,因为要加载和解析数据两次:首先构建分区范围文件,然后执行排序。

Job构建分区的操作比较简单和有效率,因为只有一个reducer,网络传输数据量较小。输出文件也较小,所以写操作不费力。你可能只是偶尔运行,这样更显不出它的消耗了。

排序阶段的性能特征跟其它数据重组模式类似,需要把所有数据通过网络并写所有数据。因此,应该使用相对较多的reducer

Total Order Sorting Examples

Sort users by last visit

的用户数据是按账户排序的。而我们希望根据最后访问网站的时间排序。这个例子中,使用一个特殊的驱动来跑分析和排序两个阶段。这样也会有两个MapReduce job数据集。

 

Driver code。可分解为两部分:通过采样创建分区列表,执行排序。第一部分解析命令行参数,创建输入输出对象,包括分区列表文件及临时目录。分区列表使用TotalOrderPartitioner,来保证键值对正确的排序。临时目录用于存储这两个job的中间输出。第一个job的配置没有特殊的地方,只需要保证是map-only的,并使用SequenceFileOutputFormat.

  

 public static void main(String[] args) throws Exception {
       Configuration conf = new Configuration();
       Path inputPath = new Path(args[0]);
       Path partitionFile = new Path(args[1] + "_partitions.lst");
       Path outputStage = new Path(args[1] + "_staging");
       Path outputOrder = new Path(args[1]);
       // Configure job to prepare for sampling
       Job sampleJob = new Job(conf, "TotalOrderSortingStage");
       sampleJob.setJarByClass(TotalOrderSorting.class);
       // Use the mapper implementation with zero reduce tasks
       sampleJob.setMapperClass(LastAccessDateMapper.class);
       sampleJob.setNumReduceTasks(0);
       sampleJob.setOutputKeyClass(Text.class);
       sampleJob.setOutputValueClass(Text.class);
       TextInputFormat.setInputPaths(sampleJob, inputPath);
       // Set the output format to a sequence file
       sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
       // Submit the job and get completion code.
       int code = sampleJob.waitForCompletion(true) ? 0 : 1; 
       if (code == 0) {
           Job orderJob = new Job(conf, "TotalOrderSortingStage");
           orderJob.setJarByClass(TotalOrderSorting.class);
           // Here, use the identity mapper to output the key/value pairs in  the SequenceFile
          orderJob.setMapperClass(Mapper.class);
          orderJob.setReducerClass(ValueReducer.class);
          // Set the number of reduce tasks to an appropriate number for the  amount of data being sorted
          orderJob.setNumReduceTasks(10);
            // Use Hadoop's TotalOrderPartitioner class
           orderJob.setPartitionerClass(TotalOrderPartitioner.class);
           // Set the partition file
           TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(), partitionFile);
            orderJob.setOutputKeyClass(Text.class);
           orderJob.setOutputValueClass(Text.class);
          // Set the input to the previous job's output
           orderJob.setInputFormatClass(SequenceFileInputFormat.class);
           SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
          // Set the output path to the command line parameter
           TextOutputFormat.setOutputPath(orderJob, outputOrder);
          // Set the separator to an empty string
          orderJob.getConfiguration().set( "mapred.textoutputformat.separator", "");
          // Use the InputSampler to go through the output of the previous job, sample it, and create the partition file
          InputSampler.writePartitionFile(orderJob,  new InputSampler.RandomSampler(.001, 10000));
         // Submit the job
         code = orderJob.waitForCompletion(true) ? 0 : 2;
      }
      // Clean up the partition file and the staging directory
      FileSystem.get(new Configuration()).delete(partitionFile, false);
      FileSystem.get(new Configuration()).delete(outputStage, true);
      System.exit(code)      
    }

第二个job使用identity mapper和自定义的reducer。第一个job的输出作为这个job的输入的一部分,所以只需要identity mapper简单的输出原来格式的键值对。Job被配置成10reducer,当然也可以是其它合理的数量。下一步,配置分区文件。

 

接下来比较重要的是使用InputSampler功能。采样器会根据job配置的目录里的数据写一个分区文件。使用RandomSampler,它会取前面job输出的样本的一部分,这个数量可以配置。

这是一个昂贵的操作,整个输出都用构造器读。另一种RandomSampler的构造器可以设定要取样的输入分片号码,这会增加执行时间,但获得不了分布式带来的好处。

 

分区文件被创建后,job就可以执行。随后分区文件和临时目录被删掉,因为已不再需要。

 

如果数据分布将来不太可能改变,可以把分区文件保存起来,重复利用。

 

if (code == 0) {
    Job orderJob = new Job(conf, "TotalOrderSortingStage");
    orderJob.setJarByClass(TotalOrderSorting.class);
    // Here, use the identity mapper to output the key/value pairs in
    // the SequenceFile
    orderJob.setMapperClass(Mapper.class);
    orderJob.setReducerClass(ValueReducer.class);
    // Set the number of reduce tasks to an appropriate number for the
    // amount of data being sorted
    orderJob.setNumReduceTasks(10);
    // Use Hadoop's TotalOrderPartitioner class
    orderJob.setPartitionerClass(TotalOrderPartitioner.class);
    // Set the partition file
    TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
           partitionFile);
    orderJob.setOutputKeyClass(Text.class);
    orderJob.setOutputValueClass(Text.class);
    // Set the input to the previous job's output
    orderJob.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
    // Set the output path to the command line parameter
    TextOutputFormat.setOutputPath(orderJob, outputOrder);
    // Set the separator to an empty string
    orderJob.getConfiguration().set(
           "mapred.textoutputformat.separator", "");
    // Use the InputSampler to go through the output of the previous
    // job, sample it, and create the partition file
    InputSampler.writePartitionFile(orderJob,  new InputSampler.RandomSampler(.001, 10000));
    // Submit the job
    code = orderJob.waitForCompletion(true) ? 0 : 2;
}
// Clean up the partition file and the staging directory
FileSystem.get(new Configuration()).delete(partitionFile, false);
FileSystem.get(new Configuration()).delete(outputStage, true);
System.exit(code)

Analyze mapper code。这个mapper只抽取记录的上次访问日期字段,并设置为排序key。输入值一并输出。这些键值对被写到SequenceFileTotalOrderPartitioner可以根据其创建分区列表。

 

 public static class LastAccessDateMapper extends
            Mapper<Object, Text, Text, Text> {

        private Text outkey = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                    .toString());
            outkey.set(parsed.get("LastAccessDate"));
            context.write(outkey, value);
        }
    }


Order mapper code。使用identity mapper,略。

Order reducer code。因为TotalOrderPartitioner关注整体排序,所有的reducer只需要输出NullWritable类型的值。每个reducer产生的产生的文件都是按最后访问日期排序的。Partitioner保证所有这些文件合起来仍然是全局有序的数据集。

 

 public static class ValueReducer extends
            Reducer<Text, Text, Text, NullWritable> {

        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text t : values) {
                context.write(t, NullWritable.get());
            }
        }
    }


Shuffling

Pattern Description

全局排序和混洗模式在效果上是相反的,但接下来都会关注记录的排序。

Intent

随机合并数据集。

Motivation

本章其它模式的要做排序,而这个模式是要打乱排序。

这种使用相对较少,较深入。但两种问题也摆到桌面上了。一种是混洗数据达到隐藏的目的。另一种是把数据随机分布,用于可重复的随机采样。

 

隐藏数据最近变得重要,可以获得用户隐私方面的信息。数据的顺序可以提供暴露用户身份的信息。依靠混洗整个数据,可以隐藏数据信息。

 

另一种混洗数据的原因是执行某种可重复的随机取样。例如,前100条记录就是简单随机取样。每次取出前100条记录,我们会得到相同的样本。这允许分析能够通过跑随机取样得到重复的结果集。也不必每次都去跑job产生随机样本。

Structure

·mapper输出记录作为值,key为一个随机数。

·reducer排序随机数的key,进一步打乱数据。

 

换句话说,每条记录随机发送到reducer,每个reducer根据随机产生的key排序,产生那个reducer上的随机顺序。

Notice:混洗模式的mapper阶段做的事情不多,这是进一步处理数据的好时机,可以转成一种隐藏格式。

Consequences

每个Reducer输出的文件都包含随机的记录。

Resemblances

Sql

等价此模式的sql写法是根据随机值排序,而不根据某列。这样一来,记录之间的比较就根据产生的随机数比较,从而产生随机的排序。不必像上一个模式那样用MapReduce做全局排序,因为发送记录到一个随机的reducer已经足够了。

SELECT FROM data ORDER BY RAND()

 

Pig

Pig中的混洗可以像sql中那样做:按随机列排序。这种情况下做全局排序是不需要的。相反,我们可以根据随机key分组,让分组顺序单调。这样就有效的实现了混洗模式。

c = GROUP b BY RANDOM();

d = FOREACH c GENERATE FLATTEN(b);

Performance analysis

混洗有几个很好的性能点。因为每条记录所去的reducer是随机的,数据通过reducer分发会比较平衡。使用更多的reducer,数据的伸展性更好。文件的大小也可以被预测:数据集大小除以reducer的个数。这很容易得到想要的大小的数据文件。

 

除此之外,本章其它模式的性能点同样适用。这种模式会通过网络发送所有数据,并写到hdfs,所以应该使用相对较多的reducer

Shuffle Examples

Anonymizing * comments

为了隐藏评论信息,这个例子剔除用户id和行id,然后截断日期时间,仅保留日期。然后混洗。

问题:给出大量评论数据,隐藏评论的某些信息:移除id,移除时间。然后随机混洗数据。

 

Mapper codeMapper用通用方法转换和解析数据。浏览xml的属性,基于属性创建xml的行。如果是用户id或行id,忽略。如果是创建时间,该属性内容中字符“T”后面的内容去掉。其它的简单的输出属性和值。生成随机key和构建的新记录作为值一并输出。

 

public static class AnonymizeMapper extends
        Mapper<Object, Text, IntWritable, Text> {

    private IntWritable outkey = new IntWritable();
    private Random rndm = new Random();
    private Text outvalue = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());
        if (parsed.size() > 0) {
            StringBuilder bldr = new StringBuilder();
            // Create the start of the record
            bldr.append("<row ");
            // For each XML attribute
            for (Entry<String, String> entry : parsed.entrySet()) {
                // If it is a user ID or row ID, ignore it
                if (entry.getKey().equals("UserId")
                        || entry.getKey().equals("Id")) {
                } else if (entry.getKey().equals("CreationDate")) {
                  // If it is a CreationDate, remove the time from the
                    // date
                    // i.e., anything after the 'T' in the value
                    bldr.append(entry.getKey()
                            + "=""
                            + entry.getValue().substring(0,
                                    entry.getValue().indexOf('T')) + "" ");
                } else {
                    // Otherwise, output the attribute and value as is
                    bldr.append(entry.getKey() + "="" + entry.getValue()
                            + "" ");
                }
            }
            // Add the /> to finish the record
            bldr.append("/>");
            // Set the sort key to a random value and output
            outkey.set(rndm.nextInt());
            outvalue.set(bldr.toString());
            context.write(outkey, outvalue);
        }
    }
}


Reducer code。按顺序输出值,随机key忽略。

 

public static class ValueReducer extends
        Reducer<IntWritable, Text, Text, NullWritable> {

    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for (Text t : values) {
            context.write(t, NullWritable.get());
        }
    }
}