Skip to content

Commit

Permalink
[SPARK-1212, Part II] Support sparse data in MLlib
Browse files Browse the repository at this point in the history
In PR apache#117, we added dense/sparse vector data model and updated KMeans to support sparse input. This PR is to replace all other `Array[Double]` usage by `Vector` in generalized linear models (GLMs) and Naive Bayes. Major changes:

1. `LabeledPoint` becomes `LabeledPoint(Double, Vector)`.
2. Methods that accept `RDD[Array[Double]]` now accept `RDD[Vector]`. We cannot support both in an elegant way because of type erasure.
3. Mark 'createModel' and 'predictPoint' protected because they are not for end users.
4. Add libSVMFile to MLContext.
5. NaiveBayes can accept arbitrary labels (introducing a breaking change to Python's `NaiveBayesModel`).
6. Gradient computation no longer creates temp vectors.
7. Column normalization and centering are removed from Lasso and Ridge because the operation will densify the data. Simple feature transformation can be done before training.

TODO:
1. ~~Use axpy when possible.~~
2. ~~Optimize Naive Bayes.~~

Author: Xiangrui Meng <meng@databricks.com>

Closes apache#245 from mengxr/vector and squashes the following commits:

eb6e793 [Xiangrui Meng] move libSVMFile to MLUtils and rename to loadLibSVMData
c26c4fc [Xiangrui Meng] update DecisionTree to use RDD[Vector]
11999c7 [Xiangrui Meng] Merge branch 'master' into vector
f7da54b [Xiangrui Meng] add minSplits to libSVMFile
da25e24 [Xiangrui Meng] revert the change to default addIntercept because it might change the behavior of existing code without warning
493f26f [Xiangrui Meng] Merge branch 'master' into vector
7c1bc01 [Xiangrui Meng] add a TODO to NB
b9b7ef7 [Xiangrui Meng] change default value of addIntercept to false
b01df54 [Xiangrui Meng] allow to change or clear threshold in LR and SVM
4addc50 [Xiangrui Meng] merge master
4ca5b1b [Xiangrui Meng] remove normalization from Lasso and update tests
f04fe8a [Xiangrui Meng] remove normalization from RidgeRegression and update tests
d088552 [Xiangrui Meng] use static constructor for MLContext
6f59eed [Xiangrui Meng] update libSVMFile to determine number of features automatically
3432e84 [Xiangrui Meng] update NaiveBayes to support sparse data
0f8759b [Xiangrui Meng] minor updates to NB
b11659c [Xiangrui Meng] style update
78c4671 [Xiangrui Meng] add libSVMFile to MLContext
f0fe616 [Xiangrui Meng] add a test for sparse linear regression
44733e1 [Xiangrui Meng] use in-place gradient computation
e981396 [Xiangrui Meng] use axpy in Updater
db808a1 [Xiangrui Meng] update JavaLR example
befa592 [Xiangrui Meng] passed scala/java tests
75c83a4 [Xiangrui Meng] passed test compile
1859701 [Xiangrui Meng] passed compile
834ada2 [Xiangrui Meng] optimized MLUtils.computeStats update some ml algorithms to use Vector (cont.)
135ab72 [Xiangrui Meng] merge glm
0e57aa4 [Xiangrui Meng] update Lasso and RidgeRegression to parse the weights correctly from GLM mark createModel protected mark predictPoint protected
d7f629f [Xiangrui Meng] fix a bug in GLM when intercept is not used
3f346ba [Xiangrui Meng] update some ml algorithms to use Vector
  • Loading branch information
mengxr authored and mateiz committed Apr 2, 2014
1 parent ed730c9 commit 9c65fa7
Show file tree
Hide file tree
Showing 40 changed files with 926 additions and 591 deletions.
14 changes: 4 additions & 10 deletions examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.mllib.examples;

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

import java.util.Arrays;
import java.util.regex.Pattern;

/**
* Logistic regression based classification using ML Lib.
*/
Expand All @@ -47,14 +46,10 @@ public LabeledPoint call(String line) {
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, x);
return new LabeledPoint(y, Vectors.dense(x));
}
}

public static void printWeights(double[] a) {
System.out.println(Arrays.toString(a));
}

public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
Expand All @@ -80,8 +75,7 @@ public static void main(String[] args) {
LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
iterations, stepSize);

System.out.print("Final w: ");
printWeights(model.weights());
System.out.print("Final w: " + model.weights());

System.exit(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,92 +110,144 @@ class PythonMLLibAPI extends Serializable {

private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
java.util.LinkedList[java.lang.Object] = {
dataBytesJRDD: JavaRDD[Array[Byte]],
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights))
ret.add(serializeDoubleVector(model.weights.toArray))
ret.add(model.intercept: java.lang.Double)
ret
}

/**
* Java stub for Python mllib LinearRegressionWithSGD.train()
*/
def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
def trainLinearRegressionModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LinearRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LinearRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib LassoWithSGD.train()
*/
def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainLassoModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LassoWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LassoWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib RidgeRegressionWithSGD.train()
*/
def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainRidgeModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
RidgeRegressionWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib SVMWithSGD.train()
*/
def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainSVMModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
SVMWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
SVMWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib LogisticRegressionWithSGD.train()
*/
def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
def trainLogisticRegressionModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LogisticRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LogisticRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for NaiveBayes.train()
*/
def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
: java.util.List[java.lang.Object] =
{
def trainNaiveBayes(
dataBytesJRDD: JavaRDD[Array[Byte]],
lambda: Double): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.labels))
ret.add(serializeDoubleVector(model.pi))
ret.add(serializeDoubleMatrix(model.theta))
ret
Expand All @@ -204,9 +256,12 @@ class PythonMLLibAPI extends Serializable {
/**
* Java stub for Python mllib KMeans.train()
*/
def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
maxIterations: Int, runs: Int, initializationMode: String):
java.util.List[java.lang.Object] = {
def trainKMeansModel(
dataBytesJRDD: JavaRDD[Array[Byte]],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
Expand Down Expand Up @@ -259,8 +314,12 @@ class PythonMLLibAPI extends Serializable {
* needs to be taken in the Python code to ensure it gets freed on exit; see
* the Py4J documentation.
*/
def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
def trainALSModel(
ratingsBytesJRDD: JavaRDD[Array[Byte]],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
ALS.train(ratings, rank, iterations, lambda, blocks)
}
Expand All @@ -271,8 +330,13 @@ class PythonMLLibAPI extends Serializable {
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
def trainImplicitALSModel(
ratingsBytesJRDD: JavaRDD[Array[Byte]],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

package org.apache.spark.mllib.classification

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

/**
* Represents a classification model that predicts to which of a set of categories an example
* belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc.
*/
trait ClassificationModel extends Serializable {
/**
* Predict values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
* @return an RDD[Double] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Array[Double]]): RDD[Double]
def predict(testData: RDD[Vector]): RDD[Double]

/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Int prediction from the trained model
* @return predicted category from the trained model
*/
def predict(testData: Array[Double]): Double
def predict(testData: Vector): Double
}
Loading

0 comments on commit 9c65fa7

Please sign in to comment.