Skip to content

Commit

Permalink
[SPARK-6267] [MLLIB] Python API for IsotonicRegression
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/SPARK-6267

Author: Yanbo Liang <ybliang8@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes apache#5890 from yanboliang/spark-6267 and squashes the following commits:

f20541d [Yanbo Liang] Merge pull request #3 from mengxr/SPARK-6267
7f202f9 [Xiangrui Meng] use Vector to have the best Python 2&3 compatibility
4bccfee [Yanbo Liang] fix doctest
ec09412 [Yanbo Liang] fix typos
8214bbb [Yanbo Liang] fix code style
5c8ebe5 [Yanbo Liang] Python API for IsotonicRegression
  • Loading branch information
yanboliang authored and mengxr committed May 6, 2015
1 parent ba2b566 commit 7b14578
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,24 @@ private[python] class PythonMLLibAPI extends Serializable {
map(_.asInstanceOf[Object]).asJava
}

/**
* Java stub for Python mllib IsotonicRegression.run()
*/
def trainIsotonicRegressionModel(
data: JavaRDD[Vector],
isotonic: Boolean): JList[Object] = {
val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
val input = data.rdd.map { x =>
(x(0), x(1), x(2))
}.persist(StorageLevel.MEMORY_AND_DISK)
try {
val model = isotonicRegressionAlg.run(input)
List[AnyRef](model.boundaryVector, model.predictionVector).asJava
} finally {
data.rdd.unpersist(blocking = false)
}
}

/**
* Java stub for Python mllib KMeans.run()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.SQLContext

/**
* :: Experimental ::
Expand All @@ -57,6 +59,13 @@ class IsotonicRegressionModel (
assertOrdered(boundaries)
assertOrdered(predictions)(predictionOrd)

/** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
def this(boundaries: java.lang.Iterable[Double],
predictions: java.lang.Iterable[Double],
isotonic: java.lang.Boolean) = {
this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
}

/** Asserts the input array is monotone with the given ordering. */
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
var i = 1
Expand Down Expand Up @@ -132,6 +141,12 @@ class IsotonicRegressionModel (
}
}

/** A convenient method for boundaries called by the Python API. */
private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries)

/** A convenient method for boundaries called by the Python API. */
private[mllib] def predictionVector: Vector = Vectors.dense(predictions)

override def save(sc: SparkContext, path: String): Unit = {
IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
}
Expand Down
73 changes: 71 additions & 2 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import numpy as np
from numpy import array

from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader

__all__ = ['LabeledPoint', 'LinearModel',
'LinearRegressionModel', 'LinearRegressionWithSGD',
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
'LassoModel', 'LassoWithSGD']
'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
'IsotonicRegression']


class LabeledPoint(object):
Expand Down Expand Up @@ -396,6 +398,73 @@ def train(rdd, i):
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)


class IsotonicRegressionModel(Saveable, Loader):

"""Regression model for isotonic regression.
>>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
>>> irm = IsotonicRegression.train(sc.parallelize(data))
>>> irm.predict(3)
2.0
>>> irm.predict(5)
16.5
>>> irm.predict(sc.parallelize([3, 5])).collect()
[2.0, 16.5]
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> irm.save(sc, path)
>>> sameModel = IsotonicRegressionModel.load(sc, path)
>>> sameModel.predict(3)
2.0
>>> sameModel.predict(5)
16.5
>>> try:
... os.removedirs(path)
... except OSError:
... pass
"""

def __init__(self, boundaries, predictions, isotonic):
self.boundaries = boundaries
self.predictions = predictions
self.isotonic = isotonic

def predict(self, x):
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
return np.interp(x, self.boundaries, self.predictions)

def save(self, sc, path):
java_boundaries = _py2java(sc, self.boundaries.tolist())
java_predictions = _py2java(sc, self.predictions.tolist())
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
java_boundaries, java_predictions, self.isotonic)
java_model.save(sc._jsc.sc(), path)

@classmethod
def load(cls, sc, path):
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
sc._jsc.sc(), path)
py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)


class IsotonicRegression(object):
"""
Run IsotonicRegression algorithm to obtain isotonic regression model.
:param data: RDD of (label, feature, weight) tuples.
:param isotonic: Whether this is isotonic or antitonic.
"""
@classmethod
def train(cls, data, isotonic=True):
"""Train a isotonic regression model on the given data."""
boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
data.map(_convert_to_vector), bool(isotonic))
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)


def _test():
import doctest
from pyspark import SparkContext
Expand Down

0 comments on commit 7b14578

Please sign in to comment.