Skip to content

Commit

Permalink
SPARK-3278 changes after PR comments apache#3519. Java api changes, t…
Browse files Browse the repository at this point in the history
…est refactoring, comments and citations, isotonic regression model validations, linear interpolation for predictions
  • Loading branch information
zapletal-martin committed Jan 29, 2015
1 parent 12151e6 commit 1fff77d
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,88 +24,142 @@ import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.rdd.RDD

/**
* Regression model for Isotonic regression
* Regression model for isotonic regression.
*
* @param features Array of features.
* @param labels Array of labels associated to the features at the same index.
* @param boundaries Array of boundaries for which predictions are known.
* @param predictions Array of predictions associated to the boundaries at the same index.
*/
class IsotonicRegressionModel (
features: Array[Double],
val labels: Array[Double])
boundaries: Array[Double],
val predictions: Array[Double])
extends Serializable {

private def isSorted(xs: Array[Double]): Boolean = {
var i = 1
while (i < xs.length) {
if (xs(i) < xs(i - 1)) false
i += 1
}
true
}

assert(isSorted(boundaries))
assert(boundaries.length == predictions.length)

/**
* Predict labels for provided features
* Using a piecewise constant function
* Predict labels for provided features.
* Using a piecewise linear function.
*
* @param testData features to be labeled
* @return predicted labels
* @param testData Features to be labeled.
* @return Predicted labels.
*/
def predict(testData: RDD[Double]): RDD[Double] =
testData.map(predict)

/**
* Predict labels for provided features
* Using a piecewise constant function
* Predict labels for provided features.
* Using a piecewise linear function.
*
* @param testData features to be labeled
* @return predicted labels
* @param testData Features to be labeled.
* @return Predicted labels.
*/
def predict(testData: JavaRDD[java.lang.Double]): JavaDoubleRDD =
def predict(testData: JavaDoubleRDD): JavaDoubleRDD =
JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]]))

/**
* Predict a single label
* Using a piecewise constant function
* Predict a single label.
* Using a piecewise linear function.
*
* @param testData feature to be labeled
* @return predicted label
* @param testData Feature to be labeled.
* @return Predicted label.
* If testData exactly matches a boundary then associated prediction is directly returned
* If testData is lower or higher than all boundaries
* then first or last prediction is returned respectively
* If testData falls between two values in boundary then predictions is treated as piecewise
* linear function and interpolated value is returned
*/
def predict(testData: Double): Double = {
val result = binarySearch(features, testData)

val index =
if (result == -1) {
0
} else if (result < 0) {
-result - 2
} else {
result
}
def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = {
y1 + (y2 - y1) * (x - x1) / (x2 - x1)
}

labels(index)
val insertIndex = binarySearch(boundaries, testData)

val normalisedInsertIndex = -insertIndex - 1

//Find if the index was lower than all values,
//higher than all values, inbetween two values or exact match.
if (insertIndex == -1) {
predictions.head
} else if (normalisedInsertIndex == boundaries.length){
predictions.last
} else if (insertIndex < 0) {
linearInterpolation(
boundaries(normalisedInsertIndex - 1),
predictions(normalisedInsertIndex - 1),
boundaries(normalisedInsertIndex),
predictions(normalisedInsertIndex),
testData)
} else {
predictions(insertIndex)
}
}
}

/**
* Isotonic regression
* Currently implemented using parallel pool adjacent violators algorithm
* Isotonic regression.
* Currently implemented using parallelized pool adjacent violators algorithm.
* Currently only univariate (single feature) algorithm supported.
*
* Sequential PAV implementation based on:
* Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
* "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
*
* Sequential PAV parallelized as per:
* Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
* "An approach to parallelizing isotonic regression."
* Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
*/
class IsotonicRegression
extends Serializable {
class IsotonicRegression extends Serializable {

/**
* Run algorithm to obtain isotonic regression model
* Run pool adjacent violators algorithm to obtain isotonic regression model.
*
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
*
* @param input (label, feature, weight)
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Isotonic regression model.
*/
def run(
input: RDD[(Double, Double, Double)],
isotonic: Boolean = true): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, isotonic),
isotonic)
}
isotonic: Boolean): IsotonicRegressionModel =
createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)

