Skip to content

Commit

Permalink
update some ml algorithms to use Vector
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 26, 2014
1 parent 8237df8 commit 3f346ba
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.optimization

import org.jblas.DoubleMatrix
import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* Class used to compute the gradient for a loss function, given a single data point.
Expand All @@ -26,38 +26,34 @@ abstract class Gradient extends Serializable {
/**
* Compute the gradient and loss given the features of a single data point.
*
* @param data - Feature values for one data point. Column matrix of size dx1
* where d is the number of features.
* @param label - Label for this data item.
* @param weights - Column matrix containing weights for every feature.
*
* @return A tuple of 2 elements. The first element is a column matrix containing the computed
* gradient and the second element is the loss computed at this data point.
* @param data features for one data point
* @param label label for this data point
* @param weights weights/coefficients corresponding to features
*
* @return (gradient: Vector, loss: Double)
*/
def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
def compute(data: Vector, label: Double, weights: Vector): (Vector, Double)
}

/**
* Compute gradient and loss for a logistic loss function, as used in binary classification.
* See also the documentation for the precise formulation.
*/
class LogisticGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
val margin: Double = -1.0 * data.dot(weights)
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val brzData = data.toBreeze
val brzWeights = data.toBreeze
val margin: Double = -1.0 * brzWeights.dot(brzData)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label

val gradient = data.mul(gradientMultiplier)
val gradient = brzData * gradientMultiplier
val loss =
if (label > 0) {
math.log(1 + math.exp(margin))
} else {
math.log(1 + math.exp(margin)) - margin
}

(gradient, loss)
(Vectors.fromBreeze(gradient), loss)
}
}

Expand All @@ -68,14 +64,14 @@ class LogisticGradient extends Gradient {
* See also the documentation for the precise formulation.
*/
class LeastSquaresGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
val diff: Double = data.dot(weights) - label

override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val brzData = data.toBreeze
val brzWeights = weights.toBreeze
val diff: Double = brzWeights.dot(brzData) - label
val loss = diff * diff
val gradient = data.mul(2.0 * diff)
val gradient = brzData * (2.0 * diff)

(gradient, loss)
(Vectors.fromBreeze(gradient), loss)
}
}

Expand All @@ -85,19 +81,19 @@ class LeastSquaresGradient extends Gradient {
* NOTE: This assumes that the labels are {0,1}
*/
class HingeGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {

val dotProduct = data.dot(weights)
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val brzData = data.toBreeze
val brzWeights = weights.toBreeze
val dotProduct = brzWeights.dot(brzData)

// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = 2 * label - 1.0

if (1.0 > labelScaled * dotProduct) {
(data.mul(-labelScaled), 1.0 - labelScaled * dotProduct)
(Vectors.fromBreeze(brzData * (-labelScaled)), 1.0 - labelScaled * dotProduct)
} else {
(DoubleMatrix.zeros(1, weights.length), 0.0)
(Vectors.dense(new Array[Double](weights.size)), 0.0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.spark.mllib.optimization
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD

import org.jblas.DoubleMatrix

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.mllib.linalg.Vector

/**
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
Expand Down Expand Up @@ -91,8 +91,7 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)
this
}

def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
: Array[Double] = {
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {

val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
data,
Expand Down Expand Up @@ -133,22 +132,22 @@ object GradientDescent extends Logging {
* stochastic loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Array[Double])],
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
initialWeights: Vector): (Vector, Vector) = {

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction

// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var weights = initialWeights.toBreeze.toDenseVector

/**
* For the first iteration, the regVal will be initialized as sum of sqrt of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.mllib.optimization

import org.apache.spark.rdd.RDD

import org.apache.spark.mllib.linalg.Vector

trait Optimizer {

/**
* Solve the provided convex optimization problem.
*/
def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]

def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ package org.apache.spark.mllib.optimization
import scala.math._
import org.jblas.DoubleMatrix

import breeze.linalg.{norm => brzNorm}

import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* Class used to perform steps (weight update) using Gradient Descent methods.
*
Expand Down Expand Up @@ -47,20 +51,24 @@ abstract class Updater extends Serializable {
* @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
* and the second element is the regularization value computed using updated weights.
*/
def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int,
regParam: Double): (DoubleMatrix, Double)
def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double)
}

/**
* A simple updater for gradient descent *without* any regularization.
* Uses a step-size decreasing with the square root of the number of iterations.
*/
class SimpleUpdater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
override def compute(weightsOld: Vector, gradient: Vector,
stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val step = gradient.mul(thisIterStepSize)
(weightsOld.sub(step), 0)
val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize
(Vectors.fromBreeze(brzWeights), 0)
}
}

Expand All @@ -83,19 +91,23 @@ class SimpleUpdater extends Updater {
* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
*/
class L1Updater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val step = gradient.mul(thisIterStepSize)
// Take gradient step
val newWeights = weightsOld.sub(step)
val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize
// Apply proximal operator (soft thresholding)
val shrinkageVal = regParam * thisIterStepSize
(0 until newWeights.length).foreach { i =>
val wi = newWeights.get(i)
newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
(0 until brzWeights.length).foreach { i =>
val wi = brzWeights(i)
brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
}
(newWeights, newWeights.norm1 * regParam)

(Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)
}
}

Expand All @@ -105,16 +117,22 @@ class L1Updater extends Updater {
* Uses a step-size decreasing with the square root of the number of iterations.
*/
class SquaredL2Updater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val step = gradient.mul(thisIterStepSize)
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (DoubleMatrix, Double) = {
// add up both updates from the gradient of the loss (= step) as well as
// the gradient of the regularizer (= regParam * weightsOld)
// w' = w - thisIterStepSize * (gradient + regParam * w)
// w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
val newWeights = weightsOld.mul(1.0 - thisIterStepSize * regParam).sub(step)
(newWeights, 0.5 * pow(newWeights.norm2, 2.0) * regParam)
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights = weightsOld.toBreeze * (1.0 - thisIterStepSize * regParam) -
(gradient.toBreeze * thisIterStepSize)
val norm = brzNorm(brzWeights, 2.0)

(Vectors.fromBreeze(newWeights), 0.5 * regParam * norm * norm)
}
}

Loading

0 comments on commit 3f346ba

Please sign in to comment.