Hadoop中的Join示范
Hadoop中的Join示例
1、案例表和数据取自ORACLE的scott/tiger
本次实现的查询:
2、处理join的思路:
将Join key 当作map的输出key, 也就是reduce的输入key , 这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。需要为join的2张表设计一个通用的一个bean. 并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。reduce 阶段根据flag来判断是EMP表还是DEPT表就很容易了 。而join的真正处理是在reduce阶段。
3、例子:
存储数据的bean (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by IntelliJ IDEA. * User: diegoball * Date: 11-3-20 * Time: 下午5:21 * To change this template use File | Settings | File Templates. */ public class Employee implements WritableComparable { private String empno=""; private String empname=""; private String deptname=""; private String deptno=""; private int flag=0; public String getEmpno() { return empno; } public void setEmpno(String empno) { this.empno = empno; } public String getEmpname() { return empname; } public void setEmpname(String empname) { this.empname = empname; } public String getDeptname() { return deptname; } public void setDeptname(String deptname) { this.deptname = deptname; } public String getDeptno() { return deptno; } public void setDeptno(String deptno) { this.deptno = deptno; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public Employee(String empno, String empname, String deptname, String deptno, int flag) { this.empno = empno; this.empname = empname; this.deptname = deptname; this.deptno = deptno; this.flag = flag; } public Employee() { super(); } public Employee(Employee obj) { this.empno = obj.empno; this.empname = obj.empname; this.deptname = obj.deptname; this.deptno = obj.deptno; this.flag = obj.flag; } package com.alipay.dw.test; public String toString() { return this.empno + "," + this.empname + "," + this.deptname + "," + this.deptno; } public int compareTo(Object o) { return 0; //To change body of implemented methods use File | Settings | File Templates. } public void write(DataOutput dataOutput) throws IOException { //To change body of implemented methods use File | Settings | File Templates. dataOutput.writeUTF(this.empno); dataOutput.writeUTF(this.empname); dataOutput.writeUTF(this.deptname); dataOutput.writeUTF(this.deptno); dataOutput.writeInt(this.flag); } public void readFields(DataInput dataInput) throws IOException { //To change body of implemented methods use File | Settings | File Templates. this.empno = dataInput.readUTF(); this.empname = dataInput.readUTF(); this.deptname = dataInput.readUTF(); this.deptno = dataInput.readUTF(); this.flag = dataInput.readInt(); } }
Mapper类:
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; /** * Created by IntelliJ IDEA. * User: diegoball * Date: 11-3-20 * Time: 下午5:36 * To change this template use File | Settings | File Templates. */ public class EmpMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Employee> { public void map(LongWritable key, Text value, OutputCollector<LongWritable, Employee> output, Reporter reporter) throws IOException { String line = value.toString(); String[] array = line.split(","); if (array.length <= 3) { // dept Employee obj = new Employee(); obj.setDeptno(array[0]); obj.setDeptname(array[1]); obj.setFlag(1); output.collect(new LongWritable(Long.valueOf(obj.getDeptno())), obj); } else { //emp : Employee obj = new Employee(); obj.setEmpno(array[0]); obj.setDeptno(array[7]); obj.setEmpname(array[1]); obj.setFlag(2); output.collect(new LongWritable(Long.valueOf(obj.getDeptno())), obj); } } }
Reduce类:
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * Created by IntelliJ IDEA. * User: diegoball * Date: 11-3-20 * Time: 下午6:30 * To change this template use File | Settings | File Templates. */ public class EmpReducer extends MapReduceBase implements Reducer<LongWritable, Employee, LongWritable, Text> { public void reduce(LongWritable key, Iterator<Employee> values, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { Employee dept = null; List<Employee> list = new ArrayList<Employee>(); while (values.hasNext()) { Employee obj = values.next(); if (obj.getFlag() == 1) { //dept dept = new Employee(obj); } else { //emp Employee objClone = new Employee(obj); list.add(objClone); } } for (int i = 0; i < list.size(); i++) { Employee employee = list.get(i); employee.setDeptname(dept.getDeptname()); output.collect(new LongWritable(0), new Text(employee.toString())); } } }
启动job的类:
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import java.io.IOException; /** * Created by IntelliJ IDEA. * User: diegoball * Date: 11-3-20 * Time: 下午6:41 * To change this template use File | Settings | File Templates. */ public class EmpMain { public static void main(String[] args) throws IOException { JobConf conf = new JobConf(EmpMain.class); conf.setJobName("Employee job"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(EmpMapper.class); conf.setReducerClass(EmpReducer.class); conf.setMapOutputValueClass(Employee.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); JobClient.runJob(conf); } }
ant 编译打包后执行:
验证下处理的结果集:
相比硬编码,hive一条SQL可以搞定,呵呵,所以这种低效的硬编码旨在帮助理解,无实际用途。
1 楼
xm_king
2011-08-18
这个可以使用MapReduce的Join,利用DataJoinMapperBase、DataJoinReducerBase、TaggedMapOutput就可以解决楼主的问题了。