/**
* Run pool adjacent violators algorithm to obtain isotonic regression model.
*
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
*
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Isotonic regression model.
*/
def run(
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
isotonic: Boolean): IsotonicRegressionModel =
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic)

/**
* Creates isotonic regression model with given parameters
* Creates isotonic regression model with given parameters.
*
* @param predictions labels estimated using isotonic regression algorithm.
* @param predictions Predictions calculated using pool adjacent violators algorithm.
* Used for predictions on new data points.
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Isotonic regression model.
*/
protected def createModel(
predictions: Array[(Double, Double, Double)],
Expand All @@ -118,30 +172,30 @@ class IsotonicRegression
}

/**
* Performs a pool adjacent violators algorithm (PAVA)
* Performs a pool adjacent violators algorithm (PAV).
* Uses approach with single processing of data where violators
* in previously processed data created by pooling are fixed immediatelly.
* Uses optimization of discovering monotonicity violating sequences (blocks)
* Method in situ mutates input array
* Uses optimization of discovering monotonicity violating sequences (blocks).
*
* @param in input data
* @param isotonic asc or desc
* @return result
* @param input Input data of tuples (label, feature, weight).
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
in: Array[(Double, Double, Double)],
input: Array[(Double, Double, Double)],
isotonic: Boolean): Array[(Double, Double, Double)] = {

// Pools sub array within given bounds assigning weighted average value to all elements
def pool(in: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = in.slice(start, end + 1)
// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.slice(start, end + 1)

val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum

var i = start
while (i <= end) {
in(i) = (weightedSum / weight, in(i)._2, in(i)._3)
input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
i = i + 1
}
}
Expand All @@ -150,39 +204,40 @@ class IsotonicRegression
(x, y) => if (isotonic) x <= y else x >= y

var i = 0
while (i < in.length) {
while (i < input.length) {
var j = i

// Find monotonicity violating sequence, if any
while (j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
// Find monotonicity violating sequence, if any.
while (j < input.length - 1 && !monotonicityConstraintHolds(input(j)._1, input(j + 1)._1)) {
j = j + 1
}

// If monotonicity was not violated, move to next data point
// If monotonicity was not violated, move to next data point.
if (i == j) {
i = i + 1
} else {
// Otherwise pool the violating sequence
// And check if pooling caused monotonicity violation in previously processed points
while (i >= 0 && !monotonicityConstraintHolds(in(i)._1, in(i + 1)._1)) {
pool(in, i, j)
// and check if pooling caused monotonicity violation in previously processed points.
while (i >= 0 && !monotonicityConstraintHolds(input(i)._1, input(i + 1)._1)) {
pool(input, i, j)
i = i - 1
}

i = j
}
}

in
input
}

/**
* Performs parallel pool adjacent violators algorithm
* Calls Pool adjacent violators on each partition and then again on the result
* Performs parallel pool adjacent violators algorithm.
* Performs Pool adjacent violators algorithm on each partition and then again on the result.
*
* @param testData input
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return result
* @param testData Input data of tuples (label, feature, weight).
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def parallelPoolAdjacentViolators(
testData: RDD[(Double, Double, Double)],
Expand All @@ -194,45 +249,4 @@ class IsotonicRegression

poolAdjacentViolators(parallelStepResult.collect(), isotonic)
}
}

/**
* Top-level methods for monotone regression (either isotonic or antitonic).
*/
object IsotonicRegression {

/**
* Train a monotone regression model given an RDD of (label, feature, weight).
* Label is the dependent y value
* Weight of the data point is the number of measurements. Default is 1
*
* @param input RDD of (label, feature, weight).
* Each point describes a row of the data
* matrix A as well as the corresponding right hand side label y
* and weight as number of measurements
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
*/
def train(
input: RDD[(Double, Double, Double)],
isotonic: Boolean = true): IsotonicRegressionModel = {
new IsotonicRegression().run(input, isotonic)
}

/**
* Train a monotone regression model given an RDD of (label, feature, weight).
* Label is the dependent y value
* Weight of the data point is the number of measurements. Default is 1
*
* @param input RDD of (label, feature, weight).
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return
*/
def train(
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegression()
.run(
input.rdd.asInstanceOf[RDD[(Double, Double, Double)]],
isotonic)
}
}
}
Loading

0 comments on commit 1fff77d

Please sign in to comment.