MapReduce Design Patterns(7、输入输出模式)(十三) CHAPTER 7.Input and Output Patterns

MapReduce Design Patterns(7、输入输出模式)(十三)

CHAPTER 7.Input and Output Patterns

http://blog.csdn.net/cuirong1986/article/details/8510162



本章关注一个最经常忽略的问题,来改进MapReduce value:自定义输入和输出。我们并不会总使用Mapreduce本身的方式加载或存储数据。有时,可以跳过在hdfs存储数据这项耗时的阶段,仅存储一些数据,不是全部的,或直接在MapReduce结束后输送给后面的处理。有时,基本的Hadoop规范,文件块和输入分割不能完成你需要的事情,所以要使用自定义的InputFormatOutputFormat

 

本章三个模式处理输入:generating dataexternal source input,和partition pruniing。都有一个有趣的属性:map阶段完全不会意识到拿到输入键值对之前会发生复杂的事情。使用自定义的输入格式抽象出你要加载数据的方法的细节,是一种有效的方式。

 

另一方面,hadoop不总是按你需要的方式存储数据。本章的模式external source output,会把数据写到hadoop以外的系统。自定义的输出格式也会不让mapreduce阶段意识到数据输出时发生的复杂的事情。

Customizing Input and Output in Hadoop

Hadoop允许你修改数据load的方式,有两个主要途径:配置输入有多少连续的块,配置记录如何出现在map阶段。相关的两个类是RecordReaderInputFormat。他们随着Hadoop框架运行,跟mapperreducer运行方式相似。也允许修改数据存储的方式,通过OutputFormatRecordWriter

InputFormat

Hadoop依赖job的输入格式做三件事:

1.校验job的输入配置,例如数据是否存在。
2.分割文件块为逻辑上的inputSplit类型的块,每一个对应一个map任务。

3.创建RecordReader的实现从inputsplit创建键值对。这些键值对一个一个发送到mapper

 

最常用的输入格式的子类是FileInputFormathadoop默认是TextInputFormat。这个类首先校验job的输入,保证输入路径的存在。然后根据文件字节数大小逻辑分割输入文件,使用块大小作为分割边界值。例如,160M的文件,块大小64M时分成三个逻辑块,0M-64M64m-128M,128M-160M.每个map任务都会对应其中一个块,然后RecordReader负责生成键值对。

 

通常,recordReader有额外的修复边界问题的责任,因为输入分割边界是任意的,很有可能不是记录的边界。例如,TextInputFormat使用LineRecordReader读取文本文件对每个map的每一文本行创建键值对,例如用换行符分割。键是读到的一行的字节数,值是整行字符串。因为它不像输入分片的字节块,会用换行符分开,LineRecordRead会负责读到行的末尾保证读到一条完整的记录。不同数据块的这点数据(一个完整行)理论上可能不在相同的节点,所以从所在主机上读。这个读由FSDataInputStream类处理,我们就不必处理去哪儿找数据块。

 

使用自己的格式时不要害怕经过了分割的边界,只需要检测没有重复或丢失数据。

 

Notice:自定义的输入格式不限于文件输入。你可以把输入表示为InputSplit对象和键值对,自定义或其它的,可以在一个MapReduce job里并行读入任何东西到map阶段。只需要记住输入分片表示什么和利用数据本地性的优势。

 

InputFormat抽象类有两个抽象方法:

getSplits

典型的实现是利用JobConText对象获取配置的输入并返回该对象的listInputsplit有个方法返回表示数据在集群中位置的机器的数组,提示TaskTracker应该执行的map任务。这个方法也是验证配置的正确性或抛出需要的异常的合适的地方,因为方法使用在前面。(例如在提交到jobTracker之前)

 

CreateRecordReader

这个方法在后面使用,用来生成RecordReader的实现,随后详细讨论。通常,一个新实例创建并立即返回,因为record reader有个初始化方法被框架调用。

 

RecordReader

RecordReader是用来根据给的InputSplit创建键值对的。因为inputsplit表示了分片的字节范围,使mapper的处理有意义。这就是为什么hadoophe MapReduce被认为是“读时模式”。模式是在RecordReader中定义的,单独的基于RecordReader的实现,而不是基于我们希望的job的输入。从输入源读取字节转换成writablecomparable key和一个writable value。创建自定义输入格式时经常使用自定义的类型,因为这是一种好的面向对象编程的方式来把信息给mapper

 

