mahout源码分析之DistributedLanczosSolver(7)总结篇

mahout源码分析之DistributedLanczosSolver(七)总结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了降维的目的。

本篇介绍一个可以直接使用svd工具类,可以在http://download.csdn.net/detail/fansy1990/6479451下载;

下载后一共有三个文件,其中一个是synthetic_control.data数据文件,一个svd.jar文件,一个crunch-0.5.0-incubating.jar文件(要放在云平台的lib下面);

运行方式:1)把crunch-0.5.0-incubating.jar放在hadoop 的/lib下面,然后重启集群;

2) 上传synthetic_control.data文件到HDFS;

3)运行svd.jar,参考下面的指令:

~/hadoop-1.0.4/bin$ ./hadoop jar ../../mahout_jar/svd.jar mahout.fansy.svd.SvdRunner -i /svd/input/ -o /svd/output1 -nr 600 -nc 60 -r 3 -sym square --cleansvd true --tempDir /svd/temp
下面给出设计思路即源文件代码:

1.刚开始由于数据是文本,所以要先把数据转为VectorWritable格式,作为临时输入文件prepareVector;

2. 针对转换后的数据,调用svd算法进行计算,得到eigenVectors;

3. 针对prepareVector和eigenVectors计算最后转换后的结果transVector;

算法最后产生的文件在:output/transformedVector中;

主类:

package mahout.fansy.svd;

import java.io.IOException;
import java.util.List;
import java.util.Map;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * SVD runner use to run svd algorithm on input like:<br>
 * 1.1,2.4,1,4.3,...<br>
 * ...<br>
 * and reduce dimension<br>
 * There are three jobs in this section:<br>
 * (1) prepare the text input to vectors which the second job needed;<br>
 * (2) do the DistributedLanczosSolver job ,which is the same as in mahout;<br>
 * (3) dimension reduction : transform the input to the reduced output;<br>
 * 
 * @author fansy
 *
 */
public class SvdRunner extends AbstractJob {

	private static final String PREPAREVECTORPATH="/prepareVector";
	private static final String TRANSFORMEDVECTOR="/transformedVector";
	private Map<String, List<String>> parsedArgs;
	
	private static final Logger log = LoggerFactory.getLogger(SvdRunner.class);
	@Override
	public int run(String[] args) throws Exception {
		if(prepareArgs(args)!=0){
			return -1;
		}
		
		/*
		 *  prepare vectors job
		 */
		log.info("prepare Vector job begins...");
		String inputPath = AbstractJob.getOption(parsedArgs, "--input");
	    String outputPath =AbstractJob.getOption(parsedArgs, "--tempDir")+SvdRunner.PREPAREVECTORPATH ;
	    String regex=",";
	    if(AbstractJob.getOption(parsedArgs, "--splitterPattern")!=null){
	    	regex=AbstractJob.getOption(parsedArgs, "--splitterPattern");
	    }
	    String column=AbstractJob.getOption(parsedArgs, "--numCols");
	    String[] job1Args=new String[]{"-i",inputPath,"-o",outputPath,"-regex",regex,"-nc",column};
	    int job1Result=ToolRunner.run(getConf(), new PrePareSvdVector(), job1Args);
	    if(job1Result!=0){
	    	return -1;
	    }
		
	    log.info("svd algorithm job begins...");
	    // replace the input
	    for(int i=0;i<args.length;i++){
	    	if(args[i].equals("-i")||args[i].equals("--input")){
	    		args[i+1]=AbstractJob.getOption(parsedArgs, "--tempDir")+SvdRunner.PREPAREVECTORPATH ;
	    		break;
	    	}
	    }
	    int job2Result=ToolRunner.run(new DistributedLanczosSolver().job(), args);
	    if(job2Result!=0){
	    	return -1;
	    }
	    
	    log.info("transform job begins...");
	    inputPath=outputPath;
	    outputPath=AbstractJob.getOption(parsedArgs, "--output")+SvdRunner.TRANSFORMEDVECTOR;
	    String eigenPath=AbstractJob.getOption(parsedArgs, "--output")+"/"+EigenVerificationJob.CLEAN_EIGENVECTORS;
	    String[] job3Args=new String[]{"-i",inputPath,"-o",outputPath,"-nc",column,"-e",eigenPath};
	    int job3Result=ToolRunner.run(getConf(), new SvdReductionTranform(), job3Args);
	    if(job3Result!=0){
	    	return -1;
	    }
		return 0;
	}

	
	/**
	 * prepare arguments
	 * @param args: input arguments
	 * @return 0 if nothing wrong;
	 * @throws IOException
	 */
	private int prepareArgs(String[] args) throws IOException{
		addInputOption();
		addOutputOption();
		addOption("numRows", "nr", "Number of rows of the input matrix");
		addOption("numCols", "nc", "Number of columns of the input matrix");
		addOption("rank", "r", "Desired decomposition rank (note: only roughly 1/4 to 1/3 "
				+ "of these will have the top portion of the spectrum)");
		addOption("symmetric", "sym", "Is the input matrix square and symmetric?");
		addOption("workingDir", "wd", "Working directory path to store Lanczos basis vectors "
                                    + "(to be used on restarts, and to avoid too much RAM usage)");
		// options required to run cleansvd job
		addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the eigenvectors after SVD", false);
		addOption("maxError", "err", "Maximum acceptable error", "0.05");
		addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0");
		addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false");

		addOption("splitterPattern", "regex", "the char used to split the input text   Default Value:"
	            + " \",\" ",false);
	    this.parsedArgs = parseArguments(args);
	    if (this.parsedArgs == null) {
	    	return -1;
	    } else {
	    	return 0;
	    }
	}
	
