MapReduce Design Patterns(5.表连接))(十)

MapReduce Design Patterns(5.表连接))(十)

http://blog.csdn.net/cuirong1986/article/details/8489248


Replicated Join

Pattern Description

复制join是一种特殊的join,用于一个大数据和许多小数据集map端执行的情况。

Intent

这种模式能够消除reduce阶段的shuffle

Motivation

复制join非常有用,除了一个大数据集外,对其它要join的数据集有严格的大小限制。除了这个大数据集外,其它数据在map任务的setup阶段都要进内存,会受到jvm 堆大小的限制。如果能适应这种限制,就能得到大大的好处,因为不存在reduce阶段,因此没有混洗和排序。在map阶段就能完成整个join,这是一个非常大的数据作为输入的job

 

当然对复制join来说,也有额外的限制:只对内连接或左外连接当左边的数据很大时有用。其它模式都需要reduce阶段用左边整个数据集分组右边的数据集。虽然一个map任务可能不会匹配内存里的数据,但另一个map可能会匹配。因为这个原因,我们限制这种模式用于内连接和左外连接。

Applicability

在以下情况可使用:

·内连接或作为连接,左边的数据非常大。

·除了一个大数据集,其它数据都能放进物理内存。

Structure

·mapper负责在setup阶段从分布式缓存读所有的文件并排序成内存查询表。Setup阶段完成后,mapper执行join操作。如果外键没找到,这条记录或者忽略或者输出,分别对应两种join类型。

·只有map

 

Consequences

输出的各个部分文件的数量等于map 任务个数。如果是左外连接,可能存在null值。

Resemblances

Pig

Pig对这种模式有原生支持,通过对标准join做一些简单的修改。基于相同的原因,也只支持内连接和左外连接。注意代码的先后顺序:

huge = LOAD 'huge_data' AS (h1,h2);

smallest = LOAD 'smallest_data' AS (ss1,ss2);

small = LOAD 'small_data' AS (s1,s2);

A = JOIN huge BY h1, small BY s1, smallest BY ss1 USING 'replicated';

 

 

MapReduce Design Patterns(5.表连接))(十)

Figure 5-2. The structure of the replicated join pattern

Performance analysis

复制join是最快的join类型,因为没有reducer,但也会有一些代价。有能安全的存储在jvm里的数据量的限制,取决于能给map分配多少内存。用这种模式之前应该检测下你的环境能放多少数据到内存。注意这里的内存指的是物理内存。使用了java对象,数据会变大。幸运的是,你可以忽略你不需要的数据。

Replicated Join Examples

Replicated user comment example

这个例子跟前面用bloom filterjoin的例子有紧密的关系。分布式缓存用来把文件推送到所有的map 任务端。不像bloom filter那样,这里把数据本身读入内存,并只有map阶段的join

 

问题:给出少量的用户信息数据和一个大的评论数据,用用户数据丰富评论数据。

 

Mapper codeSetup阶段,从分布式缓存读用户数据并存进内存。解析记录,userid 和记录放入HashMap。这里可能出现OOM错误,因为存储了数据及结构。如果错误发生了,就增大jvm 大小或使用reduce join

 

Setup之后,执行map方法。从评论数据获取user id,用它从setup阶段构建的hashmap中获取值。找到就输出,找不到,视join类型而做相应的处理。这就是全部了。

 

public static class ReplicatedJoinMapper extends
        Mapper<Object, Text, Text, Text> {

    private static final Text EMPTY_TEXT = new Text("");
    private HashMap<String, String> userIdToInfo = new HashMap<String, String>();
    private Text outvalue = new Text();
    private String joinType = null;

    public void setup(Context context) throws IOException,
            InterruptedException {
        Path[] files = DistributedCache.getLocalCacheFiles(context
                .getConfiguration());
        // Read all files in the DistributedCache
        for (Path p : files) {
            BufferedReader rdr = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(new File(
                    p.toString())))));
            String line = null;
            // For each record in the user file
            while ((line = rdr.readLine()) != null) {
                // Get the user ID for this record
                Map<String, String> parsed = transformXmlToMap(line);
                String userId = parsed.get("Id");
                // Map the user ID to the record
                userIdToInfo.put(userId, line);
            }
        }
        // Get the join type from the configuration
        joinType = context.getConfiguration().get("join.type");
    }

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        Map<String, String> parsed = transformXmlToMap(value.toString());
        String userId = parsed.get("UserId");
        String userInformation = userIdToInfo.get(userId);
        // If the user information is not null, then output
        if (userInformation != null) {
            outvalue.set(userInformation);
            context.write(value, outvalue);
        } else if (joinType.equalsIgnoreCase("leftouter")) {
           // If we are doing a left outer join,
            // output the record with an empty value
            context.write(value, EMPTY_TEXT);
        }
    }
}


