MapReduce Design Patterns(4. 分层,分区)(七) Chapter 4. Data Organization Patterns

MapReduce Design Patterns(4. 分层,分区)(七)

Chapter 4. Data Organization Patterns

http://blog.csdn.net/cuirong1986/article/details/8476368



与前面章节的过滤器相比,本章是关于数据重组。个别记录的价值通常靠分区,分片,排序成倍增加。特别是在分布式系统中,因为这能提高性能。

 

在很多组织结构方面,Hadoop和其它MapReduce使用案例仅仅是大数据分析平台上一片数据的处理。数据通常被转换成跟其它系统有良好接口的形式,同样,数据也可能从原来状态转成一种新的状态,从而使MapReduce分析更容易。

本章包括下面几个子模式:

·分层结构模式

·分区和装箱模式

·全局排序和混洗模式

·生成数据模式

 

本章的模式通常一起使用来解决数据的组织问题。例如,你可能想调整数据分层,装箱,然后对箱进行排序。第六章的“Job Chaining“详细介绍了怎用把各种模式组合起来解决复杂的问题。

Structured to Hierarchical

Pattern Description

结构分层模式会根据数据创建新的不同结构的记录。由于这种模式的重要性,在本章很多地方独立存在。

Intent

把基于行的数据转换成有层次的格式,例如,json xml

Motivation

当从RDBMSHadoop系统迁移数据时,首先考虑的一件事就是重组数据成为一种有意义的结构。因为hadoop不会关心数据格式,你应该充分利用分层次数据的优势避免做join。例如:我们的*数据包含一张评论表,一张发帖表。很明显这是存储在标准的sql数据库的。当你访问发帖表时,所有数据库中的数据块要整合成一个视图来展现。这使得想分析个别发帖时变得更复杂。设想用发帖的长度跟评论的长度相关联,这需要首先做一次代价较高的join操作,然后抽取有用的数据。如果换成根据发帖分组数据,使发帖跟相关联的评论,编辑,修改数据紧挨着(例如反规范化表数据),这样分析起来会更容易和直观。这种情况下保存结构化数据完全达不到目的。

不幸的是,数据不总是分组在一块。当某人回复了*的某个问题,hadoop不能够把这条记录立刻插到层次数据中。因此,用MapReduce创建非结构化记录只适用于周期性的批处理形式的业务逻辑。

 

另一种能平稳更新数据的方式是用HbaseHbase能存储半结构化和层次样式的数据。MongoDB也能很好的处理这种数据的排序。

Applicability

这种模式适合的场景:

·你的数据靠很多外键相联系

·你的数据是结构化的基于行的

Structure

4-1展示了这种模式的结构,每部分组件描述如下:

·如果你想合并多个数据源成为一个有层次的数据结构,hadoop中有个类:org.apache.hadoop.mapreduce.lib.input.MultipleInputs很适合使用(貌似很老的包)。Mutipleinputs允许你对不同的input使用不同的input path和不同的mapperDriver里完成配置。如果只有一个来源,则不需要这一步。

·mapper加载数据并解析成紧凑的格式从而使reducer更容易。输出key应该是你想要标识的每一条层次记录的根。例如,在*例子里,根是发帖id。也需要给没片数据标注来源信息。也需要标识输出记录是发帖还是评论。这样的话,就能简单连接这些标签并输出值。

·通常,这里用combiner起不了多大作用。可以用相同的key对条目分组,一起发送,但这样没有压缩所起到的好处,因为要做的知识连接字符串,所以输出大小不变。

·reducerkey从不同的源接收数据。所有的数据对指定的分组,每组都会产生一个迭代器,剩下你需要做的就是用数据条目构建有层次的数据结构。使用xmljson,可以构建一个简单的对象并输出。这部分的例子使用xml,提供了几个方便的方法处理结构化数据。如果使用其他格式,例如自定义的格式,也要使用合适的构建对象的方法和序列化方法。

 MapReduce Design Patterns(4. 分层,分区)(七)

Chapter 4. Data Organization Patterns

Figure 4-1. The structure of the structured to hierarchical pattern

Consequences

输出是一种有层次的形式,根据指定的key分组的。

注意很多格式例如xml json都有某种顶层根元素包在所有记录外面。如果想让文档从根到底部都有良好的格式,也比较容易在特定的处理阶段加上头部或尾部。

 

Known uses

Pre-joining data

数据是杂乱的结构化数据集,为了分析,也很容易把数据组合成更复杂的对象。通过这样,你设置好数据来充分利用分析nosql模型的优势。

