Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
DB Tsai authored and DB Tsai committed Apr 28, 2015
1 parent 4eb078d commit fcbaefe
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private[shared] object SharedParamsCodeGen {
ParamDesc[Int]("checkpointInterval", "checkpoint interval"),
ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", Some("true")),
ParamDesc[Long]("seed", "random seed", Some("Utils.random.nextLong()")))
ParamDesc[Double]("elasticNetParam", "the ElasticNet mixing parameter"),
ParamDesc[Double]("tol", "the convergence tolerance for iterative algorithms"))

val code = genSharedParams(params)
val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"
Expand Down Expand Up @@ -155,7 +157,6 @@ private[shared] object SharedParamsCodeGen {
|
|import org.apache.spark.annotation.DeveloperApi
|import org.apache.spark.ml.param._
|import org.apache.spark.util.Utils
|
|// DO NOT MODIFY THIS FILE! It was generated by SharedParamsCodeGen.
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,29 @@

package org.apache.spark.ml.regression

import org.apache.spark.mllib.linalg.BLAS.dot
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{norm => brzNorm, DenseVector => BDV, SparseVector => BSV}
import breeze.linalg.{norm => brzNorm, DenseVector => BDV}
import breeze.optimize.{LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}
import breeze.optimize.{CachedDiffFunction, DiffFunction}

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.shared.{HasElasticNetParam, HasMaxIter, HasTol}
import org.apache.spark.ml.param.{Params, ParamMap}
import org.apache.spark.ml.param.shared.{HasElasticNetParam, HasMaxIter, HasRegParam, HasTol}
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS._
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter

/**
* Params for linear regression.
*/
private[regression] trait LinearRegressionParams extends RegressorParams
with HasElasticNetParam with HasMaxIter with HasTol
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol

/**
* :: AlphaComponent ::
Expand All @@ -57,7 +56,7 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
* @group setParam
*/
def setRegParam(value: Double): this.type = set(regParam, value)
setRegParam(0.0)
setDefault(regParam -> 0.0)

/**
* Set the ElasticNet mixing parameter.
Expand All @@ -67,15 +66,15 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
* @group setParam
*/
def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value)
setElasticNetParam(0.0)
setDefault(elasticNetParam -> 0.0)

/**
* Set the maximal number of iterations.
* Default is 100.
* @group setParam
*/
def setMaxIter(value: Int): this.type = set(maxIter, value)
setMaxIter(100)
setDefault(maxIter -> 100)

/**
* Set the convergence tolerance of iterations.
Expand All @@ -84,7 +83,7 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
* @group setParam
*/
def setTol(value: Double): this.type = set(tol, value)
setTol(1E-9)
setDefault(tol -> 1E-9)

override protected def train(dataset: DataFrame, paramMap: ParamMap): LinearRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist instances.
Expand All @@ -96,38 +95,41 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

// TODO: Benchmark if using two MultivariateOnlineSummarizer will be faster
// than appending the label into the vector.
val summary = instances.map { case (label: Double, features: Vector) =>
Vectors.fromBreeze(features.toBreeze match {
case dv: BDV[Double] => BDV.vertcat(dv, new BDV[Double](Array(label)))
case sv: BSV[Double] => BSV.vertcat(sv, new BSV[Double](Array(0), Array(label), 1))
case v: Any =>
throw new IllegalArgumentException("Do not support vector type " + v.getClass)
})}.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))

val numFeatures = summary.mean.size - 1
val yMean = summary.mean(numFeatures)
val yStd = math.sqrt(summary.variance(numFeatures))

val (summarizer, statCounter) = instances.treeAggregate(
(new MultivariateOnlineSummarizer, new StatCounter))( {
case ((summarizer: MultivariateOnlineSummarizer, statCounter: StatCounter),
(label: Double, features: Vector)) =>
(summarizer.add(features), statCounter.merge(label))
}, {
case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: StatCounter),
(summarizer2: MultivariateOnlineSummarizer, statCounter2: StatCounter)) =>
(summarizer1.merge(summarizer2), statCounter1.merge(statCounter2))
})

val numFeatures = summarizer.mean.size
val yMean = statCounter.mean
val yStd = math.sqrt(statCounter.variance)