Composite Join

Pattern Description

复合join是一种特殊的join操作用来执行map端的很多大数据集的join

Intent

这种模式能消除reduce阶段的混洗和排序。然而,需要输入数据已被组织好,或准备成一种指定的格式。

Motivation

复合join在做多个很大数据的join时特别有用。要求数据满足条件:按外键排序,分区,用特殊的方式读数据。就是说,你的数据能用这种方式读,或你能准备成这种方式,复合join能起到很大的作用,跟其他类型相比。

 

Hadoop本身有CompositeInputFormat 类支持复合join。这种模式的限制类型为:内连接和全外连接。Mapper的输入必须用指定的方式分区和排序,并且每个输入数据集必须分成相同数量的分区。另外,相同的外键必须在两个数据集相同的分区。这通常发生在几个job有相同的reducer个数并输出键也相同,并且输出文件时不能分割的。例如,比hdfsblock小或是一种不支持split的压缩格式。也许本章中其它模式可能更适用。如果你发现用复合join之前必须格式化数据。你最好使用reducejoin除非这个输出用于很多分析。

Applicability

使用条件:

·内连接或全外连接。

·所有数据集都非常大。

·所有数据集都能把外键作为mapper的输入key

·所有数据集有相同数量的分区。

·每个分区都根据外键排序,所有的外键都位于每个数据集响应的分区。就是说,数据集ab的分区x包含相同的外键,而且这些外键只存在于分区x。可参考图5-3.

·数据集不经常变动。

MapReduce Design Patterns(5.表连接))(十)

Figure 5-3. Data sets that are sorted and partitioned on the same key

 

Structure

·job配置阶段,驱动代码处理了大多数的工作。设置了输入格式,join 类型。然后框架执行join操作。见图5-4.

·mapper较简单,从输入里获取两个值,输出到文件系统。

·map-only

MapReduce Design Patterns(5.表连接))(十)

Figure 5-4. The structure of the composite join pattern

Consequences

输出文件数跟map个数相同。文件内容可能有空值。

Performance analysis

复合join在大数据集上执行相对较快。然而,MapReduce 框架只能设置两个数据集的其中一个做到数据本地化。含有相同key的两部分文件不能假想在同一个节点上。

 

任何种类的数据准备需要考虑它的性能。数据准备job通常是个MapReduce job,如果数据很少改变,那么排序和分区数据集就能重复使用。这样,这种准备数据消耗的代价分到所有跑的工作上,显得没那么耗时间。

Composite Join Examples

Composite user comment join

为了满足复合join的前提条件,用户数据和评论数据都用MapReduce做预处理。两个数据集的keyuseridvaluexml记录本身。Hadoop正好有需要的KeyValueTextOutputFormat类来解析这些格式化的数据。

 

每个数据集都按外键排序,需要提醒你的仍然是,他们按文本排序,而不是数值。就是说,用户12345要比2小。这是因为joinCompositeInputFormat使用Text作为key来比较。每个输出数据用gzip压缩防止分片。驱动类描述了怎样配置MapReduce处理join

 

问题:给出两个大数据集:用户信息和评论数据,使用用户数据丰富评论数据。

 

Driver code。解析参数:用户数据输入路径,评论数据输入路径,输出路径,join类型。CompositeInputFormat 使用了旧的mapred api,但配置跟新的mapreduce相似。最终要的是配置输入格式和join表达式。

 

有个静态帮助方法来创建join表达式。用到了join类型,输入格式类用于解析所有数据集,然后是一系列用到的路径。剩下需要的参数配置好后,job就可以跑了。

 

 public static void main(String[] args) throws Exception {
        Path userPath = new Path(args[0]);
        Path commentPath = new Path(args[1]);
        Path outputDir = new Path(args[2]);
        String joinType = args[3];
        JobConf conf = new JobConf("CompositeJoin");
        conf.setJarByClass(CompositeJoinDriver.class);
        conf.setMapperClass(CompositeMapper.class);
        conf.setNumReduceTasks(0);
    // Set the input format class to a CompositeInputFormat class.
        // The CompositeInputFormat will parse all of our input files and output
        // records to our mapper.
        conf.setInputFormat(CompositeInputFormat.class);
    // The composite input format join expression will set how the records
        // are going to be read in, and in what input format.
        conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
                KeyValueTextInputFormat.class, userPath, commentPath));
        TextOutputFormat.setOutputPath(conf, outputDir);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            Thread.sleep(1000);
        }
        System.exit(job.isSuccessful() ? 0 : 1);
    }


