Skip to content

Commit

Permalink
passed scala/java tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 27, 2014
1 parent 75c83a4 commit befa592
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ abstract class Gradient extends Serializable {
class LogisticGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val brzData = data.toBreeze
val brzWeights = data.toBreeze
val brzWeights = weights.toBreeze
val margin: Double = -1.0 * brzWeights.dot(brzData)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
val gradient = brzData * gradientMultiplier
Expand All @@ -67,7 +67,7 @@ class LeastSquaresGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val brzData = data.toBreeze
val brzWeights = weights.toBreeze
val diff: Double = brzWeights.dot(brzData) - label
val diff = brzWeights.dot(brzData) - label
val loss = diff * diff
val gradient = brzData * (2.0 * diff)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.mllib.optimization

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
Expand Down Expand Up @@ -92,16 +91,15 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)
}

def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {

val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD

import org.apache.spark.mllib.linalg.Vector

trait Optimizer {
trait Optimizer extends Serializable {

/**
* Solve the provided convex optimization problem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ abstract class Updater extends Serializable {
* Uses a step-size decreasing with the square root of the number of iterations.
*/
class SimpleUpdater extends Updater {
override def compute(weightsOld: Vector, gradient: Vector,
stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize
(Vectors.fromBreeze(brzWeights), 0)
Expand Down Expand Up @@ -101,9 +105,11 @@ class L1Updater extends Updater {
val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize
// Apply proximal operator (soft thresholding)
val shrinkageVal = regParam * thisIterStepSize
(0 until brzWeights.length).foreach { i =>
var i = 0
while (i < brzWeights.length) {
val wi = brzWeights(i)
brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
i += 1
}

(Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]

/** Prepends one to the input vector. */
private def prependOne(vector: Vector): Vector = {
val vectorWithIntercept = vector.toBreeze match {
val vector1 = vector.toBreeze match {
case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv)
case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv)
case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
}
Vectors.fromBreeze(vectorWithIntercept)
Vectors.fromBreeze(vector1)
}

/**
Expand Down Expand Up @@ -151,10 +151,14 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
}

val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
val brzWeightsWithIntercept = weightsWithIntercept.toBreeze
val intercept = if (addIntercept) brzWeightsWithIntercept(0) else 0.0
val brzWeights = if (addIntercept) brzWeightsWithIntercept(1 to -1) else brzWeightsWithIntercept

createModel(Vectors.fromBreeze(brzWeights), intercept)
val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0
val weights =
if (addIntercept) {
Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size))
} else {
weightsWithIntercept
}

createModel(weights, intercept)
}
}
25 changes: 8 additions & 17 deletions mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class LassoWithSGD private (
// We don't want to penalize the intercept, so set this to false.
super.setIntercept(false)

var yMean = 0.0
var xColMean: BV[Double] = _
var xColSd: BV[Double] = _
private var yMean = 0.0
private var xColMean: BV[Double] = _
private var xColSd: BV[Double] = _

/**
* Construct a Lasso object with default parameters
Expand Down Expand Up @@ -141,11 +141,8 @@ object LassoWithSGD {
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector)
: LassoModel =
{
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
initialWeights)
initialWeights: Vector): LassoModel = {
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, initialWeights)
}

/**
Expand All @@ -165,9 +162,7 @@ object LassoWithSGD {
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double)
: LassoModel =
{
miniBatchFraction: Double): LassoModel = {
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}

Expand All @@ -187,9 +182,7 @@ object LassoWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double)
: LassoModel =
{
regParam: Double): LassoModel = {
train(input, numIterations, stepSize, regParam, 1.0)
}

Expand All @@ -205,9 +198,7 @@ object LassoWithSGD {
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int)
: LassoModel =
{
numIterations: Int): LassoModel = {
train(input, numIterations, 1.0, 1.0, 1.0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,9 @@ object LinearRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Vector)
: LinearRegressionModel =
{
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
initialWeights)
initialWeights: Vector): LinearRegressionModel = {
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
.run(input, initialWeights)
}

/**
Expand All @@ -119,9 +117,7 @@ object LinearRegressionWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LinearRegressionModel =
{
miniBatchFraction: Double): LinearRegressionModel = {
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
}

Expand All @@ -139,9 +135,7 @@ object LinearRegressionWithSGD {
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double)
: LinearRegressionModel =
{
stepSize: Double): LinearRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}

Expand All @@ -157,9 +151,7 @@ object LinearRegressionWithSGD {
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int)
: LinearRegressionModel =
{
numIterations: Int): LinearRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,10 @@ object MLUtils {
* xColMean - Row vector with mean for every column (or feature) of the input data
* xColSd - Row vector standard deviation for every column (or feature) of the input data.
*/
def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long)
: (Double, Vector, Vector) = {

def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long): (Double, Vector, Vector) = {
val brzData = data.map { case LabeledPoint(label, features) =>
(label, features.toBreeze)
}

val aggStats = brzData.aggregate(
(0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
)(
Expand All @@ -104,9 +101,10 @@ object MLUtils {
(n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
}
)

val (nl, sumLabel, sum, sumSq) = aggStats

require(nl > 0, "Input data is empty.")
require(nl == numExamples)

val n = nl.toDouble
val yMean = sumLabel / n
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ object GradientDescentSuite {
if (yVal > 0) 1 else 0
}

val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
testData
(0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i))))
}
}

Expand Down Expand Up @@ -86,7 +85,7 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
}

val dataRDD = sc.parallelize(data, 2).cache()
val initialWeightsWithIntercept = Vectors.dense(0.0, initialWeights: _*)
val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*)

val (_, loss) = GradientDescent.runMiniBatchSGD(
dataRDD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class LassoSuite extends FunSuite with LocalSparkContext {
}

test("Lasso local random SGD") {
val nPoints = 10000
val nPoints = 1000

val A = 2.0
val B = -1.5
Expand All @@ -46,7 +46,7 @@ class LassoSuite extends FunSuite with LocalSparkContext {
testRDD.cache()

val ls = new LassoWithSGD()
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)

val model = ls.run(testRDD)
val weight0 = model.weights(0)
Expand All @@ -66,23 +66,23 @@ class LassoSuite extends FunSuite with LocalSparkContext {
}

test("Lasso local random SGD with initial weights") {
val nPoints = 10000
val nPoints = 1000

val A = 2.0
val B = -1.5
val C = 1.0e-2

val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)

val initialB = -1.0
val initialC = -1.0
val initialWeights = Vectors.dense(Array(initialB, initialC))
val initialWeights = Vectors.dense(initialB, initialC)

val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val ls = new LassoWithSGD()
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)

val model = ls.run(testRDD, initialWeights)
val weight0 = model.weights(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNor
squaredDistance => breezeSquaredDistance}

import org.apache.spark.mllib.util.MLUtils._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

class MLUtilsSuite extends FunSuite {
class MLUtilsSuite extends FunSuite with LocalSparkContext {

test("epsilon computation") {
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
Expand All @@ -49,4 +51,16 @@ class MLUtilsSuite extends FunSuite {
assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
}
}

test("compute stats") {
val data = Seq.fill(3)(Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)),
LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0))
)).flatten
val rdd = sc.parallelize(data, 2)
val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6)
assert(meanLabel === 0.5)
assert(mean === Vectors.dense(2.0, 3.0, 4.0))
assert(std === Vectors.dense(1.0, 1.0, 1.0))
}
}

0 comments on commit befa592

Please sign in to comment.