MapReduce Design Patterns(3. 反向索引、计数器)(四)

http://blog.csdn.net/cuirong1986/article/details/8456923


Inverted Index Summarizations

Pattern Description

反向索引模式在MapReduce分析中经常作为一个例子。我们将会讨论我们要创建的term跟标识符之间映射的一般情况。

 

Intent

根据数据集生成索引,用于快速搜索或数据的富集能力。

Motivation

根据关键词索引大数据非常方便,搜索能追踪term找到包含指定值的记录。创建索引需要之前进行额外的处理,花时间去做这项工作能有效减少我们寻找东西的时间。

 

搜索引擎为了提高搜索性能创建索引。设想键入一个关键词,让引擎去互联网抓取信息并创建一个页面的列表返回给你。这样的查询会耗费巨大的时间。如果创建一个反向索引,搜索引擎会在之前就知道跟关键词相关的互联网页面,结果很简单的展现给用户。这种索引也经常注入数据库中,为了更快的查询响应。使用MapReduce创建反向索引是相对简单的任务,因为框架处理绝大多数工作。

Applicability

反向索引用在搜索查询时需要快速响应的情况下。查询的结果能被预先处理并放入数据库。

Structure

2-5展示了MapReduce中执行反向索引的组织结构。下面详细介绍MapReduce组件各部分的功能:

 

·mapper输出需要索引的字段作为key,唯一标识符作为value

·如果使用identity reducer,可以不用combiner,因为这种情况下combiner仅仅执行一些不需要的处理。很多实现在输出到文件系统之前将组和值关联起来。这种情况下,combiner就能使用。这里对字节计数没有有益的影响,不像其它模式里的combiner,但也会有一种改进。

·partitioner负责决定含有相同key的值能够拷贝到同一个reducer中。如果中间键不是平均分发的,为了更有效的负载均衡,可以重新定义partitioner

·reducer将接收一系列唯一的记录标识跟输入key关联。标识符可以是一些唯一的分隔符连接起来,使输出为每组一个键值对,也可以是输入key里的value,正如identity reducer

MapReduce Design Patterns(3. 反向索引、计数器)(四)

Figure 2-5. The structure of the inverted index pattern

 

Consequences

最终的输出是包含字段到一系列包含相关字段值的记录的标识符的映射的分片文件的集合。

Performance analysis

创建反向索引的性能主要取决于mapper中解析内容的计算代价,索引键的基数,和每个key中内容标识符的数量。

 

Mapper中解析文本或其他类型的内容,有时是计算紧张的操作。这个问题对半结构化的数据特别突出,例如xmljson,因为这种类型可能需要解析任意的信息量,到一个可用的对象中。尽可能高效的解析传入的记录来提高整个job的性能,是非常重要的。

 

如果唯一键的数量和标识符的数量非常巨大,将会发送到reducer更多的数据。越多的数据发送到reducer,你应该增加reducer的数量来提高reduce阶段的并行度。

 

反向索引对索引键中的热点非常敏感,因为索引键很少是均匀分布的。例如,reducer在文本搜索中处理单词”the”,将会变得非常繁忙,因为文本中有大量的单词“the”。这将因为个别执行时间较长的reducers而影响到整个job的执行。为了避免这个问题,你可能需要实现一个自定义的partitioner,或忽略这个关键词,不给值。

Inverted Index Example

Wikipedia reference inverted index

创建反向索引对MapReduce来说是一项简单的工作,它经常作为初学者继word count之后的第二个例子。与word count很像,大多数的工作由MapReduce框架来做。

 

假设要在每个引用到*评论的wikipedia页面添加*的链接。下面的例子分析每一个*的评论来找出是否从wikipedia链接过来的。如果是,链接跟评论的id一同输出,生成反向索引。到reduce阶段时,引用到相同链接的评论id会分组到一起。然后通过空格分隔符把分组连接起来,输出到文件系统。至此,可以用这些数据文件中引用到wikipedia的所有评论更新wikipedia页面。

 

问题:给出用户评论数据,在一系列回答问题的id上创建wikipedia的反向索引

 