Preparing data for HBase or MongoDB

Hbase是很自然的存储这类数据的方式。所以可以用这种方法把数据搞到一起,作为加载到hbasemongoDB的准备工作。创建hbase表,然后通过MapReduce执行大量导入是很高效的。另一种方案是分几次导入,可能效率较低。

Resemblances

Sql

RDB中这样的事情是很少见的,因为这样存储用sql分析不是很方便。然而,这种方式可以解决RDBMS中的类似问题,比如做join然后在结果上做分析。

Pig

Pig对层次数据有适当的支持,可以取到层次的包和元组,然后就能容易的展现出层次结构和列出单条记录的对象。Pig中的cogroup方法会保存原始结构做大量的合并数据的操作。然而,使用这个关键词在复杂记录上做任何种类的实时分析都会有挑战性。所以,自定义个方法是好的选择。基本做法是使用pig创建并分组记录,然后用udf去分析数据。

data_a = LOAD '/data/comments/' AS PigStorage('|');

data_b = LOAD '/data/posts/' AS PigStorage(',');

grouped = COGROUP data_a BY $2, data_b BY $1;

analyzed = FOREACH grouped GENERATE udfs.analyze(group, $1, $2);

。。。

Performance analysis

使用这种模式时有两个性能关注点。第一个,需要知道mapper发送到reducer的数据量,第二个,要清楚reducer创建的对象的内存使用情况。

因为要使用key分组的记录可以遍布在数据集的任何地方,很多数据会通过网络传输。基于此原因,你需要注意使用适当的reducer的数量。策略跟其它模式中的一样。

 

下一个主要关注的是数据的热点问题的可能性,它可以导致记录的暴增。对大的数据集,一个特殊的输出记录会有很多数据与之关联是很可能出现的情况。设想下*上发了一个帖子,有一百万的评论。这种情况相对少见,但不是不可能的。如果你在创建某种xml对象,所有的评论某一时刻在输出之前都会存储在内存里。这可能造成jvm OOM,这种情况应该避免。

 

另一个热点问题是reducer处理的数据的倾斜问题。这跟普通MapReduce job可能遇到的问题相似。很多时候,倾斜问题可以忽略,如果倾斜很严重就写个自定义的partitioner使数据分发得更均匀。

Structured to Hierarchical Examples

Post/comment building on *

这个例子里,我们将拿到*的发帖和评论数据并分组。层次结构如下所示:

Posts

  Post

    Comment

    Comment

  Post

    Comment

    Comment

    Comment

 

问题:给出用户发帖和评论数据,创建结构化的xml层次结构,将相关的评论嵌套在对应的发帖之内。

 

Driver code。我们通常不会描述驱动代码,但这个例子中会加入新的东西:MultipleInput

创建这个对象,把评论数据和发帖数据的路径加到指定的mapper。这两个路径是通过命令行提供的,通过args数组获得。

 

public static void main(String[] args) throws Exception {
       Configuration conf = new Configuration();
       Job job = new Job(conf, "PostCommentHierarchy");
       job.setJarByClass(PostCommentBuildingDriver.class);
       MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, PostMapper.class);
       MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CommentMapper.class);
       job.setReducerClass(UserJoinReducer.class);
       job.setOutputFormatClass(TextOutputFormat.class);
       TextOutputFormat.setOutputPath(job, new Path(args[2]));
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);
       System.exit(job.waitForCompletion(true) ? 0 : 2);
}


Mapper code。这里有两个mapper类,分别是评论和发帖类。两者都用发帖id作为输出key。输出值用字符(p代表发帖,c代表评论),我们在reduce阶段就会知道数据来自哪里。

 

public static class PostMapper extends Mapper<Object, Text, Text, Text> {

    private Text outkey = new Text();
    private Text outvalue = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());
        // The foreign join key is the post ID
        outkey.set(parsed.get("Id"));
        // Flag this record for the reducer and then output
        outvalue.set("P" + value.toString());
        context.write(outkey, outvalue);
    }
}

public static class CommentMapper extends Mapper<Object, Text, Text, Text> {

    private Text outkey = new Text();
    private Text outvalue = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
        // The foreign join key is the post ID
        outkey.set(parsed.get("PostId"));
        // Flag this record for the reducer and then output
        outvalue.set("C" + value.toString());
        context.write(outkey, outvalue);
    }
}

 

Reducer codeReducer创建有层次的xml对象。所有的值被迭代,来得到发帖记录和对应的评论列表。我们知道根据标识分别记录并加到值里。标识在指派为帖子(对应为帖子)或加到list(对应为评论)时移除掉。如果帖子不为空,就按帖子为父节点,评论为子节点创建xml记录。