Mapper codeMapper的输入是外键和一个TupleWritable.这个tuple包含几个Text对象,数量等于数据集个数。这两个Text 对象的顺序跟配置的一样。下标从0开始。仅仅从tuple中获取这两个对象并输出。由于只有两个数据集,所以简单的把两个对象当做键和值输出,如果多了,需要连接某些值并输出。

 

public static class CompositeMapper extends MapReduceBase implements
            Mapper<Text, TupleWritable, Text, Text> {

        public void map(Text key, TupleWritable value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            // Get the first two elements in the tuple and output them
            output.collect((Text) value.get(0), (Text) value.get(1));
        }
    }


Reducer and combiner。无。

Cartesian Product

Pattern Description

笛卡尔积模式是一种把一个数据集每条记录跟另一个数据集每条记录配对的有效手段。这种功能有代价问题,可能会有相当长的运行时间。

Intent

跟另一个数据集的每条记录配对并比较。

Motivation

笛卡尔积能使一个或多个数据集之间的每一对记录的关系都能被分析。不是靠外键配对,而是把几个数据集的每一条记录跟其它数据集的每条记录配对。所以,笛卡尔积并不很适合MapReduce 规范,因为这种操作不能直观的分割,不能并行执行得很好。并需要很多的计算时间和大量网络传输。适当做一些预处理来减少执行时间和字节传输量。

 

做笛卡尔积不太常用,有时没有外键用来join并且比较关系很复杂就要用它。笛卡尔积用得最多的是对文档或媒体数据做某些相似的分析。

Applicability

使用条件:

·需要分析每一对记录之间的关系。

·没有找到其它解决这个问题的方法。

·在执行时间上不太顾虑。

Structure

·setup和配置阶段决定笛卡尔积的输入分片。之后record reader负责通过两个分片生成笛卡尔积。Record reader把成对的记录给mapper,简单的写到文件系统。见图5-5.

·map-only

 

 MapReduce Design Patterns(5.表连接))(十)

Figure 5-5. The structure of the Cartesian product pattern

Consequences

最终的数据集个数等于两个数据集个数的乘积。输入记录的每一种可能的组合都会出现在最终的输出

Resemblances

Sql:很少见,在语法上是最简单的join,从多个表select ,不写where条件。

SELECT FROM tablea, tableb;

 

Pigpig可以使用CROSS执行笛卡尔积。会有提示是一个代价高的操作,应该保守使用。

A = LOAD 'data1' AS (a1, a2, a3);

DUMP A;

(1,2,3)

(4,5,6)

B = LOAD 'data2' AS (b1, b2);

DUMP B;

(1,2)

(3,4)

(5,6)

C = CROSS A, B;

DUMP C;

(1,2,3,1,2)

(1,2,3,3,4)

(1,2,3,5,6)

(4,5,6,1,2)

(4,5,6,3,4)

(4,5,6,5,6)

Performance Analysis

笛卡尔积会使数据集的大小暴增,甚至是一个100万数据的自连接也会产生1万亿条记录。会用掉很多map slot 并运行很长时间,所以要谨慎使用。这也会增加其它分析的执行时间,因为它占用的slot在它完成之前是不能被其它job使用的。如果它的任务数等于或大于集群中map slot的个数,其它工作将不能做。

 

每个输入分片都会跟另一个一一配对,创建新数据集的复杂度是On*n)。n是字节数。从左边分片取一条记录,然后把右边所有记录取出执行,然而再从左边取第二条记录。。。。如果左边的分片有1000条记录,则右边的分片要读1000次。这是相当长的执行时间。如果单任务失败,整个job需要重新运行。你可以想象为什么笛卡尔积对MapReduce来说是件可怕的事情。

Cartesian Product Examples

Comment Comparison

这个例子展示了如何用评论数据做自连接。自连接可以基于之间共同的单词检查一对评论数据之间的相似度。如果足够相似,就输出。这样预处理阶段就能把共同的单词和其它额外数据删除掉。

 

本例的不同点在于,特别关注数据的读入。创建自定义的input format来生成输入分片的笛卡尔积。如果数据集有11个分片,输出会有121个。因为121对是交叉相乘得到的。每个map任务的record reader执行笛卡尔积并把没对记录交给mapper处理。

 

问题:给出处理过的评论数据,根据每对之间的相似单词的数量找出评论之间的相似程度。

 

