Skip to content

Commit

Permalink
SPARK-3278 changes after PR feedback #3519. Binary search used for is…
Browse files Browse the repository at this point in the history
…otonic regression model predictions
  • Loading branch information
zapletal-martin committed Jan 23, 2015
1 parent fad4bf9 commit 9ae9d53
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,25 @@
package org.apache.spark.mllib.regression

import java.io.Serializable
import java.util.Arrays.binarySearch

import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.rdd.RDD

/**
* Regression model for Isotonic regression
*
* @param predictions Weights computed for every feature.
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @param features Array of features.
* @param labels Array of labels associated to the features at the same index.
*/
class IsotonicRegressionModel (
val predictions: Seq[(Double, Double, Double)],
val isotonic: Boolean)
features: Array[Double],
val labels: Array[Double])
extends Serializable {

/**
* Predict labels for provided features
* Using a piecewise constant function
*
* @param testData features to be labeled
* @return predicted labels
Expand All @@ -44,6 +46,7 @@ class IsotonicRegressionModel (

/**
* Predict labels for provided features
* Using a piecewise constant function
*
* @param testData features to be labeled
* @return predicted labels
Expand All @@ -53,13 +56,25 @@ class IsotonicRegressionModel (

/**
* Predict a single label
* Using a piecewise constant function
*
* @param testData feature to be labeled
* @return predicted label
*/
def predict(testData: Double): Double =
// Take the highest of data points smaller than our feature or data point with lowest feature
(predictions.head +: predictions.filter(y => y._2 <= testData)).last._1
def predict(testData: Double): Double = {
val result = binarySearch(features, testData)

val index =
if (result == -1) {
0
} else if (result < 0) {
-result - 2
} else {
result
}

labels(index)
}
}

/**
Expand Down Expand Up @@ -93,9 +108,13 @@ class IsotonicRegression
* @return isotonic regression model
*/
protected def createModel(
predictions: Seq[(Double, Double, Double)],
predictions: Array[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions, isotonic)

val labels = predictions.map(_._1)
val features = predictions.map(_._2)

new IsotonicRegressionModel(features, labels)
}

/**
Expand Down Expand Up @@ -167,7 +186,7 @@ class IsotonicRegression
*/
private def parallelPoolAdjacentViolators(
testData: RDD[(Double, Double, Double)],
isotonic: Boolean): Seq[(Double, Double, Double)] = {
isotonic: Boolean): Array[(Double, Double, Double)] = {

val parallelStepResult = testData
.sortBy(_._2)
Expand Down Expand Up @@ -213,7 +232,7 @@ object IsotonicRegression {
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegression()
.run(
input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())),
input.rdd.asInstanceOf[RDD[(Double, Double, Double)]],
isotonic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void tearDown() {
double difference(List<Tuple3<Double, Double, Double>> expected, IsotonicRegressionModel model) {
double diff = 0;

for(int i = 0; i < model.predictions().length(); i++) {
for(int i = 0; i < model.labels().length; i++) {
Tuple3<Double, Double, Double> exp = expected.get(i);
diff += Math.abs(model.predict(exp._2()) - exp._1());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(
generateIsotonicInput(
1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
model.labels should be(
Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
}

test("increasing isotonic regression using api") {
Expand All @@ -50,9 +49,8 @@ class IsotonicRegressionSuite

val model = IsotonicRegression.train(trainRDD, true)

model.predictions should be(
generateIsotonicInput(
1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
model.labels should be(
Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
}

test("isotonic regression with size 0") {
Expand All @@ -61,7 +59,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(List())
model.labels should be(Array())
}

test("isotonic regression with size 1") {
Expand All @@ -70,7 +68,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1))
model.labels should be(Array(1.0))
}

test("isotonic regression strictly increasing sequence") {
Expand All @@ -79,7 +77,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
model.labels should be(Array(1, 2, 3, 4, 5))
}

test("isotonic regression strictly decreasing sequence") {
Expand All @@ -88,7 +86,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3))
model.labels should be(Array(3, 3, 3, 3, 3))
}

test("isotonic regression with last element violating monotonicity") {
Expand All @@ -97,7 +95,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3))
model.labels should be(Array(1, 2, 3, 3, 3))
}

test("isotonic regression with first element violating monotonicity") {
Expand All @@ -106,7 +104,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5))
model.labels should be(Array(3, 3, 3, 4, 5))
}

test("isotonic regression with negative labels") {
Expand All @@ -115,7 +113,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0))
model.labels should be(Array(-1.5, -1.5, 0, 0, 0))
}

test("isotonic regression with unordered input") {
Expand All @@ -124,7 +122,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
model.labels should be(Array(1, 2, 3, 4, 5))
}

test("weighted isotonic regression") {
Expand All @@ -134,8 +132,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(
generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2)))
model.labels should be(Array(1, 2, 2.75, 2.75,2.75))
}

test("weighted isotonic regression with weights lower than 1") {
Expand All @@ -145,8 +142,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions.map(p => p.copy(_1 = round(p._1))) should be(
generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1)))
model.labels.map(round) should be(Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2))
}

test("weighted isotonic regression with negative weights") {
Expand All @@ -155,8 +151,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(
generateWeightedIsotonicInput(Seq(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6), Seq(-1, 1, -3, 1, -5)))
model.labels should be(Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6))
}

test("weighted isotonic regression with zero weights") {
Expand All @@ -165,7 +160,7 @@ class IsotonicRegressionSuite
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0)))
model.labels should be(Array(1, 2, 2, 2, 2))
}

test("isotonic regression prediction") {
Expand Down

0 comments on commit 9ae9d53

Please sign in to comment.