紧随其后的是nestElements。使用xml api创建xml记录,你可以在需要时尽情使用。

   

public static class PostCommentHierarchyReducer extends
        Reducer<Text, Text, Text, NullWritable> {

    private ArrayList<String> comments = new ArrayList<String>();
    private DocumentBuilderFactory dbf = DocumentBuilderFactory
            .newInstance();
    private String post = null;

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // Reset variables
        post = null;
        comments.clear();
        // For each input value
        for (Text t : values) {
            // If this is the post record, store it, minus the flag
            if (t.charAt(0) == 'P') {
                post = t.toString().substring(1, t.toString().length())
                        .trim();
            } else {
                // Else, it is a comment record. Add it to the list, minus
                // the flag
                comments.add(t.toString()
                        .substring(1, t.toString().length()).trim());
            }
        }
        // If there are no comments, the comments list will simply be empty.
        // If post is not null, combine post with its comments.
        if (post != null) {
            // nest the comments underneath the post element
            String postWithCommentChildren = nestElements(post, comments);
            // write out the XML
            context.write(new Text(postWithCommentChildren),
                    NullWritable.get());
        }
    }

}

 

nestElements方法拿到帖子和评论的列表,创建一个xml字符串并输出。使用DocumentBuilder和一些额外的帮助方法拷贝Element对象为新的,还有他们的属性。这种拷贝发生在数据行转为帖子或评论时的重命名标签时。最终的document被转换成了xml

 

private String nestElements(String post, List<String> comments) {
        // Create the new document to build the XML
        DocumentBuilder bldr = dbf.newDocumentBuilder();
        Document doc = bldr.newDocument();
        // Copy parent node to document
        Element postEl = getXmlElementFromString(post);
        Element toAddPostEl = doc.createElement("post");
        // Copy the attributes of the original post element to the new one
        copyAttributesToElement(postEl.getAttributes(), toAddPostEl);
        // For each comment, copy it to the "post" node
        for (String commentXml : comments) {
            Element commentEl = getXmlElementFromString(commentXml);
            Element toAddCommentEl = doc.createElement("comments");
            // Copy the attributes of the original comment element to
            // the new one
            copyAttributesToElement(commentEl.getAttributes(),
                    toAddCommentEl);
            // Add the copied comment to the post element
            toAddPostEl.appendChild(toAddCommentEl);
        }
        // Add the post element to the document
        doc.appendChild(toAddPostEl);
        // Transform the document into a String of XML and return
        return transformDocumentToString(doc);
    }

    private Element getXmlElementFromString(String xml) {
        // Create a new document builder
        DocumentBuilder bldr = dbf.newDocumentBuilder();
        return bldr.parse(new InputSource(new StringReader(xml)))
                .getDocumentElement();
    }

    private void copyAttributesToElement(NamedNodeMap attributes,
            Element element) {
        // For each attribute, copy it to the element
        for (int i = 0; i < attributes.getLength(); ++i) {
            Attr toCopy = (Attr) attributes.item(i);
            element.setAttribute(toCopy.getName(), toCopy.getValue());
        }
    }

    private String transformDocumentToString(Document doc) {
        TransformerFactory tf = TransformerFactory.newInstance();
        Transformer transformer = tf.newTransformer();
        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION,
                "yes");
        StringWriter writer = new StringWriter();
        transformer.transform(new DOMSource(doc), new StreamResult(writer));
        // Replace all new line characters with an empty string to have
        // one record per line.
        return writer.getBuffer().toString().replaceAll("
|
", "");
    }



Question/answer building on *

本例是前面例子的后续处理,会使用前面例子的输出作为本例的输入数据。现在,我们已经得到了帖子和跟帖子关联的评论,现在我们要关联帖子的提问和帖子的回答。这是需要做的,因为帖子由回答帖和提问帖构成,并根据postTypeId区分。我们将根据提问的id和回答的父id分组到一起。

 

问题:使用前面例子的输出,执行自关联操作,创建问题帖,回答帖,和评论的层次结构。

 

Mapper code。首先决定记录是问题还是回答,因为这两种记录的行为不同。对于问题,抽取它的id作为key,并标记为问题。对于回答,抽取父id作为key,并标记为回答。

 

public class QuestionAnswerBuildingDriver {

