Skip to content

Commit

Permalink
Removed MonotonicityConstraint, Isotonic and Antitonic constraints. R…
Browse files Browse the repository at this point in the history
…eplced by simple boolean
  • Loading branch information
zapletal-martin committed Dec 27, 2014
1 parent c06f88c commit 089bf86
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,17 @@
package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint._
import org.apache.spark.rdd.RDD

/**
* Monotonicity constrains for monotone regression
* Isotonic (increasing)
* Antitonic (decreasing)
*/
object MonotonicityConstraint {

object MonotonicityConstraint {

sealed trait MonotonicityConstraint {
private[regression] def holds(
current: WeightedLabeledPoint,
next: WeightedLabeledPoint): Boolean
}

/**
* Isotonic monotonicity constraint. Increasing sequence
*/
case object Isotonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label <= next.label
}
}

/**
* Antitonic monotonicity constrain. Decreasing sequence
*/
case object Antitonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label >= next.label
}
}
}

val Isotonic = MonotonicityConstraint.Isotonic
val Antitonic = MonotonicityConstraint.Antitonic
}

/**
* Regression model for Isotonic regression
*
* @param predictions Weights computed for every feature.
* @param monotonicityConstraint specifies if the sequence is increasing or decreasing
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
*/
class IsotonicRegressionModel(
val predictions: Seq[WeightedLabeledPoint],
val monotonicityConstraint: MonotonicityConstraint)
val isotonic: Boolean)
extends RegressionModel {

override def predict(testData: RDD[Vector]): RDD[Double] =
Expand All @@ -91,23 +52,23 @@ trait IsotonicRegressionAlgorithm
*
* @param predictions labels estimated using isotonic regression algorithm.
* Used for predictions on new data points.
* @param monotonicityConstraint isotonic or antitonic
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
*/
protected def createModel(
predictions: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
isotonic: Boolean): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
*
* @param input data
* @param monotonicityConstraint ascending or descenting
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
*/
def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
isotonic: Boolean): IsotonicRegressionModel
}

/**
Expand All @@ -118,16 +79,16 @@ class PoolAdjacentViolators private [mllib]

override def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
isotonic: Boolean): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, monotonicityConstraint),
monotonicityConstraint)
parallelPoolAdjacentViolators(input, isotonic),
isotonic)
}

override protected def createModel(
predictions: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions, monotonicityConstraint)
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions, isotonic)
}

/**
Expand All @@ -138,12 +99,12 @@ class PoolAdjacentViolators private [mllib]
* Method in situ mutates input array
*
* @param in input data
* @param monotonicityConstraint asc or desc
* @param isotonic asc or desc
* @return result
*/
private def poolAdjacentViolators(
in: Array[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Array[WeightedLabeledPoint] = {
isotonic: Boolean): Array[WeightedLabeledPoint] = {

// Pools sub array within given bounds assigning weighted average value to all elements
def pool(in: Array[WeightedLabeledPoint], start: Int, end: Int): Unit = {
Expand All @@ -159,11 +120,17 @@ class PoolAdjacentViolators private [mllib]

var i = 0

val monotonicityConstrainter: (Double, Double) => Boolean = (x, y) => if(isotonic) {
x <= y
} else {
x >= y
}

while(i < in.length) {
var j = i

// Find monotonicity violating sequence, if any
while(j < in.length - 1 && !monotonicityConstraint.holds(in(j), in(j + 1))) {
while(j < in.length - 1 && !monotonicityConstrainter(in(j).label, in(j + 1).label)) {
j = j + 1
}

Expand All @@ -173,7 +140,7 @@ class PoolAdjacentViolators private [mllib]
} else {
// Otherwise pool the violating sequence
// And check if pooling caused monotonicity violation in previously processed points
while (i >= 0 && !monotonicityConstraint.holds(in(i), in(i + 1))) {
while (i >= 0 && !monotonicityConstrainter(in(i).label, in(i + 1).label)) {
pool(in, i, j)
i = i - 1
}
Expand All @@ -190,19 +157,19 @@ class PoolAdjacentViolators private [mllib]
* Calls Pool adjacent violators on each partition and then again on the result
*
* @param testData input
* @param monotonicityConstraint asc or desc
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return result
*/
private def parallelPoolAdjacentViolators(
testData: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Seq[WeightedLabeledPoint] = {
isotonic: Boolean): Seq[WeightedLabeledPoint] = {

poolAdjacentViolators(
testData
.sortBy(_.features.toArray.head)
.cache()
.mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator)
.collect(), monotonicityConstraint)
.mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator)
.collect(), isotonic)
}
}

Expand All @@ -221,11 +188,11 @@ object IsotonicRegression {
* Each point describes a row of the data
* matrix A as well as the corresponding right hand side label y
* and weight as number of measurements
* @param monotonicityConstraint Isotonic (increasing) or Antitonic (decreasing) sequence
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
*/
def train(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint = Isotonic): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint)
isotonic: Boolean = true): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, isotonic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void runIsotonicRegressionUsingConstructor() {
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators();
IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), MonotonicityConstraint.Isotonic());
IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), true);