Input format codeCartesianImportFormat类代替了前面章节的CompositeInputFormat类。它用来支持两个数据集做笛卡尔积,保证代码更简单。用一个数据集可以当左边的也可以当右边的数据集,本例就是这样。Setup阶段,getInputSplits创建笛卡尔积并放入CompositeInputSplits.依靠对每个数据集创建的输入格式,拿到分片,然后计算交叉乘积。

 

 public static class CartesianInputFormat extends FileInputFormat {

        public static final String LEFT_INPUT_FORMAT = "cart.left.inputformat";
        public static final String LEFT_INPUT_PATH = "cart.left.path";
        public static final String RIGHT_INPUT_FORMAT = "cart.right.inputformat";
        public static final String RIGHT_INPUT_PATH = "cart.right.path";

        public static void setLeftInputInfo(JobConf job,
                Class<? extends FileInputFormat> inputFormat, String inputPath) {
            job.set(LEFT_INPUT_FORMAT, inputFormat.getCanonicalName());
            job.set(LEFT_INPUT_PATH, inputPath);
        }

        public static void setRightInputInfo(JobConf job,
                Class<? extends FileInputFormat> inputFormat, String inputPath) {
            job.set(RIGHT_INPUT_FORMAT, inputFormat.getCanonicalName());
            job.set(RIGHT_INPUT_PATH, inputPath);
        }

        public InputSplit[] getSplits(JobConf conf, int numSplits)
                throws IOException {
            // Get the input splits from both the left and right data sets
            InputSplit[] leftSplits = getInputSplits(conf,
                    conf.get(LEFT_INPUT_FORMAT), conf.get(LEFT_INPUT_PATH),
                    numSplits);
            InputSplit[] rightSplits = getInputSplits(conf,
                    conf.get(RIGHT_INPUT_FORMAT), conf.get(RIGHT_INPUT_PATH),
                    numSplits);
            // Create our CompositeInputSplits, size equal to
            // left.length * right.length
            CompositeInputSplit[] returnSplits = new CompositeInputSplit[leftSplits.length
                    * rightSplits.length];
            int i = 0;
            // For each of the left input splits
            for (InputSplit left : leftSplits) {
                // For each of the right input splits
                for (InputSplit right : rightSplits) {
                    // Create a new composite input split composing of the two
                    returnSplits[i] = new CompositeInputSplit(2);
                    returnSplits[i].add(left);
                    returnSplits[i].add(right);
                    ++i;
                }
            }
            // Return the composite splits
            LOG.info("Total splits to process: " + returnSplits.length);
            return returnSplits;
        }

        public RecordReader getRecordReader(InputSplit split, JobConf conf,
                Reporter reporter) throws IOException {
            // Create a new instance of the Cartesian record reader
            return new CartesianRecordReader((CompositeInputSplit) split, conf,
                    reporter);
        }

        private InputSplit[] getInputSplits(JobConf conf,
                String inputFormatClass, String inputPath, int numSplits)
                throws ClassNotFoundException, IOException {
            // Create a new instance of the input format
            FileInputFormat inputFormat = (FileInputFormat) ReflectionUtils
                    .newInstance(Class.forName(inputFormatClass), conf);
            // Set the input path for the left data set
            inputFormat.setInputPaths(conf, inputPath);
            // Get the left input splits
            return inputFormat.getSplits(conf, numSplits);
        }
    }

 

Driver code。驱动类设置参数。两个数据集输入路径相同,因为我们执行的是比较评论数据本身之间的记录。

 

 public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        // Configure the join type
        JobConf conf = new JobConf("Cartesian Product");
        conf.setJarByClass(CartesianProduct.class);
        conf.setMapperClass(CartesianMapper.class);
        conf.setNumReduceTasks(0);
        conf.setInputFormat(CartesianInputFormat.class);
        // Configure the input format
        CartesianInputFormat.setLeftInputInfo(conf, TextInputFormat.class,
                args[0]);
        CartesianInputFormat.setRightInputInfo(conf, TextInputFormat.class,
                args[0]);
        TextOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            Thread.sleep(1000);
        }
        System.exit(job.isSuccessful() ? 0 : 1);
    }

 

Record reader code。这里是执行笛卡尔积奇迹发生的地方。Setup阶段,框架调用getRecordReader返回CartesianRecordReader.这个类的构造器会创建两个单独的记录读对象,每个分片各读一个。

 

第一次调用next方法从左边读入第一条记录作为mapper的输入key,右边的第一条记录作为mapper的输入值。然后由框架把这个键值对交给mapper处理。

 