Mapper codeMapper解析*帖子数据输出wikipedia url和回帖记录的id。从xml属性中抽取内容,提交类型(发帖or回帖),记录id。如果提交类型是提问,不是回答,标识为“2”(代码中明明写的是1,我的钛合金狗眼。。。)(本人认为有误),然后解析内容,找到wikipedia url。使用getWikipediaURL方法,传入非转义的html文本,找到就返回,否则返回空。这段代码就不在这里列出了。如果url找到,就把它当做key,记录id作为值一并输出。

 

public static class WikipediaExtractor extends
        Mapper<Object, Text, Text, Text> {

    private Text link = new Text();
    private Text outkey = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());
// Grab the necessary XML attributes
        String txt = parsed.get("Body");
        String posttype = parsed.get("PostTypeId");
        String row_id = parsed.get("Id");
// if the body is null, or the post is a question (1), skip
        if (txt == null || (posttype != null && posttype.equals("1"))) {
            return;
        }
// Unescape the HTML because the SO data is escaped.
        txt = StringEscapeUtils.unescapeHtml(txt.toLowerCase());
        link.set(getWikipediaURL(txt));
        outkey.set(row_id);
        context.write(link, outkey);
    }
}


 

Reduce codeReducer会迭代所有值,以string类型,空格为分隔符,把记录id连接起来。输入key作为输出keyvalue是连接起来的string

 

public static class Concatenator extends Reducer<Text, Text, Text, Text> {

    private Text result = new Text();

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        boolean first = true;
        for (Text id : values) {
            if (first) {
                first = false;
            } else {
                sb.append(" ");
            }
            sb.append(id.toString());
        }
        result.set(sb.toString());
        context.write(key, result);
    }
}


 

Combiner optimizationCombiner可以在reduce阶段之前做一些预连接。因为所有的记录id简单连接在一起,需要拷贝到reducer的字节量比数值聚合模式中的要多。Reduce代码可以用作combiner

 

Counting with Counters

Pattern Description

这种模式利用了MapReduce框架本身的计数器功能在map端做全局的计算,不做任何输出。

Intent

一种获取大数据量下总体计数值得有效手段。

Motivation

一个计数或总和能告诉你数据某个字段的信息,或整个数据信息。根据每个小时的计数值就能得到有用的直方图。这种模式也可以用类似word count的程序计算:这种情况下,对每个输入记录,输出相同记录作为key,表示这一小时处理了这条记录,并给计数1。唯一的reduce会对所有输入值求和,输出这一小时内最终的记录条数。这种使用起来很简单,但用计数器会更高效。不会写任何的键值对,只利用框架的计数机制跟踪输入的记录条数。这样就不需要reduce阶段并不需要求和。框架会监控计数器的名字和它们相关的值,并根据所有tasks聚合这些值,包括任何失败的task attempts

 

例如你想找到每天你的员工大量访问站点的次数。假设你有若干员工,可以对web日志解析时用条件过滤。不用输出员工的姓名和计数1,你可以创建一个计数器,包含员工id,自增1。在job的最后,简单的从框架获取计数器并保存到任何你想要的的地方。

 

许多计数器是内建在框架里的,例如,输入输出记录数和字节数。Hadoop允许程序猿自定义任何可能需要的计数器。这种模式描述了怎样利用这种自定义计数器从数据集收集计数或合计指标。使用计数器的主要好处就是所有的计数都在map阶段完成。

 

Notice:使用计数器需要清楚的是它们都存储在jobTracker的内存里。Map任务序列化它们,连同更新状态被发送。为了运行正常且jobTracker不会出问题,计数器的数量应该在10-100个,计数器不仅仅只用来聚合MapReduce job的统计值。新版本的hadoop限制了计数器的数量,以防给jobTracker带来损害。你最不想看到的事情就是由于定义上百个计数器而使jobTracker宕机。

 

Applicability

用计数器计数适用的情况:

·在大数据集上收集计数或求和。

·创建的计数器个数较小,两位数以内。

Structure

2-6展示了这种模式的组织结构。

·mapper每次处理每条输入记录根据某一条件自增计数器。计数器可以是自增1的计数,也可以是自增某数值的求和计算。这些计数器在TaskTracker聚合以后加到jobTracker上,直到job结束。失败任务的计数器在jobTracker最终求和时不会计算在内。

·因为job只有map,所以没有combinerpartitionerreducer

Consequences