List<WeightedLabeledPoint> expected = IsotonicDataGenerator
.generateIsotonicInputAsList(
Expand All @@ -77,7 +77,7 @@ public void runIsotonicRegressionUsingStaticMethod() {
.generateIsotonicInputAsList(
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic());
IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true);

List<WeightedLabeledPoint> expected = IsotonicDataGenerator
.generateIsotonicInputAsList(
Expand All @@ -92,7 +92,7 @@ public void testPredictJavaRDD() {
.generateIsotonicInputAsList(
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), MonotonicityConstraint.Isotonic());
IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true);

JavaRDD<Vector> vectors = testRDD.map(new Function<WeightedLabeledPoint, Vector>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint.{Antitonic, Isotonic}
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.scalatest.{Matchers, FunSuite}
import WeightedLabeledPointConversions._
Expand All @@ -37,7 +36,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
}
Expand All @@ -46,7 +45,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(List[WeightedLabeledPoint]()).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(List())
}
Expand All @@ -55,7 +54,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(1))
}
Expand All @@ -64,7 +63,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
}
Expand All @@ -73,7 +72,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3))
}
Expand All @@ -82,7 +81,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3))
}
Expand All @@ -91,7 +90,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5))
}
Expand All @@ -100,7 +99,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0))
}
Expand All @@ -109,7 +108,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
}
Expand All @@ -118,7 +117,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2)))
}
Expand All @@ -127,7 +126,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions.map(p => p.copy(label = round(p.label))) should be
(generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1)))
Expand All @@ -137,7 +136,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions.map(p => p.copy(label = round(p.label))) should be
(generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5)))
Expand All @@ -147,7 +146,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0)))
}
Expand All @@ -156,7 +155,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predict(Vectors.dense(0)) should be(1)
model.predict(Vectors.dense(2)) should be(2)
Expand All @@ -168,7 +167,7 @@ class IsotonicRegressionSuite
val testRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Antitonic)
val model = alg.run(testRDD, false)

model.predict(Vectors.dense(0)) should be(7)
model.predict(Vectors.dense(2)) should be(5)
Expand All @@ -183,7 +182,7 @@ class IsotonicRegressionSuite
LabeledPoint(1, Vectors.dense(2)))).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)
val model = alg.run(testRDD, true)

model.predictions should be(generateIsotonicInput(1.5, 1.5))
}
Expand All @@ -201,7 +200,7 @@ class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkCont

// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = IsotonicRegression.train(points, Isotonic)
val model = IsotonicRegression.train(points, true)
val predictions = model.predict(points.map(_.features))
}
}

0 comments on commit 089bf86

Please sign in to comment.