hadoop 二次排序 安插数据库
hadoop 二次排序 插入数据库
二次排序:根据自定义对象的compareTo 方法排序
由下面的代码实现可以看出 二次排序的实质是 先根据第一个字段排完序后再排第二个字段
若还有第三个字段参与进来是否可以叫作三次排序呢 (?_ ?)
另:根据程序断点初步判断
设置job的sort 会在mapper 至combiner阶段执行
设置job的group会在combiner至reduce 阶段执行
不过在从combiner到reduce的时候若传递的key为自定义的对象即使重写了hashcode 和equals 方法也不会当成相同的key来处理 不得已在本程序中传输key为一个空Text()
不知是否有别的方法可以实现 ?
插入数据库的操作在 附件中有详细的实现.
package hdfs.demo2.final_indb; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Demo2_3Mapp { /** * 用户自定义对象 保存 * @author Administrator * */ public static class TopTenPair implements WritableComparable<TopTenPair>, DBWritable, Writable { int prodid; //商品编码 int price; //商品价格 int count; //商品销售数量 @Override public void write(PreparedStatement statement) throws SQLException { statement.setInt(1, prodid); statement.setInt(2, price); statement.setInt(3, count); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.prodid = resultSet.getInt(1); this.price = resultSet.getInt(2); this.count = resultSet.getInt(3); } /** * Set the prodId and price and count values. */ public void set(int prodid, int price, int count) { this.prodid = prodid; this.price = price; this.count = count; } public int getProdid() { return prodid; } public int getPrice() { return price; } public int getCount() { return count; } @Override // 反序列化,从流中的二进制转换成IntPair public void readFields(DataInput in) throws IOException { prodid = in.readInt(); price = in.readInt(); count = in.readInt(); } @Override // 序列化,将IntPair转化成使用流传送的二进制 public void write(DataOutput out) throws IOException { out.writeInt(prodid); out.writeInt(price); out.writeInt(count); } @Override // key的比较 public int compareTo(TopTenPair o) { if ( o.count ==count) { if( o.count==0){ return o.prodid - prodid; } return o.price-price; } return o.count-count; } // 新定义类应该重写的两个方法 @Override public int hashCode() { return count+prodid*3 ; } @Override public boolean equals(Object right) { if (right == null) return false; if (this == right) return true; if (right instanceof TopTenPair) { TopTenPair r = (TopTenPair) right; return r.prodid == prodid && r.price == price&& r.count == count; } else { return false; } } @Override public String toString(){ return getProdid()+"\t"+getPrice()+"\t"+getCount(); } } public static class TopTenPairS extends TopTenPair{ public TopTenPairS(){ } // key的比较 @Override public int compareTo(TopTenPair o) { return o.price-price; } } /** * 分区函数类。根据first确定Partition。 */ public static class FirstPartitioner extends Partitioner<TopTenPair, Text> { @Override public int getPartition(TopTenPair key, Text value, int numPartitions) { return Math.abs(key.getProdid() ) % numPartitions; } } /** * 分组函数类。只要first相同就属于同一个组。 */ public static class GroupingComparator extends WritableComparator { protected GroupingComparator() { super(TopTenPair.class, true); } @Override // Compare two WritableComparables. public int compare(WritableComparable w1, WritableComparable w2) { TopTenPair ip1 = (TopTenPair) w1; TopTenPair ip2 = (TopTenPair) w2; if (ip1.count == ip2.count) { if(ip1.count==0){ return ip1.prodid - ip2.prodid; } return ip1.price - ip2.price ; } return ip1.count-ip2.count; } } public static class Map extends Mapper<LongWritable, Text, Text, Text> { private final Text intkey= new Text(); private final Text intvalue = new Text(); //商品ID 售价 数量 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); int prodid = 0; int price = 0; if (tokenizer.hasMoreTokens()) prodid = Integer.parseInt(tokenizer.nextToken()); if (tokenizer.hasMoreTokens()) price = Integer.parseInt(tokenizer.nextToken()); intkey.set(prodid+""); intvalue.set(price+""); // intvalue.set(0, price, 0); context.write(intkey, intvalue); } } public static class Demo2_3Combiner extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int count=0; int maxPrice=0; for (Text value : values) { int v=Integer.parseInt(value.toString()); maxPrice=v<maxPrice?maxPrice:v; count++; } //key :prodId context.write(new Text(),new Text(key+"-"+maxPrice+"-"+count)); } } public static class Reduce extends Reducer<Text, Text, TopTenPairS, Text> { TopTenPair pair = new TopTenPair(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String [] strs=null; TopTenPair pair ; List<TopTenPair> list=new ArrayList<Demo2_3Mapp.TopTenPair>(); for (Text val : values) { pair = new TopTenPair(); strs=val.toString().split("-"); pair.set(Integer.parseInt(strs[0]), Integer.parseInt(strs[1]), Integer.parseInt(strs[2])); list.add(pair); } //按 count属性排序 Collections.sort(list); List<TopTenPairS> lists=new ArrayList<Demo2_3Mapp.TopTenPairS>(); //取前4个对象 for(int i =0;i<4&& i<list.size();i++){ TopTenPair ttp=list.get(i); TopTenPairS ttps=new TopTenPairS(); ttps.set(ttp.getProdid(), ttp.getPrice(), ttp.getCount()); lists.add(ttps); } //按 price 属性排序 Collections.sort(lists); for(TopTenPairS ttps:lists){ System.out.println(ttps); //参考 DBRecordWriter //key 为数据类型, value:无用 context.write( ttps , new Text()); //输出到数据中 } } } }