    public static class PostCommentMapper extends
            Mapper<Object, Text, Text, Text> {

        private DocumentBuilderFactory dbf = DocumentBuilderFactory
                .newInstance();
        private Text outkey = new Text();
        private Text outvalue = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // Parse the post/comment XML hierarchy into an Element
            Element post = getXmlElementFromString(value.toString());
            int postType = Integer.parseInt(post.getAttribute("PostTypeId"));
            // If postType is 1, it is a question
            if (postType == 1) {
                outkey.set(post.getAttribute("Id"));
                outvalue.set("Q" + value.toString());
            } else {
                // Else, it is an answer
                outkey.set(post.getAttribute("ParentId"));
                outvalue.set("A" + value.toString());
            }
            context.write(outkey, outvalue);
        }

        private Element getXmlElementFromString(String xml) {
            // same as previous example, “Mapper code” (page 80)
        }
    }
} 


Reducer codeReduce代码跟前面例子相似。迭代输入值,抽取问题和回答,并移除标记。然后将回答嵌套在问题里面。不同点只是xml的标签不同。

 

public static class QuestionAnswerReducer extends
        Reducer<Text, Text, Text, NullWritable> {

    private ArrayList<String> answers = new ArrayList<String>();
    private DocumentBuilderFactory dbf = DocumentBuilderFactory
            .newInstance();
    private String question = null;

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // Reset variables
        question = null;
        answers.clear();
        // For each input value
        for (Text t : values) {
            // If this is the post record, store it, minus the flag
            if (t.charAt(0) == 'Q') {
                question = t.toString().substring(1, t.toString().length())
                        .trim();
            } else {
                  // Else, it is a comment record. Add it to the list, minus
                // the flag
                answers.add(t.toString()
                        .substring(1, t.toString().length()).trim());
            }
        }
        // If post is not null
        if (question != null) {
            // nest the comments underneath the post element
            String postWithCommentChildren = nestElements(question, answers);
            // write out the XML
            context.write(new Text(postWithCommentChildren),
                    NullWritable.get());
        }
    }
}


Partitioning

Pattern Description

分区模式是把记录移动到目录里(分片,分区,或箱)。但不关心记录的顺序。

Intent

获取数据集中相似的记录并把它们分区放入不重复的,更小的数据集。

Motivation

如果你想查看一个特殊的数据集例如根据日期检索条数需要检索的数据通常遍布在整个数据集。所以查看一个子集也要scan整个数据集。分区的意思是,根据分析的相关性,把数据分成小的子集。为了提高性能,可以跑一个job根据分区把数据分到不同的目录。需要分析时,只需查看某个目录下的数据。

根据日期分区是最常见的方案。对分析一定时间段数据比较有用,因为数据已经按这种条件分组了。例如,假设hadoop集群上的某数据跨度为3年,由于某种原因数据不是按日期排序的。如果只想取当年127号到23号的数据,只能扫描全部数据。但如果能按月份分区,只需要在1月和2月分区上跑MapReduce job,这样比按天分区要好一点。

 

分区能帮助解决数据集有几种不同类型记录的问题,NoSQL中越来越常见。例如,http服务器日志有getpost两种请求,系统消息和错误消息。分析可能只针对某一类数据,所以分区能使MapReduce job跑的数据量降低。

RDBMS中,典型的分区是where条件中用得最多的字段。例如,按国家过滤数据,就可以按国家分区。MapReduce也可以应用这个。如果你发现经常根据相同的条件过滤数据,就应该考虑分区了。

除了要构建分区,这种模式基本没负面影响。如果需要的话,MapReduce job也可以在所有的分区上跑数据。

Applicability

这种模式最主要的是要预知分区的数量。例如,如果对一周的每天分区,可以得到7个分区。

可以通过运行一个分析获得能决定分区数量的信息。例如,有一段时间戳范围的数据,但不知道具体范围,跑个job计算出数据的具体范围。

Structure

这种模式利用partitioner分割数据。没有确切的分区逻辑。需要做的只是通过自定义partitioner决定一条记录去哪个分区。

4-2展示了这种模式的结构。

 

·大多数情况下可以使用identity 作为mapper

·自定义partitioner是这种模式的关键点。它决定哪个reducer接收哪条记录,每个reduce对应一个特定的分区。

·大多数情况下,可以使用identity作为reducer。但是如果需要的话,这种模式能在reducer中做额外的工作。数据仍然会分组和排序,所以可以去重,聚合,求和,针对每一个分区。

 

Consequences

Job的输出目录每个分区一个分区文件。

Notice:因为每个目录会写入一个大文件,所以用块压缩的sequenceFiles存储数据较好,这种方式已被证明最有效并且是一种容易使用的格式。

