MapReduce Design Patterns(3. Top Ten))(六)

MapReduce Design Patterns(3. Top Ten))(六)

http://blog.csdn.net/cuirong1986/article/details/8469448


Top Ten

Pattern Description

Top ten模式跟前面的有很大的不同,跟输入数据大小无关,最终得到的记录数量是确定的。而在通用filtering中,输出的规模取决于输入数据。

Intent

根据数据集的排名,获取相对较小的前K条记录,不管数据量多大。

Motivation

在数据分析中,找出离群值是很重要的工作,因为这些记录是典型的最引人关注的独特的数据片。这种模式的关键点是根据指定的规则找到最具代表性的记录,根据这些记录,可能就会找出导致这些记录特殊的原因。如果定义了能决定两条记录之间排名的排名方法或比较方法,就能用这种模式,用MapReduce找出整个数据集的排名最高的值。

 

这种模式特别引人关注的原因是,比较器是在MapReduce上下文之外实现的。在hql中,你可能更倾向于用排名值排序数据,然后取top k。在MapReduce中,正如下一章要讲的,全局排序是非常复杂的,会使用集群大量的资源。这种模式着眼于不通过排序而去取到有限的high-value 记录。

 

找出top ten 是一件有趣的事情。*上哪些发帖得分最高?谁是最老的会员?你的网站最大的订单是?哪个中帖子“meow”出现次数最多?

Applicability

·这种模式需要一个能用于两条记录的比较方法。就是说,我们必须能够拿一条记录和另一条记录比较来决定那一条更大。

·输出记录的数量应该明显比输入少,其中一个重要原因是,这样应该比全局排序更有意义。

Structure

这种模式mapprer reducer都有。Mapper找到本地top K,然后都发送到reducer来找出最终的top K。因为mapper输出的记录最多K条,相对较小,所以仅需要一个reducer。可以看图3-3.

 

 

class mapper:

setup():

initialize top ten sorted list

map(key, record):

insert record into top ten sorted list

if length of array is greater-than 10 then

    truncate list to a length of 10

cleanup():

for record in top sorted ten list:

    emit null,record


class reducer:

setup():

initialize top ten sorted list

reduce(key, records):

sort records

truncate records to top 10

for record in records:

     emit record

MapReduce Design Patterns(3. Top Ten))(六)

Figure 3-3. The structure of the top ten pattern

Mapper读每条输入记录,用一个大小为K的数组对象收集值。在mappercleanup阶段,把存在数组里的K条记录作为值,keynull,发送到reducer。这是map任务最低需求的K

 

我们会在reducer得到K*M条记录,M代表map任务数。在reduce方法,做跟mapper同样的事情。

我们在每个mapper中要选出top k是要考虑最极端的情况。

Consequences

Top k条记录被返回。

Known uses

Outlier analysis

离群值通常需要关注。可能是用户使用你的系统很困难造成的,或者网站的高级用户。用过滤和分组,也可能给你另一种数据集的视图。

Select interesting data

如果你能根据某种排序给记录评分,就能找出最有价值的数据。如果你打算跟踪后续处理过程,这就非常有用。例如BI工具或RDB,不能处理大规模的数据。评分规则可以用一些高级的算法设置的很复杂,例如给文本评分,根据语法和拼写的精确度,以此达到删除垃圾数据的目的。

Catchy dashboards

这不是一本心理学的书,所以你认为消费者感兴趣的top ten数据,他们就是。这种模式也可以用于网站的一些有趣的top ten的统计,并且可能让用户对数据有更多的思考,或者甚至带来竞争。

Resemblances

Sql

在传统的小的RDB中,排序可能不算什么。这种情况下,可以根据排序的条件获得top tenMapReduce中也可以做同样的事情,但你会在后面的模式中看到,排序是一种代价很高的操作。

SELECT FROM table ORDER BY col4 DESC LIMIT 10;

 

Pig

Pig无论用任何最优的排序,在执行这种查询时会有一些问题。最简单的样式跟sql 查询一样,但仅仅为了取得几条记录,排序代价高。改进这种情形的方法是用java MapReduce代替pig

B = ORDER A BY col4 DESC;

C = LIMIT B 10;

Performance analysis

Top ten模式的性能是很好的,但有几个重要的局限性。大多数局限性来自于单reducer,不管处理的数据量的大小。

 

使用这种模式时,我们需要关注的是reducer会接收到多少数据。每个map任务输出K条记录,job会由Mmap任务组成,所以reducer要处理M*K条记录,这可能有点多。

reducer处理大量数据是不合适的,原因如下:

·记录很多时排序代价高,需要用到本地磁盘做大部分的排序,而不是在内存里。

·Reducer运行的主机会通过网络接收大量的数据,会造成网络资源热点。

