大数据学习day10-----zookeeper--------1.小文件合并,2 输入和输出 3 多路径输出 4.zookeeper(选举机制,安装,zk的shell客户端、java客户端)
1. 小文件合并
HDFS中不适合存储大量的小文件,原因如下;
- 无论文件大小,namenode记录的元数据大小几乎是一致的(1KB的文件与120M的文件在namenode中的元数据都是一样的)
- namenode的内存有限,记录的元数据条数有限,集群的存储容量受限,所以HDFS不能无限添加datanode扩容
- 增加namenode管理元数据的压力
- MR程序默认的是使用TextInputFormat类,计算任务的时候是以文件数量为基准的,大量的小文件会启动大量的maptask,而maptask内部处理数据是比较复杂的,这会降低处理数据的效率
所以
尽量别在hdfs上存储小文件,如果有大量的小文件产生,最好将小文件合并以后再上传
假如小文件真存储在了HDFS中,这是需要避免处理数据时大量maptask的产生
案例:
将E:wcfilemergeinput中的所有文件中的内容读取到一个文件中,文件中内容的格式为文件名:内容
A. 默认设置(产生多个map任务(文件的个数)处理的情况),代码如下:
public class Merge1 { static class MergeMapper extends Mapper<LongWritable, Text, Text,Text >{ // 获取文件名 String fileName = null; @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { FileSplit fs = (FileSplit)context.getInputSplit(); fileName = fs.getPath().getName(); } // 处理每行数据 StringBuilder sb = new StringBuilder(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); sb.append(line+" "); } // 将数据以filename为key,文件内容为value写出 @Override protected void cleanup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { context.write(new Text(fileName), new Text(sb.toString().trim())); } } static class MergeReducer extends Reducer<Text, Text, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<Text> iters, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String fileName = key.toString(); Text next = iters.iterator().next(); // 迭代器中只有一条数据 String content = next.toString(); context.write(new Text(fileName+":"+content), NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapperClass(MergeMapper.class); job.setReducerClass(MergeReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 输出好输入的数据路径 FileInputFormat.setInputPaths(job, new Path("E:/wc/filemerge/input/")); FileOutputFormat.setOutputPath(job, new Path("E:/wc/filemerge/output1/")); // true 执行成功 boolean b = job.waitForCompletion(true); // 退出程序的状态码 404 200 500 System.exit(b ? 0 : -1); } }
此处的代码需要注意的点:
(1)StringBuilder的使用(节省资源,若直接使用String会在常量池创建多个空间)
(2)迭代器中只有一个值时,直接获取值,不需要遍历
运行部分结果如下:
B. 比较少mapTask个数的情况(此种情况不能得到文件名)
代码如下
public class Merge2 { static class MergeMapper2 extends Mapper<LongWritable, Text, Text, NullWritable>{ StringBuilder sb = new StringBuilder(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { try { String line = value.toString(); sb.append(line+" "); } catch (Exception e) { e.printStackTrace(); } } @Override protected void cleanup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new Text(sb.toString().trim()), NullWritable.get()); } } // 可以不需要reduce static class MergeReducer2 extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> iters, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { // 获取mr程序运行时的初始化配置 Configuration conf = new Configuration(); //一个maptask处理的最小数据大小 // 参数一 name 参数二 处理数据量的最小值 单位字节 conf.setLong("mapreduce.input.fileinputformat.split.minsize", 1024*2);//2M Job job = Job.getInstance(conf); // 设置map和reduce类 调用类中自定义的map reduce方法的业务逻辑 job.setMapperClass(MergeMapper2.class); job.setReducerClass(MergeReducer2.class); // 设置map端输出的key-value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 设置reduce的key-value的类型 结果的最终输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置reducetask的个数 默认1个 //job.setNumReduceTasks(3); // 修改默认的输入对象 job.setInputFormatClass(CombineTextInputFormat.class); // 处理的文件的路径 FileInputFormat.setInputPaths(job, new Path("E:/wc/filemerge/input")); // 结果输出路径 FileOutputFormat.setOutputPath(job, new Path("E:/wc/filemerge/output3/")); // 提交任务 参数等待执行 job.waitForCompletion(true) ; } }