hadoop瀛︿範涔嬭矾_一銆佽嚜瀹氫箟搴忓垪鍖栫被

hadoop瀛︿範涔嬭矾_1銆佽嚜瀹氫箟搴忓垪鍖栫被
涓€銆丠adoop搴忓垪鍖?
1銆佸簭鍒楀寲锛圫erialization锛夋槸鎸囨妸缁撴瀯鍖栧璞¤浆鍖栦负瀛楄妭娴併€?
2銆佸弽搴忓垪鍖栵紙Deserialization锛夋槸搴忓垪鍖栫殑閫嗚繃绋嬨€傚嵆鎶婂瓧鑺傛祦杞洖缁撴瀯鍖栧璞°€?
3銆丣ava搴忓垪鍖栵紙java.io.Serializable锛?

浜屻€佸簭鍒楀寲鏍煎紡鐗圭偣锛?
1銆佺揣鍑戯細楂樻晥浣跨敤瀛樺偍绌洪棿銆?
2銆佸揩閫燂細璇诲啓鏁版嵁鐨勯澶栧紑閿€灏?
3銆佸彲鎵╁睍锛氬彲閫忔槑鍦拌鍙栬€佹牸寮忕殑鏁版嵁
4銆佷簰鎿嶄綔锛氭敮鎸佸璇█鐨勪氦浜?

涓夈€丠adoop鐨勫簭鍒楀寲鏍煎紡锛歐ritable

鍥涖€丠adoop搴忓垪鍖栫殑浣滅敤
1銆佸簭鍒楀寲鍦ㄥ垎甯冨紡鐜鐨勪袱澶т綔鐢細杩涚▼闂撮€氫俊锛屾案涔呭瓨鍌ㄣ€?
2銆丠adoop鑺傜偣闂撮€氫俊銆?

浜斻€佷娇鐢╤adoop鍐呯疆鐨勫簭鍒楀寲绫?涓嶄娇鐢ㄨ嚜瀹氫箟搴忓垪鍖栫被)锛屽疄鐜版祦閲忕粺璁$殑鍔熻兘銆?


public class TrafficApp1 {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Job job = Job.getInstance(new Configuration(), TrafficApp1.class.getSimpleName());
		job.setJarByClass(TrafficApp1.class);
		FileInputFormat.setInputPaths(job, args[0]);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
		
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		Text k2 = new Text();
		Text v2 = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
						throws IOException, InterruptedException {
			String line = value.toString();
			String[] splited = line.split("\t");
			k2.set(splited[1]);
			//灏嗗緱鍒扮殑鏁版嵁鎷兼帴鎴怱tring瀛楃涓诧紝鐢ㄤ簬reduce杈撳叆浣跨敤
			v2.set(splited[6]+"\t"+splited[7]+"\t"+ splited[8]+"\t"+ splited[9]);
			context.write(k2, v2);
			
		}
	}
	
	public static class MyReduce extends Reducer<Text, Text, Text, Text>{
		Text v3 = new Text();
		@Override
		protected void reduce(Text k2, Iterable<Text> v2s,
				Reducer<Text, Text, Text, Text>.Context context)
						throws IOException, InterruptedException {
			long t1 = 0L;
			long t2 = 0L;
			long t3 = 0L;
			long t4 = 0L;
			String[] splited = null;
			for(Text v2 : v2s){
				//灏唌ap杈撳叆鐨勫瓧绗︿覆鍒嗗壊瑙f瀽骞惰绠?
				splited = v2.toString().split("\t");
				t1 += Long.parseLong(splited[0]);
				t2 += Long.parseLong(splited[1]);
				t3 += Long.parseLong(splited[2]);
				t4 += Long.parseLong(splited[3]);
			}
			//杈撳嚭鏍煎紡鍖栫殑瀛楃涓?
			v3.set(t1+"\t"+t2+"\t"+t3+"\t"+t4);
			context.write(k2, v3);
		}
		
	}
	
}





鍏€佽嚜瀹氫箟搴忓垪鍖栫被
public class TrafficApp {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName());
		job.setJarByClass(TrafficApp.class);
		
		FileInputFormat.setInputPaths(job, args[0]);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TrafficWritable.class);
		
		
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TrafficWritable.class);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
		
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{
		
		Text k2 = new Text();
		TrafficWritable v2 = new TrafficWritable();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
						throws IOException, InterruptedException {
			String line = value.toString();
			String[] splited = line.split("\t");
			k2.set(splited[1]);
			v2.set(splited[6], splited[7], splited[8], splited[9]);
			
			context.write(k2, v2);
			
		}
	}
	
	
	public static class MyReduce extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{
		TrafficWritable v3 = new TrafficWritable();
		@Override
		protected void reduce(Text k2, Iterable<TrafficWritable> v2s,
				Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
						throws IOException, InterruptedException {
			long t1 = 0L;
			long t2 = 0L;
			long t3 = 0L;
			long t4 = 0L;
			for(TrafficWritable v2 : v2s){
				t1 += v2.t1;
				t2 += v2.t2;
				t3 += v2.t3;
				t4 += v2.t4;
			}
			
			v3.set(t1, t2, t3, t4);
			context.write(k2, v3);
			
		}
		
	}
	
	static class TrafficWritable implements Writable{

		long t1;
		long t2;
		long t3;
		long t4;
		
		public TrafficWritable(){}
		
		public void set(long t1,long t2,long t3,long t4){
			this.t1 = t1;
			this.t2 = t2;
			this.t3 = t3;
			this.t4 = t4;
		}
		
		public void set(String t1,String t2,String t3, String t4){
			this.t1 = Long.parseLong(t1);
			this.t2 = Long.parseLong(t2);
			this.t3 = Long.parseLong(t3);
			this.t4 = Long.parseLong(t4);
		}
		
		public void readFields(DataInput in) throws IOException {
			this.t1 = in.readLong();
			this.t2 = in.readLong();
			this.t3 = in.readLong();
			this.t4 = in.readLong();
			
		}

		public void write(DataOutput out) throws IOException {

			out.writeLong(t1);
			out.writeLong(t2);
			out.writeLong(t3);
			out.writeLong(t4);
		}
		
		@Override
		public String toString() {
			return this.t1+"\t"+t2+"\t"+t3+"\t"+t4+"\t";
		}
		
	} 
}