最终的输出是从job框架获取的计数器的集合。分析本身没有实际的输出。但是job需要一个输出目录。这个目录将会产生几个空文件风别对应几个map任务。Job完成时目录应该被删掉。

 

Known uses

统计记录的条数:

简单的统计给定时间段内记录条数是很常见的,这是框架提供的典型的计数器。

统计数量较小的唯一事件:

计数器也可以在程序运行中创建,用字符串变量。你可能现在就知道值是什么,但计数器不必提前创建。简单的使用字段值创建一个计数器并自增,足够解决这类问题。只需要保证计数器数量要小。

求和:

计数器能用来做数据字段的求和。但不是在reduce端执行求和,仅仅创建并使用它求字段值的和。

 MapReduce Design Patterns(3. 反向索引、计数器)(四)

Figure 2-6. The structure of the counting with counters pattern

Performance analysis

使用计数器能很快的完成计算,因为数据仅仅在map中处理,没有输出要写。性能主要取决于执行的map的个数和处理每条记录花费的时间。

Counting with Counters Example

Number of users per state

对于这个例子,我们只用map来统计每个州下用户的数量。位置属性是用户键入的值,不需要任何具体的输入。由于此,会存在大量的空字段,还有编造的位置数据。我们需要处理这个问题,处理每条输入记录时也要保证不要创建太多的计数器。我们创建计数器之前要检验位置数据是否包含状态缩写码。这样创建最多52个计数器—50个州(美帝),2nullempty。这对于jobTracker来说是很容易管理的计数器的数量,你的程序不能比这个多很多。

 

问题:用hadoop自定义计数器统计每个州的用户数。

 

Mapper codeMapper读取每条记录并得到这个用户的位置。位置是用空壳分隔的,用代表州的信息搜索。我们把所有州的缩写放进内存,来防止过多的计数器被创建。位置数据仅仅是用户设置的字符串,并不是其他数据结构。如果州被识别出来,计数器递增1。计数器通过组和名字标识。这里,组是州,一个公共的string类型变量,名字是州的缩写代码。

 

public static class CountNumUsersByStateMapper extends
        Mapper<Object, Text, NullWritable, NullWritable> {

    public static final String STATE_COUNTER_GROUP = "State";
    public static final String UNKNOWN_COUNTER = "Unknown";
    public static final String NULL_OR_EMPTY_COUNTER = "Null or Empty";
    private String[] statesArray = new String[]{"AL", "AK", "AZ", "AR",
        "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN",
        "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS",
        "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND",
        "OH", "OK", "OR", "PA", "RI", "SC", "SF", "TN", "TX", "UT",
        "VT", "VA", "WA", "WV", "WI", "WY"};
    private HashSet<String> states = new HashSet<String>(
            Arrays.asList(statesArray));

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                .toString());
        String location = parsed.get("Location");

        if (location != null && !location.isEmpty()) {
            String[] tokens = location.toUpperCase().split("\s");
            boolean unknown = true;
            for (String state : tokens) {
                if (states.contains(state)) {
                    context.getCounter(STATE_COUNTER_GROUP, state)
                            .increment(1);
                    unknown = false;
                    break;
                }
            }
            if (unknown) {
                context.getCounter(STATE_COUNTER_GROUP, UNKNOWN_COUNTER)
                        .increment(1);
            }
        } else {
            context.getCounter(STATE_COUNTER_GROUP,
                    NULL_OR_EMPTY_COUNTER).increment(1);
        }
    }
}


 

Driver code。驱动代码大部分都是样板,不用动,这个例子要改一些:job完成后获取计数器。Job成功以后就把结果打印到标准输出。这些计数器的值也会在job完成后输出的指定目录,所以把他们写到标准输出可能是多余的,如果你会通过查看日志文件获取他们。输出目录随后被删掉,不管成功与否,因为这样的job创建的不是有形产出,基本没意义。

 

 

        int code = job.waitForCompletion(true) ? 0 : 1;
        if (code == 0) {
            for (Counter counter : job.getCounters().getGroup(
                    CountNumUsersByStateMapper.STATE_COUNTER_GROUP)) {
                System.out.println(counter.getDisplayName() + "	"
                        + counter.getValue());
            }
        }
        FileSystem.get(conf).delete(outputDir, true);
        System.exit(code);
    }