Skip to content

Commit

Permalink
[SPARK-4583] [mllib] LogLoss for GradientBoostedTrees fix + doc updates
Browse files Browse the repository at this point in the history
Currently, the LogLoss used by GradientBoostedTrees has 2 issues:
* the gradient (and therefore loss) does not match that used by Friedman (1999)
* the error computation uses 0/1 accuracy, not log loss

This PR updates LogLoss.
It also adds some doc for boosting and forests.

I tested it on sample data and made sure the log loss is monotonically decreasing with each boosting iteration.

CC: mengxr manishamde codedeft

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #3439 from jkbradley/gbt-loss-fix and squashes the following commits:

cfec17e [Joseph K. Bradley] removed forgotten temp comments
a27eb6d [Joseph K. Bradley] corrections to last log loss commit
ed5da2c [Joseph K. Bradley] updated LogLoss (boosting) for numerical stability
5e52bff [Joseph K. Bradley] * Removed the 1/2 from SquaredError.  This also required updating the test suite since it effectively doubles the gradient and loss. * Added doc for developers within RandomForest. * Small cleanup in test suite (generating data only once)
e57897a [Joseph K. Bradley] Fixed LogLoss for GradientBoostedTrees, and updated doc for losses, forests, and boosting

(cherry picked from commit c251fd7)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
  • Loading branch information
jkbradley authored and mengxr committed Nov 26, 2014
1 parent e866972 commit 6b5564a
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ import org.apache.spark.storage.StorageLevel

/**
* :: Experimental ::
* A class that implements Stochastic Gradient Boosting for regression and binary classification.
* A class that implements
* [[http://en.wikipedia.org/wiki/Gradient_boosting Stochastic Gradient Boosting]]
* for regression and binary classification.
*
* The implementation is based upon:
* J.H. Friedman. "Stochastic Gradient Boosting." 1999.
*
* Notes:
* - This currently can be run with several loss functions. However, only SquaredError is
* fully supported. Specifically, the loss function should be used to compute the gradient
* (to re-label training instances on each iteration) and to weight weak hypotheses.
* Currently, gradients are computed correctly for the available loss functions,
* but weak hypothesis weights are not computed correctly for LogLoss or AbsoluteError.
* Running with those losses will likely behave reasonably, but lacks the same guarantees.
* Notes on Gradient Boosting vs. TreeBoost:
* - This implementation is for Stochastic Gradient Boosting, not for TreeBoost.
* - Both algorithms learn tree ensembles by minimizing loss functions.
* - TreeBoost (Friedman, 1999) additionally modifies the outputs at tree leaf nodes
* based on the loss function, whereas the original gradient boosting method does not.
* - When the loss is SquaredError, these methods give the same result, but they could differ
* for other loss functions.
*
* @param boostingStrategy Parameters for the gradient boosting algorithm.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.util.Utils

/**
* :: Experimental ::
* A class which implements a random forest learning algorithm for classification and regression.
* A class that implements a [[http://en.wikipedia.org/wiki/Random_forest Random Forest]]
* learning algorithm for classification and regression.
* It supports both continuous and categorical features.
*
* The settings for featureSubsetStrategy are based on the following references:
Expand Down Expand Up @@ -70,6 +71,47 @@ private class RandomForest (
private val seed: Int)
extends Serializable with Logging {

/*
ALGORITHM
This is a sketch of the algorithm to help new developers.
The algorithm partitions data by instances (rows).
On each iteration, the algorithm splits a set of nodes. In order to choose the best split
for a given node, sufficient statistics are collected from the distributed data.
For each node, the statistics are collected to some worker node, and that worker selects
the best split.
This setup requires discretization of continuous features. This binning is done in the
findSplitsBins() method during initialization, after which each continuous feature becomes
an ordered discretized feature with at most maxBins possible values.
The main loop in the algorithm operates on a queue of nodes (nodeQueue). These nodes
lie at the periphery of the tree being trained. If multiple trees are being trained at once,
then this queue contains nodes from all of them. Each iteration works roughly as follows:
On the master node:
- Some number of nodes are pulled off of the queue (based on the amount of memory
required for their sufficient statistics).
- For random forests, if featureSubsetStrategy is not "all," then a subset of candidate
features are chosen for each node. See method selectNodesToSplit().
On worker nodes, via method findBestSplits():
- The worker makes one pass over its subset of instances.
- For each (tree, node, feature, split) tuple, the worker collects statistics about
splitting. Note that the set of (tree, node) pairs is limited to the nodes selected
from the queue for this iteration. The set of features considered can also be limited
based on featureSubsetStrategy.
- For each node, the statistics for that node are aggregated to a particular worker
via reduceByKey(). The designated worker chooses the best (feature, split) pair,
or chooses to stop splitting if the stopping criteria are met.
On the master node:
- The master collects all decisions about splitting nodes and updates the model.
- The updated model is passed to the workers on the next iteration.
This process continues until the node queue is empty.
Most of the methods in this implementation support the statistics aggregation, which is
the heaviest part of the computation. In general, this implementation is bound by either
the cost of statistics computation on workers or by communicating the sufficient statistics.
*/

