Hadoop中的Join示范

Hadoop中的Join示例

1、案例表和数据取自ORACLE的scott/tiger

 

  
Hadoop中的Join示范

 

本次实现的查询:

   
Hadoop中的Join示范

 

2、处理join的思路:

       将Join key 当作map的输出key, 也就是reduce的输入key ,  这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。需要为join的2张表设计一个通用的一个bean.  并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。reduce 阶段根据flag来判断是EMP表还是DEPT表就很容易了 。而join的真正处理是在reduce阶段。 

 

3、例子:

 

    存储数据的bean  (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):

 

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-20
 * Time: 下午5:21
 * To change this template use File | Settings | File Templates.
 */
public class Employee implements WritableComparable {
    private String empno="";
    private String empname="";
    private String deptname="";
    private String deptno="";
    private int flag=0;

    public String getEmpno() {
        return empno;
    }

    public void setEmpno(String empno) {
        this.empno = empno;
    }

    public String getEmpname() {
        return empname;
    }

    public void setEmpname(String empname) {
        this.empname = empname;
    }

    public String getDeptname() {
        return deptname;
    }

    public void setDeptname(String deptname) {
        this.deptname = deptname;
    }

    public String getDeptno() {
        return deptno;
    }

    public void setDeptno(String deptno) {
        this.deptno = deptno;
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }


    public Employee(String empno, String empname, String deptname, String deptno, int flag) {
        this.empno = empno;
        this.empname = empname;
        this.deptname = deptname;
        this.deptno = deptno;
        this.flag = flag;
    }

    public Employee() {
        super();
    }

    public Employee(Employee obj) {
        this.empno = obj.empno;
        this.empname = obj.empname;
        this.deptname = obj.deptname;
        this.deptno = obj.deptno;
        this.flag = obj.flag;
    }

package com.alipay.dw.test;
    public String toString() {
        return this.empno + "," + this.empname + "," + this.deptname + "," + this.deptno;
    }

    public int compareTo(Object o) {
        return 0;  //To change body of implemented methods use File | Settings | File Templates.
    }

    public void write(DataOutput dataOutput) throws IOException {
        //To change body of implemented methods use File | Settings | File Templates.
        dataOutput.writeUTF(this.empno);
        dataOutput.writeUTF(this.empname);
        dataOutput.writeUTF(this.deptname);
        dataOutput.writeUTF(this.deptno);
        dataOutput.writeInt(this.flag);
    }

    public void readFields(DataInput dataInput) throws IOException {
        //To change body of implemented methods use File | Settings | File Templates.
        this.empno = dataInput.readUTF();
        this.empname = dataInput.readUTF();
        this.deptname = dataInput.readUTF();
        this.deptno = dataInput.readUTF();
        this.flag = dataInput.readInt();
    }
}
 

 

 

Mapper类:

 

 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;

/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-20
 * Time: 下午5:36
 * To change this template use File | Settings | File Templates.
 */
public class EmpMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Employee> {

    public void map(LongWritable key, Text value, OutputCollector<LongWritable, Employee> output, Reporter reporter) throws IOException {
        String line = value.toString();
        String[] array = line.split(",");
        if (array.length <= 3) {  // dept
            Employee obj = new Employee();
            obj.setDeptno(array[0]);
            obj.setDeptname(array[1]);
            obj.setFlag(1);
            output.collect(new LongWritable(Long.valueOf(obj.getDeptno())), obj);

        } else {          //emp   :
            Employee obj = new Employee();
            obj.setEmpno(array[0]);
            obj.setDeptno(array[7]);
            obj.setEmpname(array[1]);
            obj.setFlag(2);
            output.collect(new LongWritable(Long.valueOf(obj.getDeptno())), obj);

        }

    }
}

 

Reduce类:

 

 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-20
 * Time: 下午6:30
 * To change this template use File | Settings | File Templates.
 */
public class EmpReducer extends MapReduceBase implements Reducer<LongWritable, Employee, LongWritable, Text> {

    public void reduce(LongWritable key, Iterator<Employee> values,

                       OutputCollector<LongWritable, Text> output,

                       Reporter reporter) throws IOException {
        Employee dept = null;
        List<Employee> list = new ArrayList<Employee>();
        while (values.hasNext()) {
            Employee obj = values.next();
            if (obj.getFlag() == 1) {    //dept
                dept = new Employee(obj);
            } else {      //emp
                Employee objClone = new Employee(obj);
                list.add(objClone);
            }

        }
        for (int i = 0; i < list.size(); i++) {
            Employee employee = list.get(i);
            employee.setDeptname(dept.getDeptname());
            output.collect(new LongWritable(0), new Text(employee.toString()));
        }

    }
}

 

启动job的类:

 

 

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

import java.io.IOException;

/**
 * Created by IntelliJ IDEA.
 * User: diegoball
 * Date: 11-3-20
 * Time: 下午6:41
 * To change this template use File | Settings | File Templates.
 */
public class EmpMain {
    public static void main(String[] args) throws IOException {
        JobConf conf = new JobConf(EmpMain.class);
        conf.setJobName("Employee job");
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(EmpMapper.class);
        conf.setReducerClass(EmpReducer.class);
        conf.setMapOutputValueClass(Employee.class);
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);
        JobClient.runJob(conf);
    }
}

 

ant 编译打包后执行:


Hadoop中的Join示范

 

 验证下处理的结果集:


Hadoop中的Join示范

 

相比硬编码,hive一条SQL可以搞定,呵呵,所以这种低效的硬编码旨在帮助理解,无实际用途。

1 楼 xm_king 2011-08-18  
这个可以使用MapReduce的Join,利用DataJoinMapperBase、DataJoinReducerBase、TaggedMapOutput就可以解决楼主的问题了。