val featuresMean = summarizer.mean.toArray
val featuresStd = summarizer.variance.toArray.map(math.sqrt)

// Since we implicitly do the feature scaling when we compute the cost function
// to improve the convergence, the effective regParam will be changed.
val effectiveRegParam = paramMap(regParam) / yStd
val effectiveL1RegParam = paramMap(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - paramMap(elasticNetParam)) * effectiveRegParam

val costFun = new LeastSquaresCostFun(
instances,
yStd, yMean,
summary.variance.toArray.slice(0, numFeatures).map(Math.sqrt(_)).toArray,
summary.mean.toArray.slice(0, numFeatures).toArray,
featuresStd,
featuresMean,
effectiveL2RegParam)

val optimizer = if (paramMap(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]](paramMap(maxIter), 10, paramMap(tol))
} else {
new BreezeOWLQN[Int, BDV[Double]](
paramMap(maxIter), 10, effectiveL1RegParam, paramMap(tol))
new BreezeOWLQN[Int, BDV[Double]](paramMap(maxIter), 10, effectiveL1RegParam, paramMap(tol))
}

val initialWeights = Vectors.zeros(numFeatures)
Expand All @@ -142,20 +144,23 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
}
lossHistory.append(state.value)

// TODO: Based on the sparsity of weights, we may convert the weights to the sparse vector.
// The weights are trained in the scaled space; we're converting them back to
// the original space.
val weights = {
val rawWeights = state.x.toArray
val std = summary.variance.toArray.slice(0, numFeatures).map(Math.sqrt(_)).toArray
require(rawWeights.size == std.size)

val rawWeights = state.x.toArray.clone()
var i = 0
while (i < rawWeights.size) {
rawWeights(i) = if (std(i) != 0.0) rawWeights(i) * yStd / std(i) else 0.0
while (i < rawWeights.length) {
rawWeights(i) = if (featuresStd(i) != 0.0) rawWeights(i) * yStd / featuresStd(i) else 0.0
i += 1
}
Vectors.dense(rawWeights)
}

val intercept = yMean - dot(weights, Vectors.dense(summary.mean.toArray.slice(0, numFeatures)))
// The intercept in R's GLMNET is computed using closed form after the coefficients are
// converged. See the following discussion for detail.
// http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
val intercept = yMean - dot(weights, Vectors.dense(featuresMean))

if (handlePersistence) {
instances.unpersist()
Expand All @@ -179,7 +184,7 @@ class LinearRegressionModel private[ml] (
with LinearRegressionParams {

override protected def predict(features: Vector): Double = {
BLAS.dot(features, weights) + intercept
dot(features, weights) + intercept
}

override protected def copy(): LinearRegressionModel = {
Expand All @@ -204,7 +209,7 @@ private class LeastSquaresAggregator(
val weightsArray = weights.toArray.clone()
var sum = 0.0
var i = 0
while (i < weights.size) {
while (i < weightsArray.length) {
if (featuresStd(i) != 0.0) {
weightsArray(i) /= featuresStd(i)
sum += weightsArray(i) * featuresMean(i)
Expand All @@ -215,9 +220,9 @@ private class LeastSquaresAggregator(
}
(weightsArray, -sum, weightsArray.length)
}

private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)
private var gradientSumArray: Array[Double] = Array.ofDim[Double](dim)

private val gradientSumArray: Array[Double] = Array.ofDim[Double](dim)

/**
* Add a new training data to this LeastSquaresAggregator, and update the loss and gradient
Expand Down Expand Up @@ -258,15 +263,16 @@ private class LeastSquaresAggregator(
* @return This LeastSquaresAggregator object.
*/
def merge(other: LeastSquaresAggregator): this.type = {
require(dim == other.dim, s"Dimensions mismatch when merging with another " +
s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")

if (this.totalCnt != 0 && other.totalCnt != 0) {
require(dim == other.dim, s"Dimensions mismatch when merging with another summarizer. " +
s"Expecting $dim but got ${other.dim}.")
totalCnt += other.totalCnt
lossSum += other.lossSum
diffSum += other.diffSum

var i = 0
val localThisGradientSumArray = gradientSumArray
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
while (i < dim) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
Expand All @@ -276,7 +282,7 @@ private class LeastSquaresAggregator(
this.totalCnt = other.totalCnt
this.lossSum = other.lossSum
this.diffSum = other.diffSum
this.gradientSumArray = other.gradientSumArray.clone
System.arraycopy(other.gradientSumArray, 0, this.gradientSumArray, 0, dim)
}
this
}
Expand All @@ -286,12 +292,12 @@ private class LeastSquaresAggregator(
def loss: Double = lossSum / totalCnt

def gradient: Vector = {
val result = Vectors.dense(gradientSumArray.clone)
val result = Vectors.dense(gradientSumArray.clone())

val correction = {
val temp = effectiveWeightsArray.clone
val temp = effectiveWeightsArray.clone()
var i = 0
while (i < temp.size) {
while (i < temp.length) {
temp(i) *= featuresMean(i)
i += 1
}
Expand Down Expand Up @@ -324,22 +330,16 @@ private class LeastSquaresCostFun(
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

/**
* regVal is sum of weight squares if it's L2 updater;
* for other updater, the same logic is followed.
*/
// regVal is sum of weight squares for L2 regularization
val norm = brzNorm(weights, 2.0)
val regVal = 0.5 * effectiveL2regParam * norm * norm

val loss = leastSquaresAggregator.loss + regVal
// The following gradientTotal is actually the regularization part of gradient.
// Will add the gradientSum computed from the data with weights in the next step.

val gradientTotal = w.copy
scal(effectiveL2regParam, gradientTotal)

// gradientTotal = gradient + gradientTotal
axpy(1.0, leastSquaresAggregator.gradient, gradientTotal)

(loss, gradientTotal.toBreeze.asInstanceOf[BDV[Double]])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.ml.regression
import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext}
import org.apache.spark.sql.{SQLContext, DataFrame}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.{Row, SQLContext, DataFrame}

class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext {

Expand Down Expand Up @@ -73,12 +73,11 @@ class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext {
assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
assert(model.weights(1) ~== weightsR(1) relTol 1E-3)

model.transform(dataset).select("features", "prediction").collect().map {instance =>
val features = instance(0).asInstanceOf[DenseVector].toArray
val prediction1 = instance(1).asInstanceOf[Double]
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
model.transform(dataset).select("features", "prediction").collect().foreach {
case Row(features: DenseVector, prediction1: Double) =>
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
}
}

Expand All @@ -102,12 +101,11 @@ class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext {
assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
assert(model.weights(1) ~== weightsR(1) relTol 1E-3)

model.transform(dataset).select("features", "prediction").collect().map {instance =>
val features = instance(0).asInstanceOf[DenseVector].toArray
val prediction1 = instance(1).asInstanceOf[Double]
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
model.transform(dataset).select("features", "prediction").collect().foreach {
case Row(features: DenseVector, prediction1: Double) =>
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
}
}

Expand All @@ -131,12 +129,11 @@ class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext {
assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
assert(model.weights(1) ~== weightsR(1) relTol 1E-3)

model.transform(dataset).select("features", "prediction").collect().map {instance =>
val features = instance(0).asInstanceOf[DenseVector].toArray
val prediction1 = instance(1).asInstanceOf[Double]
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
model.transform(dataset).select("features", "prediction").collect().foreach {
case Row(features: DenseVector, prediction1: Double) =>
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
}
}

Expand All @@ -160,12 +157,11 @@ class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext {
assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
assert(model.weights(1) ~== weightsR(1) relTol 1E-3)

model.transform(dataset).select("features", "prediction").collect().map { instance =>
val features = instance(0).asInstanceOf[DenseVector].toArray
val prediction1 = instance(1).asInstanceOf[Double]
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
model.transform(dataset).select("features", "prediction").collect().foreach {
case Row(features: DenseVector, prediction1: Double) =>
val prediction2 =
features(0) * model.weights(0) + features(1) * model.weights(1) + model.intercept
assert(prediction1 ~== prediction2 relTol 1E-5)
}
}
}

0 comments on commit fcbaefe

Please sign in to comment.