Hadoop学习三十一:Win7下HBase与MapReduce集成有关问题

Hadoop学习三十一:Win7下HBase与MapReduce集成问题

一. 代码

  1.      Hbase In Action(HBase实战)和Hbase:The Definitive Guide(HBase权威指南)两本书中,有很多入门级的代码,可以选择自己感兴趣的check out。地址分别为https://github.com/HBaseinaction https://github.com/larsgeorge/hbase-book。
  2. 在Win7下运行Hbase与MapReduce集成章节的代码时,出现了错误。比喻这个代码https://github.com/larsgeorge/hbase-book/blob/master/ch07/src/main/java/mapreduce/ParseJson.java

 

二. 错误

Exception in thread "main" java.lang.IllegalArgumentException: Pathname /D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar from hdfs://192.168.1.200:9000/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar is not a valid DFS filename.
	at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:184)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92)
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
	at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:264)
	at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:300)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:387)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
	at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286)
	at com.jyz.study.hadoop.hbase.mapreduce.AnalyzeData.main(AnalyzeData.java:249)

  

 

三. 跟踪代码

    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil

  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
    addDependencyJars(conf,
      // explicitly pull a class from each module
      org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
      org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
      org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
      // pull necessary dependencies
      org.apache.zookeeper.ZooKeeper.class,
      org.jboss.netty.channel.ChannelFactory.class,
      com.google.protobuf.Message.class,
      com.google.common.collect.Lists.class,
      org.cloudera.htrace.Trace.class);
  }


public static void addDependencyJars(Configuration conf,
      Class<?>... classes) throws IOException {
      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
  }

      findOrCreateJar方法就是将一个class类装换为此类所在jar的路径(Find a jar that contains a class of the same name),如org.apache.hadoop.hbase.HConstants.class被转换成/D:/GoogleCode/platform-components/trunk/SourceCode/study-hadoop/lib/hbase-client-0.96.1.1-hadoop2.jar。问题也是在这个方法里,没有考虑windows的情况,只适用linux。

private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
      throws IOException {
    ClassLoader loader = my_class.getClassLoader();
    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";

    // first search the classpath
    for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
      URL url = itr.nextElement();
      if ("jar".equals(url.getProtocol())) {
        String toReturn = url.getPath();
        if (toReturn.startsWith("file:")) {
          toReturn = toReturn.substring("file:".length());
        }
        // URLDecoder is a misnamed class, since it actually decodes
        // x-www-form-urlencoded MIME type rather than actual
        // URL encoding (which the file path has). Therefore it would
        // decode +s to ' 's which is incorrect (spaces are actually
        // either unencoded or encoded as "%20"). Replace +s first, so
        // that they are kept sacred during the decoding process.
        toReturn = toReturn.replaceAll("\\+", "%2B");
        toReturn = URLDecoder.decode(toReturn, "UTF-8");
        return toReturn.replaceAll("!.*$", "");
      }
    }

    如果你只是在Window下调试用,可以toReturn = toReturn.substring("file:".length());改为toReturn = toReturn.substring("file:".length() + 1);编译后替换原来class。

 

四. 其它解决方法

     在TableMapReduceUtil里initTableMapperJob,initTableReducerJob都有大量的重构方法,其中可以指定参数

   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).

      也正是因为addDependencyJars默认为true,才触发了上面的错误

if (addDependencyJars) {
      addDependencyJars(job);
    }

      所以我们可以将其设置为false。修改https://github.com/larsgeorge/hbase-book/blob/master/ch07/src/main/java/mapreduce/ParseJson.java 代码为

TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, // co ParseJson-3-SetMap Setup map phase details using the utility method.
      ImmutableBytesWritable.class, Put.class, job, false);
    TableMapReduceUtil.initTableReducerJob(output, // co ParseJson-4-SetReduce Configure an identity reducer to store the parsed data.
      IdentityTableReducer.class, job, null, null, null, null, false);

 运行正常,查看结果,testtable data:json的数据划分为 testtable data:column1 data:column2...符合期望。