strategy.assertValid()
require(numTrees > 0, s"RandomForest requires numTrees > 0, but was given numTrees = $numTrees.")
require(RandomForest.supportedFeatureSubsetStrategies.contains(featureSubsetStrategy),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@

package org.apache.spark.mllib.tree.loss

import org.apache.spark.SparkContext._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.model.TreeEnsembleModel
import org.apache.spark.rdd.RDD

/**
* :: DeveloperApi ::
* Class for least absolute error loss calculation.
* The features x and the corresponding label y is predicted using the function F.
* For each instance:
* Loss: |y - F|
* Negative gradient: sign(y - F)
* Class for absolute error loss calculation (for regression).
*
* The absolute (L1) error is defined as:
* |y - F(x)|
* where y is the label and F(x) is the model prediction for features x.
*/
@DeveloperApi
object AbsoluteError extends Loss {

/**
* Method to calculate the gradients for the gradient boosting calculation for least
* absolute error calculation.
* @param model Model of the weak learner
* The gradient with respect to F(x) is: sign(F(x) - y)
* @param model Ensemble model
* @param point Instance of the training dataset
* @return Loss gradient
*/
Expand All @@ -48,19 +48,17 @@ object AbsoluteError extends Loss {
}

/**
* Method to calculate error of the base learner for the gradient boosting calculation.
* Method to calculate loss of the base learner for the gradient boosting calculation.
* Note: This method is not used by the gradient boosting algorithm but is useful for debugging
* purposes.
* @param model Model of the weak learner.
* @param model Ensemble model
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return
* @return Mean absolute error of model on data
*/
override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
val sumOfAbsolutes = data.map { y =>
data.map { y =>
val err = model.predict(y.features) - y.label
math.abs(err)
}.sum()
sumOfAbsolutes / data.count()
}.mean()
}

}
34 changes: 22 additions & 12 deletions mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,50 @@ import org.apache.spark.rdd.RDD

