Skip to content

Commit

Permalink
* Removed the 1/2 from SquaredError. This also required updating the …
Browse files Browse the repository at this point in the history
…test suite since it effectively doubles the gradient and loss.

* Added doc for developers within RandomForest.
* Small cleanup in test suite (generating data only once)
  • Loading branch information
jkbradley committed Nov 25, 2014
1 parent e57897a commit 5e52bff
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,47 @@ private class RandomForest (
private val seed: Int)
extends Serializable with Logging {

/*
ALGORITHM
This is a sketch of the algorithm to help new developers.
The algorithm partitions data by instances (rows).
On each iteration, the algorithm splits a set of nodes. In order to choose the best split
for a given node, sufficient statistics are collected from the distributed data.
For each node, the statistics are collected to some worker node, and that worker selects
the best split.
This setup requires discretization of continuous features. This binning is done in the
findSplitsBins() method during initialization, after which each continuous feature becomes
an ordered discretized feature with at most maxBins possible values.
The main loop in the algorithm operates on a queue of nodes (nodeQueue). These nodes
lie at the periphery of the tree being trained. If multiple trees are being trained at once,
then this queue contains nodes from all of them. Each iteration works roughly as follows:
On the master node:
- Some number of nodes are pulled off of the queue (based on the amount of memory
required for their sufficient statistics).
- For random forests, if featureSubsetStrategy is not "all," then a subset of candidate
features are chosen for each node. See method selectNodesToSplit().
On worker nodes, via method findBestSplits():
- The worker makes one pass over its subset of instances.
- For each (tree, node, feature, split) tuple, the worker collects statistics about
splitting. Note that the set of (tree, node) pairs is limited to the nodes selected
from the queue for this iteration. The set of features considered can also be limited
based on featureSubsetStrategy.
- For each node, the statistics for that node are aggregated to a particular worker
via reduceByKey(). The designated worker chooses the best (feature, split) pair,
or chooses to stop splitting if the stopping criteria are met.
On the master node:
- The master collects all decisions about splitting nodes and updates the model.
- The updated model is passed to the workers on the next iteration.
This process continues until the node queue is empty.
Most of the methods in this implementation support the statistics aggregation, which is
the heaviest part of the computation. In general, this implementation is bound by either
the cost of statistics computation on workers or by communicating the sufficient statistics.
*/

strategy.assertValid()
require(numTrees > 0, s"RandomForest requires numTrees > 0, but was given numTrees = $numTrees.")
require(RandomForest.supportedFeatureSubsetStrategies.contains(featureSubsetStrategy),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
* Class for squared error loss calculation.
*
* The squared (L2) error is defined as:
* (y - F(x))**2 / 2
* (y - F(x))**2
* where y is the label and F(x) is the model prediction for features x.
*/
@DeveloperApi
Expand All @@ -36,15 +36,15 @@ object SquaredError extends Loss {
/**
* Method to calculate the gradients for the gradient boosting calculation for least
* squares error calculation.
* The gradient with respect to F(x) is: - (y - F(x))
* The gradient with respect to F(x) is: - 2 (y - F(x))
* @param model Ensemble model
* @param point Instance of the training dataset
* @return Loss gradient
*/
override def gradient(
model: TreeEnsembleModel,
point: LabeledPoint): Double = {
model.predict(point.features) - point.label
2.0 * (model.predict(point.features) - point.label)
}

/**
Expand All @@ -59,6 +59,6 @@ object SquaredError extends Loss {
data.map { y =>
val err = model.predict(y.features) - y.label
err * err
}.mean() / 2.0
}.mean()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,39 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
test("Regression with continuous features: SquaredError") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
val rdd = sc.parallelize(arr, 2)

val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2,
categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate)
val boostingStrategy =
new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate)

val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
EnsembleTestHelper.validateRegressor(gbt, arr, 0.03)

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val dt = DecisionTree.train(remappedInput, treeStrategy)

// Make sure trees are the same.
assert(gbt.trees.head.toString == dt.toString)
GradientBoostedTreesSuite.randomSeeds.foreach { randomSeed =>
val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2)

val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2,
categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate)
val boostingStrategy =
new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate)

val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
try {
EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.06)
} catch {
case e: java.lang.AssertionError =>
println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
throw e
}

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val dt = DecisionTree.train(remappedInput, treeStrategy)

// Make sure trees are the same.
assert(gbt.trees.head.toString == dt.toString)
}
}
}

test("Regression with continuous features: Absolute Error") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
val rdd = sc.parallelize(arr, 2)
val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2)

val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2,
categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate)
Expand All @@ -70,7 +77,14 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
EnsembleTestHelper.validateRegressor(gbt, arr, 0.85, "mae")
try {
EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.85, "mae")
} catch {
case e: java.lang.AssertionError =>
println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
throw e
}

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val dt = DecisionTree.train(remappedInput, treeStrategy)
Expand All @@ -83,8 +97,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
test("Binary classification with continuous features: Log Loss") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
val rdd = sc.parallelize(arr, 2)
val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2)

val treeStrategy = new Strategy(algo = Classification, impurity = Variance, maxDepth = 2,
numClassesForClassification = 2, categoricalFeaturesInfo = Map.empty,
Expand All @@ -95,7 +108,14 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
val gbt = GradientBoostedTrees.train(rdd, boostingStrategy)

assert(gbt.trees.size === numIterations)
EnsembleTestHelper.validateClassifier(gbt, arr, 0.9)
try {
EnsembleTestHelper.validateClassifier(gbt, GradientBoostedTreesSuite.data, 0.9)
} catch {
case e: java.lang.AssertionError =>
println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
throw e
}

val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
val ensembleStrategy = treeStrategy.copy
Expand All @@ -113,5 +133,9 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
object GradientBoostedTreesSuite {

// Combinations for estimators, learning rates and subsamplingRate
val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 1.0, 0.75), (10, 0.1, 0.75))
val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 0.5, 0.75), (10, 0.1, 0.75))

val randomSeeds = Array(681283, 4398)

val data = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100)
}

0 comments on commit 5e52bff

Please sign in to comment.