	/**
	 * SvdRunner main  
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(),new SvdRunner(), args);
	}
}
初始数据转换:

package mahout.fansy.svd;

import java.io.IOException;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
/**
 * 
 * prepare the input to vector which the svd algorithm needed
 * @author fansy
 *
 */
public class PrePareSvdVector extends AbstractJob {
	private final static String VECTORREGEX="vectorRegex";
	private final static String VECTORCOLUMN="vectorColumn";
	
	@Override
	public int run(String[] args) throws Exception {
		addInputOption();
	    addOutputOption();
	    addOption("numCols", "nc", "Number of columns of the input matrix");
	    addOption("splitterPattern", "regex", "the char used to split the input text   Default Value:"
	            + " \",\" ",false);
	    
	    if (parseArguments(args) == null) {
	        return -1;
	      }
	    Path input=getInputPath();
	    Path output=getOutputPath();
	    String regex=getOption("splitterPattern");
	    String column=getOption("numCols");
	    
	    Configuration conf=new Configuration(getConf() != null ? getConf() : new Configuration());
	    conf.set(VECTORREGEX, regex);
	    try{
	    	int col=Integer.parseInt(column);
	    	conf.setInt(VECTORCOLUMN, col);
	    }catch(Exception e){
	    	return -2;  // format exception:-2
	    }
	    HadoopUtil.delete(conf, output);  // delete output
	    
	    Job job=new Job(conf,"prepare svd vector from "+input.toUri());
	    job.setJarByClass(PrePareSvdVector.class);
	    job.setOutputFormatClass(SequenceFileOutputFormat.class);
	    job.setInputFormatClass(TextInputFormat.class);
	    
	    job.setOutputKeyClass(LongWritable.class);
	    job.setOutputValueClass(VectorWritable.class);
	    
	    FileInputFormat.addInputPath(job, input);
	    SequenceFileOutputFormat.setOutputPath(job, output);
	    
	    job.setMapperClass(PrePareMapper.class);
	    job.setNumReduceTasks(0);
	    
	    boolean succeeded = job.waitForCompletion(true);
	    if (!succeeded) {
	      throw new IllegalStateException("Job failed!");
	    }
	    
		return 0;
	}
	public static class PrePareMapper extends Mapper<LongWritable,Text,LongWritable,VectorWritable>{
		Pattern pattern;
		int column;
		@Override
		public void setup(Context cxt){
			String regex=cxt.getConfiguration().get(VECTORREGEX);
			pattern=Pattern.compile(regex, 0);
			column=cxt.getConfiguration().getInt(VECTORCOLUMN, -1);
		}
		 @Override
		 public void map(LongWritable key,Text value,Context cxt) throws IOException, InterruptedException{
			 String[] values=pattern.split(value.toString());
			 int len=values.length;
			 if(column!=len){  // arguments wrong
				 return ; 
			 }
			 Vector v=new DenseVector(column);
			 for(int i=0;i<len;i++){
				 try{
					 v.setQuick(i, Double.parseDouble(values[i]));
				 }catch(Exception e){
					 return;
				 }
			 }
			 VectorWritable vector=new VectorWritable(v);
			 cxt.write(key, vector);
		 }
	}
	public static void main(String[] args) throws Exception{
		ToolRunner.run(new Configuration(), new PrePareSvdVector(), args);
	}
}

计算最后结果类:

package mahout.fansy.svd;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import mahout.fansy.utils.read.ReadArbiKV;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

/**
 * Dimension Reduction<br>
 * 
 * the last job to transform the input to the right one
 * @author fansy
 *
 */
public class SvdReductionTranform extends AbstractJob {
	private final static String 	EIGENPATH="/eigenPath";
	private final static String VECTORCOLUMN="vectorColumn";
	private static final Logger log = LoggerFactory.getLogger(SvdReductionTranform.class);
	@Override
	public int run(String[] args) throws Exception {
		addInputOption();
	    addOutputOption();
	    addOption("numCols", "nc", "Number of columns of the input matrix");
	    addOption("eigenPath","e","eigen vectors path");
	    
	    if (parseArguments(args) == null) {
	        return -1;
	      }
	    Path input=getInputPath();
	    Path output=getOutputPath();
	    String eigenPath=getOption("eigenPath");
	    String column=getOption("numCols");
	    
	    Configuration conf=new Configuration(getConf() != null ? getConf() : new Configuration());
	    conf.set(EIGENPATH, eigenPath);
	    try{
	    	int col=Integer.parseInt(column);
	    	conf.setInt(VECTORCOLUMN, col);
	    }catch(Exception e){
	    	return -2;  // format exception:-2
	    }
	    
	    log.info("delete file "+output);
	    HadoopUtil.delete(conf, output);  // delete output
	    
	    Job job=new Job(conf,"prepare svd vector from "+input.toUri());
	    job.setJarByClass(SvdReductionTranform.class);
	    job.setInputFormatClass(SequenceFileInputFormat.class);
	    job.setOutputFormatClass(SequenceFileOutputFormat.class);
	    
	    job.setOutputKeyClass(NullWritable.class);
	    job.setOutputValueClass(VectorWritable.class);
	    
	    SequenceFileInputFormat.addInputPath(job, input);
	    SequenceFileOutputFormat.setOutputPath(job, output);
	    
	    job.setMapperClass(TransMapper.class);
	    job.setNumReduceTasks(0);
	    
	    boolean succeeded = job.waitForCompletion(true);
	    if (!succeeded) {
	      throw new IllegalStateException("Job failed!");
	    }
		return 0;
	}
	public static class TransMapper extends Mapper<LongWritable,VectorWritable,NullWritable,VectorWritable>{
		List<Vector> list=Lists.newArrayList();
		int column;
		int transCol;
		@Override
		public void setup(Context cxt) throws IOException{
			log.info("in the first row in setup()");
			column=cxt.getConfiguration().getInt(VECTORCOLUMN, -1);
			String eigenPath=cxt.getConfiguration().get(EIGENPATH);
			log.info("eigenPath:"+eigenPath);
			log.info("cxt.getConfiguration().get(\"mapred.job.tracker\")"+cxt.getConfiguration().get("mapred.job.tracker"));
			Map<Writable,Writable> eigenMap=null;
			try {
				eigenMap=ReadArbiKV.readFromFile(eigenPath,cxt.getConfiguration().get("mapred.job.tracker"));
			} catch (Exception e) {
				log.info("读取不到数据?");
			//	e.printStackTrace();
			}
			Iterator<Entry<Writable, Writable>> eigenIter=eigenMap.entrySet().iterator();
			// initial eigen vectors
			while(eigenIter.hasNext()){
				Map.Entry<Writable, Writable> set=eigenIter.next();
				VectorWritable eigenV=(VectorWritable) set.getValue();
				if(eigenV.get().size()==column){
					list.add(eigenV.get());	
				}
			}
			log.info("the last row in setup()"+list.size());
			transCol=list.size();
		}
		 @Override
		 public void map(LongWritable key,VectorWritable value,Context cxt) throws IOException, InterruptedException{
			
			 Vector transVector=new DenseVector(transCol);
			 for(int i=0;i<transCol;i++){
				 double d=value.get().dot(list.get(i));  //  dot multiply
				 transVector.setQuick(i, d);
			 }
			 VectorWritable vector=new VectorWritable(transVector);
			 cxt.write(NullWritable.get(), vector);
		 }
	}
	
	public static void main(String[] args) throws Exception{
		ToolRunner.run(new Configuration(), new SvdReductionTranform(), args);
	}
}


最后,总结一点就是rank的值,应该可以设置为小于原始数据列数且接近这个值,这样就应该会得到比较好的结果。因为最后得到的eigenVectors的行数就是最后降维的列数,而eigenVectors的行数是一个不大于rank的整型值,所以说rank的值要设置好才行;


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990