diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 84cff9bbf61d1..e03b784a368f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -289,10 +289,12 @@ private[python] class PythonMLLibAPI extends Serializable { data: JavaRDD[Vector], isotonic: Boolean): JList[Object] = { val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic) + val input = data.rdd.map { x => + (x(0), x(1), x(2)) + }.persist(StorageLevel.MEMORY_AND_DISK) try { - val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map { - x => (x(0), x(1), x(2)) }.persist(StorageLevel.MEMORY_AND_DISK)) - List(model.boundaries, model.predictions).map(_.asInstanceOf[Object]).asJava + val model = isotonicRegressionAlg.run(input) + List[AnyRef](model.boundaryVector, model.predictionVector).asJava } finally { data.rdd.unpersist(blocking = false) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 15a0760f5e6c1..be2a00c2dfea4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -28,12 +28,13 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.SQLContext /** * :: Experimental :: @@ -140,6 +141,12 @@ class IsotonicRegressionModel ( } } + /** A convenient method for boundaries called by the Python API. */ + private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries) + + /** A convenient method for boundaries called by the Python API. */ + private[mllib] def predictionVector: Vector = Vectors.dense(predictions) + override def save(sc: SparkContext, path: String): Unit = { IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic) } diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 8d6e3d85a4341..41bde2ce3e60b 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -445,17 +445,16 @@ def save(self, sc, path): def load(cls, sc, path): java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load( sc._jsc.sc(), path) - py_boundaries = _java2py(sc, java_model.boundaries()) - py_predictions = _java2py(sc, java_model.predictions()) - return IsotonicRegressionModel(np.array(py_boundaries), - np.array(py_predictions), java_model.isotonic) + py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray() + py_predictions = _java2py(sc, java_model.predictionVector()).toArray() + return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic) class IsotonicRegression(object): """ Run IsotonicRegression algorithm to obtain isotonic regression model. - :param data: RDD of data points + :param data: RDD of (label, feature, weight) tuples. :param isotonic: Whether this is isotonic or antitonic. """ @classmethod @@ -463,7 +462,7 @@ def train(cls, data, isotonic=True): """Train a isotonic regression model on the given data.""" boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel", data.map(_convert_to_vector), bool(isotonic)) - return IsotonicRegressionModel(np.array(boundaries), np.array(predictions), isotonic) + return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic) def _test():