hbase coprocessor小实践引发的对coprocessor错误处理机制的探究

hbase coprocessor小实践引发的对coprocessor异常处理机制的探究


转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12970321

最近又翻回头看了看hbase cp相关的东西,就思考写一个实例,其中包含masterobserver、regionobserver和endpoint。最终大致定了这么一些场景:

a.对于表名符合相关规则的表在建表的同时再建一个与之相关的sibling table;

b.自定义InternalPut,只允许使用InternalPut来插入数据,并且给该InternalPut增加一个名为heapsize的列,列的值为InternalPut上heapsize()方法的返回值,put数据之后给sibling table用相同rowkey插入一行数据;

c.客户端对该表某rowkey范围内的heapsize的总和做统计(totalheapsize)。

以上三种场景分别对应masterobserver、regionobserver和endpoint。

实现并不复杂,略过,说一下踩到的两个坑。

1.  prePut的时候做了

if (!(put instanceof InternalPut))

的验证,自定义抛出一个运行时异常。然后运行,确实按预期抛出异常,同样代码再次运行,数据居然插入成功...hbase coprocessor小实践引发的对coprocessor错误处理机制的探究

解决: 第一次执行了,说明协处理加上了,所以就怀疑是不是协处理出什么问题了,因为协处理的管理都在CoprocessorHost类上,所以去看这个类,注意到了这个方法 

	protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
看下代码:

	if (e instanceof IOException) {
	throw (IOException) e;
	}
如果是IOException,抛出。我抛的是运行时异常,怎么处理呢?

if (env.getConfiguration().getBoolean("hbase.coprocessor.abortonerror", false)) {
			// server is configured to abort.
		abortServer(env, e);
	} else {
		LOG.error("Removing coprocessor '" + env.toString() + "' from "
				+ "environment because it threw:  " + e, e);
		coprocessors.remove(env);
		throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e
				+ "' and has been removed" + "from the active " + "coprocessor set.", e);
	}
如果hbase.coprocessor.abortonerror这个参数开启了,ok,退出。 否则,该协处理会被从加载的环境中删除! 这也是为什么我第二次运行数据居然插入了!!

另外,在看下这个方法的描述信息,关于hbase协处理对IOException的一些处理机制,我就不翻译了,如下:

/**
	 * This is used by coprocessor hooks which are declared to throw
	 * IOException (or its subtypes). For such hooks, we should handle
	 * throwable objects depending on the Throwable's type. Those which are
	 * instances of IOException should be passed on to the client. This is
	 * in conformance with the HBase idiom regarding IOException: that it
	 * represents a circumstance that should be passed along to the client
	 * for its own handling. For example, a coprocessor that implements
	 * access controls would throw a subclass of IOException, such as
	 * AccessDeniedException, in its preGet() method to prevent an
	 * unauthorized client's performing a Get on a particular table.
	 * /
2. 严格意义上第二个问题也可以不算做坑...  在做
if (!(put instanceof InternalPut))
验证之后,抛出异常,我自定义了一个MyAccessDeniedException,异常信息为“Put denied! use InternalPut!”, 正常情况,我用普通Put去插入数据时,该Put被拦截,并且抛出异常。 确实抛出异常了,也是MyAccessDeniedException这个类,但是并没有包含我期望的异常信息,并且这段代码的执行会卡很长的时间。 异常信息大致如下:(我懒得再布包去重现了..hbase coprocessor小实践引发的对coprocessor错误处理机制的探究)

RetriesExhaustedWithDetailsException:Failed 1 action:MyAccessDeniedException 

.....

后边并没有出现我预期的异常信息及异常栈。

解决:没好办法,debug...  在跟到HConnectionManager的这个方法中时,出现了些问题的端倪,

public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException 

我发现在RPC调用的返回结果 

 MultiResponse resp = future.get();
resp中已经包含我要的正确的异常信息! 为什么没有打印出来呢? 继续往下走。。 有一段代码引起了我的注意:

 if (results[i] == null ||
              (results[i] instanceof Throwable &&
                  !(results[i] instanceof DoNotRetryIOException))) {

            retry = true;
            actionCount++;
            Row row = list.get(i);
            workingList.add(row);
            deleteCachedLocation(tableName, row.getRow());
          } else {
此时,resp中的内容已经放在了result数组中, 
  !(results[i] instanceof DoNotRetryIOException)))
如果不是DoNotRetryIOException,那么就会重试... 多少次呢?如果你没有修改过配置,默认十次!  这就是为什么执行之前的代码卡很久的原因!重试了十次! 那重试十次我也忍了,为什么异常信息没给我打出来呢? 继续往下看... 看到了这里..

if (!exceptions.isEmpty()) {
        throw new RetriesExhaustedWithDetailsException(exceptions,
            actions,
            addresses);
      }
发现了这货...  RetriesExhaustedWithDetailsException...并且我的异常信息被包在里边了.. 进去看看...
public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
                                              List<Row> actions,
                                              List<String> hostnameAndPort) {
    super("Failed " + exceptions.size() + " action" +
        pluralize(exceptions) + ": " +
        getDesc(exceptions, actions, hostnameAndPort));

    this.exceptions = exceptions;
    this.actions = actions;
    this.hostnameAndPort = hostnameAndPort;
  }
getDesc方法比较可疑...

public static String getDesc(List<Throwable> exceptions,
                               List<Row> actions,
                               List<String> hostnamePort) {
    String s = getDesc(classifyExs(exceptions));
    s += "servers with issues: ";
    Set<String> uniqAddr = new HashSet<String>();
    uniqAddr.addAll(hostnamePort);
    for(String addr : uniqAddr) {
      s += addr + ", ";
    }
    return s;
  }
这里看到在获取异常信息的描述时是会对异常进行分类处理的!classifyExs做的这个事情!
 public static Map<String, Integer> classifyExs(List<Throwable> ths) {
    Map<String, Integer> cls = new HashMap<String, Integer>();
    for (Throwable t : ths) {
      if (t == null) continue;
      String name = "";
      if (t instanceof DoNotRetryIOException) {
        name = t.getMessage();
      } else {
        name = t.getClass().getSimpleName();
      }
看到这里明白了...  对于DoNotRetryIOException类型的异常,获取异常信息。其他类型的只是记个类名字。。。为毛啊这是! 坑爹呢!hbase coprocessor小实践引发的对coprocessor错误处理机制的探究

至此,问题算是找到原因了,所以在prePut报异常那里用DoNotRetryIOException把我自己的异常信息包一下就行了。

然后重新布包,运行,问题解决。

这里有一个问题要说明,重试机制本身没有任何问题,但是对于没必要重试或者其他不愿意重试的场景,重试机制就会让用户无法接受,用上述的方式可以作为一种解决办法。