From 5e52bff9daf677814c34b38013d70835263d7155 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 25 Nov 2014 13:30:33 -0800 Subject: [PATCH] * Removed the 1/2 from SquaredError. This also required updating the test suite since it effectively doubles the gradient and loss. * Added doc for developers within RandomForest. * Small cleanup in test suite (generating data only once) --- .../spark/mllib/tree/RandomForest.scala | 41 ++++++++++ .../spark/mllib/tree/loss/SquaredError.scala | 8 +- .../tree/GradientBoostedTreesSuite.scala | 74 ++++++++++++------- 3 files changed, 94 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala index d30524a6e1aa5..482d3395516e7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala @@ -71,6 +71,47 @@ private class RandomForest ( private val seed: Int) extends Serializable with Logging { + /* + ALGORITHM + This is a sketch of the algorithm to help new developers. + + The algorithm partitions data by instances (rows). + On each iteration, the algorithm splits a set of nodes. In order to choose the best split + for a given node, sufficient statistics are collected from the distributed data. + For each node, the statistics are collected to some worker node, and that worker selects + the best split. + + This setup requires discretization of continuous features. This binning is done in the + findSplitsBins() method during initialization, after which each continuous feature becomes + an ordered discretized feature with at most maxBins possible values. + + The main loop in the algorithm operates on a queue of nodes (nodeQueue). These nodes + lie at the periphery of the tree being trained. If multiple trees are being trained at once, + then this queue contains nodes from all of them. Each iteration works roughly as follows: + On the master node: + - Some number of nodes are pulled off of the queue (based on the amount of memory + required for their sufficient statistics). + - For random forests, if featureSubsetStrategy is not "all," then a subset of candidate + features are chosen for each node. See method selectNodesToSplit(). + On worker nodes, via method findBestSplits(): + - The worker makes one pass over its subset of instances. + - For each (tree, node, feature, split) tuple, the worker collects statistics about + splitting. Note that the set of (tree, node) pairs is limited to the nodes selected + from the queue for this iteration. The set of features considered can also be limited + based on featureSubsetStrategy. + - For each node, the statistics for that node are aggregated to a particular worker + via reduceByKey(). The designated worker chooses the best (feature, split) pair, + or chooses to stop splitting if the stopping criteria are met. + On the master node: + - The master collects all decisions about splitting nodes and updates the model. + - The updated model is passed to the workers on the next iteration. + This process continues until the node queue is empty. + + Most of the methods in this implementation support the statistics aggregation, which is + the heaviest part of the computation. In general, this implementation is bound by either + the cost of statistics computation on workers or by communicating the sufficient statistics. + */ + strategy.assertValid() require(numTrees > 0, s"RandomForest requires numTrees > 0, but was given numTrees = $numTrees.") require(RandomForest.supportedFeatureSubsetStrategies.contains(featureSubsetStrategy), diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index 29b5d34981ea4..50ecaa2f86f35 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD * Class for squared error loss calculation. * * The squared (L2) error is defined as: - * (y - F(x))**2 / 2 + * (y - F(x))**2 * where y is the label and F(x) is the model prediction for features x. */ @DeveloperApi @@ -36,7 +36,7 @@ object SquaredError extends Loss { /** * Method to calculate the gradients for the gradient boosting calculation for least * squares error calculation. - * The gradient with respect to F(x) is: - (y - F(x)) + * The gradient with respect to F(x) is: - 2 (y - F(x)) * @param model Ensemble model * @param point Instance of the training dataset * @return Loss gradient @@ -44,7 +44,7 @@ object SquaredError extends Loss { override def gradient( model: TreeEnsembleModel, point: LabeledPoint): Double = { - model.predict(point.features) - point.label + 2.0 * (model.predict(point.features) - point.label) } /** @@ -59,6 +59,6 @@ object SquaredError extends Loss { data.map { y => val err = model.predict(y.features) - y.label err * err - }.mean() / 2.0 + }.mean() } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index f3f8eff2db300..d4d54cf4c9e2a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -35,32 +35,39 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { test("Regression with continuous features: SquaredError") { GradientBoostedTreesSuite.testCombinations.foreach { case (numIterations, learningRate, subsamplingRate) => - val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100) - val rdd = sc.parallelize(arr, 2) - - val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2, - categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate) - val boostingStrategy = - new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate) - - val gbt = GradientBoostedTrees.train(rdd, boostingStrategy) - - assert(gbt.trees.size === numIterations) - EnsembleTestHelper.validateRegressor(gbt, arr, 0.03) - - val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) - val dt = DecisionTree.train(remappedInput, treeStrategy) - - // Make sure trees are the same. - assert(gbt.trees.head.toString == dt.toString) + GradientBoostedTreesSuite.randomSeeds.foreach { randomSeed => + val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2) + + val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2, + categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate) + val boostingStrategy = + new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate) + + val gbt = GradientBoostedTrees.train(rdd, boostingStrategy) + + assert(gbt.trees.size === numIterations) + try { + EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.06) + } catch { + case e: java.lang.AssertionError => + println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + + s" subsamplingRate=$subsamplingRate") + throw e + } + + val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) + val dt = DecisionTree.train(remappedInput, treeStrategy) + + // Make sure trees are the same. + assert(gbt.trees.head.toString == dt.toString) + } } } test("Regression with continuous features: Absolute Error") { GradientBoostedTreesSuite.testCombinations.foreach { case (numIterations, learningRate, subsamplingRate) => - val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100) - val rdd = sc.parallelize(arr, 2) + val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2) val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2, categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate) @@ -70,7 +77,14 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { val gbt = GradientBoostedTrees.train(rdd, boostingStrategy) assert(gbt.trees.size === numIterations) - EnsembleTestHelper.validateRegressor(gbt, arr, 0.85, "mae") + try { + EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.85, "mae") + } catch { + case e: java.lang.AssertionError => + println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + + s" subsamplingRate=$subsamplingRate") + throw e + } val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) val dt = DecisionTree.train(remappedInput, treeStrategy) @@ -83,8 +97,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { test("Binary classification with continuous features: Log Loss") { GradientBoostedTreesSuite.testCombinations.foreach { case (numIterations, learningRate, subsamplingRate) => - val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100) - val rdd = sc.parallelize(arr, 2) + val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2) val treeStrategy = new Strategy(algo = Classification, impurity = Variance, maxDepth = 2, numClassesForClassification = 2, categoricalFeaturesInfo = Map.empty, @@ -95,7 +108,14 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { val gbt = GradientBoostedTrees.train(rdd, boostingStrategy) assert(gbt.trees.size === numIterations) - EnsembleTestHelper.validateClassifier(gbt, arr, 0.9) + try { + EnsembleTestHelper.validateClassifier(gbt, GradientBoostedTreesSuite.data, 0.9) + } catch { + case e: java.lang.AssertionError => + println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + + s" subsamplingRate=$subsamplingRate") + throw e + } val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) val ensembleStrategy = treeStrategy.copy @@ -113,5 +133,9 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { object GradientBoostedTreesSuite { // Combinations for estimators, learning rates and subsamplingRate - val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 1.0, 0.75), (10, 0.1, 0.75)) + val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 0.5, 0.75), (10, 0.1, 0.75)) + + val randomSeeds = Array(681283, 4398) + + val data = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100) }