Hadoop2.5.2 从hdfs 地图reduce导出数据到多个hbase表

Hadoop2.5.2 从hdfs mapreduce导出数据到多个hbase表

hadoop和hbase配置好正常运行时的进程情况,jps后查看

60559 HRegionServer

7329 Main

20653 Jps

29355 HQuorumPeer

16221 ResourceManager

29417 HMaster

16538 NodeManager

15750 NameNode

15880 DataNode

16046 SecondaryNameNode

 

网上很多例子都是基于hadoop 0.9x 的,新版hadoop函数有变。

例子是从 hadoop hdfs上读取文件,map reduce后写入多个hbase 表

故重新测试例子如下环境:

hadoop 2.5.2

hbase 1.1.4

有一种场景:例如需要分析日志,统计后,存储到hbase 结果集表和索引表:

例子中没用新版hbase函数,若用新版函数请参考修改

http://bobboy007.iteye.com/admin/blogs/2289537

package jyw.test;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Writable;
/*
 * 测试reduce写入多个表
 * */
public class HBaseMultiTableOutputReduce {

	// 实现 Map 类
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	/* 实现 Reduce 类
	 * map 的输出类型
	 * map的输出值类型
	 * reduce的输出类型
	 * reduce的输出类型
	 * 查是否有setup,clear方法,测试到 myql
 */
	public static class Reduce extends Reducer<Text, IntWritable, Writable, Put> {

		public void reduce(Text key, Iterable<IntWritable> values, Context context) {
			ImmutableBytesWritable putTable1 = new ImmutableBytesWritable(Bytes.toBytes("wordcount"));
			ImmutableBytesWritable putTable2 = new ImmutableBytesWritable(Bytes.toBytes("wordcount1"));
			int sum = 0;

			Iterator<IntWritable> iterator = values.iterator();
			while (iterator.hasNext()) {
				sum += iterator.next().get();
			}

			// Put 实例化,每个词存一行
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 列族为 content,列修饰符为 count,列值为数目
			put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

			try {
				context.write(putTable1, put);
				context.write(putTable2, put);
			} catch (Exception e) {
				e.printStackTrace();
			}
			// context.write(NullWritable.get(), put);
		}
	}

	// 创建 HBase 数据表
	public static void createHBaseTable(String tableName) throws IOException {
		// 创建表描述
		HTableDescriptor htd = new HTableDescriptor(tableName);
		// 创建列族描述
		HColumnDescriptor col = new HColumnDescriptor("content");
		htd.addFamily(col);

		// 配置 HBase
		Configuration conf = HBaseConfiguration.create();

		// conf.set("hbase.zookeeper.quorum","127.0.0.1");
		// conf.set("hbase.zookeeper.property.clientPort", "2181");
		HBaseAdmin hAdmin = new HBaseAdmin(conf);

		if (hAdmin.tableExists(tableName)) {
			System.out.println("该数据表已经存在,正在重新创建。");
			// hAdmin.disableTable(tableName);
			// hAdmin.deleteTable(tableName);
		} else {

			System.out.println("创建表:" + tableName);
			hAdmin.createTable(htd);
		}
	}

	public static void main(String[] args) throws Exception {
		String tableName1 = "wordcount";
		String tableName2 = "wordcount1";
		// 第一步:创建数据库表
		HBaseMultiTableOutputReduce.createHBaseTable(tableName1);
		HBaseMultiTableOutputReduce.createHBaseTable(tableName2);
		// 第二步:进行 MapReduce 处理
		// 配置 MapReduce
		Configuration conf = new Configuration();
		// 这几句话很关键
		// conf.set("mapred.job.tracker", "master:9001");
		// conf.set("hbase.zookeeper.quorum","master");
		// conf.set("hbase.zookeeper.property.clientPort", "2181");
		// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);

		Job job = new Job(conf, "multi output Count");
		job.setJarByClass(HBaseMultiTableOutputReduce.class);

		// 设置 Map 和 Reduce 处理类
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);

		// 设置输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 设置输入和输出格式
		job.setInputFormatClass(TextInputFormat.class);
		// job.setOutputFormatClass(TableOutputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);

		// 设置输入目录
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.42:9000/user/jiayongwei/input/"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}