随后的next方法的调用,把右边的每条记录读入,执行上面的操作,直到右边记录取完。这里会设置一个标记让循环跳出,然后从做左边读取第二条记录,重置record reader,继续执行。

 

Record  reader返回false时处理完成,说明没有键值对要处理了。此时,record reader就把所有笛卡尔积传给了map任务。

Notice:一些附着在RecordReader接口的方法,例如closegetPos等,为了简洁,在这里省略了。当然还可以做进一步的优化,例如跳过某些左边的记录。本例中要求最少3个共同的单词,但如果左边记录只有一个单词,就可以直接跳过去。

 

public static class CartesianRecordReader<K1, V1, K2, V2> implements
        RecordReader<Text, Text> {

    // Record readers to get key value pairs
    private RecordReader leftRR = null, rightRR = null;
    // Store configuration to re-create the right record reader
    private FileInputFormat rightFIF;
    private JobConf rightConf;
    private InputSplit rightIS;
    private Reporter rightReporter;
    // Helper variables
    private K1 lkey;
    private V1 lvalue;
    private K2 rkey;
    private V2 rvalue;
    private boolean goToNextLeft = true, alldone = false;

    public CartesianRecordReader(CompositeInputSplit split, JobConf conf,
            Reporter reporter) throws IOException {
        this.rightConf = conf;
        this.rightIS = split.get(1);
        this.rightReporter = reporter;
        // Create left record reader
        FileInputFormat leftFIF = (FileInputFormat) ReflectionUtils
                .newInstance(Class.forName(conf
                                .get(CartesianInputFormat.LEFT_INPUT_FORMAT)), conf);
        leftRR = leftFIF.getRecordReader(split.get(0), conf, reporter);
        // Create right record reader
        rightFIF = (FileInputFormat) ReflectionUtils.newInstance(
                Class.forName(conf
                        .get(CartesianInputFormat.RIGHT_INPUT_FORMAT)),
                conf);
        rightRR = rightFIF.getRecordReader(rightIS, rightConf,
                rightReporter);
        // Create key value pairs for parsing
        lkey = (K1) this.leftRR.createKey();
        lvalue = (V1) this.leftRR.createValue();
        rkey = (K2) this.rightRR.createKey();
        rvalue = (V2) this.rightRR.createValue();
    }

    public boolean next(Text key, Text value) throws IOException {
        do {
            // If we are to go to the next left key/value pair
            if (goToNextLeft) {
                // Read the next key value pair, false means no more pairs
                if (!leftRR.next(lkey, lvalue)) {
                    // If no more, then this task is nearly finished
                    alldone = true;
                    break;
                } else {
                  // If we aren't done, set the value to the key and set
                    // our flags
                    key.set(lvalue.toString());
                    goToNextLeft = alldone = false;
                    // Reset the right record reader
                    this.rightRR = this.rightFIF.getRecordReader(
                            this.rightIS, this.rightConf,
                            this.rightReporter);
                }
            }
            // Read the next key value pair from the right data set
            if (rightRR.next(rkey, rvalue)) {
                // If success, set the value
                value.set(rvalue.toString());
            } else {
              // Otherwise, this right data set is complete
                // and we should go to the next left pair
                goToNextLeft = true;
            }
           // This loop will continue if we finished reading key/value
            // pairs from the right data set
        } while (goToNextLeft);
        // Return true if a key/value pair was read, false otherwise
        return !alldone;
    }
}


Mapper codeMapper接收笛卡尔积对,对每个Text对象,把单词读入set。然后迭代set决定这两条记录有多少共同的单词。超过10个,输出到文件系统。


 

public static class CartesianMapper extends MapReduceBase implements
        Mapper<Text, Text, Text, Text> {

    private Text outkey = new Text();

    public void map(Text key, Text value,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        // If the two comments are not equal
        if (!key.toString().equals(value.toString())) {
            String[] leftTokens = key.toString().split("\s");
            String[] rightTokens = value.toString().split("\s");
            HashSet<String> leftSet = new HashSet<String>(
                    Arrays.asList(leftTokens));
            HashSet<String> rightSet = new HashSet<String>(
                    Arrays.asList(rightTokens));
            int sameWordCount = 0;
            StringBuilder words = new StringBuilder();
            for (String s : leftSet) {
                if (rightSet.contains(s)) {
                    words.append(s + ",");
                    ++sameWordCount;
                }
            }
            // If there are at least three words, output
            if (sameWordCount > 2) {
                outkey.set(words + "	" + key);
                output.collect(outkey, value);
            }
        }
    }
}