·如果有太多的记录要处理,reduce scan数据花费很长的时间。

·reducer中排序需要的内存的增长可能导致jvmOOM。例如,在中间过程中把所有的值收集到arrayList里,会导致很大。对于top ten问题,这可能不是特殊的问题,但如果数量太大就会超出内存的限制。

·写到输出文件不是并行的。Reduce阶段往本地磁盘写数据是代价高的操作。如果只有一个reducer,就不能利用往多个主机写数据的并行优势,或相同主机上的不同磁盘。这不是top ten本身的问题,但会在数据很大时变成一种制约因素。

 

如果K值很大,这种模式变得低效。考虑一种极端情况,K被设为五百万,整个数据集为十百万。500万超过了输入分片的大小,所以每个mapper会发送所有的数据到reducerReducer会处理整个数据集,问题是数据到reducer不会并行加载。

 

一种优化方式是过滤掉一些数据,当你知道某些规则时。设想数据中有个值是只增长,你想找到top 100条记录。第100条记录这个值是52485,然后你就可以过滤掉比这个值小的记录。

 

基于以上原因,这种模式只适用于K值较小的情况。最多几十,几百。做全局排序是否看起来更有效,这个K的界限是模糊的。

Top Ten Examples

Top ten users by reputation

MapReduce计算top ten记录是有意义的实践。每个mapper决定各自分片的top ten并输出到reducer阶段,来计算最终的top ten。记得配置你的job1reducer

问题:给出用户信息数据,根据用户的声誉值输出top ten

 

Mapper codeMapper处理所有输入记录存在treeMap里。TreeMap是按key排序的。Integer的默认排序时升序。如果treeMap超过10个记录,第一条就被移除。所有记录处理后,在cleanup方法里输出给reducer

 

public static class TopTenMapper extends
        Mapper<Object, Text, NullWritable, Text> {
// Stores a map of user reputation to the record

    private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = transformXmlToMap(value.toString());
        String userId = parsed.get("Id");
        String reputation = parsed.get("Reputation");
// Add this record to our map with the reputation as the key
        repToRecordMap.put(Integer.parseInt(reputation), new Text(value));
// If we have more than ten records, remove the one with the lowest rep
// As this tree map is sorted in descending order, the user with
// the lowest reputation is the last key.
        if (repToRecordMap.size() > 10) {
            repToRecordMap.remove(repToRecordMap.firstKey());
        }
    }

    protected void cleanup(Context context) throws IOException {
    }
}


 

reducer code。整体上,reducer计算top ten的方式跟mapper相同。使用job.setNumReduceTasks(1),来配置jobreducer1个,并使用nullwritable作为key,这将在reducer只产生一个输入组,包含了所有的top ten 记录。计算完后,把降序排序的结果刷到文件系统中。由于只有一个输入组,所以这一步可以reduce方法完成,也可以在cleanup里做。

 

public static class TopTenReducer extends
        Reducer<NullWritable, Text, NullWritable, Text> {
// Stores a map of user reputation to the record
// Overloads the comparator to order the reputations in descending order

    private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();

    public void reduce(NullWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            Map<String, String> parsed = transformXmlToMap(value.toString());
            repToRecordMap.put(Integer.parseInt(parsed.get("Reputation")),
                    new Text(value));
// If we have more than ten records, remove the one with the lowest rep
// As this tree map is sorted in descending order, the user with
// the lowest reputation is the last key.
            if (repToRecordMap.size() > 10) {
                repToRecordMap.remove(repToRecordMap.firstKey());
            }
        }
        for (Text t : repToRecordMap.descendingMap().values()) {
// Output our ten records to the file system with a null key
            context.write(NullWritable.get(), t);
        }
    }
}



Notice:这个job也不需要combiner,虽然从技术上reduce代码可以用作combiner。但是多余的处理。这段代码是找出top ten的固定的代码,但很容易通过在setup方法捕获变量的方式改成求top K记录的程序。只要保证K满足前面讨论的问题,别太大。

 

 

Distinct

Pattern Description

这种模式过滤整个数据集,在处理相似的记录时面临挑战。最终的输出是剔重后的所有记录。

Intent

在包含很多相似记录的数据中做剔重。

Motivation

减少数据集使其成为一个有唯一值得集合有几种用法。一种是剔重。在大数据集里,重复或极其相似的记录会带来很麻烦的问题。重复记录占用大量的空间或倾斜top-level分析的结果。例如,每次某人访问你的网站,你收集他所用的浏览器和设备来进行营销分析。如果用户访问次数超过一次,你会记录多次。如果你要计算各种浏览器在用户中使用所占的百分比,用户的访问次数会扭曲真实的结果。因此,首先要对数据去重,保证一台设备一个日志事件一条记录。

 

