diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 0b9cd4287dcc7..8ecedd74b93c1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.regression import java.io.Serializable +import java.lang.{Double => JDouble} import java.util.Arrays.binarySearch import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} @@ -53,8 +54,9 @@ class IsotonicRegressionModel ( * @param testData Features to be labeled. * @return Predicted labels. */ - def predict(testData: RDD[Double]): RDD[Double] = + def predict(testData: RDD[Double]): RDD[Double] = { testData.map(predict) + } /** * Predict labels for provided features. @@ -63,8 +65,9 @@ class IsotonicRegressionModel ( * @param testData Features to be labeled. * @return Predicted labels. */ - def predict(testData: JavaDoubleRDD): JavaDoubleRDD = + def predict(testData: JavaDoubleRDD): JavaDoubleRDD = { JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]])) + } /** * Predict a single label. @@ -75,8 +78,8 @@ class IsotonicRegressionModel ( * If testData exactly matches a boundary then associated prediction is directly returned * If testData is lower or higher than all boundaries * then first or last prediction is returned respectively - * If testData falls between two values in boundary then predictions is treated as piecewise - * linear function and interpolated value is returned + * If testData falls between two values in boundary then predictions is treated + * as piecewise linear function and interpolated value is returned */ def predict(testData: Double): Double = { @@ -88,8 +91,8 @@ class IsotonicRegressionModel ( val normalisedInsertIndex = -insertIndex - 1 - //Find if the index was lower than all values, - //higher than all values, inbetween two values or exact match. + // Find if the index was lower than all values, + // higher than all values, inbetween two values or exact match. if (insertIndex == -1) { predictions.head } else if (normalisedInsertIndex == boundaries.length){ @@ -121,37 +124,50 @@ class IsotonicRegressionModel ( * "An approach to parallelizing isotonic regression." * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147. */ -class IsotonicRegression extends Serializable { +class IsotonicRegression private (private var isotonic: Boolean) extends Serializable { /** - * Run pool adjacent violators algorithm to obtain isotonic regression model. + * Constructs IsotonicRegression instance with default parameter isotonic = true + * @return New instance of IsotonicRegression + */ + def this() = this(true) + + /** + * Sets the isotonic parameter + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. + * @return The instance of IsotonicRegression + */ + def setIsotonic(isotonic: Boolean): this.type = { + this.isotonic = isotonic + this + } + + /** + * Run IsotonicRegression algorithm to obtain isotonic regression model. * * @param input RDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. * - * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. * @return Isotonic regression model. */ - def run( - input: RDD[(Double, Double, Double)], - isotonic: Boolean): IsotonicRegressionModel = + def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = { createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic) + } -/** + /** * Run pool adjacent violators algorithm to obtain isotonic regression model. * * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable * for which we calculate isotonic regression, feature is independent variable * and weight represents number of measures with default 1. * - * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. * @return Isotonic regression model. */ def run( - input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)], - isotonic: Boolean): IsotonicRegressionModel = - run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic) + input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = { + run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]]) + } /** * Creates isotonic regression model with given parameters. @@ -164,11 +180,7 @@ class IsotonicRegression extends Serializable { protected def createModel( predictions: Array[(Double, Double, Double)], isotonic: Boolean): IsotonicRegressionModel = { - - val labels = predictions.map(_._1) - val features = predictions.map(_._2) - - new IsotonicRegressionModel(features, labels) + new IsotonicRegressionModel(predictions.map(_._2), predictions.map(_._1)) } /** @@ -249,4 +261,4 @@ class IsotonicRegression extends Serializable { poolAdjacentViolators(parallelStepResult.collect(), isotonic) } -} \ No newline at end of file +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java index aacdf97056e0a..046b359ea3eb6 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java @@ -61,7 +61,7 @@ private IsotonicRegressionModel runIsotonicRegression(double[] labels) { JavaRDD> trainRDD = sc.parallelize(generateIsotonicInput(labels)).cache(); - return new IsotonicRegression().run(trainRDD, true); + return new IsotonicRegression().run(trainRDD); } @Before diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 1c001b6d8e14f..24dca73d42cb4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -23,31 +23,35 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers { - private def round(d: Double) = + private def round(d: Double) = { Math.round(d * 100).toDouble / 100 + } - private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = + private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = { labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d)) + } private def generateIsotonicInput( labels: Seq[Double], - weights: Seq[Double]): Seq[(Double, Double, Double)] = + weights: Seq[Double]): Seq[(Double, Double, Double)] = { labels.zip(1 to labels.size) .zip(weights) .map(point => (point._1._1, point._1._2.toDouble, point._2)) + } private def runIsotonicRegression( labels: Seq[Double], weights: Seq[Double], isotonic: Boolean): IsotonicRegressionModel = { val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache() - new IsotonicRegression().run(trainRDD, isotonic) + new IsotonicRegression().setIsotonic(isotonic).run(trainRDD) } private def runIsotonicRegression( labels: Seq[Double], - isotonic: Boolean): IsotonicRegressionModel = + isotonic: Boolean): IsotonicRegressionModel = { runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic) + } test("increasing isotonic regression") { val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true) @@ -99,7 +103,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression with unordered input") { val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache() - val model = new IsotonicRegression().run(trainRDD, true) + val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) }