MapReduce Design Patterns(4. 分层,分区)(七)

Chapter 4. Data Organization Patterns

Figure 4-2. The structure of the partitioning pattern

Known uses

Partition pruning by continuous value

如果有某连续变量的排序,例如日期或数值,每次只会关注某一条件的数据的子集。把数据分成一块一块可以使job只加载相关的数据。

Partition pruning by category

这里不是根据连续的变量,而是某些已知类型,例如国家,地区号码,或语言。

Sharding

系统架构要分割数据,例如不同的磁盘,你需要把数据放进这些存在的分片中。

Resemblances

Sql

有些sql数据库可以自动对表分区。在跑sql之前就能拍出不需要的数据。

Other patterns

这种模式跟本章的分箱模式类似。

Performance analysis

主要的性能关注点是最终的分区之间记录数量可能不相近。可能出现一个分区拥有整个数据50%的数据量。如果实现得比较烂,所有数据发送到了一个reducer会降低处理效率。

避免这种情况很容易。把大的分区分成几个小的分区,甚至只需要随即分发。对一个分区指定多个reducer,然后随机分发会更好一点。

例如,考虑*用户的最后访问时间。如果根据这个时间的月份分区,那么最近一个月的数据会比其它月份的大得多。为了防止这种倾斜,把最近一个月数据按天分区,或只是随机就好。

Partitioning Examples

Partitioning users by last access date

数据中,用户是按注册时间排序的。我们想根据一年内最后一次访问日期重组数据到分区里。可以根据时间创建自定义的partitioner分发记录。

 

问题:给出用户信息数据,根据一年内最后访问日期分区数据,一年一个大分区。

 

Driver codeJob需要配置使用自定义的partitioner。最小的上次访问年份需要配置,2008.原因在partitioner代码部分解释。Reducer的个数很重要,保证覆盖要计算的分区的范围。作者在2012年跑这个例子,最后访问日期的年份跨度是2008-2011.意味着job要有4reducer

 

// Set custom partitioner and min last access date
    job.setPartitionerClass(LastAccessDatePartitioner.class);
    LastAccessDatePartitioner.setMinLastAccessDate(job, 2008);
    // Last access dates span between 2008-2011, or 4 years
    job.setNumReduceTasks(4);

Mapper code。处理输入记录,日期作为key,整条记录作为值输出。Partitioner可以发挥分发的作用,随后的reduce输出阶段,key可以忽略掉。

 

public static class LastAccessDateMapper extends
        Mapper<Object, Text, IntWritable, Text> {

    // This object will format the creation date string into a Date object
    private final static SimpleDateFormat frmt = new SimpleDateFormat(
            "yyyy-MM-dd'T'HH:mm:ss.SSS");
    private IntWritable outkey = new IntWritable();

    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());
        // Grab the last access date
        String strDate = parsed.get("LastAccessDate");
        // Parse the string into a Calendar object
        Calendar cal = Calendar.getInstance();
        cal.setTime(frmt.parse(strDate));
        outkey.set(cal.get(Calendar.YEAR));
        context.write(outkey, value);
    }
}


Partitioner code。这个类做它该做的事情。实现了configurable接口。Setconf方法在配置partitioner时使用。从配置里取最后一次访问的最小年份。Driver会调用DatePartitioner.setMinLastAccessDate取到job配置阶段配置的这个日期。最小的上次访问日期是2008,所以这些用户会写到0分区。

 

public static class LastAccessDatePartitioner extends
        Partitioner<IntWritable, Text> implements Configurable {

    private static final String MIN_LAST_ACCESS_DATE_YEAR = "min.last.access.date.year";
    private Configuration conf = null;
    private int minLastAccessDateYear = 0;

    public int getPartition(IntWritable key, Text value, int numPartitions) {
        return key.get() - minLastAccessDateYear;
    }

    public Configuration getConf() {
        return conf;
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
        minLastAccessDateYear = conf.getInt(MIN_LAST_ACCESS_DATE_YEAR, 0);
    }

    public static void setMinLastAccessDate(Job job, int minLastAccessDateYear) {
        job.getConfiguration().setInt(MIN_LAST_ACCESS_DATE_YEAR,
                minLastAccessDateYear);
    }
}


Reduce code。这一阶段很简单,因为只需要输出值。

 

public static class ValueReducer extends
        Reducer<IntWritable, Text, Text, NullWritable> {

    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for (Text t : values) {
            context.write(t, NullWritable.get());
        }
    }
}