hadoop自定义门类
hadoop自定义类型
hadoop为用户提供了自定义类型的接口,对于key,需要重写实现WritableComparable,而对于value,只需重写实现Writable即可。但是在作为reduce的输出的时,输出格式为TextOutputFormat时,不能正常显示。还需要自定义OutputFormat。
常用的几个函数:
private double []values = null; public VectorWritable(){} public VectorWritable(double []a){ this.values =a; } public double get(int index){ return this.values[index]; } public int getLen(){ return this.values.length; } public void set(double []a){ this.values = a; }
需要重写的几个函数:
@Override public void readFields(DataInput arg0) throws IOException { // TODO Auto-generated method stub this.values = new double[arg0.readInt()]; for(int i=0;i<this.values.length;i++){ double tmp = arg0.readDouble(); this.values[i] = tmp; } } @Override public void write(DataOutput arg0) throws IOException { // TODO Auto-generated method stub arg0.writeInt(this.values.length); for(int i=0;i<this.values.length;i++){ arg0.writeDouble(this.values[i]); } }
其实就是序列化和反序列化的需要重写
Map:
public static class TestMap extends Mapper<Object,Text,Text,VectorWritable>{ private static Text outputkey = new Text("1"); private static VectorWritable Vector = new VectorWritable(); public void map(Object key, Text rawLine, Context context) throws IOException, InterruptedException{ String []parts = rawLine.toString().trim().split(","); int len = parts.length; double []tmp = new double[len]; for(int i=0;i<len;i++){ tmp[i] = Double.valueOf(parts[i]); } Vector.set(tmp); context.write(outputkey, Vector); } }
Reduce:
public static class TestReduce extends Reducer<Object,VectorWritable,Text,DoubleWritable>{ private static Text outputkey = new Text("1"); private static DoubleWritable output = new DoubleWritable(); public void reduce(Object key,Iterable<VectorWritable>value,Context context) throws IOException, InterruptedException{ for(VectorWritable vm : value){ int len = vm.getLen(); for(int i=0;i<len;i++){ double x = vm.get(i); //System.out.println(x); output.set(x); context.write(outputkey, output); } } } }