Skip to content

Commit

Permalink
SPARK-3278 Isotonic regression java api
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Jan 11, 2015
1 parent a24e29f commit 941fd1f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.mllib.regression

import org.apache.spark.api.java.{JavaRDD, JavaPairRDD}
import org.apache.spark.rdd.RDD

/**
Expand All @@ -30,9 +31,30 @@ class IsotonicRegressionModel (
val isotonic: Boolean)
extends Serializable {

/**
* Predict labels for provided features
*
* @param testData features to be labeled
* @return predicted labels
*/
def predict(testData: RDD[Double]): RDD[Double] =
testData.map(predict)

/**
* Predict labels for provided features
*
* @param testData features to be labeled
* @return predicted labels
*/
def predict(testData: JavaRDD[java.lang.Double]): RDD[java.lang.Double] =
testData.rdd.map(x => x.doubleValue()).map(predict)

/**
* Predict a single label
*
* @param testData feature to be labeled
* @return predicted label
*/
def predict(testData: Double): Double =
// Take the highest of data points smaller than our feature or data point with lowest feature
(predictions.head +: predictions.filter(y => y._2 <= testData)).last._1
Expand All @@ -59,7 +81,7 @@ trait IsotonicRegressionAlgorithm
/**
* Run algorithm to obtain isotonic regression model
*
* @param input data
* @param input (label, feature, weight)
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
*/
Expand Down Expand Up @@ -115,12 +137,11 @@ class PoolAdjacentViolators private [mllib]
}
}

def monotonicityConstraint(isotonic: Boolean): (Double, Double) => Boolean =
(x, y) => if(isotonic) {
x <= y
} else {
x >= y
}
val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y
val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y

def monotonicityConstraint(isotonic: Boolean) =
if(isotonic) isotonicConstraint else antitonicConstraint

val monotonicityConstraintHolds = monotonicityConstraint(isotonic)

Expand Down Expand Up @@ -179,12 +200,11 @@ class PoolAdjacentViolators private [mllib]
object IsotonicRegression {

/**
* Train a monotone regression model given an RDD of (label, features, weight).
* Currently only one dimensional algorithm is supported (features.length is one)
* Train a monotone regression model given an RDD of (label, feature, weight).
* Label is the dependent y value
* Weight of the data point is the number of measurements. Default is 1
*
* @param input RDD of (label, array of features, weight).
* @param input RDD of (label, feature, weight).
* Each point describes a row of the data
* matrix A as well as the corresponding right hand side label y
* and weight as number of measurements
Expand All @@ -195,4 +215,20 @@ object IsotonicRegression {
isotonic: Boolean = true): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, isotonic)
}

/**
* Train a monotone regression model given an RDD of (label, feature).
* Label is the dependent y value
* Weight defaults to 1
*
* @param input RDD of (label, feature).
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return
*/
def train(
input: JavaPairRDD[java.lang.Double, java.lang.Double],
isotonic: Boolean): IsotonicRegressionModel = {
new PoolAdjacentViolators()
.run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.util

import scala.collection.JavaConversions._
import java.lang.{Double => JDouble}

object IsotonicDataGenerator {

Expand All @@ -26,13 +27,11 @@ object IsotonicDataGenerator {
* @param labels list of labels for the data points
* @return Java List of input.
*/
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(Double, Double, Double)] = {
seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*))
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = {
seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2))))
//.map(d => new Tuple3(new java.lang.Double(d._1), new java.lang.Double(d._2), new java.lang.Double(d._3))))
}

def bam(d: Option[Double]): Double = d.get

/**
* Return an ordered sequence of labeled data points with default weights
* @param labels list of labels for the data points
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,22 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*//*
*/

package org.apache.spark.mllib.regression;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.util.IsotonicDataGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import scala.Tuple3;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

public class JavaIsotonicRegressionSuite implements Serializable {
Expand All @@ -50,52 +45,26 @@ public void tearDown() {
sc = null;
}

double difference(List<Tuple3<Double, Double, Double>> expected, IsotonicRegressionModel model) {
double difference(List<Tuple2<Double, Double>> expected, IsotonicRegressionModel model) {
double diff = 0;

for(int i = 0; i < model.predictions().length(); i++) {
Tuple3<Double, Double, Double> exp = expected.get(i);
Tuple2<Double, Double> exp = expected.get(i);
diff += Math.abs(model.predict(exp._2()) - exp._1());
}

return diff;
}

*/
/*@Test
public void runIsotonicRegressionUsingConstructor() {
JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(IsotonicDataGenerator
.generateIsotonicInputAsList(
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators();
IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), true);
List<Tuple3<Double, Double, Double>> expected = IsotonicDataGenerator
.generateIsotonicInputAsList(
new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12});
Assert.assertTrue(difference(expected, model) == 0);
}*//*
@Test
public void runIsotonicRegressionUsingStaticMethod() {
*/
/*JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(IsotonicDataGenerator
.generateIsotonicInputAsList(
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();*//*
*/
/*JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(Arrays.asList(new Tuple3(1.0, 1.0, 1.0)));*//*
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
IsotonicDataGenerator.generateIsotonicInputAsList(
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);

JavaPairRDD<Double, Double> testRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<Double, Double>(1.0, 1.0)));
IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true);
List<Tuple3<Double, Double, Double>> expected = IsotonicDataGenerator
List<Tuple2<Double, Double>> expected = IsotonicDataGenerator
.generateIsotonicInputAsList(
new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12});

Expand All @@ -104,23 +73,23 @@ public void runIsotonicRegressionUsingStaticMethod() {

@Test
public void testPredictJavaRDD() {
JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(IsotonicDataGenerator
.generateIsotonicInputAsList(
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
IsotonicDataGenerator.generateIsotonicInputAsList(
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();

IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true);
IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);

JavaRDD<Vector> vectors = testRDD.map(new Function<Tuple3<Double, Double, Double>, Vector>() {
JavaRDD<Double> testRDD = trainRDD.map(new Function<Tuple2<Double, Double>, Double>() {
@Override
public Vector call(Tuple3<Double, Double, Double> v) throws Exception {
return Vectors.dense(v._2());
public Double call(Tuple2<Double, Double> v) throws Exception {
return v._2();
}
});

List<Double> predictions = model.predict(vectors).collect();
Double[] predictions = model.predict(testRDD).collect();

Assert.assertTrue(predictions.get(0) == 1d);
Assert.assertTrue(predictions.get(11) == 12d);
Assert.assertTrue(predictions[0] == 1d);
Assert.assertTrue(predictions[11] == 12d);
}
}
*/

Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,13 @@ class IsotonicRegressionClusterSuite
extends FunSuite
with LocalClusterSparkContext {

//TODO: FIX
test("task size should be small in both training and prediction") {
val n = 5

val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1.toDouble))
val n = 135000

val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d))
val points = sc.parallelize(trainData, 1)

/*val points = sc.parallelize(0 until n, 2).mapPartitionsWithIndex { (idx, iter) =>
val random = new Random(idx)
iter.map(i => (random.nextDouble(), random.nextDouble(), 1))
}.cache()*/

// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = IsotonicRegression.train(points, true)
Expand Down

0 comments on commit 941fd1f

Please sign in to comment.