RecordReader使用数据和由inputsplit创建的边界生成键值对。在基于文件的输入的环境中,“start“是文件中的RecordReader应该开始生成键值对的字节偏移量。“end”是应该停止读记录的偏移量。就api而言,没有硬性的边界:不能阻止一个开发人员把整个文件作为一个map的输入,当然这是不建议的,经常需要越过边界读数据,来保证读到一条完整的记录。

 

考虑xml的问题。当使用TextInputFormat抽取每行时,xml元素通常不在同一行,会被MapReduce input 分割。当读到输入分区边界的“end“之后,就得到一条完整记录。找到记录的末尾以后,你仅需要保证每条记录的读从xml元素的开始开始。找到inputsplit的开始之后,继续读直到开始的xml标签被读到。这允许MapReduce框架覆盖整个xml文件的内容,但不会重复任何xml记录。由于向前找xml元素的开始而跳过的xml内容会被前面的map任务处理。

 

recordReader 抽象类有几个方法要覆盖。

Initialize

map任务指定的inputSplitTaskAttemptContext作为本方法的参数。对基于文件的输入格式,这是寻找开始读文件时的字节偏移的好时机。

GetCurrentKey and getCurrentValue

这两个方法被框架使用生成键值对发送给mapper。尽可能重用这两个方法返回的对象

nextKeyValue

类似inputFormat类里的对应方法,读一个简单的键值对并返回true,直到数据读完。

GetProgress

这是个可选的方法,用于框架对度量的收集。

Close

由框架使用,在没有键值对要处理时清除资源。

 

outputFormat

hadoop依靠job的输出格式做两个主要的任务:

1.检验job的输出配置。

2.创建RecordWriter的实现写job的输出。

 

FileInputFormat相对应的,FileOutputFormat处理基于文件的输出。因为MapReduce job的大多数输出写到hdfs,很多基于文件的输出格式相应的api都能解决大部分的需求。Hadoop默认使用TextOutputFormat,存储tab分隔的键值对到配置的hdfs的输出目录。TextOutputFormat也检验开始运行job之前输出目录不能存在。

 

TextoutputFormat 使用LineRecordWriter对每个mapreduce任务写键值对,根据是否是reduce阶段。这个类使用toString方法序列化每一键值对到存储在hdfspart文件里,用tab分隔键值。这个分隔符是默认的,能通过配置改变。

 

inputFormat类似,数据不会受限于只存在hdfs上。只要能用java把键值对写到其它源,例如jdbc,就可以用MapReduce做批量写。只需要保证你要写到的地方能处理多个任务产生的连接。

outputFormat抽象类有三个抽象方法需要实现:

 

checkOutputSpecs

用于验证job指定的输出,例如保证job提交之前输出目录不能存在。否则,输出可能覆盖(看具体配置)。

GetRecordWriter

方法返回RecordWriter的实现,来序列化键值对到输出,输出可以是FileSystem对象。

GetOutputCommiter

Job的输出提交者在初始化期间设置每个任务,根据成功完成的状态提交(commit,区别于submit)任务,成功或其它状态,完成时都会清除任务。对基于文件的输出,FileOutputCommittter可以处理所有繁重的工作。它会对每个map任务创建临时输出目录,把成功的任务的输出移动到最终的输出目录。

 

RecordWriter

RecordWriter抽象类把键值对写到文件系统或另外的输出。与RecordReader不同,它没有初始化阶段。然而,可用构造器在需要的时候设置record writer。构造期间任何参数都能传入,因为record writer实例的创建是通过OutputFormat.getRecordWriter

 

此类包含两个方法:

Write

这个方法由框架对每个要写的键值对调用。这个方法的实现很大程度上取决于你的使用。下面的例子中,我们展示怎样把键值对写到外部的内存键值存储,而不是文件系统。

Close

当处理完键值对时,框架调用这个方法。可以释放文件句柄,关闭跟其它服务的连接,或清除需要清除的任务。

Generating Data

Pattern Description

生成数据模式很有趣,因为不是从外面加载数据,它快速,并行地产生数据。

Intent

你需要从零开始生成大量数据。

Motivation

这个模式最大的特点是,它不加载数据。用这种模式,你可以生成数据并存到分布式文件系统上。

生成数据不太常见。通常,你可以生成一批数据并反复使用。当需要生成数据时,MapReduce是一个很适合的工具。

