Spark机器学习库MLib分门别类和回归文档V1.4.1(翻译)
Spark MLib分类与回归
MLlib支持各种方法二分类,多分类,和回归分析。下表概述了每个类型的问题支持的算法。
问题类型 |
支持的方法 |
二分类 |
线性支持向量机,Logistic回归,决策树,朴素贝叶斯 |
多类分类 |
决策树,朴素贝叶斯 |
回归 |
线性最小二乘,Lasso,岭回归,决策树 |
这些方法的更多细节可以在这里找到:
· 线性模型
o 二分类(SVM,Logistic回归)
o 线性回归(最小二乘,Lasso,ridge)
· 决策树
· 朴素贝叶斯
MLib线性方法
· 数学公式
o 损失函数
o 正则化因子
o 优化
· 分类
o 线性支持向量机(SVM)
o Logistic回归
· 回归
o 线性最小二乘,Lasso,岭回归
o 流的线性回归
· 实现(开发者)
数学公式
许多标准机器学习方法可以被转换为凸优化问题,即,一个为凸函数 f 找到最小值的任务,这个函数 f 依赖于一个有 d 个值的向量变量 w(代码中的 weights)。更正式点,这是一个 优化问题,其目标函数 f 具有下面形式:
向量 xi∈Rd是训练数据样本,其中 1≤i≤n。 yi∈R 是相应的类标签,也是我们想要预测的目标。如果 L(w;x,y)能被表述为wTx和 y的一个函数,我们称该方法是线性的。有几个MLlib分类和回归算法属于该范畴,我们在此一一讨论。
目标函数f包括两部分:控制模型复杂度的正则化因子和度量模型误差的损失函数。损失函数 L(w;.)是典型关于w的凸函数。事先锁定的正则化参数 λ≥0(代码中的regParam)承载了我们在最小化损失量(训练误差)和最小化模型复杂度(避免过渡拟合)两个目标之间的权衡取舍。
损失函数
下表概述了 MLlib 支持的损失函数及其梯度和子梯度:
|
损失函数 L(w;x,y) |
梯度或子梯度 |
hinge loss |
max{0,1−ywTx},y∈{−1,+1} |
|
logistic loss |
log(1+exp(−ywTx)),y∈{−1,+1} |
|
Squared loss |
|
|
正则化因子
正则化因子的目标是获得简单模型和避免过度拟合。在 MLlib 中,支持下面正则化因子:
|
正则化因子 R(w) |
梯度或子梯度 |
零(未正则化) |
0 |
O |
L2 范数 |
|
W |
L1 范数 |
|
sign(w) |
在这里,sign(w)是一个代表w中所有实体的类标签(signs(±1))的向量。
与 L1 正则化问题比较,由于 L2的平滑性,L2 正则化问题一般较容易解决。但是,由于可以强化权重的稀疏性,L1 正则化更能产生较小的和更容易解释的模型,而后者在特征选择是非常有用的。不正则化而去训练模型是不恰当的,尤其是在训练样本数量较小的时候。
优化
线性方法使用凸优化函数优化目标函数。MLib使用两个方法,SGD和L-BFGS,在优化部分有描述。目前,大多数算法API支持随机梯度下降(SGD),一部分支持L-BFGS。参考优化部分中选择优化方法的指南。
分类
分类的目的是将项分为类别。最常见的分类类型是二元分类,二元分类将数据项划分为两类:正例和反例。如果有不止两个类别,那它就叫做多元分类。MLib支持两种线性分类方法:线性支持向量机(SVMs)和Logistic回归。线性支持向量机只支持二元分类,而Logistic回归同时支持二元和多元的分类问题。对两种方法来说,MLlib 都支持 L1、 L2 正则化。在 MLlib 中,训练数据集用一个 LabeledPoint 格式的 RDD 来表示。其中标签是从0开始的类指标:0,1,2······需要注意,本指南中的数学公式里,约定二元标签 y 为+1(正例)或-1(反例),这对公式来说是方便的。然而,在 MLlib 中,为了与多类标签保持一致,反例标签是 0,而不是-1。
线性支持向量机(SVMs)
对于大规模的分类任务来说,线性支持向量机是标准方法。它是上面“数学公式”一节中所描述的线性方法,其损失函数是 hinge loss:
默认配置下,线性 SVM 使用 L2 正则化训练。我们也支持L1 正则化。通过这种方式,问题变为线性规划问题。线性支持向量机算法的产出是一个SVM模型。给定新数据点X,该模型基于 wTx的值来预测。默认情形下,wTx≥0时为正例,否则为反例。
例子
Scala
下面代码片段演示了如何加载一个简单的数据集、运用算法对象的静态方法执行训练算法、以及运用模型预测来计算训练误差。
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils // Load training data in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc,"data/mllib/sample_libsvm_data.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score =model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC) // Save and load model model.save(sc, "myModelPath") val sameModel = SVMModel.load(sc, "myModelPath")
默认配置下,SVMWithSGD.train()将正则化参数设置为 1.0来进行 L2 正则化。如果我们想配置算法参数,我们可以直接生成一个 SVMWithSGD 对象然后调用 setter 方法。所有其他 MLlib 算法都支持这种自定义化方法。举例来说,下面代码产生了一个用于 SVM 的L1 正则化变量,其正则化参数为 0.1,且迭代次数为 200。
import org.apache.spark.mllib.optimization.L1Updater val svmAlg = new SVMWithSGD() svmAlg.optimizer. setNumIterations(200). setRegParam(0.1). setUpdater(new L1Updater) val modelL1 = svmAlg.run(training)
Java
所有 MLlib 方法都使用 Java 友好的类型,所以您可以像scala 中那样导入和调用。唯一要说明的是那些使用 Scala RDD 对象的方法,因为在 spark 的 java API 中使用的是
JavaRDD 类。对 JavaRDD 对象,您能够通过调用.rdd()方法将其转换为对应的Scala 对象。
与 Scala 示例等效的应用示例如下:
import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.*; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class SVMClassifier { public static voidmain(String[] args) { SparkConf conf = newSparkConf().setAppName("SVM Classifier Example"); SparkContext sc = newSparkContext(conf); String path ="data/mllib/sample_libsvm_data.txt"; JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc,path).toJavaRDD(); // Split initial RDD intotwo... [60% training data, 40% testing data]. JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L); training.cache(); JavaRDD<LabeledPoint> test = data.subtract(training); // Run training algorithmto build the model. int numIterations = 100; final SVMModel model =SVMWithSGD.train(training.rdd(), numIterations); // Clear the defaultthreshold. model.clearThreshold(); // Compute raw scores onthe test set. JavaRDD<Tuple2<Object,Object>> scoreAndLabels = test.map( newFunction<LabeledPoint, Tuple2<Object, Object>>() { publicTuple2<Object, Object> call(LabeledPoint p) { Double score =model.predict(p.features()); return newTuple2<Object, Object>(score, p.label()); } } ); // Get evaluation metrics. BinaryClassificationMetrics metrics = newBinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); double auROC =metrics.areaUnderROC(); System.out.println("Areaunder ROC = " + auROC); // Save and load model model.save(sc,"myModelPath"); SVMModel sameModel =SVMModel.load(sc, "myModelPath"); } }
默认配置下,SVMWithSGD.train()将正则化参数设置为 1.0来进行 L2 正则化。如果我们想配置算法参数,我们可以直接生成一个 SVMWithSGD 对象然后调用 setter 方法。所有其他 MLlib 算法都支持这种客户化方法。举例来说,下面代码产生了一个用于 SVM 的L1 正则化变量,其正则化参数为 0.1,且迭代次数为 200。
import org.apache.spark.mllib.optimization.L1Updater; SVMWithSGD svmAlg = new SVMWithSGD(); svmAlg.optimizer() .setNumIterations(200) .setRegParam(0.1) .setUpdater(newL1Updater()); final SVMModel modelL1 = svmAlg.run(training.rdd());
为了运行上面的程序,请参考Spark快速开始指南中的Self-Contained Applications章节中提供的介绍。务必将spark-mllib 作为编译依赖库。
Python
下面代码片段演示了如何加载一个简单的数据集、运用算法对象的静态方法执行训练算法、以及运用模型预测来计算训练误差。
from pyspark.mllib.classification import SVMWithSGD, SVMModel from pyspark.mllib.regression import LabeledPoint # Load and parse the data def parsePoint(line): values = [float(x) for xin line.split(' ')] returnLabeledPoint(values[0], values[1:]) data = sc.textFile("data/mllib/sample_svm_data.txt") parsedData = data.map(parsePoint) # Build the model model = SVMWithSGD.train(parsedData, iterations=100) # Evaluating the model on training data labelsAndPreds = parsedData.map(lambda p: (p.label,model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /float(parsedData.count()) print("Training Error = " + str(trainErr)) # Save and load model model.save(sc, "myModelPath") sameModel = SVMModel.load(sc, "myModelPath")
逻辑回归
逻辑回归广泛运用于二元因变量预测。它是上面“数学公式”一节中所描述的线性方法,其损失函数是logistic loss:
逻辑回归算法的产出是一个逻辑回归模型。给定新数据点 X,该模型运用下面逻辑函数来预测:
在这里,z=wTx。默认情况下,若 f(wTx)>0.5,输出是正例,否则是反例。与线性支持向量机不同之处在于,线性回归模型 f(z)的输出含有一个概率解释(即,x是正例的概率)。
二元逻辑回归可以推广到多项逻辑回归来训练和预测多类函数分类问题。例如,对于K个可能的输出,输出之一可以被作为一个“枢纽”,剩余的其他K-1个输出可以相对于枢纽输出进行分别回归。在MLib中,第一个0类被选为“枢纽”类。参考统计学习元素的4.4章节。详细的数学推导在这里。
对于多元分类问题,该算法将输出一个多项Logistic回归模型,其中包含K-1个与第一类不同的二元逻辑回归模型。给定一个新的数据点,K-1个模型将会运行,概率最大的类将会被选择为预测的类。
我们实现了两种算法来解决Logistic回归分析,小批量梯度下降和L-BFGS。比起小批量梯度下降我们更推荐L-BFGS,因为它能更快地收敛。
例子
Scala
下面的代码演示了如何加载一个简单的多类数据集,将其分为训练数据集和测试数据集,并使用LogisticRegressionWithLBFGS来适应Logistic回归模型。然后将该模型与测试数据集进行评价,并保存到磁盘。
import org.apache.spark.SparkContext importorg.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS,LogisticRegressionModel} import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils // Load training data in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc,"data/mllib/sample_libsvm_data.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training) // Compute raw scores on the test set. val predictionAndLabels = test.map { case LabeledPoint(label,features) => val prediction =model.predict(features) (prediction, label) } // Get evaluation metrics. val metrics = new MulticlassMetrics(predictionAndLabels) val precision = metrics.precision println("Precision = " + precision) // Save and load model model.save(sc, "myModelPath") val sameModel = LogisticRegressionModel.load(sc,"myModelPath")
Java
下面的代码演示了如何加载一个简单的多类数据集,将其分为训练数据集和测试数据集,并使用LogisticRegressionWithLBFGS来适应Logistic回归模型。然后将该模型与测试数据集进行评价,并保存到磁盘。
import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; importorg.apache.spark.mllib.classification.LogisticRegressionModel; importorg.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class MultinomialLogisticRegressionExample { public static voidmain(String[] args) { SparkConf conf = newSparkConf().setAppName("SVM Classifier Example"); SparkContext sc = newSparkContext(conf); String path ="data/mllib/sample_libsvm_data.txt"; JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); // Split initial RDD intotwo... [60% training data, 40% testing data]. JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.6, 0.4}, 11L); JavaRDD<LabeledPoint> training = splits[0].cache(); JavaRDD<LabeledPoint> test = splits[1]; // Run training algorithmto build the model. finalLogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training.rdd()); // Compute raw scores onthe test set. JavaRDD<Tuple2<Object,Object>> predictionAndLabels = test.map( newFunction<LabeledPoint, Tuple2<Object, Object>>() { publicTuple2<Object, Object> call(LabeledPoint p) { Double prediction =model.predict(p.features()); return new Tuple2<Object,Object>(prediction, p.label()); } } ); // Get evaluation metrics. MulticlassMetrics metrics= new MulticlassMetrics(predictionAndLabels.rdd()); double precision =metrics.precision(); System.out.println("Precision = " + precision); // Save and load model model.save(sc,"myModelPath"); LogisticRegressionModelsameModel = LogisticRegressionModel.load(sc, "myModelPath"); } }
Python
下面的例子展示了如何加载一个简单的数据集,构建Logistic回归模型,以及运用模型预测来计算训练误差。
注意Python API现在还不支持多元分类和模型的save/load,但将来会支持。
from pyspark.mllib.classification import LogisticRegressionWithLBFGS from pyspark.mllib.regression import LabeledPoint from numpy import array # Load and parse the data def parsePoint(line): values = [float(x) for xin line.split(' ')] returnLabeledPoint(values[0], values[1:]) data = sc.textFile("data/mllib/sample_svm_data.txt") parsedData = data.map(parsePoint) # Build the model model = LogisticRegressionWithLBFGS.train(parsedData) # Evaluating the model on training data labelsAndPreds = parsedData.map(lambda p: (p.label,model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /float(parsedData.count()) print("Training Error = " + str(trainErr))
回归
线性最小二乘,Lasso,岭回归
线性最小二乘法是回归问题中最常用的的公式。它是上面“数学公式”一节中所描述的线性方法,其损失函数是平方损失(squared loss):
根据正则化参数类型的不同,将相关算法分为不同回归算法:普通最小二乘或线性最小二乘,不进行正则化;岭回归算法,使用L2正则化;Lasso回归,使用L1正则化。对于所有这些模型,其平均损失或训练误差计算公式为,即著名的均方误差。
例子
Scala
下面示例演示了如何加载训练数据、将其解析为一个LabeledPoint 格式的RDD。然后使用LinearRegressionWithSGD 建立一个用于预测类标签的模型。最后我们计算均方误差来评估拟合度。
importorg.apache.spark.mllib.regression.LabeledPoint importorg.apache.spark.mllib.regression.LinearRegressionModel importorg.apache.spark.mllib.regression.LinearRegressionWithSGD importorg.apache.spark.mllib.linalg.Vectors // Load and parse the data val data =sc.textFile("data/mllib/ridge-data/lpsa.data") val parsedData = data.map { line => valparts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split('').map(_.toDouble))) }.cache() // Building the model val numIterations = 100 val model =LinearRegressionWithSGD.train(parsedData, numIterations) // Evaluate model on training examples andcompute training error val valuesAndPreds = parsedData.map { point=> valprediction = model.predict(point.features) (point.label, prediction) } val MSE = valuesAndPreds.map{case(v, p)=> math.pow((v - p), 2)}.mean() println("training Mean Squared Error =" + MSE) // Save and load model model.save(sc, "myModelPath") val sameModel =LinearRegressionModel.load(sc, "myModelPath")
RidgeRegressionWithSGD 和 LassoWithSGD 的 使 用 方 法 与LinearRegressionWithSGD相似。
Java
所有 MLlib 方法都使用 Java 友好的类型,所以您可以像scala 中那样导入和调用。唯一要说明的是那些使用 Scala RDD 对象的方法,因为在 spark 的 java API 中使用的是JavaRDD 类。对 JavaRDD 对象,您能够通过调用.rdd()方法将其转换为对应的 Scala 对象。与 Scala 示例等效的 Java 示例如下:
import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; import org.apache.spark.mllib.regression.LinearRegressionWithSGD; import org.apache.spark.SparkConf; public class LinearRegression { public static voidmain(String[] args) { SparkConf conf = newSparkConf().setAppName("Linear Regression Example"); JavaSparkContext sc = newJavaSparkContext(conf); // Load and parse the data String path ="data/mllib/ridge-data/lpsa.data"; JavaRDD<String> data= sc.textFile(path); JavaRDD<LabeledPoint> parsedData = data.map( new Function<String,LabeledPoint>() { public LabeledPointcall(String line) { String[] parts =line.split(","); String[] features =parts[1].split(" "); double[] v = newdouble[features.length]; for (int i = 0; i< features.length - 1; i++) v[i] = Double.parseDouble(features[i]); return newLabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); } } ); parsedData.cache(); // Building the model int numIterations = 100; finalLinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); // Evaluate model ontraining examples and compute training error JavaRDD<Tuple2<Double, Double>> valuesAndPreds =parsedData.map( newFunction<LabeledPoint, Tuple2<Double, Double>>() { publicTuple2<Double, Double> call(LabeledPoint point) { double prediction =model.predict(point.features()); return newTuple2<Double, Double>(prediction, point.label()); } } ); double MSE = newJavaDoubleRDD(valuesAndPreds.map( newFunction<Tuple2<Double, Double>, Object>() { public Objectcall(Tuple2<Double, Double> pair) { returnMath.pow(pair._1() - pair._2(), 2.0); } } ).rdd()).mean(); System.out.println("training MeanSquared Error = " + MSE); // Save and load model model.save(sc.sc(),"myModelPath"); LinearRegressionModelsameModel = LinearRegressionModel.load(sc.sc(), "myModelPath"); } }
Python
下面示例演示了如何加载训练数据、将其解析为一个LabeledPoint 格式的RDD。然后使用LinearRegressionWithSGD 建立一个用于预测类标签的模型。最后我们计算均方误差来评估拟合度。
注意Python API现在还不支持模型的save/load,但将来会支持。
from pyspark.mllib.regression import LabeledPoint,LinearRegressionWithSGD from numpy import array # Load and parse the data def parsePoint(line): values = [float(x) for xin line.replace(',', ' ').split(' ')] returnLabeledPoint(values[0], values[1:]) data = sc.textFile("data/mllib/ridge-data/lpsa.data") parsedData = data.map(parsePoint) # Build the model model = LinearRegressionWithSGD.train(parsedData) # Evaluate the model on training data valuesAndPreds = parsedData.map(lambda p: (p.label,model.predict(p.features))) MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x,y: x + y) / valuesAndPreds.count() print("Mean Squared Error = " + str(MSE))
为了运行上面的程序,请参考Spark快速开始指南中提供的Self-ContainedApplications章节。务必将spark-mllib作为编译依赖库。
流的线性回归
当数据以流的形式传入,在线拟合回归模型是有用的,当收到新数据时更新模型参数。MLlib 目前使用普通最小二乘法实现流的线性回归。这种拟合的处理机制与离线方法相似,但其拟合发生于每一数据块到达时之外,目的是为了持续更新以反应流中数据。
例子
下面示例演示了如何从两个文本流中加载训练数据和验证数据、将其解析为LabeledPoint 流、基于第一个流在线拟合线性回归模型、然后在第二个流上进行预测。
Scala
首先,我们导入用来解析输入数据和创建模型的必要的类。
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint importorg.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
然后创建训练集和测试集的输入流。我们假定一个StreamingContext ssc已经被创建,请参见 Spark Streaming Programming Guide 获取更多信息。对这个例子来说,我们在流中使用含类标签的点,但在实际引用中,您更可能使用不含有类标签的数据作为测试集。
val trainingData =ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache() val testData =ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
我们将权重初始化为 0 来创建模型
val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.zeros(numFeatures))
接下来我们注册用于训练和测试的流并开始任务。并打印其正确的类标签结果来观察结果。
model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label,lp.features))).print()
ssc.start() ssc.awaitTermination()
我们现在可以在训练目录和测试目录中存入文本数据来模拟流事件。每一行数据应该是一个(y,[x1,x2,x3])格式的数据点,其中 y 是类标签,而 x1,x2,x3 是特征。每当一个文本文件放入/training/data/dir 目录时模型会更新。每当一个文本文件放入到/testing/data/dir目录时您将观察到预测结果。在训练目录中放入越多数据,预测越好。
实现(开发者)
在具体场景之外,MLlib 还实现了随机梯度下降(stochastic gradient descent (SGD))的一个简单分布式版本,该实现基于底层的梯度下降功能单元(请参见“优化”章节)。所有提 供 的 算 法 接 收 一 个 正 则 化 参 数 (regParam) 和 不 同 的 随 机 梯 度 下 降 相 关 参 数(stepSize,numIterations, miniBatchFraction)作为输入。对每一个算法来说,我们都支持三种可能正则化方法(不正则化,L1 和 L2)。
Scala 中实现了如下算法:
1) SVMWithSGD
2) LogisticRegressionWithLBFGS
3) LogisticRegressionWithSGD
4) LinearRegressionWithSGD
5) RidgeRegressionWithSGD
6) LassoWithSGD
Python 通过 PythonMLLibAPI 来调用 Scala 实现。