Hadoop对输入路径的过滤有关问题解决方式

Hadoop对输入路径的过滤问题解决方式

需求:

数据格式如下:

/data/input/news/old.dat

/data/input/news/current.dat

/data/input/news/20131001.txt

......

/data/input/news/20131030.txt

/data/input/news/20131031.txt

我们需要计算分析10月份的新闻,其他的暂时不计算,那么就需要做一个过滤操作,input path为新闻根路径即/data/input/news/

 

思路:

基于PathFilter实现

 

实现代码:

fitler:

class TextPathFilter extends Configured implements PathFilter {
	Configuration conf = null;

	@Override
	public Configuration getConf() {
		return conf;
	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;

	}

	@Override
	public boolean accept(Path path) {
		String regex = conf.get("org.test.filter.regex");
		if (regex == null) {
			return true;
		}
		return path.toString().matches(regex);
	}

}

使用方式:

Configuration conf = new Configuration();
conf.set("org.test.filter.regex", "2012[1-12][1-31].txt");
........
TextInputFormat.setInputPathFilter(job, TextPathFilter.class);

 

其他说明:

0.21.0版本之前会报错,错误信息如下:

Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://your path
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:248)
        at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:950)
        at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:967)
        at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170)
        at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:880)
        at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:476)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506)

错误原因:

在进行match的时候返回了null,应该返回一个空的FileStatus[],代码位置在FileSystem.java中

0.21.0版本之后此bug已经修复,因此升级即可