hive0.11的hive server实现kerberos认证和impersonation中碰到的有关问题
hive0.11的hive server实现kerberos认证和impersonation中碰到的问题
具体定位语句可以分为三种情况:

3. 类似”select count(1) from tblname“这种会起MR Job的语句,会先在HDFS上创建scratch目录(由hive.exec.scratchdir配置),计算结果写到hdfs scratch目录下,再通过FetchTask读出来。
获取和创建Scratch dir的时候将scratch dir path加入filesystem instance内部的deleteOnExit集合中
filesystem close的时候会先删除所有mark为deleteOnExit的files
同时我们知道FileSystem抽象类内部有个静态final成员变量Cache,以schema, authority, ugi, unique的组合为key缓存了filesystem instance,内部还有一个ClientFinalizer对象(实现了Runnable),注册到JVM的shutdown hook中,在JVM关闭的时候,会启动ClientFinalizer线程,依次关闭所有Cache中的filesystem,通过这种方式来清理删除与filesystem挂钩的资源文件,在Hive中这些挂钩的文件就是local/hdfs scratch dir
回到前面的问题,第二第三种类型语句通过在hive server执行端execute方法中打log能够确认数据是已经dump到scratch目录下了,但是在一进入fetchN方法的时候,发现这些文件就莫名奇妙消失了,从而导致读出来的是空数据。排查了很久发现是由于HIVE 0.10中引入了JIRA HIVE-3098(解决FileSystem Cache内存泄露的问题)所引起的。
正是由于第一个execute方法在finally中调用FileSystem.closeAllForUGI(clientUgi),close掉相关filesystem对象,同时也删除了绑定的scratch目录,第二个fetchN方法才没有数据可读。但是为什么同样实现了kerberos认证和impersonation的hive server 2没有碰到这个问题呢? 其实hive server 2在开启impersonation(set hive.server2.enable.doAs=true)后并不是在thrift processor level而是在hive session level做impersonation的,从而不会在process finally中清理filesystem
在HiveSessionProxy(代理HiveSessionImplwithUGI)中用ugi doAs执行
client端调用HiveConnection.close后,最终server端会调用HiveSessionImplwithUGI.close();关闭UGI相对应的filesystem对象
解决方法
同时在HiveServerHandler的clean方法中(即关闭一个Hive Coonection的时候)加入对于filesystem清理的逻辑
修改上述代码后重新编译,之前三种case语句都能正常返回结果了,就这个问题折腾了一天,hive的bug不是一般的多啊,所以时不时会踩到坑,不过在发现问题到debug再到解决问题的过程中也学习到了很多。
背景
最近在做hive0.9升级到0.11的工作,其中一个步骤就是将之前apply到0.9的patch re-apply到0.11中,有一个patch(https://github.com/lalaguozhe/hive/commit/f2892f9e4706f3ea04117cbc7e7f54ff6af1e415)参考了hive
metastore service的实现,对hive server增加了sasl kerberos认证,支持impersonate成client ugi的身份来启动作业(默认方式会以起hive service daemon的用户身份来执行,导致所有query共用一个用户启动作业)。
发现的问题
不过在re-apply这个patch后发现,用jdbc client访问hive server对于某些语句返回的是空结果集(HiveQueryResultSet中的fetchedRows是个空集合),中间也没有任何报错。非常奇怪,通过多次尝试定位出只有一种case的语句会正常返回结果,即类似“select * from xxx where yyy”这种不会起MapReduce Job的语句,其他“show tables/databases”,“select a from xxx”等语句都返回为空结果集。
调研
Hive jdbc client(底层是thrift client)在提交一条语句的时候会经历如下过程:
1. 构建execute_args对象,里面封装了要执行的语句,发送远程方法名execute和execute_args对象,接收execute返回结果,这时候client已经获取了column names和column types信息。
public void send_execute(String query) throws org.apache.thrift.TException { execute_args args = new execute_args(); args.setQuery(query); sendBase("execute", args); }server端由HiveServerHandler来处理,对应的execute方法会new一个driver,调用driver.run(cmd)来执行具体的语句
2.
多次发送远程方法名fetchN和最大返回记录数numRows,返回的结果集会放在List<String> fetchedRows中。比如一共要返回90条record,每次fetchN最多返回50条,则一共调用了两次fetchN
public void send_fetchN(int numRows) throws org.apache.thrift.TException { fetchN_args args = new fetchN_args(); args.setNumRows(numRows); sendBase("fetchN", args); }server端HiveServerHandler中的fetchN会调用driver.getResult(), 由QueryPlan的FetchTask中的FetchOperator获取存放结果集的文件路径,得到InputFormat信息,有了InputFormat就可以调用getSplits方法获取一串InputSplit,依次获取每一个InputSplit的RecordReader,迭代next获取key/value值(value代表每行结果record)
3.
HiveConnection close,发送clean远程方法名
public void send_clean() throws org.apache.thrift.TException { clean_args args = new clean_args(); sendBase("clean", args); }server端执行clean方法,close掉driver并对context做一些清理工作,删除语句产生的scratch directories (local file system和hdfs上的都会清除)
Context.java的removeScratchDir方法
private void removeScratchDir() { for (Map.Entry<String, String> entry : fsScratchDirs.entrySet()) { try { Path p = new Path(entry.getValue()); p.getFileSystem(conf).delete(p, true); } catch (Exception e) { LOG.warn("Error Removing Scratch: " + StringUtils.stringifyException(e)); } } fsScratchDirs.clear(); }
具体定位语句可以分为三种情况:
1. 针对”select * from xxx“这种不起MR Job的语句,server端是直接通过MetastoreClient拿到了表对应hdfs的存放路径用FetchTask读取出来的。这边有点要注意的是hive
0.11中新增加了一个配置项”hive.fetch.task.conversion“,由jira HIVE-887引入,默认值是minimal,此外可以设置成more,minimal模式下对于”SELECT
STAR, FILTER on partition columns, LIMIT only“不会起MR Job,more模式下对于”SELECT, FILTER, LIMIT only (TABLESAMPLE, virtual columns)“这种没有子查询,聚合操作和distinct的语句也不会起MR
Job,大大降低了query latency,观察实现代码,其实它是将TableScanOperator, FilterOperator, SelectOperator作为FetchOperator的子Operator将数据拿到client端(即hive
server端)来做filter或者projection
FetchOperator中保存了Operator Tree信息,类似深度遍历调用operator.process()方法。
FetchOperator.java
public boolean pushRow() throws IOException, HiveException { InspectableObject row = getNextRow(); if (row != null) { operator.process(row.o, 0); } return row != null; }比如对于语句”select c1 from abc where c1 = 1;“,会依次调用Fetch Operator -> TableScanOperator -> FilterOperator -> SelectOperator -> ListSinkOperator
2. 类似”show tables/databases“这种DDL/DML语句,这种语句会先在本地FileSystem上创建一个scratch目录(由hive.exec.local.scratchdir配置),将计算结果写到本地scratch目录下,再通过FetchTask读取
尝试多次后发现第一种类型的语句能返回结果,第二种第三种类型语句返回为空集合,而两者区别就在于是直接读取原表数据路径还是从scratch目录中读取。
HIVE在Compile环节会设置环境Context,创建local/hdfs scratch目录。在0.10版本之前,会存在一个问题,如果用户强制kill掉正在执行的语句,那么这些scratch dir就变成orphaned dir,未被清理。HIVE在0.10中加入了HIVE-3251来解决这个问题。
Driver中设置Context的HDFSCleanUp为true
command = new VariableSubstitution().substitute(conf,command); ctx = new Context(conf); ctx.setTryCount(getTryCount()); ctx.setCmd(command); ctx.setHDFSCleanup(true);
private String getScratchDir(String scheme, String authority, boolean mkdir, String scratchDir) { String fileSystem = scheme + ":" + authority; String dir = fsScratchDirs.get(fileSystem); if (dir == null) { Path dirPath = new Path(scheme, authority, scratchDir); if (mkdir) { try { FileSystem fs = dirPath.getFileSystem(conf); dirPath = new Path(fs.makeQualified(dirPath).toString()); if (!fs.mkdirs(dirPath)) { throw new RuntimeException("Cannot make directory: " + dirPath.toString()); } if (isHDFSCleanup) { fs.deleteOnExit(dirPath); } } catch (IOException e) { throw new RuntimeException (e); } } dir = dirPath.toString(); fsScratchDirs.put(fileSystem, dir); }
filesystem close的时候会先删除所有mark为deleteOnExit的files
public void close() throws IOException { // delete all files that were marked as delete-on-exit. processDeleteOnExit(); CACHE.remove(this.key, this); }
同时我们知道FileSystem抽象类内部有个静态final成员变量Cache,以schema, authority, ugi, unique的组合为key缓存了filesystem instance,内部还有一个ClientFinalizer对象(实现了Runnable),注册到JVM的shutdown hook中,在JVM关闭的时候,会启动ClientFinalizer线程,依次关闭所有Cache中的filesystem,通过这种方式来清理删除与filesystem挂钩的资源文件,在Hive中这些挂钩的文件就是local/hdfs scratch dir
private class ClientFinalizer implements Runnable { @Override public synchronized void run() { try { closeAll(true); } catch (IOException e) { LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e); } } }
回到前面的问题,第二第三种类型语句通过在hive server执行端execute方法中打log能够确认数据是已经dump到scratch目录下了,但是在一进入fetchN方法的时候,发现这些文件就莫名奇妙消失了,从而导致读出来的是空数据。排查了很久发现是由于HIVE 0.10中引入了JIRA HIVE-3098(解决FileSystem Cache内存泄露的问题)所引起的。
每一个function call都会由一个HadoopThriftAuthBridge20S中的TUGIAssumingProcessor来处理,在process方法中会先创建一个proxyUser UGI,用clientUgi.doAs来执行具体的逻辑,这样daemon user会impersonate成client user,具体逻辑代码里面如果要创建一个filesystem对象,会通过UserGroupInformation.getCurrentUser()(即clientUgi)来作为FileSystem Cache
Key的一部分加入Cache中。
HIVE-3098增加了process方法的finally中清除clientUGI在FileSystem.Cache中对应的filesystem
instance
finally { if (clientUgi != null) { // 清除与此clientUgi相关的filesystem try { FileSystem.closeAllForUGI(clientUgi); } catch(IOException exception) { LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); } }
正是由于第一个execute方法在finally中调用FileSystem.closeAllForUGI(clientUgi),close掉相关filesystem对象,同时也删除了绑定的scratch目录,第二个fetchN方法才没有数据可读。但是为什么同样实现了kerberos认证和impersonation的hive server 2没有碰到这个问题呢? 其实hive server 2在开启impersonation(set hive.server2.enable.doAs=true)后并不是在thrift processor level而是在hive session level做impersonation的,从而不会在process finally中清理filesystem
// hive server 2中useProxy = false; if (useProxy) { clientUgi = UserGroupInformation.createProxyUser( endUser, UserGroupInformation.getLoginUser()); remoteUser.set(clientUgi.getShortUserName()); returnCode = clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() { public Boolean run() { try { return wrapped.process(inProt, outProt); } catch (TException te) { throw new RuntimeException(te); } } }); } else { remoteUser.set(endUser); return wrapped.process(inProt, outProt); }
在HiveSessionProxy(代理HiveSessionImplwithUGI)中用ugi doAs执行
public Object invoke(Object arg0, final Method method, final Object[] args) throws Throwable { try { return ShimLoader.getHadoopShims().doAs(ugi, new PrivilegedExceptionAction<Object> () { @Override public Object run() throws HiveSQLException { try { return method.invoke(base, args); } catch (InvocationTargetException e) { if (e.getCause() instanceof HiveSQLException) { throw (HiveSQLException)e.getCause(); } else { throw new RuntimeException(e.getCause()); } } catch (IllegalArgumentException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }); } catch (UndeclaredThrowableException e) { Throwable innerException = e.getCause(); if (innerException instanceof PrivilegedActionException) { throw innerException.getCause(); } else { throw e.getCause(); } } }
client端调用HiveConnection.close后,最终server端会调用HiveSessionImplwithUGI.close();关闭UGI相对应的filesystem对象
public void close() throws HiveSQLException { try { acquire(); ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi); cancelDelegationToken(); } finally { release(); super.close(); } }
解决方法
了解实现原理后,解决的方式有三种:
1. 启动Hive Server的时候关闭FileSystem Cache
$HIVE_HOME/bin/hive --service hiveserver2 --hiveconf fs.hdfs.impl.disable.cache=true --hiveconf fs.file.impl.disable.cache=true2. Hive Context中设置setHDFSCleanup(false),从而不会自动清除scratch目录,但是会有orphaned files问题,需要另外部署一个定时脚本去主动删除
3. thrift processor中根据每个function call的返回值来判断是否close filesystem,并且在最后connection close的时候,主动close filesystem
我们最终采用了第三种方案:
对hive改动如下https://github.com/lalaguozhe/hive/commit/b32eeee2498b679d3792f06640dd0a187cf506d1
将clientUgi.doAs返回的结果保存下来,在finally环节判断如果返回值为false,也就是执行结果fail的时候可以closeAllForUGI
finally { if (!returnCode) { if (clientUgi != null) { LOG.info("Start to close filesystem for clientUgi:" + clientUgi.getUserName()); try { FileSystem.closeAllForUGI(clientUgi); } catch(IOException exception) { LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); } } } }
同时在HiveServerHandler的clean方法中(即关闭一个Hive Coonection的时候)加入对于filesystem清理的逻辑
public void clean() { if (driver != null) { driver.close(); driver.destroy(); } SessionState session = SessionState.get(); if (session.getTmpOutputFile() != null) { session.getTmpOutputFile().delete(); } pipeIn = null; try { LOG.info("Start to close filesystem for ugi:" + UserGroupInformation.getCurrentUser().getUserName()); ShimLoader.getHadoopShims().closeAllForUGI(UserGroupInformation.getCurrentUser()); } catch (IOException ioe) { ioe.printStackTrace(); } }
修改上述代码后重新编译,之前三种case语句都能正常返回结果了,就这个问题折腾了一天,hive的bug不是一般的多啊,所以时不时会踩到坑,不过在发现问题到debug再到解决问题的过程中也学习到了很多。
本文链接http://blog.****.net/lalaguozhe/article/details/12969379,转载请注明