Skip to content

Commit

Permalink
optimized MLUtils.computeStats
Browse files Browse the repository at this point in the history
update some ml algorithms to use Vector (cont.)
  • Loading branch information
mengxr committed Mar 26, 2014
1 parent 135ab72 commit 834ada2
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* Class used to solve an optimization problem using Gradient Descent.
Expand Down Expand Up @@ -139,47 +139,46 @@ object GradientDescent extends Logging {
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Vector) = {
initialWeights: Vector): (Vector, Array[Double]) = {

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction

// Initialize weights as a column vector
var weights = initialWeights.toBreeze.toDenseVector
var weights = Vectors.dense(initialWeights.toArray)

/**
* For the first iteration, the regVal will be initialized as sum of sqrt of
* weights if it's L2 update; for L1 update; the same logic is followed.
*/
var regVal = updater.compute(
weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

for (i <- 1 to numIterations) {
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map {
case (y, features) =>
val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
val (grad, loss) = gradient.compute(featuresCol, y, weights)
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
val (grad, loss) = gradient.compute(features, y, weights)
(grad.toBreeze, loss)
}.reduce((a, b) => (a._1 += b._1, a._2 + b._2))

/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights.toArray, stochasticLossHistory.toArray)
(weights, stochasticLossHistory.toArray)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.mllib.optimization

import scala.math._
import org.jblas.DoubleMatrix

import breeze.linalg.{norm => brzNorm}

Expand Down Expand Up @@ -122,7 +121,7 @@ class SquaredL2Updater extends Updater {
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (DoubleMatrix, Double) = {
regParam: Double): (Vector, Double) = {
// add up both updates from the gradient of the loss (= step) as well as
// the gradient of the regularizer (= regParam * weightsOld)
// w' = w - thisIterStepSize * (gradient + regParam * w)
Expand All @@ -132,7 +131,7 @@ class SquaredL2Updater extends Updater {
(gradient.toBreeze * thisIterStepSize)
val norm = brzNorm(brzWeights, 2.0)

(Vectors.fromBreeze(newWeights), 0.5 * regParam * norm * norm)
(Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]

/** Prepends one to the input vector. */
private def prependOne(vector: Vector): Vector = {
val vectorWithIntercept = vector match {
case dv: BDV[Double] => BDV.vertcat(BDV.ones(1), dv)
val vectorWithIntercept = vector.toBreeze match {
case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv)
case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv)
case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.util.MLUtils

import org.jblas.DoubleMatrix
import org.apache.spark.mllib.linalg.{Vector, Vectors}

/**
* Regression model trained using LinearRegression.
Expand All @@ -31,15 +30,15 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class LinearRegressionModel(
override val weights: Array[Double],
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

override protected def predictPoint(
dataMatrix: DoubleMatrix,
weightMatrix: DoubleMatrix,
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double): Double = {
dataMatrix.dot(weightMatrix) + intercept
weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}

Expand Down Expand Up @@ -69,7 +68,7 @@ class LinearRegressionWithSGD private (
*/
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Array[Double], intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
}
Expand Down Expand Up @@ -98,7 +97,7 @@ object LinearRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
initialWeights: Vector)
: LinearRegressionModel =
{
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
Expand Down Expand Up @@ -172,7 +171,7 @@ object LinearRegressionWithSGD {
val sc = new SparkContext(args(0), "LinearRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Weights: " + model.weights)
println("Intercept: " + model.intercept)

sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vectors, Vector}

import breeze.linalg.{Vector => BV, DenseVector => BDV}

/**
* Regression model trained using RidgeRegression.
Expand All @@ -36,10 +38,10 @@ class RidgeRegressionModel(
with RegressionModel with Serializable {

override protected def predictPoint(
dataMatrix: DoubleMatrix,
weightMatrix: DoubleMatrix,
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double): Double = {
dataMatrix.dot(weightMatrix) + intercept
weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}

Expand Down Expand Up @@ -71,8 +73,8 @@ class RidgeRegressionWithSGD private (
super.setIntercept(false)

var yMean = 0.0
var xColMean: DoubleMatrix = _
var xColSd: DoubleMatrix = _
var xColMean: BV[Double] = _
var xColSd: BV[Double] = _

/**
* Construct a RidgeRegression object with default parameters
Expand All @@ -85,33 +87,33 @@ class RidgeRegressionWithSGD private (
this
}

override protected def createModel(weights: Array[Double], intercept: Double) = {
val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
val weightsScaled = weightsMat.div(xColSd)
val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
override protected def createModel(weights: Vector, intercept: Double) = {
val weightsMat = weights.toBreeze
val weightsScaled = weightsMat :/ xColSd
val interceptScaled = yMean - weightsMat.dot(xColMean :/ xColSd)

new RidgeRegressionModel(weightsScaled.data, interceptScaled)
new RidgeRegressionModel(Vectors.fromBreeze(weightsScaled), interceptScaled)
}

override def run(
input: RDD[LabeledPoint],
initialWeights: Array[Double])
initialWeights: Vector)
: RidgeRegressionModel =
{
val nfeatures: Int = input.first().features.length
val nfeatures: Int = input.first().features.size
val nexamples: Long = input.count()

// To avoid penalizing the intercept, we center and scale the data.
val stats = MLUtils.computeStats(input, nfeatures, nexamples)
yMean = stats._1
xColMean = stats._2
xColSd = stats._3
xColMean = stats._2.toBreeze
xColSd = stats._3.toBreeze

val normalizedData = input.map { point =>
val yNormalized = point.label - yMean
val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
LabeledPoint(yNormalized, featuresNormalized.toArray)
val featuresMat = point.features.toBreeze
val featuresNormalized = (featuresMat - xColMean) :/ xColSd
LabeledPoint(yNormalized, Vectors.fromBreeze(featuresNormalized))
}

super.run(normalizedData, initialWeights)
Expand Down Expand Up @@ -143,7 +145,7 @@ object RidgeRegressionWithSGD {
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
initialWeights: Vector)
: RidgeRegressionModel =
{
new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(
Expand Down Expand Up @@ -220,7 +222,8 @@ object RidgeRegressionWithSGD {
val data = MLUtils.loadLabeledData(sc, args(1))
val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
args(3).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))

println("Weights: " + model.weights)
println("Intercept: " + model.intercept)

sc.stop()
Expand Down
73 changes: 39 additions & 34 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.mllib.util

import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
squaredDistance => breezeSquaredDistance}

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

import org.jblas.DoubleMatrix

import org.apache.spark.mllib.regression.LabeledPoint

import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

/**
* Helper methods to load, save and pre-process data used in ML Lib.
Expand Down Expand Up @@ -54,7 +52,7 @@ object MLUtils {
sc.textFile(dir).map { line =>
val parts = line.split(',')
val label = parts(0).toDouble
val features = parts(1).trim().split(' ').map(_.toDouble)
val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble))
LabeledPoint(label, features)
}
}
Expand All @@ -68,52 +66,59 @@ object MLUtils {
* @param dir Directory to save the data.
*/
def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
dataStr.saveAsTextFile(dir)
}

/**
* Utility function to compute mean and standard deviation on a given dataset.
*
* @param data - input data set whose statistics are computed
* @param nfeatures - number of features
* @param nexamples - number of examples in input dataset
* @param numFeatures - number of features
* @param numExamples - number of examples in input dataset
*
* @return (yMean, xColMean, xColSd) - Tuple consisting of
* yMean - mean of the labels
* xColMean - Row vector with mean for every column (or feature) of the input data
* xColSd - Row vector standard deviation for every column (or feature) of the input data.
*/
def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
(Double, DoubleMatrix, DoubleMatrix) = {
val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples
def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long)
: (Double, Vector, Vector) = {

// NOTE: We shuffle X by column here to compute column sum and sum of squares.
val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint =>
val nCols = labeledPoint.features.length
// Traverse over every column and emit (col, value, value^2)
Iterator.tabulate(nCols) { i =>
(i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i)))
}
}.reduceByKey { case(x1, x2) =>
(x1._1 + x2._1, x1._2 + x2._2)
val brzData = data.map { case LabeledPoint(label, features) =>
(label, features.toBreeze)
}
val xColSumsMap = xColSumSq.collectAsMap()

val xColMean = DoubleMatrix.zeros(nfeatures, 1)
val xColSd = DoubleMatrix.zeros(nfeatures, 1)
val aggStats = brzData.aggregate(
(0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
)(
seqOp = (c , v) => (c, v) match {
case ((n, sumLabel, sum, sumSq), (label, features)) =>
features.activeIterator.foreach { case (i, x) =>
sumSq(i) += x * x
}
(n + 1L, sumLabel + label, sum += features, sumSq)
},
combOp = (c1, c2) => (c1, c2) match {
case ((n1, sumLabel1, sum1, sumSq1), (n2, sumLabel2, sum2, sumSq2)) =>
(n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
}
)

val (nl, sumLabel, sum, sumSq) = aggStats
require(nl > 0, "Input data is empty.")

// Compute mean and unbiased variance using column sums
var col = 0
while (col < nfeatures) {
xColMean.put(col, xColSumsMap(col)._1 / nexamples)
val variance =
(xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / nexamples
xColSd.put(col, math.sqrt(variance))
col += 1
val n = nl.toDouble
val yMean = sumLabel / n
val mean: BDV[Double] = sum / n
val std = new Array[Double](sum.length)
var i = 0
while (i < numFeatures) {
std(i) = sumSq(i) / n - mean(i) * mean(i)
i += 1
}

(yMean, xColMean, xColSd)
(yMean, Vectors.fromBreeze(mean), Vectors.dense(std))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.spark.mllib.regression

import org.scalatest.FunSuite

import org.jblas.DoubleMatrix
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite

import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}


class RidgeRegressionSuite extends FunSuite with LocalSparkContext {

def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = {
Expand Down

0 comments on commit 834ada2

Please sign in to comment.