这种模式最常见的使用案例是生成随机数据。构建一些具有代表性的数据集对大规模的测试非常有用,尽管这种测试针对的实际数据量很小。它对创建在一定范围内用来研究理论的证明的“toy domains”也很有用。

 

生成随机数据常用来作为基准测试的一部分,例如常用的TeraGen/TeraSort  DFSIO

不幸的是,这种模式的实现用hadoop不是很简单,因为底层的框架对一个map任务指定一个输入分片并对记录指定一个map方法。在这种模式下,没有输入分片,也没有记录的概念,所以必须欺骗框架认为是有分片,有记录。

 

Structure

hadoop实现这种模式,要实现一个自定义的inputFormat并让一个RecordReader生成随机数据。Map方法对数据源的处理完全忽略,所以构建非常迅速,而不是加载hdfs上的数据文件,多数情况下,可使用identity mapper,但如果想在map任务做一些后处理,或者立即分析它。见图7-1.

这种模式是map-only的。

·InputFormat创建虚拟分片,分片数量可以配置。

·RecordReader拿假分片生成随机记录数据。有些时候,在输入分片里可以指定一些信息来告诉record reader生成什么。例如,要生成随机的日期/时间数据,一个输入分片代表一个小时段。

 

·多数情况下使用identitymapper原样写出输入数据。

MapReduce Design Patterns(7、输入输出模式)(十三)

CHAPTER 7.Input and Output Patterns

Figure 7-1. The structure of the generating data pattern

Notice:实现这种模式比较懒的方式是把job的每个假输入文件塞进单条编造的记录。然后,就可以使用通用InputFormat RecordReadermap方法里生成数据。

Consequences

每个mapper输出一个包含随机数据的文件。

Resemblances

sqlpig有几种创建随机数据的方式,但都不够简洁和有说服力。

Performance analysis

这里主要考虑的性能方面的问题是需要多少map任务来生成数据。通常,map任务越多,生成数据量越快,因为充分利用了集群的并行性。然而,map启动数多于map slots数时意义不大,他们一直在做同样的事情。

Generating Data Examples

Generating random * comments

为了生成随机*数据,我们使用1000个单词的list生成随机短评。我们需要生成一个随机分数,row iduserid,和创建时间。

 

Driver code。解析四个命令行参数配置job。设置自定义的输入格式,然后进一步调用静态方法配置。所有输出写到给定的输出目录。使用identity mapper,设置reduce数量为0从而禁用reduce阶段。

 

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    int numMapTasks = Integer.parseInt(args[0]);
    int numRecordsPerTask = Integer.parseInt(args[1]);
    Path wordList = new Path(args[2]);
    Path outputDir = new Path(args[3]);
    Job job = new Job(conf, "RandomDataGenerationDriver");
    job.setJarByClass(RandomDataGenerationDriver.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(Random*InputFormat.class);
    Random*InputFormat.setNumMapTasks(job, numMapTasks);
    Random*InputFormat.setNumRecordPerTask(job,
           numRecordsPerTask);
    Random*InputFormat.setRandomWordList(job, wordList);
    TextOutputFormat.setOutputPath(job, outputDir);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 2);
}
 



InputSplit code.FakeInputSplit类简单的继承自InputSplit并实现writable.没有覆盖任何方法。用于欺骗框架指派一个任务生成随机数据。

 

public static class FakeInputSplit extends InputSplit implements Writable {
    publicvoid readFields(DataInput arg0) throws IOException {
    }
    publicvoid write(DataOutput arg0) throws IOException {
    }
    publiclong getLength() throws IOException, InterruptedException {
       return 0;
    }
    public String[] getLocations() throws IOException, InterruptedException {
       returnnew String[0];
    }
}

inputFormat code。输入格式有两个目的:返回框架生成map任务需要的输入分片,然后为map任务创建Random*RecordReader。覆盖getSplits方法返回一个配置的数量的FakeInputSplit分片数。这个数量是从配置取的。当框架调用createRecordReader,一个

Random*RecordReader实例化,初始化,返回。

 