记录没有必要再原生形式上精确的认为相同。只需要翻译成一种能认为相同的形式即可认为相同。例如,根据http server 日志做web浏览分析,抽取用户名,设备,和用户使用的浏览器。我们并不关心浏览时间,或者来自哪台http server

Applicability

只需要你的数据集有重复的值的数据。当然不是必须的,如果本身没有重复值,傻瓜才会用这种模式。

Structure

这种模式在MapReduce的使用上显得非常平滑。利用MapReduce能把key按分组聚到一起的功能来删除重复数据。Mapper转换数据,reducer做的工作也不多。如果重复数据太多,也可以考虑使用combiner。重复的记录经常挨着,所以combiner可以在map阶段去重。

map(key, record):

emit record,null

reduce(key, records):

emit key

 

mapper在每个分片上去重,去重的原则只根据那三个字段。然后输出到reducer,记录作为keynull作为value

Reducer根据key分组valuevaluenull,不分析。因为是按key分组的,所以只需要简单的输出key就可保证结果唯一。

这种模式一个好的特性是reducer的数量按计算量自己决定,设置reducers的数量相对较多一点,因为mapper几乎把所有数据都发送到reducer

Notice:这是调整数据文件大小的最好时机。想让输出文件大一点,就减少reduce个数,反之一样。不管怎样输出文件的大小是相同的,归功于partitioner的随机hashing

Consequences

输出记录保证是唯一的,没使用任何相关命令是因为使用了框架默认配置。

Known uses

Deduplicate data

如果你从几个数据源采集数据,并出现相同的事件存在两次,你就能用这种模式去重。

Getting distinct values

如果你的原生记录可能不存在重复值,但提取出来的信息可能有重复的。也可以使用。

Protecting from an inner join explosion

如果你要做两个数据集之间的inner join,外键还不是唯一的,后果可能会获取巨大数量的记录。例如,在一个数据集有3000个相同的key,另一个数据集有2000个也是这个相同的key,最后你会得到600000条记录,都会发送到reducer。在用这种模式时,要处理连接key保证它们唯一来减轻这个问题。

Resemblances

SqlSELECT DISTINCT FROM table;

Pigb = DISTINCT a;

Performance analysis

理解这种模式的性能分析对高效的使用很重要。主要考虑的问题是如何设置MapReduce job reduce的个数。这主要看mapper输出的记录条数和字节大小。这也能看出combiner能剔除多少数据。如果在一个分片内重复值很少(combiner不会有太大的作用),大多数数据都会发到reduce阶段。

当一个程序在跑时,通过jobTrackerjob的信息可以看到输出的字节数和记录数。可以用字节数除以reducer的个数计算相应值。这个值决定了每个reducer接受的字节大小,倾斜情况不计算。一个reducer能处理的数据量依不同的调度而不同,但通常不要超过几百兆。当然也不能太小,出现过多的小文件,并不会加速reduce的执行。这个数据量一般要比分片大。

由于所有的数据都要发送到reduceer,要使用相对较多的reducer个数运行job。不管是一个reducer对应100mapper,还是一个reducer对应2mapperjob都会完成。要理论联系实践找到最佳的reducer个数。通常,要想reducer时间减半,就double reducer的数量,但要小心文件不要太小。

Notice:选择job reducer数量时,要考虑到集群配置的slots数量。开始用distinct模式可以在数据量合理时设置为跟slots数量相近,数据量大时配成两倍slots数量。

Distinct Examples

Distinct user IDs

去重集合数据是一个展示MapReduce性能的很好的例子。因为每个reducer面对一个唯一key和与其相关的值得集合,为了去重,只需要输出key

 

问题:给出用户评论数据,根据用户id去重。

Mapper codeMapper从每条输入记录拿到用户id,作为keyvaluenull,输出到reducer

 

public static class DistinctUserMapper extends
        Mapper<Object, Text, Text, NullWritable> {

    private Text outUserId = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = transformXmlToMap(value.toString());
// Get the value for the UserId attribute
        String userId = parsed.get("UserId");
// Set our output key to the user's id
        outUserId.set(userId);
// Write the user's id with a null value
        context.write(outUserId, NullWritable.get());
    }
}


 

Reducer code。大量的体力活都由MapReduce框架来完成。

 

public static class DistinctUserReducer extends
        Reducer<Text, NullWritable, Text, NullWritable> {

    public void reduce(Text key, Iterable<NullWritable> values,
            Context context) throws IOException, InterruptedException {
// Write the user's id with a null value
        context.write(key, NullWritable.get());
    }
}

Combiner optimizationCombiner可以使用,现在map本地去重,减少网络ioreduce代码可用于combiner