From 1fff77d54df2d1f7703b8b0048140ca7bf03c78d Mon Sep 17 00:00:00 2001 From: martinzapletal Date: Thu, 29 Jan 2015 17:20:38 +0100 Subject: [PATCH] SPARK-3278 changes after PR comments https://github.com/apache/spark/pull/3519. Java api changes, test refactoring, comments and citations, isotonic regression model validations, linear interpolation for predictions --- .../mllib/regression/IsotonicRegression.scala | 236 ++++++++++-------- .../JavaIsotonicRegressionSuite.java | 78 +++--- .../regression/IsotonicRegressionSuite.scala | 193 +++++--------- .../regression/LinearRegressionSuite.scala | 2 +- .../mllib/util/IsotonicDataGenerator.scala | 70 ------ 5 files changed, 239 insertions(+), 340 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 5aa2340f680d2..0b9cd4287dcc7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -24,88 +24,142 @@ import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} import org.apache.spark.rdd.RDD /** - * Regression model for Isotonic regression + * Regression model for isotonic regression. * - * @param features Array of features. - * @param labels Array of labels associated to the features at the same index. + * @param boundaries Array of boundaries for which predictions are known. + * @param predictions Array of predictions associated to the boundaries at the same index. */ class IsotonicRegressionModel ( - features: Array[Double], - val labels: Array[Double]) + boundaries: Array[Double], + val predictions: Array[Double]) extends Serializable { + private def isSorted(xs: Array[Double]): Boolean = { + var i = 1 + while (i < xs.length) { + if (xs(i) < xs(i - 1)) false + i += 1 + } + true + } + + assert(isSorted(boundaries)) + assert(boundaries.length == predictions.length) + /** - * Predict labels for provided features - * Using a piecewise constant function + * Predict labels for provided features. + * Using a piecewise linear function. * - * @param testData features to be labeled - * @return predicted labels + * @param testData Features to be labeled. + * @return Predicted labels. */ def predict(testData: RDD[Double]): RDD[Double] = testData.map(predict) /** - * Predict labels for provided features - * Using a piecewise constant function + * Predict labels for provided features. + * Using a piecewise linear function. * - * @param testData features to be labeled - * @return predicted labels + * @param testData Features to be labeled. + * @return Predicted labels. */ - def predict(testData: JavaRDD[java.lang.Double]): JavaDoubleRDD = + def predict(testData: JavaDoubleRDD): JavaDoubleRDD = JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]])) /** - * Predict a single label - * Using a piecewise constant function + * Predict a single label. + * Using a piecewise linear function. * - * @param testData feature to be labeled - * @return predicted label + * @param testData Feature to be labeled. + * @return Predicted label. + * If testData exactly matches a boundary then associated prediction is directly returned + * If testData is lower or higher than all boundaries + * then first or last prediction is returned respectively + * If testData falls between two values in boundary then predictions is treated as piecewise + * linear function and interpolated value is returned */ def predict(testData: Double): Double = { - val result = binarySearch(features, testData) - val index = - if (result == -1) { - 0 - } else if (result < 0) { - -result - 2 - } else { - result - } + def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = { + y1 + (y2 - y1) * (x - x1) / (x2 - x1) + } - labels(index) + val insertIndex = binarySearch(boundaries, testData) + + val normalisedInsertIndex = -insertIndex - 1 + + //Find if the index was lower than all values, + //higher than all values, inbetween two values or exact match. + if (insertIndex == -1) { + predictions.head + } else if (normalisedInsertIndex == boundaries.length){ + predictions.last + } else if (insertIndex < 0) { + linearInterpolation( + boundaries(normalisedInsertIndex - 1), + predictions(normalisedInsertIndex - 1), + boundaries(normalisedInsertIndex), + predictions(normalisedInsertIndex), + testData) + } else { + predictions(insertIndex) + } } } /** - * Isotonic regression - * Currently implemented using parallel pool adjacent violators algorithm + * Isotonic regression. + * Currently implemented using parallelized pool adjacent violators algorithm. + * Currently only univariate (single feature) algorithm supported. + * + * Sequential PAV implementation based on: + * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani. + * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61. + * + * Sequential PAV parallelized as per: + * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset. + * "An approach to parallelizing isotonic regression." + * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147. */ -class IsotonicRegression - extends Serializable { +class IsotonicRegression extends Serializable { /** - * Run algorithm to obtain isotonic regression model + * Run pool adjacent violators algorithm to obtain isotonic regression model. + * + * @param input RDD of tuples (label, feature, weight) where label is dependent variable + * for which we calculate isotonic regression, feature is independent variable + * and weight represents number of measures with default 1. * - * @param input (label, feature, weight) - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence - * @return isotonic regression model + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @return Isotonic regression model. */ def run( input: RDD[(Double, Double, Double)], - isotonic: Boolean = true): IsotonicRegressionModel = { - createModel( - parallelPoolAdjacentViolators(input, isotonic), - isotonic) - } + isotonic: Boolean): IsotonicRegressionModel = + createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic) + +/** + * Run pool adjacent violators algorithm to obtain isotonic regression model. + * + * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable + * for which we calculate isotonic regression, feature is independent variable + * and weight represents number of measures with default 1. + * + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @return Isotonic regression model. + */ + def run( + input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)], + isotonic: Boolean): IsotonicRegressionModel = + run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic) /** - * Creates isotonic regression model with given parameters + * Creates isotonic regression model with given parameters. * - * @param predictions labels estimated using isotonic regression algorithm. + * @param predictions Predictions calculated using pool adjacent violators algorithm. * Used for predictions on new data points. - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence - * @return isotonic regression model + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @return Isotonic regression model. */ protected def createModel( predictions: Array[(Double, Double, Double)], @@ -118,30 +172,30 @@ class IsotonicRegression } /** - * Performs a pool adjacent violators algorithm (PAVA) + * Performs a pool adjacent violators algorithm (PAV). * Uses approach with single processing of data where violators * in previously processed data created by pooling are fixed immediatelly. - * Uses optimization of discovering monotonicity violating sequences (blocks) - * Method in situ mutates input array + * Uses optimization of discovering monotonicity violating sequences (blocks). * - * @param in input data - * @param isotonic asc or desc - * @return result + * @param input Input data of tuples (label, feature, weight). + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @return Result tuples (label, feature, weight) where labels were updated + * to form a monotone sequence as per isotonic regression definition. */ private def poolAdjacentViolators( - in: Array[(Double, Double, Double)], + input: Array[(Double, Double, Double)], isotonic: Boolean): Array[(Double, Double, Double)] = { - // Pools sub array within given bounds assigning weighted average value to all elements - def pool(in: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = in.slice(start, end + 1) + // Pools sub array within given bounds assigning weighted average value to all elements. + def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { + val poolSubArray = input.slice(start, end + 1) val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum val weight = poolSubArray.map(_._3).sum var i = start while (i <= end) { - in(i) = (weightedSum / weight, in(i)._2, in(i)._3) + input(i) = (weightedSum / weight, input(i)._2, input(i)._3) i = i + 1 } } @@ -150,22 +204,22 @@ class IsotonicRegression (x, y) => if (isotonic) x <= y else x >= y var i = 0 - while (i < in.length) { + while (i < input.length) { var j = i - // Find monotonicity violating sequence, if any - while (j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) { + // Find monotonicity violating sequence, if any. + while (j < input.length - 1 && !monotonicityConstraintHolds(input(j)._1, input(j + 1)._1)) { j = j + 1 } - // If monotonicity was not violated, move to next data point + // If monotonicity was not violated, move to next data point. if (i == j) { i = i + 1 } else { // Otherwise pool the violating sequence - // And check if pooling caused monotonicity violation in previously processed points - while (i >= 0 && !monotonicityConstraintHolds(in(i)._1, in(i + 1)._1)) { - pool(in, i, j) + // and check if pooling caused monotonicity violation in previously processed points. + while (i >= 0 && !monotonicityConstraintHolds(input(i)._1, input(i + 1)._1)) { + pool(input, i, j) i = i - 1 } @@ -173,16 +227,17 @@ class IsotonicRegression } } - in + input } /** - * Performs parallel pool adjacent violators algorithm - * Calls Pool adjacent violators on each partition and then again on the result + * Performs parallel pool adjacent violators algorithm. + * Performs Pool adjacent violators algorithm on each partition and then again on the result. * - * @param testData input - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence - * @return result + * @param testData Input data of tuples (label, feature, weight). + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @return Result tuples (label, feature, weight) where labels were updated + * to form a monotone sequence as per isotonic regression definition. */ private def parallelPoolAdjacentViolators( testData: RDD[(Double, Double, Double)], @@ -194,45 +249,4 @@ class IsotonicRegression poolAdjacentViolators(parallelStepResult.collect(), isotonic) } -} - -/** - * Top-level methods for monotone regression (either isotonic or antitonic). - */ -object IsotonicRegression { - - /** - * Train a monotone regression model given an RDD of (label, feature, weight). - * Label is the dependent y value - * Weight of the data point is the number of measurements. Default is 1 - * - * @param input RDD of (label, feature, weight). - * Each point describes a row of the data - * matrix A as well as the corresponding right hand side label y - * and weight as number of measurements - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence - */ - def train( - input: RDD[(Double, Double, Double)], - isotonic: Boolean = true): IsotonicRegressionModel = { - new IsotonicRegression().run(input, isotonic) - } - - /** - * Train a monotone regression model given an RDD of (label, feature, weight). - * Label is the dependent y value - * Weight of the data point is the number of measurements. Default is 1 - * - * @param input RDD of (label, feature, weight). - * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence - * @return - */ - def train( - input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)], - isotonic: Boolean): IsotonicRegressionModel = { - new IsotonicRegression() - .run( - input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], - isotonic) - } -} +} \ No newline at end of file diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index b064e1aeec203..aacdf97056e0a 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -18,8 +18,11 @@ package org.apache.spark.mllib.regression; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import org.apache.spark.api.java.JavaDoubleRDD; import scala.Tuple3; import org.junit.After; @@ -27,29 +30,26 @@ import org.junit.Before; import org.junit.Test; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.mllib.util.IsotonicDataGenerator; public class JavaIsotonicRegressionSuite implements Serializable { private transient JavaSparkContext sc; - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); - } + private List> generateIsotonicInput(double[] labels) { + List> input = new ArrayList<>(); - @After - public void tearDown() { - sc.stop(); - sc = null; + for(int i = 1; i <= labels.length; i++) { + input.add(new Tuple3(labels[i-1], (double)i, 1d)); + } + + return input; } - double difference(List> expected, IsotonicRegressionModel model) { + private double difference(List> expected, IsotonicRegressionModel model) { double diff = 0; - for(int i = 0; i < model.labels().length; i++) { + for(int i = 0; i < model.predictions().length; i++) { Tuple3 exp = expected.get(i); diff += Math.abs(model.predict(exp._2()) - exp._1()); } @@ -57,39 +57,49 @@ public void tearDown() { return diff; } - @Test - public void runIsotonicRegressionUsingStaticMethod() { - JavaRDD> trainRDD = sc.parallelize( - IsotonicDataGenerator.generateIsotonicInputAsList( - new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); + private IsotonicRegressionModel runIsotonicRegression(double[] labels) { + JavaRDD> trainRDD = + sc.parallelize(generateIsotonicInput(labels)).cache(); + + return new IsotonicRegression().run(trainRDD, true); + } - IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true); + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); + } - List> expected = IsotonicDataGenerator - .generateIsotonicInputAsList( - new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void testIsotonicRegressionJavaRDD() { + IsotonicRegressionModel model = + runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12}); + + List> expected = + generateIsotonicInput(new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12}); Assert.assertTrue(difference(expected, model) == 0); } @Test - public void testPredictJavaRDD() { - JavaRDD> trainRDD = sc.parallelize( - IsotonicDataGenerator.generateIsotonicInputAsList( - new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache(); - - IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true); + public void testIsotonicRegressionPredictionsJavaRDD() { + IsotonicRegressionModel model = + runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12}); - JavaRDD testRDD = trainRDD.map(new Function, Double>() { - @Override - public Double call(Tuple3 v) throws Exception { - return v._2(); - } - }); + JavaDoubleRDD testRDD = + sc.parallelizeDoubles(Arrays.asList(new Double[] {0.0, 1.0, 9.5, 12.0, 13.0})); List predictions = model.predict(testRDD).collect(); Assert.assertTrue(predictions.get(0) == 1d); - Assert.assertTrue(predictions.get(11) == 12d); + Assert.assertTrue(predictions.get(1) == 1d); + Assert.assertTrue(predictions.get(2) == 10d); + Assert.assertTrue(predictions.get(3) == 12d); + Assert.assertTrue(predictions.get(4) == 12d); } } \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 06f18fb17ec55..1c001b6d8e14f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -19,198 +19,143 @@ package org.apache.spark.mllib.regression import org.scalatest.{Matchers, FunSuite} -import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} -import org.apache.spark.mllib.util.IsotonicDataGenerator._ +import org.apache.spark.mllib.util.MLlibTestSparkContext -class IsotonicRegressionSuite - extends FunSuite - with MLlibTestSparkContext - with Matchers { +class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers { - private def round(d: Double): Double = + private def round(d: Double) = Math.round(d * 100).toDouble / 100 - test("increasing isotonic regression") { - val trainRDD = sc.parallelize( - generateIsotonicInput( - 1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = + labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d)) - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + private def generateIsotonicInput( + labels: Seq[Double], + weights: Seq[Double]): Seq[(Double, Double, Double)] = + labels.zip(1 to labels.size) + .zip(weights) + .map(point => (point._1._1, point._1._2.toDouble, point._2)) - model.labels should be( - Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + private def runIsotonicRegression( + labels: Seq[Double], + weights: Seq[Double], + isotonic: Boolean): IsotonicRegressionModel = { + val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache() + new IsotonicRegression().run(trainRDD, isotonic) } - test("increasing isotonic regression using api") { - val trainRDD = sc.parallelize( - generateIsotonicInput( - 1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache() + private def runIsotonicRegression( + labels: Seq[Double], + isotonic: Boolean): IsotonicRegressionModel = + runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic) - val model = IsotonicRegression.train(trainRDD, true) + test("increasing isotonic regression") { + val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true) - model.labels should be( - Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20)) + assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18)) } test("isotonic regression with size 0") { - val trainRDD = sc.parallelize(List[(Double, Double, Double)]()).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(), true) - model.labels should be(Array()) + assert(model.predictions === Array()) } test("isotonic regression with size 1") { - val trainRDD = sc.parallelize(generateIsotonicInput(1)).cache() + val model = runIsotonicRegression(Seq(1), true) - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) - - model.labels should be(Array(1.0)) + assert(model.predictions === Array(1.0)) } test("isotonic regression strictly increasing sequence") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true) - model.labels should be(Array(1, 2, 3, 4, 5)) + assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { - val trainRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true) - model.labels should be(Array(3, 3, 3, 3, 3)) + assert(model.predictions === Array(3, 3, 3, 3, 3)) } test("isotonic regression with last element violating monotonicity") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache() + val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true) - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) - - model.labels should be(Array(1, 2, 3, 3, 3)) + assert(model.predictions === Array(1, 2, 3, 3, 3)) } test("isotonic regression with first element violating monotonicity") { - val trainRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true) - model.labels should be(Array(3, 3, 3, 4, 5)) + assert(model.predictions === Array(3, 3, 3, 4, 5)) } test("isotonic regression with negative labels") { - val trainRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true) - model.labels should be(Array(-1.5, -1.5, 0, 0, 0)) + assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0)) } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache() + val model = new IsotonicRegression().run(trainRDD, true) - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) - - model.labels should be(Array(1, 2, 3, 4, 5)) + assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { - val trainRDD = sc.parallelize( - generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true) - model.labels should be(Array(1, 2, 2.75, 2.75,2.75)) + assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75)) } test("weighted isotonic regression with weights lower than 1") { - val trainRDD = sc.parallelize( - generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache() + val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) - - model.labels.map(round) should be(Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) + assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) } test("weighted isotonic regression with negative weights") { - val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) - model.labels should be(Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) + assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) } test("weighted isotonic regression with zero weights") { - val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true) - model.labels should be(Array(1, 2, 2, 2, 2)) + assert(model.predictions === Array(1, 2, 2, 2, 2)) } test("isotonic regression prediction") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() + val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) - - model.predict(0) should be(1) - model.predict(2) should be(2) - model.predict(3) should be(10d/3) - model.predict(10) should be(10d/3) + assert(model.predict(-1) === 1) + assert(model.predict(0) === 1) + assert(model.predict(1.5) === 1.5) + assert(model.predict(1.75) === 1.75) + assert(model.predict(2) === 2) + assert(model.predict(3) === 10d/3) + assert(model.predict(10) === 10d/3) } test("isotonic regression RDD prediction") { - val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache() - val testRDD = sc.parallelize(List(0.0, 2.0, 3.0, 10.0)).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, true) + val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) + val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache() - model.predict(testRDD).collect() should be(Array(1, 2, 10.0/3, 10.0/3)) + assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) } test("antitonic regression prediction") { - val trainRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache() - - val alg = new IsotonicRegression - val model = alg.run(trainRDD, false) - - model.predict(0) should be(7) - model.predict(2) should be(5) - model.predict(3) should be(4) - model.predict(10) should be(1) - } -} - -class IsotonicRegressionClusterSuite - extends FunSuite - with LocalClusterSparkContext { - - test("task size should be small in both training and prediction") { - val n = 1000 - - val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d)) - val points = sc.parallelize(trainData, 2) + val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false) - // If we serialize data directly in the task closure, the size of the serialized task would be - // greater than 1MB and hence Spark would throw an error. - val model = IsotonicRegression.train(points) - val predictions = model.predict(points.map(_._2)) + assert(model.predict(-1) === 7) + assert(model.predict(0) === 7) + assert(model.predict(1.5) === 6) + assert(model.predict(1.75) === 5.5) + assert(model.predict(2) === 5) + assert(model.predict(3) === 4) + assert(model.predict(10) === 1) } -} +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala index 864622a9296a6..3d5625ad41b6a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala @@ -137,7 +137,7 @@ class LinearRegressionClusterSuite extends FunSuite with LocalClusterSparkContex }.cache() // If we serialize data directly in the task closure, the size of the serialized task would be // greater than 1MB and hence Spark would throw an error. - val model = LinearRegressionWithSGD.train(points, 2) + val model = LinearRegressionWithSGD.train(points, 2 ) val predictions = model.predict(points.map(_.features)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala deleted file mode 100644 index 2aaa5b72b97d2..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.util - -import java.lang.{Double => JDouble} - -import scala.collection.JavaConversions._ - -import org.apache.spark.annotation.DeveloperApi - -/** - * :: DeveloperApi :: - * Generate test data for Isotonic regresision. - */ -@DeveloperApi -object IsotonicDataGenerator { - - /** - * Return a Java List of ordered labeled points - * - * @param labels list of labels for the data points - * @return Java List of input. - */ - def generateIsotonicInputAsList( - labels: Array[Double]):java.util.List[(JDouble, JDouble, JDouble)] = { - seqAsJavaList( - generateIsotonicInput(wrapDoubleArray(labels):_*) - .map(x => (new JDouble(x._1), new JDouble(x._2), new JDouble(1)))) - } - - /** - * Return an ordered sequence of labeled data points with default weights - * - * @param labels list of labels for the data points - * @return sequence of data points - */ - def generateIsotonicInput(labels: Double*): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size) - .map(point => (point._1, point._2.toDouble, 1d)) - } - - /** - * Return an ordered sequence of labeled weighted data points - * - * @param labels list of labels for the data points - * @param weights list of weights for the data points - * @return sequence of data points - */ - def generateWeightedIsotonicInput( - labels: Seq[Double], - weights: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size).zip(weights) - .map(point => (point._1._1, point._1._2.toDouble, point._2)) - } -}