Skip to content

Commit

Permalink
[SPARK-41008][MLLIB] Follow-up isotonic regression features deduplica…
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

A follow-up on #38966 to update relevant documentation and remove redundant sort key.

### Why are the changes needed?

For isotonic regression, another method for breaking ties of repeated features was introduced in #38966. This will aggregate points having the same feature value by computing the weighted average of the labels.
- This only requires points to be sorted by features instead of features and labels. So, we should remove label as a secondary sorting key.
- Isotonic regression documentation needs to be updated to reflect the new behavior.

### Does this PR introduce _any_ user-facing change?

Isotonic regression documentation update. The documentation described the behavior of the algorithm when there are points in the input with repeated features. Since this behavior has changed, documentation needs to describe the new behavior.

### How was this patch tested?

Existing tests passed. No need to add new tests since existing tests are already comprehensive.

srowen

Closes #38996 from ahmed-mahran/ml-isotonic-reg-dups-follow-up.

Authored-by: Ahmed Mahran <ahmed.mahran@mashin.io>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
ahmed-mahran authored and srowen committed Dec 11, 2022
1 parent af33722 commit f92c827
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 62 deletions.
18 changes: 10 additions & 8 deletions docs/mllib-isotonic-regression.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ best fitting the original data points.
which uses an approach to
[parallelizing isotonic regression](https://doi.org/10.1007/978-3-642-99789-1_10).
The training input is an RDD of tuples of three double values that represent
label, feature and weight in this order. Additionally, IsotonicRegression algorithm has one
label, feature and weight in this order. In case there are multiple tuples with
the same feature then these tuples are aggregated into a single tuple as follows:

* Aggregated label is the weighted average of all labels.
* Aggregated feature is the unique feature value.
* Aggregated weight is the sum of all weights.

Additionally, IsotonicRegression algorithm has one
optional parameter called $isotonic$ defaulting to true.
This argument specifies if the isotonic regression is
isotonic (monotonically increasing) or antitonic (monotonically decreasing).
Expand All @@ -53,17 +60,12 @@ labels for both known and unknown features. The result of isotonic regression
is treated as piecewise linear function. The rules for prediction therefore are:

* If the prediction input exactly matches a training feature
then associated prediction is returned. In case there are multiple predictions with the same
feature then one of them is returned. Which one is undefined
(same as java.util.Arrays.binarySearch).
then associated prediction is returned.
* If the prediction input is lower or higher than all training features
then prediction with lowest or highest feature is returned respectively.
In case there are multiple predictions with the same feature
then the lowest or highest is returned respectively.
* If the prediction input falls between two training features then prediction is treated
as piecewise linear function and interpolated value is calculated from the
predictions of the two closest features. In case there are multiple values
with the same feature then the same rules as in previous point are used.
predictions of the two closest features.

### Examples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.Arrays.binarySearch
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.math3.util.Precision
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -272,8 +271,8 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
* If multiple labels share the same feature value then they are ordered before
* the algorithm is executed.
* If multiple labels share the same feature value then they are aggregated using
* the weighted average before the algorithm is executed.
* @return Isotonic regression model.
*/
@Since("1.3.0")
Expand All @@ -298,8 +297,8 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
* If multiple labels share the same feature value then they are ordered before
* the algorithm is executed.
* If multiple labels share the same feature value then they are aggregated using
* the weighted average before the algorithm is executed.
* @return Isotonic regression model.
*/
@Since("1.3.0")
Expand All @@ -310,21 +309,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
/**
* Aggregates points of duplicate feature values into a single point using as label the weighted
* average of the labels of the points with duplicate feature values. All points for a unique
* feature values are aggregated as:
* feature value are aggregated as:
*
* - Aggregated label is the weighted average of all labels
* - Aggregated feature is the weighted average of all equal features[1]
* - Aggregated weight is the sum of all weights
* - Aggregated label is the weighted average of all labels.
* - Aggregated feature is the unique feature value.
* - Aggregated weight is the sum of all weights.
*
* [1] Note: It is possible that feature values to be equal up to a resolution due to
* representation errors, since we cannot know which feature value to use in that case, we
* compute the weighted average of the features. Ideally, all feature values will be equal and
* the weighted average is just the value at any point.
*
* @param input
* Input data of tuples (label, feature, weight). Weights must be non-negative.
* @return
* Points with unique feature values.
* @param input Input data of tuples (label, feature, weight). Weights must be non-negative.
* @return Points with unique feature values.
*/
private[regression] def makeUnique(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
Expand All @@ -339,28 +331,28 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
if (cleanInput.length <= 1) {
cleanInput
} else {
// whether or not two double features are equal up to a precision
@inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b)

val pointsAccumulator = new IsotonicRegression.PointsAccumulator
var (_, prevFeature, _) = cleanInput.head

// Go through input points, merging all points with approximately equal feature values into
// a single point. Equality of features is defined by areEqual method. The label of the
// accumulated points is the weighted average of the labels of all points of equal feature
// value. It is possible that feature values to be equal up to a resolution due to
// representation errors, since we cannot know which feature value to use in that case,
// we compute the weighted average of the features.
cleanInput.foreach { case point @ (_, feature, _) =>
if (areEqual(feature, prevFeature)) {

// Go through input points, merging all points with equal feature values into a single point.
// Equality of features is defined by shouldAccumulate method. The label of the accumulated
// points is the weighted average of the labels of all points of equal feature value.

// Initialize with first point
pointsAccumulator := cleanInput.head
// Accumulate the rest
cleanInput.tail.foreach { case point @ (_, feature, _) =>
if (pointsAccumulator.shouldAccumulate(feature)) {
// Still on a duplicate feature, accumulate
pointsAccumulator += point
} else {
// A new unique feature encountered:
// - append the last accumulated point to unique features output
pointsAccumulator.appendToOutput()
// - and reset
pointsAccumulator := point
}
prevFeature = feature
}
// Append the last accumulated point
// Append the last accumulated point to unique features output
pointsAccumulator.appendToOutput()
pointsAccumulator.getOutput
}
Expand Down Expand Up @@ -488,14 +480,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
// Points with same or adjacent features must collocate within the same partition.
.partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
.values
// Lexicographically sort points by features then labels.
.mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
// Lexicographically sort points by features.
.mapPartitions(p => Iterator(p.toArray.sortBy(_._2)))
// Aggregate points with equal features into a single point.
.map(makeUnique)
.flatMap(poolAdjacentViolators)
.collect()
// Sort again because collect() doesn't promise ordering.
.sortBy(x => (x._2, x._1))
.sortBy(_._2)
poolAdjacentViolators(parallelStepResult)
}
}
Expand All @@ -511,30 +503,32 @@ object IsotonicRegression {
private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) =
(0d, 0d, 0d)

/** Whether or not this feature exactly equals the current accumulated feature. */
@inline def shouldAccumulate(feature: Double): Boolean = currentFeature == feature

/** Resets the current value of the point accumulator using the provided point. */
def :=(point: (Double, Double, Double)): Unit = {
@inline def :=(point: (Double, Double, Double)): Unit = {
val (label, feature, weight) = point
currentLabel = label * weight
currentFeature = feature * weight
currentFeature = feature
currentWeight = weight
}

/** Accumulates the provided point into the current value of the point accumulator. */
def +=(point: (Double, Double, Double)): Unit = {
val (label, feature, weight) = point
@inline def +=(point: (Double, Double, Double)): Unit = {
val (label, _, weight) = point
currentLabel += label * weight
currentFeature += feature * weight
currentWeight += weight
}

/** Appends the current value of the point accumulator to the output. */
def appendToOutput(): Unit =
@inline def appendToOutput(): Unit =
output += ((
currentLabel / currentWeight,
currentFeature / currentWeight,
currentFeature,
currentWeight))

/** Returns all accumulated points so far. */
def getOutput: Array[(Double, Double, Double)] = output.toArray
@inline def getOutput: Array[(Double, Double, Double)] = output.toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.mllib.regression

import org.apache.commons.math3.util.Precision
import org.scalatest.matchers.must.Matchers

import org.apache.spark.{SparkException, SparkFunSuite}
Expand Down Expand Up @@ -225,12 +224,18 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w

test("SPARK-41008 isotonic regression with duplicate features differs from sklearn") {
val model = runIsotonicRegressionOnInput(
Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
Seq((1, 0.6, 1), (0, 0.6, 1),
(0, 1.0 / 3, 1), (1, 1.0 / 3, 1), (0, 1.0 / 3, 1),
(1, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1), (0, 0.2, 1)),
true,
2)

assert(model.boundaries === Array(1.0, 3.0))
assert(model.predictions === Array(0.75, 0.75))
assert(model.boundaries === Array(0.2, 1.0 / 3, 0.6))
assert(model.predictions === Array(0.25, 1.0 / 3, 0.5))

assert(model.predict(0.6) === 0.5)
assert(model.predict(1.0 / 3) === 1.0 / 3)
assert(model.predict(0.2) === 0.25)
}

test("isotonic regression prediction") {
Expand Down Expand Up @@ -327,9 +332,8 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
test("makeUnique: handle duplicate features") {
val regressor = new IsotonicRegression()
import regressor.makeUnique
import Precision.EPSILON

// Note: input must be lexicographically sorted by (feature, label)
// Note: input must be lexicographically sorted by feature

// empty
assert(makeUnique(Array.empty) === Array.empty)
Expand Down Expand Up @@ -373,9 +377,14 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
(10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0),
(10.0, 3.0, 1.0)))

// duplicate up to resolution error
assert(
makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + EPSILON, 1.0))) ===
Array((1.0, 1.0, 3.0)))
// don't handle tiny representation errors
// e.g. infinitely adjacent doubles are already unique
val adjacentDoubles = {
// i-th next representable double to 1.0 is java.lang.Double.longBitsToDouble(base + i)
val base = java.lang.Double.doubleToRawLongBits(1.0)
(0 until 10).map(i => java.lang.Double.longBitsToDouble(base + i))
.map((1.0, _, 1.0)).toArray
}
assert(makeUnique(adjacentDoubles) === adjacentDoubles)
}
}

0 comments on commit f92c827

Please sign in to comment.