diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 1a7965a7c04b4..ca2ac820fab13 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -21,6 +21,8 @@ import java.io.Serializable import java.lang.{Double => JDouble} import java.util.Arrays.binarySearch +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} import org.apache.spark.rdd.RDD @@ -208,9 +210,13 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali private def poolAdjacentViolators( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { + if (input.isEmpty) { + return Array.empty + } + // Pools sub array within given bounds assigning weighted average value to all elements. def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = input.view.slice(start, end + 1) + val poolSubArray = input.slice(start, end + 1) val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum val weight = poolSubArray.map(_._3).sum @@ -246,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } } - input + // For points having the same prediction, we only keep two boundary points. + val compressed = ArrayBuffer.empty[(Double, Double, Double)] + + var (curLabel, curFeature, curWeight) = input.head + var rightBound = curFeature + def merge(): Unit = { + compressed += ((curLabel, curFeature, curWeight)) + if (rightBound > curFeature) { + compressed += ((curLabel, rightBound, 0.0)) + } + } + i = 1 + while (i < input.length) { + val (label, feature, weight) = input(i) + if (label == curLabel) { + curWeight += weight + rightBound = feature + } else { + merge() + curLabel = label + curFeature = feature + curWeight = weight + rightBound = curFeature + } + i += 1 + } + merge() + + compressed.toArray } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 0e8025c1e0e2e..9a516402267a9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -29,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d)) + Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d)) } private def generateIsotonicInput( labels: Seq[Double], weights: Seq[Double]): Seq[(Double, Double, Double)] = { - labels.zip(1 to labels.size) - .zip(weights) - .map(point => (point._1._1, point._1._2.toDouble, point._2)) + Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i))) } private def runIsotonicRegression( @@ -55,87 +53,123 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } test("increasing isotonic regression") { - val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true) - assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18)) + /* + The following result could be re-produced with sklearn. + + > from sklearn.isotonic import IsotonicRegression + > x = range(9) + > y = [1, 2, 3, 1, 6, 17, 16, 17, 18] + > ir = IsotonicRegression(x, y) + > print ir.predict(x) + + array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ]) + */ + val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) + + assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) + + assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) + assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) + assert(model.isotonic) } test("isotonic regression with size 0") { val model = runIsotonicRegression(Seq(), true) + assert(model.predictions === Array()) } test("isotonic regression with size 1") { val model = runIsotonicRegression(Seq(1), true) + assert(model.predictions === Array(1.0)) } test("isotonic regression strictly increasing sequence") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true) + assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("isotonic regression strictly decreasing sequence") { val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true) - assert(model.predictions === Array(3, 3, 3, 3, 3)) + + assert(model.boundaries === Array(0, 4)) + assert(model.predictions === Array(3, 3)) } test("isotonic regression with last element violating monotonicity") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true) - assert(model.predictions === Array(1, 2, 3, 3, 3)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(1, 2, 3, 3)) } test("isotonic regression with first element violating monotonicity") { val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true) - assert(model.predictions === Array(3, 3, 3, 4, 5)) + + assert(model.boundaries === Array(0, 2, 3, 4)) + assert(model.predictions === Array(3, 3, 4, 5)) } test("isotonic regression with negative labels") { val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true) - assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(-1.5, -1.5, 0, 0)) } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache() + val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2) + val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) } test("weighted isotonic regression") { val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true) - assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions === Array(1, 2, 2.75, 2.75)) } test("weighted isotonic regression with weights lower than 1") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) - assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2)) + + assert(model.boundaries === Array(0, 1, 2, 4)) + assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2)) } test("weighted isotonic regression with negative weights") { val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) - assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6)) + + assert(model.boundaries === Array(0.0, 1.0, 4.0)) + assert(model.predictions === Array(1.0, 10.0/6, 10.0/6)) } test("weighted isotonic regression with zero weights") { val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true) - assert(model.predictions === Array(1, 2, 2, 2, 2)) + + assert(model.boundaries === Array(0.0, 1.0, 4.0)) + assert(model.predictions === Array(1, 2, 2)) } test("isotonic regression prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) + assert(model.predict(-2) === 1) assert(model.predict(-1) === 1) - assert(model.predict(0) === 1) - assert(model.predict(1.5) === 1.5) - assert(model.predict(1.75) === 1.75) - assert(model.predict(2) === 2) - assert(model.predict(3) === 10d/3) - assert(model.predict(10) === 10d/3) + assert(model.predict(0.5) === 1.5) + assert(model.predict(0.75) === 1.75) + assert(model.predict(1) === 2) + assert(model.predict(2) === 10d/3) + assert(model.predict(9) === 10d/3) } test("isotonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache() + (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2) val model = new IsotonicRegression().run(trainRDD) assert(model.predict(0) === 1) @@ -147,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("antitonic regression prediction with duplicate features") { val trainRDD = sc.parallelize( Seq[(Double, Double, Double)]( - (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache() + (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2) val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) assert(model.predict(0) === 6) @@ -158,21 +192,22 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M test("isotonic regression RDD prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true) - val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache() - assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) + val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2) + val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2) + assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) } test("antitonic regression prediction") { val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false) + assert(model.predict(-2) === 7) assert(model.predict(-1) === 7) - assert(model.predict(0) === 7) - assert(model.predict(1.5) === 6) - assert(model.predict(1.75) === 5.5) - assert(model.predict(2) === 5) - assert(model.predict(3) === 4) - assert(model.predict(10) === 1) + assert(model.predict(0.5) === 6) + assert(model.predict(0.75) === 5.5) + assert(model.predict(1) === 5) + assert(model.predict(2) === 4) + assert(model.predict(9) === 1) } test("model construction") {