现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排列的整数。

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排列的整数。

package org.apache.hadoop.examples;
import java.util.HashMap;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class B_sortInt {
	public static Integer numsum = new Integer(0);
	public B_sortInt() {
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		String[] otherArgs = new String[]{"input","output"};
		if(otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> [<in>...] <out>");
			System.exit(2);
		}

		Job job = Job.getInstance(conf, "sort");
		job.setJarByClass(B_sortInt.class);
		job.setMapperClass(B_sortInt.TokenizerMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
		job.setReducerClass(B_sortInt.IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		for(int i = 0; i < otherArgs.length - 1; ++i) {
			FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
		}

		FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
		System.exit(job.waitForCompletion(true)?0:1);
	}

	public static class IntSumReducer extends Reducer<IntWritable, Text, Text, Text> {
		private Text word = new Text();
		public IntSumReducer() {
		}

		public void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
			this.word.set(key.toString());
			numsum+=1;
			context.write(new Text(numsum.toString()), word);
		}
		//System.out.println(key.toString()+"
"+result.toString());
	}


	public static class TokenizerMapper extends Mapper<Object, Text, IntWritable, Text> {
		private IntWritable one = new IntWritable();
		public TokenizerMapper() {
		}

		public void map(Object key, Text value, Mapper<Object, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while(itr.hasMoreTokens()) {
				String tmpstr = itr.nextToken();
				this.one.set(Integer.parseInt(tmpstr));
				//System.out.println("["+one.toString()+"]");
				context.write(one, new Text("a"));
			}

		}
	}
}