public static class Random*InputFormat extends
       InputFormat<Text, NullWritable> {
    public static final String NUM_MAP_TASKS = "random.generator.map.tasks";
    public static final String NUM_RECORDS_PER_TASK = "random.generator.num.records.per.map.task";
    public static final String RANDOM_WORD_LIST = "random.generator.random.word.file";
    public List<InputSplit> getSplits(JobContext job) throws IOException {
       // Get the number of map tasks configured for
       int numSplits = job.getConfiguration().getInt(NUM_MAP_TASKS, -1);
       // Create a number of input splits equivalent to the number of tasks
       ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
       for (int i = 0; i < numSplits; ++i) {
           splits.add(new FakeInputSplit());
       }
       return splits;
    }
    public RecordReader<Text, NullWritable> createRecordReader(
           InputSplit split, TaskAttemptContext context)
           throws IOException, InterruptedException {
       // Create a new Random*RecordReader and initialize it
       Random*RecordReader rr = new Random*RecordReader();
       rr.initialize(split, context);
       return rr;
    }
    public static void setNumMapTasks(Job job, int i) {
       job.getConfiguration().setInt(NUM_MAP_TASKS, i);
    }
    public static void setNumRecordPerTask(Job job, int i) {
       job.getConfiguration().setInt(NUM_RECORDS_PER_TASK, i);
    }
    public static void setRandomWordList(Job job, Path file) {
       DistributedCache.addCacheFile(file.toUri(), job.getConfiguration());
    }
}
 



recordReader codeRecord reader是数据真正生成的地方。在初始化FakeInputSplit时给出,但简单的忽视它。从job配置中抽取要创建的记录数,从分布式缓存获得随机单词的list。对每次nextKeyValue调用,使用简单的随机数生成器创建一条随机记录。评论体由一个帮助方法从list随机选择单词来生成。单词数量从130个。计数器也自增,为了跟踪生成了多少条记录。一旦所有的记录生成完成,record reader返回false,给框架mapper已经没有输入信息的信号。

 

public static class Random*RecordReader extends
       RecordReader<Text, NullWritable> {
    private int numRecordsToCreate = 0;
    private int createdRecords = 0;
    private Text key = new Text();
    private NullWritable value = NullWritable.get();
    private Random rndm = new Random();
    private ArrayList<String> randomWords = new ArrayList<String>();
    // This object will format the creation date string into a Date
    // object
    private SimpleDateFormat frmt = new SimpleDateFormat(
           "yyyy-MM-dd'T'HH:mm:ss.SSS");
    public void initialize(InputSplit split, TaskAttemptContext context)
           throws IOException, InterruptedException {
       // Get the number of records to create from the configuration
       this.numRecordsToCreate = context.getConfiguration().getInt(
              NUM_RECORDS_PER_TASK, -1);
       // Get the list of random words from the DistributedCache
       URI[] files = DistributedCache.getCacheFiles(context
              .getConfiguration());
       // Read the list of random words into a list
       BufferedReader rdr = new BufferedReader(new FileReader(
              files[0].toString()));
       String line;
       while ((line = rdr.readLine()) != null) {
           randomWords.add(line);
       }
       rdr.close();
    }
    public boolean nextKeyValue() throws IOException, InterruptedException {
       // If we still have records to create
       if (createdRecords < numRecordsToCreate) {
           // Generate random data
           int score = Math.abs(rndm.nextInt()) % 15000;
           int rowId = Math.abs(rndm.nextInt()) % 1000000000;
           int postId = Math.abs(rndm.nextInt()) % 100000000;
           int userId = Math.abs(rndm.nextInt()) % 1000000;
           String creationDate = frmt.format(Math.abs(rndm.nextLong()));
           // Create a string of text from the random words
           String text = getRandomText();
           String randomRecord = "<row Id="" + rowId + "" PostId=""
                  + postId + "" Score="" + score + "" Text="" + text
                  + "" CreationDate="" + creationDate + "" UserId"="
                  + userId + "" />";
           key.set(randomRecord);
           ++createdRecords;
           return true;
       } else {
           // We are done creating records
           return false;
       }
    }
    private String getRandomText() {
       StringBuilder bldr = new StringBuilder();
       int numWords = Math.abs(rndm.nextInt()) % 30 + 1;
       for (int i = 0; i < numWords; ++i) {
           bldr.append(randomWords.get(Math.abs(rndm.nextInt())
                  % randomWords.size())
                  + " ");
       }
       return bldr.toString();
    }
    public Text getCurrentKey() throws IOException, InterruptedException {
       return key;
    }
    public NullWritable getCurrentValue() throws IOException,
           InterruptedException {
       return value;
    }
    public float getProgress() throws IOException, InterruptedException {
       return (float) createdRecords / (float) numRecordsToCreate;
    }
    public void close() throws IOException {
       // nothing to do here...
    }
}


 

External Source Output

Pattern Description

正如本章早些时候说的,外部源输出模式写到hadoop系统之外。

