Hadoop: How to using two 地图per to do different thing

Hadoop: How to using two mapper to do different thing

In my work, I run a situation that I want to use A mapper reading a file with to fields (questionId, questionTags) and outpute format likes key: questionId  value: questionTags, while B mapper reading a dir which contains many files named by questionId with questionContent as its file content  and output format likes key: questionId/fileName  value: questionContent. Then a reducer do some string operations.

 

The framework above is   

 

A mapper 

                 >   reducer 

B mapper

 

The problem can't be solved by ChainMapper.

I found that the two mapper's output format is the same. So, the other way is to adopt one mapper to read questions dir and tags file.

 

 

 

two problems;

a.

            QuestionTagsWritable e1 = null, e2 = null;
           
            for (QuestionTagsWritable e : values) {
                System.out.println("xx = " + e.toString());
                if (e.isTags) {
                    e1 = e;
                } else {
                    e2 = e;
                }
            }

 solution: e1 = new QuestionTagsWritable(true,tmp.content);   //pass value not address

b.

FileSplit fileSplit = (FileSplit) context.getInputSplit();

 

java.lang.ClassCastException:org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit cannot be cast toorg.apache.hadoop.mapreduce.lib.input.FileSplit

 solution:

InputSplit split = context.getInputSplit();
		    Class<? extends InputSplit> splitClass = split.getClass();

		    FileSplit fileSplit = null;
		    if (splitClass.equals(FileSplit.class)) {
		        fileSplit = (FileSplit) split;
		    } else if (splitClass.getName().equals(
		            "org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
		        // begin reflection hackery...

		        try {
		            Method getInputSplitMethod = splitClass
		                    .getDeclaredMethod("getInputSplit");
		            getInputSplitMethod.setAccessible(true);
		            fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
		        } catch (Exception e) {
		            // wrap and re-throw error
		            throw new IOException(e);
		        }

		        // end reflection hackery
		    }

 see:http://*.com/questions/11130145/hadoop-multipleinputs-fails-with-classcastexception