Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1212, Part II] Support sparse data in MLlib #245

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3f346ba
update some ml algorithms to use Vector
mengxr Mar 26, 2014
d7f629f
fix a bug in GLM when intercept is not used
mengxr Mar 26, 2014
0e57aa4
update Lasso and RidgeRegression to parse the weights correctly from GLM
mengxr Mar 26, 2014
135ab72
merge glm
mengxr Mar 26, 2014
834ada2
optimized MLUtils.computeStats
mengxr Mar 26, 2014
1859701
passed compile
mengxr Mar 26, 2014
75c83a4
passed test compile
mengxr Mar 26, 2014
befa592
passed scala/java tests
mengxr Mar 27, 2014
db808a1
update JavaLR example
mengxr Mar 27, 2014
e981396
use axpy in Updater
mengxr Mar 27, 2014
44733e1
use in-place gradient computation
mengxr Mar 27, 2014
f0fe616
add a test for sparse linear regression
mengxr Mar 27, 2014
78c4671
add libSVMFile to MLContext
mengxr Mar 27, 2014
b11659c
style update
mengxr Mar 27, 2014
0f8759b
minor updates to NB
mengxr Mar 30, 2014
3432e84
update NaiveBayes to support sparse data
mengxr Mar 31, 2014
6f59eed
update libSVMFile to determine number of features automatically
mengxr Mar 31, 2014
d088552
use static constructor for MLContext
mengxr Mar 31, 2014
f04fe8a
remove normalization from RidgeRegression and update tests
mengxr Mar 31, 2014
4ca5b1b
remove normalization from Lasso and update tests
mengxr Mar 31, 2014
4addc50
merge master
mengxr Mar 31, 2014
b01df54
allow to change or clear threshold in LR and SVM
mengxr Mar 31, 2014
b9b7ef7
change default value of addIntercept to false
mengxr Mar 31, 2014
7c1bc01
add a TODO to NB
mengxr Mar 31, 2014
493f26f
Merge branch 'master' into vector
mengxr Mar 31, 2014
da25e24
revert the change to default addIntercept because it might change the…
mengxr Mar 31, 2014
f7da54b
add minSplits to libSVMFile
mengxr Apr 1, 2014
11999c7
Merge branch 'master' into vector
mengxr Apr 2, 2014
c26c4fc
update DecisionTree to use RDD[Vector]
mengxr Apr 2, 2014
eb6e793
move libSVMFile to MLUtils and rename to loadLibSVMData
mengxr Apr 2, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.mllib.examples;

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

import java.util.Arrays;
import java.util.regex.Pattern;

/**
* Logistic regression based classification using ML Lib.
*/
Expand All @@ -47,14 +46,10 @@ public LabeledPoint call(String line) {
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, x);
return new LabeledPoint(y, Vectors.dense(x));
}
}

public static void printWeights(double[] a) {
System.out.println(Arrays.toString(a));
}

public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
Expand All @@ -80,8 +75,7 @@ public static void main(String[] args) {
LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
iterations, stepSize);

System.out.print("Final w: ");
printWeights(model.weights());
System.out.print("Final w: " + model.weights());

System.exit(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,74 +114,125 @@ class PythonMLLibAPI extends Serializable {
java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights))
ret.add(serializeDoubleVector(model.weights.toArray))
ret.add(model.intercept: java.lang.Double)
ret
}

/**
* Java stub for Python mllib LinearRegressionWithSGD.train()
*/
def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
def trainLinearRegressionModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LinearRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LinearRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib LassoWithSGD.train()
*/
def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainLassoModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LassoWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LassoWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib RidgeRegressionWithSGD.train()
*/
def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainRidgeModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
RidgeRegressionWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib SVMWithSGD.train()
*/
def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainSVMModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
SVMWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
SVMWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib LogisticRegressionWithSGD.train()
*/
def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
def trainLogisticRegressionModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LogisticRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LogisticRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
Expand All @@ -192,7 +243,7 @@ class PythonMLLibAPI extends Serializable {
{
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.classification

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vector

trait ClassificationModel extends Serializable {
/**
Expand All @@ -26,13 +27,13 @@ trait ClassificationModel extends Serializable {
* @param testData RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Array[Double]]): RDD[Double]
def predict(testData: RDD[Vector]): RDD[Double]

/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Int prediction from the trained model
*/
def predict(testData: Array[Double]): Double
def predict(testData: Vector): Double
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.DataValidators

import org.jblas.DoubleMatrix
import org.apache.spark.mllib.linalg.Vector

/**
* Classification model trained using Logistic Regression.
Expand All @@ -35,14 +34,14 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class LogisticRegressionModel(
override val weights: Array[Double],
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with ClassificationModel with Serializable {

override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
round(1.0/ (1.0 + math.exp(margin * -1)))
}
}
Expand Down Expand Up @@ -73,7 +72,7 @@ class LogisticRegressionWithSGD private (
*/
def this() = this(1.0, 100, 0.0, 1.0)

def createModel(weights: Array[Double], intercept: Double) = {
def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
}
Expand Down Expand Up @@ -105,11 +104,9 @@ object LogisticRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
input, initialWeights)
initialWeights: Vector): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
.run(input, initialWeights)
}

/**
Expand All @@ -128,11 +125,9 @@ object LogisticRegressionWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
input)
miniBatchFraction: Double): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
.run(input)
}

/**
Expand All @@ -150,9 +145,7 @@ object LogisticRegressionWithSGD {
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
{
stepSize: Double): LogisticRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}

Expand All @@ -168,9 +161,7 @@ object LogisticRegressionWithSGD {
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int)
: LogisticRegressionModel =
{
numIterations: Int): LogisticRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}

Expand All @@ -183,7 +174,7 @@ object LogisticRegressionWithSGD {
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Weights: " + model.weights)
println("Intercept: " + model.intercept)

sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vector

/**
* Model for Naive Bayes Classifiers.
Expand All @@ -39,9 +40,11 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
private val _theta = new DoubleMatrix(theta)

def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)

def predict(testData: Array[Double]): Double = {
override def predict(testData: Vector): Double = predict(testData.toArray)

private def predict(testData: Array[Double]): Double = {
val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
val result = _pi.add(_theta.mmul(dataMatrix))
result.argmax()
Expand Down Expand Up @@ -70,17 +73,26 @@ class NaiveBayes private (var lambda: Double)
/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
* @param data RDD of (label, array of features) pairs.
* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
def run(data: RDD[LabeledPoint]) = {
runRaw(data.map(v => (v.label, v.features.toArray)))
}

/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
* @param data RDD of (label, array of features) pairs.
*/
private def runRaw(data: RDD[(Double, Array[Double])]) = {
// Aggregates all sample points to driver side to get sample count and summed feature vector
// for each label. The shape of `zeroCombiner` & `aggregated` is:
//
// label: Int -> (count: Int, featuresSum: DoubleMatrix)
val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
point match {
case LabeledPoint(label, features) =>
case (label, features) =>
val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
val fs = new DoubleMatrix(features.length, 1, features: _*)
combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
Expand Down
Loading