Intent

你想把MapReduce的输出写到远程地点。

Motivation

使用这种模式,我们能够把从MapReduce框架输出的数据直接给一个外部源。这对直接加载数据到系统而不是通过中转数据的方式极其有用。这种模式跳过存储到文件系统这一步而直接发送到要去的地方。使用MapReduce大量并行导入外部源的方式有使用的地方。

 

数据是并行写出的。由于使用外部源做输出,你需要保证目标系统能够处理这种并行度,承受所有打开的连接。

Structure

7-2展示了外部源输出结构,解释如下:

·OutputFormatjob提交前验证job配置指定的输出。这也是保证外部源完全可用的好时机。如果当把数据提交给外部系统时发现它并不能用,这是不好的。这个方法也负责RecordWriter方法的创建。

·RecordWriter把所有的键值对写到外部源。它的实现根据不同的外部源而不同。对象构建时,使用外部源的api建立需要的连接。这些连接用于mapreduce任务写数据。

MapReduce Design Patterns(7、输入输出模式)(十三)

CHAPTER 7.Input and Output Patterns

Figure 7-2. The structure of the external source output pattern

Consequences

输出数据已经发送到外部源,并且外部源成功加载数据。

 

Notice:注意任务失败可能发生,如果发生了,任何write方法里的键值对都不能恢复。典型的MapReduce job里,临时输出写到文件系统里。在失败的情况下,输出被丢弃。当写到一个外部目录时,会在流中接收数据。如果任务失败,外部源不会自动识别并丢弃所有从这个任务接收的数据。如果这是不可接受的,考虑使用自定义的OutputCommitter写临时输出到文件系统。

Performance analysis

MapReduce角度看,没什么可担心的,因为mapreduce都是通用的。不过要注意数据的接收方要能处理平行连接。运行1000个任务写到一个关系数据库里不是很好。要避免这种情况,你可能要让每个reducer处理多一点的数据。如果目标系统对并行支持的很好,这也可能不是问题。例如,写到一个分区数据库,可以把每个reducer写到指定的数据库实例。(oracle RAC?)

 

External Source Output Example

Writing to Redis instances

这个例子是从MapReduce并行写入多个redis实例的基本方式。Redis是一个开源内存键值存储数据库。通常作为数据结构服务器,键可以是stringhashlistset,和sorted setRedis用标准c写的,能在多数posix系统下工作,例如linux,不需要外部的依赖。

 

为了跟hadoop框架一起工作,jedis用于跟redis的交流。Jedis是开源的“blazingly small and sane Redis java 客户端”。还有其它语言编写的redis客户端可以在网上找到。

 

这个例子没有实际的分析业务,本章剩下的都是这样。关注于用自定义的fileOutputFormat怎样把数据存储到hdfs并存到外部数据系统。这个例子里,*用户数据写到数量可配置的redis集群,数据是用户到声誉值的映射。这些映射数据根据redishash平均的随机分发。

 

Redishash是一种stirng 字段到string值的映射,跟javahashmap类似。每个hash都有一个key标识它。每个hash可保存超过40亿的键值对。

 

问题:给出用户信息数据,并行随机分发用户-声誉值的映射数据到一个数量可配置的redis集群。

 

outputFormat codeRedisHashOutputFormat类负责在提交到jobtracker之前创建和验证job配置。也会创建RecordWriter序列化输出键值对。通常写到hdfs,但我们这里不是,一会会看到。

 

输出格式包含了必须被驱动代码设置的配置变量,来保证已经有了job运行需要的所有信息。

这里,有几个推测开发人员需要用到的静态方法。这个输出格式接受一些redis实例主机作为csv结构和一个写所有输出的redis hash keycheckOutputSpecs 方法里,运行之前要保证两个参数已被设置,因为没有他们job会失败。这也是你要验证配置的地方。

 

getRecordWriter方法用于后面为mapreduce任务创建RecordWriter实例。这里我们靠RedisHashRecordWriter得到需要的配置变量并返回一个新的实例。这个Record writerRedisHashOutputFormat的子类,不需要但是约定的东西。

 

这个输出格式的最后一个方法是getOutputCommitter。在任务失败需要重跑之前框架用它管理任何临时输出。对于这个实现,我们通常不关心任务是否失败和需要重新执行。只要job完成就可以。输出提交者是框架需要的,但NullOutputFormat包含的输出提交者的实现什么也不做。

 

