Skip to content

Commit

Permalink
Python API for IsotonicRegression
Browse files Browse the repository at this point in the history
  • Loading branch information
yanboliang committed May 4, 2015
1 parent 343d3bf commit 5c8ebe5
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,22 @@ private[python] class PythonMLLibAPI extends Serializable {
map(_.asInstanceOf[Object]).asJava
}

/**
* Java stub for Python mllib IsotonicRegression.run()
*/
def trainIsotonicRegressionModel(
data: JavaRDD[Vector],
isotonic: Boolean): JList[Object] = {
val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
try {
val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map {
x => (x(0), x(1), x(2)) }.persist(StorageLevel.MEMORY_AND_DISK))
List(model.boundaries, model.predictions).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
}
}

/**
* Java stub for Python mllib KMeans.run()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.json4s._
Expand Down Expand Up @@ -57,6 +58,13 @@ class IsotonicRegressionModel (
assertOrdered(boundaries)
assertOrdered(predictions)(predictionOrd)

/** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
def this(boundaries: java.lang.Iterable[Double],
predictions: java.lang.Iterable[Double],
isotonic: java.lang.Boolean) = {
this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
}

/** Asserts the input array is monotone with the given ordering. */
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
var i = 1
Expand Down
75 changes: 74 additions & 1 deletion python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import numpy as np
from numpy import array

from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader

__all__ = ['LabeledPoint', 'LinearModel',
Expand Down Expand Up @@ -396,6 +397,78 @@ def train(rdd, i):
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)


class IsotonicRegressionModel(Saveable, Loader):

"""Regression model for isotonic regression.
>>> data = [(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)]
>>> irm = IsotonicRegression.train(sc.parallelize(data))
>>> irm.predict(1.5)
2.0
>>> irm.predict(2.5)
4.5
>>> irm.predict(4)
6.0
>>> irm.predict(sc.parallelize([1.5, 2.5, 4])).collect()
[2.0, 4.5, 6.0]
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> irm.save(sc, path)
>>> sameModel = IsotonicRegressionModel.load(sc, path)
>>> sameModel.predict(1.5)
2.0
>>> sameModel.predict(2.5)
4.5
>>> sameModel.predict(4)
6.0
>>> try:
... os.removedirs(path)
... except OSError:
... pass
"""

def __init__(self, boundaries, predictions, isotonic):
self.boundaries = boundaries
self.predictions = predictions
self.isotonic = isotonic

def predict(self, x):
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
return np.interp(x, self.boundaries, self.predictions)

def save(self, sc, path):
java_boundaries = _py2java(sc, self.boundaries.tolist())
java_predictions = _py2java(sc, self.predictions.tolist())
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
java_boundaries, java_predictions, self.isotonic)
java_model.save(sc._jsc.sc(), path)

@classmethod
def load(cls, sc, path):
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
sc._jsc.sc(), path)
py_boundaries = _java2py(sc, java_model.boundaries())
py_predictions = _java2py(sc, java_model.predictions())
return IsotonicRegressionModel(np.array(py_boundaries),
np.array(py_predictions), java_model.isotonic)


class IsotonicRegression(object):
"""
Run IsotonicRegression algorithm to obtain isotonic regression model.
:param data: RDD of data points
:param isotonic: Whether this is isotonic or antitonic.
"""
@classmethod
def train(cls, data, isotonic=True):
"""Train a isotonic regression model on the given data."""
boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
data.map(_convert_to_vector), bool(isotonic))
return IsotonicRegressionModel(np.array(boundaries), np.array(predictions), isotonic)


def _test():
import doctest
from pyspark import SparkContext
Expand Down

0 comments on commit 5c8ebe5

Please sign in to comment.