Skip to content

Commit

Permalink
addressed feedback and fixed a bug. TODO: documentation and build ano…
Browse files Browse the repository at this point in the history
…ther

synthetic dataset which can catch the bug fixed in this commit.
  • Loading branch information
DB Tsai committed Apr 28, 2015
1 parent fcbaefe commit 9988ca8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.regression

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import breeze.linalg.{norm => brzNorm, DenseVector => BDV}
import breeze.optimize.{LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}
Expand Down Expand Up @@ -79,11 +79,11 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
/**
* Set the convergence tolerance of iterations.
* Smaller value will lead to higher accuracy with the cost of more iterations.
* Default is 1E-9.
* Default is 1E-6.
* @group setParam
*/
def setTol(value: Double): this.type = set(tol, value)
setDefault(tol -> 1E-9)
setDefault(tol -> 1E-6)

override protected def train(dataset: DataFrame, paramMap: ParamMap): LinearRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist instances.
Expand Down Expand Up @@ -119,12 +119,8 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
val effectiveL1RegParam = paramMap(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - paramMap(elasticNetParam)) * effectiveRegParam

val costFun = new LeastSquaresCostFun(
instances,
yStd, yMean,
featuresStd,
featuresMean,
effectiveL2RegParam)
val costFun = new LeastSquaresCostFun(instances, yStd, yMean,
featuresStd, featuresMean, effectiveL2RegParam)

val optimizer = if (paramMap(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]](paramMap(maxIter), 10, paramMap(tol))
Expand All @@ -137,12 +133,13 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
optimizer.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector)

var state = states.next()
val lossHistory = new ArrayBuffer[Double](paramMap(maxIter))
while(states.hasNext) {
lossHistory.append(state.value)
val lossHistory = mutable.ArrayBuilder.make[Double]

while (states.hasNext) {
lossHistory += state.value
state = states.next()
}
lossHistory.append(state.value)
lossHistory += state.value

// TODO: Based on the sparsity of weights, we may convert the weights to the sparse vector.
// The weights are trained in the scaled space; we're converting them back to
Expand All @@ -151,7 +148,7 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
val rawWeights = state.x.toArray.clone()
var i = 0
while (i < rawWeights.length) {
rawWeights(i) = if (featuresStd(i) != 0.0) rawWeights(i) * yStd / featuresStd(i) else 0.0
rawWeights(i) *= { if (featuresStd(i) != 0.0) yStd / featuresStd(i) else 0.0 }
i += 1
}
Vectors.dense(rawWeights)
Expand Down Expand Up @@ -218,7 +215,7 @@ private class LeastSquaresAggregator(
}
i += 1
}
(weightsArray, -sum, weightsArray.length)
(weightsArray, -sum + labelMean / labelStd, weightsArray.length)
}
private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)

Expand All @@ -237,7 +234,7 @@ private class LeastSquaresAggregator(
require(dim == data.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $dim but got ${data.size}.")

val diff = dot(data, effectiveWeightsVector) - (label - labelMean) / labelStd + offset
val diff = dot(data, effectiveWeightsVector) - label / labelStd + offset

if (diff != 0) {
val localGradientSumArray = gradientSumArray
Expand Down Expand Up @@ -266,7 +263,7 @@ private class LeastSquaresAggregator(
require(dim == other.dim, s"Dimensions mismatch when merging with another " +
s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")

if (this.totalCnt != 0 && other.totalCnt != 0) {
if (other.totalCnt != 0) {
totalCnt += other.totalCnt
lossSum += other.lossSum
diffSum += other.diffSum
Expand All @@ -278,11 +275,6 @@ private class LeastSquaresAggregator(
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
} else if (totalCnt == 0 && other.totalCnt != 0) {
this.totalCnt = other.totalCnt
this.lossSum = other.lossSum
this.diffSum = other.diffSum
System.arraycopy(other.gradientSumArray, 0, this.gradientSumArray, 0, dim)
}
this
}
Expand All @@ -304,7 +296,7 @@ private class LeastSquaresAggregator(
Vectors.dense(temp)
}

axpy(-diffSum, result, correction)
axpy(-diffSum, correction, result)
scal(1.0 / totalCnt, result)
result
}
Expand All @@ -319,27 +311,25 @@ private class LeastSquaresCostFun(
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {

override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
val w = Vectors.fromBreeze(weights)

val leastSquaresAggregator = data.treeAggregate(
new LeastSquaresAggregator(w, labelStd, labelMean, featuresStd, featuresMean))(
seqOp = (c, v) => (c, v) match {
case (aggregator, (label, features)) => aggregator.add(label, features)
},
combOp = (c1, c2) => (c1, c2) match {
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

// regVal is sum of weight squares for L2 regularization
val norm = brzNorm(weights, 2.0)
val regVal = 0.5 * effectiveL2regParam * norm * norm

val loss = leastSquaresAggregator.loss + regVal

val gradientTotal = w.copy
scal(effectiveL2regParam, gradientTotal)
axpy(1.0, leastSquaresAggregator.gradient, gradientTotal)

(loss, gradientTotal.toBreeze.asInstanceOf[BDV[Double]])
}
val w = Vectors.fromBreeze(weights)

val leastSquaresAggregator = data.treeAggregate(new LeastSquaresAggregator(w, labelStd,
labelMean, featuresStd, featuresMean))(
seqOp = (c, v) => (c, v) match {
case (aggregator, (label, features)) => aggregator.add(label, features)
},
combOp = (c1, c2) => (c1, c2) match {
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

// regVal is sum of weight squares for L2 regularization
val norm = brzNorm(weights, 2.0)
val regVal = 0.5 * effectiveL2regParam * norm * norm

val loss = leastSquaresAggregator.loss + regVal
val gradient = leastSquaresAggregator.gradient
axpy(effectiveL2regParam, w, gradient)

(loss, gradient.toBreeze.asInstanceOf[BDV[Double]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.mllib.optimization

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV}
Expand Down Expand Up @@ -164,7 +165,7 @@ object LBFGS extends Logging {
regParam: Double,
initialWeights: Vector): (Vector, Array[Double]) = {

val lossHistory = new ArrayBuffer[Double](maxNumIterations)
val lossHistory = mutable.ArrayBuilder.make[Double]

val numExamples = data.count()

Expand All @@ -181,24 +182,26 @@ object LBFGS extends Logging {
* and regVal is the regularization value computed in the previous iteration as well.
*/
var state = states.next()
while(states.hasNext) {
lossHistory.append(state.value)
while (states.hasNext) {
lossHistory += state.value
state = states.next()
}
lossHistory.append(state.value)
lossHistory += state.value
val weights = Vectors.fromBreeze(state.x)

val lossHistoryArray = lossHistory.result()

logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
lossHistory.takeRight(10).mkString(", ")))
lossHistoryArray.takeRight(10).mkString(", ")))

(weights, lossHistory.toArray)
(weights, lossHistoryArray)
}

/**
* CostFun implements Breeze's DiffFunction[T], which returns the loss and gradient
* at a particular point (weights). It's used in Breeze's convex optimization routines.
*/
private[spark] class CostFun(
private class CostFun(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
Expand Down

0 comments on commit 9988ca8

Please sign in to comment.