public static class RedisHashOutputFormat extends OutputFormat<Text, Text> {
    public static final String REDIS_HOSTS_CONF = "mapred.redishashoutputformat.hosts";
    public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";
    public static void setRedisHosts(Job job, String hosts) {
       job.getConfiguration().set(REDIS_HOSTS_CONF, hosts);
    }
    public static void setRedisHashKey(Job job, String hashKey) {
       job.getConfiguration().set(REDIS_HASH_KEY_CONF, hashKey);
    }
    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
           throws IOException, InterruptedException {
       return new RedisHashRecordWriter(job.getConfiguration().get(
              REDIS_HASH_KEY_CONF), job.getConfiguration().get(
              REDIS_HOSTS_CONF));
    }
    public void checkOutputSpecs(JobContext job) throws IOException {
       String hosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
       if (hosts == null || hosts.isEmpty()) {
           throw new IOException(REDIS_HOSTS_CONF
                  + " is not set in configuration.");
       }
       String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
       if (hashKey == null || hashKey.isEmpty()) {
           throw new IOException(REDIS_HASH_KEY_CONF
                  + " is not set in configuration.");
       }
    }
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
           throws IOException, InterruptedException {
       return (new NullOutputFormat<Text, Text>())
              .getOutputCommitter(context);
    }
    public static class RedisHashRecordWriter extends
           RecordWriter<Text, Text> {
       // code in next section
    }
}


RecordReader code RedisHashRecordWriter类通过jedis客户端处理redis的连接并写数据。每个键值对随机写到redis实例,这种写是在整个集群内平均分发的,构造器保存要写的hash key并创建新的jedis实例。

然后连接上jedis实例并跟一个整数做映射。Write方法会用这个映射得到指定的jedis实例。用keyhash码对redis实例个数取模。这个模值决定了键值对要发送的jedis实例。Jedis实例在close方法里关闭连接。

 

public static class RedisHashRecordWriter extends RecordWriter<Text, Text> {
    private HashMap<Integer, Jedis> jedisMap = new HashMap<Integer, Jedis>();
    private String hashKey = null;
    public RedisHashRecordWriter(String hashKey, String hosts) {
       this.hashKey = hashKey;
       // Create a connection to Redis for each host
       // Map an integer 0-(numRedisInstances - 1) to the instance
       int i = 0;
       for (String host : hosts.split(",")) {
           Jedis jedis = new Jedis(host);
           jedis.connect();
           jedisMap.put(i, jedis);
           ++i;
       }
    }
    public void write(Text key, Text value) throws IOException,
           InterruptedException {
       // Get the Jedis instance that this key/value pair will be
       // written to
       Jedis j = jedisMap.get(Math.abs(key.hashCode()) % jedisMap.size());
       // Write the key/value pair
       j.hset(hashKey, key.toString(), value.toString());
    }
    public void close(TaskAttemptContext context) throws IOException,
           InterruptedException {
       // For each jedis instance, disconnect it
       for (Jedis jedis : jedisMap.values()) {
           jedis.disconnect();
       }
    }
}


Mapper code。较简单。Userid和声誉值从记录获取然后输出。Outputformat会做大部分的工作,允许mapper重用多次去写任何你想要的东西到redis hash里。

 

public static class RedisOutputMapper extends
       Mapper<Object, Text, Text, Text> {
    private Text outkey = new Text();
    private Text outvalue = new Text();
 
    public void map(Object key, Text value, Context context)
           throws IOException, InterruptedException {
       Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
              .toString());
       String userId = parsed.get("Id");
       String reputation = parsed.get("Reputation");
       // Set our output key and values
       outkey.set(userId);
       outvalue.set(reputation);
       context.write(outkey, outvalue);
    }
}


Driver code。驱动代码解析命令行参数,调用静态方法设置要写到redis的数据。

 

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Path inputPath = new Path(args[0]);
    String hosts = args[1];
    String hashName = args[2];
    Job job = new Job(conf, "Redis Output");
    job.setJarByClass(RedisOutputDriver.class);
    job.setMapperClass(RedisOutputMapper.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(TextInputFormat.class);
    TextInputFormat.setInputPaths(job, inputPath);
    job.setOutputFormatClass(RedisHashOutputFormat.class);
    RedisHashOutputFormat.setRedisHosts(job, hosts);
    RedisHashOutputFormat.setRedisHashKey(job, hashName);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    int code = job.waitForCompletion(true) ? 0 : 2;
    System.exit(code);
}