/**
* :: DeveloperApi ::
* Class for least squares error loss calculation.
* Class for log loss calculation (for classification).
* This uses twice the binomial negative log likelihood, called "deviance" in Friedman (1999).
*
* The features x and the corresponding label y is predicted using the function F.
* For each instance:
* Loss: log(1 + exp(-2yF)), y in {-1, 1}
* Negative gradient: 2y / ( 1 + exp(2yF))
* The log loss is defined as:
* 2 log(1 + exp(-2 y F(x)))
* where y is a label in {-1, 1} and F(x) is the model prediction for features x.
*/
@DeveloperApi
object LogLoss extends Loss {

/**
* Method to calculate the loss gradients for the gradient boosting calculation for binary
* classification
* @param model Model of the weak learner
* The gradient with respect to F(x) is: - 4 y / (1 + exp(2 y F(x)))
* @param model Ensemble model
* @param point Instance of the training dataset
* @return Loss gradient
*/
override def gradient(
model: TreeEnsembleModel,
point: LabeledPoint): Double = {
val prediction = model.predict(point.features)
1.0 / (1.0 + math.exp(-prediction)) - point.label
- 4.0 * point.label / (1.0 + math.exp(2.0 * point.label * prediction))
}

/**
* Method to calculate error of the base learner for the gradient boosting calculation.
* Method to calculate loss of the base learner for the gradient boosting calculation.
* Note: This method is not used by the gradient boosting algorithm but is useful for debugging
* purposes.
* @param model Model of the weak learner.
* @param model Ensemble model
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return
* @return Mean log loss of model on data
*/
override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
val wrongPredictions = data.filter(lp => model.predict(lp.features) != lp.label).count()
wrongPredictions / data.count
data.map { case point =>
val prediction = model.predict(point.features)
val margin = 2.0 * point.label * prediction
// The following are equivalent to 2.0 * log(1 + exp(-margin)) but are more numerically
// stable.
if (margin >= 0) {
2.0 * math.log1p(math.exp(-margin))
} else {
2.0 * (-margin + math.log1p(math.exp(margin)))
}
}.mean()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,48 @@

package org.apache.spark.mllib.tree.loss

import org.apache.spark.SparkContext._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.model.TreeEnsembleModel
import org.apache.spark.rdd.RDD

/**
* :: DeveloperApi ::
* Class for least squares error loss calculation.
* Class for squared error loss calculation.
*
* The features x and the corresponding label y is predicted using the function F.
* For each instance:
* Loss: (y - F)**2/2
* Negative gradient: y - F
* The squared (L2) error is defined as:
* (y - F(x))**2
* where y is the label and F(x) is the model prediction for features x.
*/
@DeveloperApi
object SquaredError extends Loss {

/**
* Method to calculate the gradients for the gradient boosting calculation for least
* squares error calculation.
* @param model Model of the weak learner
* The gradient with respect to F(x) is: - 2 (y - F(x))
* @param model Ensemble model
* @param point Instance of the training dataset
* @return Loss gradient
*/
override def gradient(
model: TreeEnsembleModel,
point: LabeledPoint): Double = {
model.predict(point.features) - point.label
2.0 * (model.predict(point.features) - point.label)
}

/**
* Method to calculate error of the base learner for the gradient boosting calculation.
* Method to calculate loss of the base learner for the gradient boosting calculation.
* Note: This method is not used by the gradient boosting algorithm but is useful for debugging
* purposes.
* @param model Model of the weak learner.
* @param model Ensemble model
* @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return
* @return Mean squared error of model on data
*/
override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = {
data.map { y =>
val err = model.predict(y.features) - y.label
err * err
}.mean()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,39 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
test("Regression with continuous features: SquaredError") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
val rdd = sc.parallelize(arr, 2)

val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2,
categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate)
val boostingStrategy =
new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate)

val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
EnsembleTestHelper.validateRegressor(gbt, arr, 0.03)

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val dt = DecisionTree.train(remappedInput, treeStrategy)

// Make sure trees are the same.
assert(gbt.trees.head.toString == dt.toString)
GradientBoostedTreesSuite.randomSeeds.foreach { randomSeed =>
val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2)

val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2,
categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate)
val boostingStrategy =
new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate)

val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
try {
EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.06)
} catch {
case e: java.lang.AssertionError =>
println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
throw e
}

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val dt = DecisionTree.train(remappedInput, treeStrategy)

// Make sure trees are the same.
assert(gbt.trees.head.toString == dt.toString)
}
}
}

test("Regression with continuous features: Absolute Error") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
val rdd = sc.parallelize(arr, 2)
val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2)

val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2,
categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate)
Expand All @@ -70,7 +77,14 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
EnsembleTestHelper.validateRegressor(gbt, arr, 0.85, "mae")
try {
EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.85, "mae")
} catch {
case e: java.lang.AssertionError =>
println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
throw e
}

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val dt = DecisionTree.train(remappedInput, treeStrategy)
Expand All @@ -83,8 +97,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
test("Binary classification with continuous features: Log Loss") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
val rdd = sc.parallelize(arr, 2)
val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2)

val treeStrategy = new Strategy(algo = Classification, impurity = Variance, maxDepth = 2,
numClassesForClassification = 2, categoricalFeaturesInfo = Map.empty,
Expand All @@ -95,7 +108,14 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
EnsembleTestHelper.validateClassifier(gbt, arr, 0.9)
try {
EnsembleTestHelper.validateClassifier(gbt, GradientBoostedTreesSuite.data, 0.9)
} catch {
case e: java.lang.AssertionError =>
println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
throw e
}

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val ensembleStrategy = treeStrategy.copy
Expand All @@ -113,5 +133,9 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
object GradientBoostedTreesSuite {

// Combinations for estimators, learning rates and subsamplingRate
val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 1.0, 0.75), (10, 0.1, 0.75))
val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 0.5, 0.75), (10, 0.1, 0.75))

val randomSeeds = Array(681283, 4398)

val data = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
}

0 comments on commit 6b5564a

Please sign in to comment.