Skip to content

Commit

Permalink
SPARK-3278 changes after PR comments apache#3519. Change to IsotonicR…
Browse files Browse the repository at this point in the history
…egression api. Isotonic parameter now follows api of other mllib algorithms
  • Loading branch information
zapletal-martin committed Jan 29, 2015
1 parent 1fff77d commit d93c8f9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.regression

import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
Expand Down Expand Up @@ -53,8 +54,9 @@ class IsotonicRegressionModel (
* @param testData Features to be labeled.
* @return Predicted labels.
*/
def predict(testData: RDD[Double]): RDD[Double] =
def predict(testData: RDD[Double]): RDD[Double] = {
testData.map(predict)
}

/**
* Predict labels for provided features.
Expand All @@ -63,8 +65,9 @@ class IsotonicRegressionModel (
* @param testData Features to be labeled.
* @return Predicted labels.
*/
def predict(testData: JavaDoubleRDD): JavaDoubleRDD =
def predict(testData: JavaDoubleRDD): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]]))
}

/**
* Predict a single label.
Expand All @@ -75,8 +78,8 @@ class IsotonicRegressionModel (
* If testData exactly matches a boundary then associated prediction is directly returned
* If testData is lower or higher than all boundaries
* then first or last prediction is returned respectively
* If testData falls between two values in boundary then predictions is treated as piecewise
* linear function and interpolated value is returned
* If testData falls between two values in boundary then predictions is treated
* as piecewise linear function and interpolated value is returned
*/
def predict(testData: Double): Double = {

Expand All @@ -88,8 +91,8 @@ class IsotonicRegressionModel (

val normalisedInsertIndex = -insertIndex - 1

//Find if the index was lower than all values,
//higher than all values, inbetween two values or exact match.
// Find if the index was lower than all values,
// higher than all values, inbetween two values or exact match.
if (insertIndex == -1) {
predictions.head
} else if (normalisedInsertIndex == boundaries.length){
Expand Down Expand Up @@ -121,37 +124,50 @@ class IsotonicRegressionModel (
* "An approach to parallelizing isotonic regression."
* Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
*/
class IsotonicRegression extends Serializable {
class IsotonicRegression private (private var isotonic: Boolean) extends Serializable {

/**
* Run pool adjacent violators algorithm to obtain isotonic regression model.
* Constructs IsotonicRegression instance with default parameter isotonic = true
* @return New instance of IsotonicRegression
*/
def this() = this(true)

/**
* Sets the isotonic parameter
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return The instance of IsotonicRegression
*/
def setIsotonic(isotonic: Boolean): this.type = {
this.isotonic = isotonic
this
}

/**
* Run IsotonicRegression algorithm to obtain isotonic regression model.
*
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
*
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Isotonic regression model.
*/
def run(
input: RDD[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel =
def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
}

/**
/**
* Run pool adjacent violators algorithm to obtain isotonic regression model.
*
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
*
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Isotonic regression model.
*/
def run(
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
isotonic: Boolean): IsotonicRegressionModel =
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic)
input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]])
}

/**
* Creates isotonic regression model with given parameters.
Expand All @@ -164,11 +180,7 @@ class IsotonicRegression extends Serializable {
protected def createModel(
predictions: Array[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {

val labels = predictions.map(_._1)
val features = predictions.map(_._2)

new IsotonicRegressionModel(features, labels)
new IsotonicRegressionModel(predictions.map(_._2), predictions.map(_._1))
}

/**
Expand Down Expand Up @@ -249,4 +261,4 @@ class IsotonicRegression extends Serializable {

poolAdjacentViolators(parallelStepResult.collect(), isotonic)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private IsotonicRegressionModel runIsotonicRegression(double[] labels) {
JavaRDD<Tuple3<Double, Double, Double>> trainRDD =
sc.parallelize(generateIsotonicInput(labels)).cache();

return new IsotonicRegression().run(trainRDD, true);
return new IsotonicRegression().run(trainRDD);
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,35 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext

class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers {

private def round(d: Double) =
private def round(d: Double) = {
Math.round(d * 100).toDouble / 100
}

private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] =
private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d))
}

private def generateIsotonicInput(
labels: Seq[Double],
weights: Seq[Double]): Seq[(Double, Double, Double)] =
weights: Seq[Double]): Seq[(Double, Double, Double)] = {
labels.zip(1 to labels.size)
.zip(weights)
.map(point => (point._1._1, point._1._2.toDouble, point._2))
}

private def runIsotonicRegression(
labels: Seq[Double],
weights: Seq[Double],
isotonic: Boolean): IsotonicRegressionModel = {
val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache()
new IsotonicRegression().run(trainRDD, isotonic)
new IsotonicRegression().setIsotonic(isotonic).run(trainRDD)
}

private def runIsotonicRegression(
labels: Seq[Double],
isotonic: Boolean): IsotonicRegressionModel =
isotonic: Boolean): IsotonicRegressionModel = {
runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic)
}

test("increasing isotonic regression") {
val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true)
Expand Down Expand Up @@ -99,7 +103,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M

test("isotonic regression with unordered input") {
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache()
val model = new IsotonicRegression().run(trainRDD, true)
val model = new IsotonicRegression().run(trainRDD)

assert(model.predictions === Array(1, 2, 3, 4, 5))
}
Expand Down

0 comments on commit d93c8f9

Please sign in to comment.