hive auto 地图 join
hive auto map join
旧的mapjoin实现,0.6及其以前的版本
新的mapjoin实现,0.7版本,HIVE-1641、HIVE-1754 将小表加载到分布式缓存
automapjoin依赖与新的mapjoin实现
https://issues.apache.org/jira/browse/HIVE-1642
Hiveconf:
hive.auto.convert.join
SemanticAnalyzer.genMapRedTasks(QB qb){
PhysicalContext physicalContext = new PhysicalContext(conf,
getParseContext(), ctx, rootTasks, fetchTask);
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
physicalContext, conf);
physicalOptimizer.optimize();
}
PhysicalOptimizer:
private void initialize(HiveConf hiveConf) {
resolvers = new ArrayList<PhysicalPlanResolver>();
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
resolvers.add(new SkewJoinResolver());
}
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
resolvers.add(new CommonJoinResolver());
}
resolvers.add(new MapJoinResolver());
}
MapJoinResolver:
遍历所有的task,如果这个task有一个MapredLocalWork,那么创建一个MapredLocalTask运行这
个MapredLocalWork,前后task的依赖关系也相应的建立起来。
public class MapJoinResolver implements PhysicalPlanResolver {
class LocalMapJoinTaskDispatcher implements Dispatcher {
if(localwork != null) {
private void processCurrentTask(Task<? extends Serializable> currTask,
ConditionalTask conditionalTask) throws SemanticException {
// get the context info and set up the shared tmp URI
Context ctx = physicalContext.getContext();
String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
localwork.setTmpFileURI(tmpFileURI);
String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
}
}
}
HashTableSinkOperator:
public void closeOp(boolean abort) throws HiveException {
// get tmp file URI
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
// get the tmp URI path; it will be a hdfs path if not local mode
String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName);
}
MapJoinOperator:
private void loadHashTable() throws HiveException {
}
CommonJoinResolver 是auto map join的入口。
CommonJoinResolver.resolve(PhysicalContext pctx){
CommonJoinTaskDispatcher
}
CommonJoinTaskDispatcher.dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs){
}
HiveConf
//small table file size
HIVESMALLTABLESFILESIZE("hive.smalltable.filesize",25000000L), //25M
ConditionalTask
MapJoinOperator
将每个包含 Map Join 的 Stage 替换并拆分成两个 Stage:
第一个 Stage 是一个包含 HashTableSinkOperator 的 MapredLocalTask,将小表生成 Hash Table 文件,并分发到 Distributed Cache。
第二个 Stage 是一个包含 MapJoinOperator 的 MapredTask,将小表(Hash Table)从Distributed Cache中加载进来,并执行实际的 Map Join 操作。
原先的一个MapredTask -> MapredLocalTask+MapredTask
MapredLocalTask :
MapredLocalTask.execute(){
起子进程运行ExecDriver.main() {}
}
子进程运行
ExecDriver.main(){
MapredLocalTask.executeFromChildJVM(){
运行TableScanOperator+HashTableSinkOperator
HashTableSinkOperator读取小表创建HashTable,写入本地的一个文件。
//HashTableSinkOperator的processOp把数据put到HashMapWrapper。
//HashTableSinkOperator的closeOp把HashMapWrappert Dump到本地的一个文件
}
}
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.processOp(HashTableSinkOperator.java:318)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:330)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.closeOp(HashTableSinkOperator.java:438)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:326)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
MapredTask:
调用return super.execute(driverContext);
try { //把小表存放在本地的文件中,写入hdfs
// propagate the file to distributed cache
MapredLocalWork localwork = work.getMapLocalWork();
if (localwork != null) {
boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local");
if (!localMode) {
Path localPath = new Path(localwork.getTmpFileURI());
Path hdfsPath = new Path(work.getTmpHDFSFileURI());
FileSystem hdfs = hdfsPath.getFileSystem(job);
FileSystem localFS = localPath.getFileSystem(job);
FileStatus[] hashtableFiles = localFS.listStatus(localPath);
for(int i =0; i<hashtableFiles.length;i++){
FileStatus file = hashtableFiles[i];
Path path = file.getPath();
String fileName = path.getName();
String hdfsFile = hdfsPath + Path.SEPARATOR + fileName;
LOG.info("Upload 1 HashTable from" +path+" to: "+hdfsFile);
Path hdfsFilePath = new Path(hdfsFile);
hdfs.copyFromLocalFile(path, hdfsFilePath);
short replication = (short) job.getInt("mapred.submit.replication", 10);
hdfs.setReplication(hdfsFilePath, replication);
}
FileStatus[] hashtableRemoteFiles = hdfs.listStatus(hdfsPath);
for(int i =0; i<hashtableRemoteFiles.length;i++){
FileStatus file = hashtableRemoteFiles[i];
Path path = file.getPath();
DistributedCache.addCacheFile(path.toUri(), job);
LOG.info("add 1 hashtable file to distributed cache: " + path.toUri());
}
}
}
提交任务给JobTracker
Map阶段:
MapTask 运行 MapJoinOperator,MapJoinOperator在processOp的时候会loadHashTable
MapTask执行MapJoinOperator时读取小表的栈:
java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:117)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at java.util.HashMap.readObject(HashMap.java:1029)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.initilizePersistentHash(HashMapWrapper.java:127)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:193)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:225)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.genAllOneUniqueJoinObject(CommonJoinOperator.java:732)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.checkAndGenObject(CommonJoinOperator.java:819)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:265)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:402)
at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:141)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.Child.main(Child.java:156)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.RangeCheck(ArrayList.java:547)
at java.util.ArrayList.get(ArrayList.java:322)
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:113)
... 50 more
初始化:
2011-09-01 05:59:00 Starting to launch local task to process map join; maximum memory = 372834304
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.<init>(HashMapWrapper.java:83)
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.initializeOp(HashTableSinkOperator.java:264)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.Operator.initializeOp(Operator.java:375)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.initializeOperators(MapredLocalTask.java:429)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:267)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask
旧的mapjoin实现,0.6及其以前的版本
新的mapjoin实现,0.7版本,HIVE-1641、HIVE-1754 将小表加载到分布式缓存
automapjoin依赖与新的mapjoin实现
https://issues.apache.org/jira/browse/HIVE-1642
Hiveconf:
hive.auto.convert.join
SemanticAnalyzer.genMapRedTasks(QB qb){
PhysicalContext physicalContext = new PhysicalContext(conf,
getParseContext(), ctx, rootTasks, fetchTask);
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
physicalContext, conf);
physicalOptimizer.optimize();
}
PhysicalOptimizer:
private void initialize(HiveConf hiveConf) {
resolvers = new ArrayList<PhysicalPlanResolver>();
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
resolvers.add(new SkewJoinResolver());
}
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
resolvers.add(new CommonJoinResolver());
}
resolvers.add(new MapJoinResolver());
}
MapJoinResolver:
遍历所有的task,如果这个task有一个MapredLocalWork,那么创建一个MapredLocalTask运行这
个MapredLocalWork,前后task的依赖关系也相应的建立起来。
public class MapJoinResolver implements PhysicalPlanResolver {
class LocalMapJoinTaskDispatcher implements Dispatcher {
if(localwork != null) {
private void processCurrentTask(Task<? extends Serializable> currTask,
ConditionalTask conditionalTask) throws SemanticException {
// get the context info and set up the shared tmp URI
Context ctx = physicalContext.getContext();
String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
localwork.setTmpFileURI(tmpFileURI);
String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
}
}
}
HashTableSinkOperator:
public void closeOp(boolean abort) throws HiveException {
// get tmp file URI
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
// get the tmp URI path; it will be a hdfs path if not local mode
String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName);
}
MapJoinOperator:
private void loadHashTable() throws HiveException {
}
CommonJoinResolver 是auto map join的入口。
CommonJoinResolver.resolve(PhysicalContext pctx){
CommonJoinTaskDispatcher
}
CommonJoinTaskDispatcher.dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs){
}
HiveConf
//small table file size
HIVESMALLTABLESFILESIZE("hive.smalltable.filesize",25000000L), //25M
ConditionalTask
MapJoinOperator
将每个包含 Map Join 的 Stage 替换并拆分成两个 Stage:
第一个 Stage 是一个包含 HashTableSinkOperator 的 MapredLocalTask,将小表生成 Hash Table 文件,并分发到 Distributed Cache。
第二个 Stage 是一个包含 MapJoinOperator 的 MapredTask,将小表(Hash Table)从Distributed Cache中加载进来,并执行实际的 Map Join 操作。
原先的一个MapredTask -> MapredLocalTask+MapredTask
MapredLocalTask :
MapredLocalTask.execute(){
起子进程运行ExecDriver.main() {}
}
子进程运行
ExecDriver.main(){
MapredLocalTask.executeFromChildJVM(){
运行TableScanOperator+HashTableSinkOperator
HashTableSinkOperator读取小表创建HashTable,写入本地的一个文件。
//HashTableSinkOperator的processOp把数据put到HashMapWrapper。
//HashTableSinkOperator的closeOp把HashMapWrappert Dump到本地的一个文件
}
}
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.processOp(HashTableSinkOperator.java:318)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:330)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.closeOp(HashTableSinkOperator.java:438)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:326)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
MapredTask:
调用return super.execute(driverContext);
try { //把小表存放在本地的文件中,写入hdfs
// propagate the file to distributed cache
MapredLocalWork localwork = work.getMapLocalWork();
if (localwork != null) {
boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local");
if (!localMode) {
Path localPath = new Path(localwork.getTmpFileURI());
Path hdfsPath = new Path(work.getTmpHDFSFileURI());
FileSystem hdfs = hdfsPath.getFileSystem(job);
FileSystem localFS = localPath.getFileSystem(job);
FileStatus[] hashtableFiles = localFS.listStatus(localPath);
for(int i =0; i<hashtableFiles.length;i++){
FileStatus file = hashtableFiles[i];
Path path = file.getPath();
String fileName = path.getName();
String hdfsFile = hdfsPath + Path.SEPARATOR + fileName;
LOG.info("Upload 1 HashTable from" +path+" to: "+hdfsFile);
Path hdfsFilePath = new Path(hdfsFile);
hdfs.copyFromLocalFile(path, hdfsFilePath);
short replication = (short) job.getInt("mapred.submit.replication", 10);
hdfs.setReplication(hdfsFilePath, replication);
}
FileStatus[] hashtableRemoteFiles = hdfs.listStatus(hdfsPath);
for(int i =0; i<hashtableRemoteFiles.length;i++){
FileStatus file = hashtableRemoteFiles[i];
Path path = file.getPath();
DistributedCache.addCacheFile(path.toUri(), job);
LOG.info("add 1 hashtable file to distributed cache: " + path.toUri());
}
}
}
提交任务给JobTracker
Map阶段:
MapTask 运行 MapJoinOperator,MapJoinOperator在processOp的时候会loadHashTable
MapTask执行MapJoinOperator时读取小表的栈:
java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:117)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at java.util.HashMap.readObject(HashMap.java:1029)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.initilizePersistentHash(HashMapWrapper.java:127)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:193)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:225)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.genAllOneUniqueJoinObject(CommonJoinOperator.java:732)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.checkAndGenObject(CommonJoinOperator.java:819)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:265)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:402)
at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:141)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.Child.main(Child.java:156)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.RangeCheck(ArrayList.java:547)
at java.util.ArrayList.get(ArrayList.java:322)
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:113)
... 50 more
初始化:
2011-09-01 05:59:00 Starting to launch local task to process map join; maximum memory = 372834304
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.<init>(HashMapWrapper.java:83)
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.initializeOp(HashTableSinkOperator.java:264)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.Operator.initializeOp(Operator.java:375)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.initializeOperators(MapredLocalTask.java:429)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:267)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask