From f1063e150f5e8a8ec4b654d708786d221295f96a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 20 Nov 2014 22:41:54 -0800 Subject: [PATCH 1/7] cache serialized java object --- python/pyspark/mllib/clustering.py | 5 ++--- python/pyspark/mllib/common.py | 4 +--- python/pyspark/mllib/regression.py | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index fe4c4cc5094d8..3b3ac33aec60b 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -80,9 +80,8 @@ class KMeans(object): @classmethod def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" - # cache serialized data to avoid objects over head in JVM - jcached = _to_java_object_rdd(rdd.map(_convert_to_vector), cache=True) - model = callMLlibFunc("trainKMeansModel", jcached, k, maxIterations, runs, + jrdd = _to_java_object_rdd(rdd.map(_convert_to_vector)) + model = callMLlibFunc("trainKMeansModel", jrdd.cache(), k, maxIterations, runs, initializationMode) centers = callJavaFunc(rdd.context, model.clusterCenters) return KMeansModel([c.toArray() for c in centers]) diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index c6149fe391ec8..33c49e2399908 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -54,15 +54,13 @@ def _new_smart_decode(obj): # this will call the MLlib version of pythonToJava() -def _to_java_object_rdd(rdd, cache=False): +def _to_java_object_rdd(rdd): """ Return an JavaRDD of Object by unpickling It will convert each Python object into Java object by Pyrolite, whenever the RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) - if cache: - rdd.cache() return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index f4f5e615fadc3..9e203c95471b8 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -129,7 +129,7 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights): if not isinstance(first, LabeledPoint): raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first) initial_weights = initial_weights or [0.0] * len(data.first().features) - weights, intercept = train_func(_to_java_object_rdd(data, cache=True), + weights, intercept = train_func(_to_java_object_rdd(data).cache(), _convert_to_vector(initial_weights)) return modelClass(weights, intercept) From c2bdfc2b11a81629e15b66cdd52793da0f0339f3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 00:05:47 -0800 Subject: [PATCH 2/7] refactor --- .../mllib/api/python/PythonMLLibAPI.scala | 80 +++++++------------ .../spark/mllib/clustering/KMeans.scala | 13 +-- .../GeneralizedLinearAlgorithm.scala | 13 +-- python/pyspark/mllib/clustering.py | 7 +- python/pyspark/mllib/recommendation.py | 4 +- python/pyspark/mllib/regression.py | 5 +- 6 files changed, 39 insertions(+), 83 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index b6f7618171224..dc0820a8e0033 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -74,12 +74,26 @@ class PythonMLLibAPI extends Serializable { learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel], data: JavaRDD[LabeledPoint], initialWeights: Vector): JList[Object] = { - // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD. - learner.disableUncachedWarning() - val model = learner.run(data.rdd, initialWeights) + val model = learner.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER), initialWeights) List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava } + /** + * Return the Updater from string + */ + def getUpdateFromString(regType: String): Updater = { + if (regType == "l2") { + new SquaredL2Updater + } else if (regType == "l1") { + new L1Updater + } else if (regType == null || regType == "none") { + new SimpleUpdater + } else { + throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter." + + " Can only be initialized using the following string values: ['l1', 'l2', None].") + } + } + /** * Java stub for Python mllib LinearRegressionWithSGD.train() */ @@ -99,16 +113,6 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) - if (regType == "l2") { - lrAlg.optimizer.setUpdater(new SquaredL2Updater) - } else if (regType == "l1") { - lrAlg.optimizer.setUpdater(new L1Updater) - } else if (regType == null) { - lrAlg.optimizer.setUpdater(new SimpleUpdater) - } else { - throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter." - + " Can only be initialized using the following string values: ['l1', 'l2', None].") - } trainRegressionModel( lrAlg, data, @@ -178,16 +182,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) - if (regType == "l2") { - SVMAlg.optimizer.setUpdater(new SquaredL2Updater) - } else if (regType == "l1") { - SVMAlg.optimizer.setUpdater(new L1Updater) - } else if (regType == null) { - SVMAlg.optimizer.setUpdater(new SimpleUpdater) - } else { - throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter." - + " Can only be initialized using the following string values: ['l1', 'l2', None].") - } + SVMAlg.optimizer.setUpdater(getUpdateFromString(regType)) trainRegressionModel( SVMAlg, data, @@ -213,16 +208,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) - if (regType == "l2") { - LogRegAlg.optimizer.setUpdater(new SquaredL2Updater) - } else if (regType == "l1") { - LogRegAlg.optimizer.setUpdater(new L1Updater) - } else if (regType == null) { - LogRegAlg.optimizer.setUpdater(new SimpleUpdater) - } else { - throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter." - + " Can only be initialized using the following string values: ['l1', 'l2', None].") - } + LogRegAlg.optimizer.setUpdater(getUpdateFromString(regType)) trainRegressionModel( LogRegAlg, data, @@ -248,16 +234,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setNumCorrections(corrections) .setConvergenceTol(tolerance) - if (regType == "l2") { - LogRegAlg.optimizer.setUpdater(new SquaredL2Updater) - } else if (regType == "l1") { - LogRegAlg.optimizer.setUpdater(new L1Updater) - } else if (regType == null) { - LogRegAlg.optimizer.setUpdater(new SimpleUpdater) - } else { - throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter." - + " Can only be initialized using the following string values: ['l1', 'l2', None].") - } + LogRegAlg.optimizer.setUpdater(getUpdateFromString(regType)) trainRegressionModel( LogRegAlg, data, @@ -289,9 +266,7 @@ class PythonMLLibAPI extends Serializable { .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) - // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD. - .disableUncachedWarning() - kMeansAlg.run(data.rdd) + kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) } /** @@ -333,7 +308,7 @@ class PythonMLLibAPI extends Serializable { if (seed != null) als.setSeed(seed) - val model = als.run(ratingsJRDD.rdd) + val model = als.run(ratingsJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) new MatrixFactorizationModelWrapper(model) } @@ -364,7 +339,7 @@ class PythonMLLibAPI extends Serializable { if (seed != null) als.setSeed(seed) - val model = als.run(ratingsJRDD.rdd) + val model = als.run(ratingsJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) new MatrixFactorizationModelWrapper(model) } @@ -495,8 +470,8 @@ class PythonMLLibAPI extends Serializable { categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap, minInstancesPerNode = minInstancesPerNode, minInfoGain = minInfoGain) - - DecisionTree.train(data.rdd, strategy) + val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) + DecisionTree.train(cached, strategy) } /** @@ -526,10 +501,11 @@ class PythonMLLibAPI extends Serializable { numClassesForClassification = numClasses, maxBins = maxBins, categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap) + val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) if (algo == Algo.Classification) { - RandomForest.trainClassifier(data.rdd, strategy, numTrees, featureSubsetStrategy, seed) + RandomForest.trainClassifier(cached, strategy, numTrees, featureSubsetStrategy, seed) } else { - RandomForest.trainRegressor(data.rdd, strategy, numTrees, featureSubsetStrategy, seed) + RandomForest.trainRegressor(cached, strategy, numTrees, featureSubsetStrategy, seed) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 7443f232ec3e7..34ea0de706f08 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -113,22 +113,13 @@ class KMeans private ( this } - /** Whether a warning should be logged if the input RDD is uncached. */ - private var warnOnUncachedInput = true - - /** Disable warnings about uncached input. */ - private[spark] def disableUncachedWarning(): this.type = { - warnOnUncachedInput = false - this - } - /** * Train a K-means model on the given set of points; `data` should be cached for high * performance, because this is an iterative algorithm. */ def run(data: RDD[Vector]): KMeansModel = { - if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) { + if (data.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } @@ -143,7 +134,7 @@ class KMeans private ( norms.unpersist() // Warn at the end of the run as well, for increased visibility. - if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) { + if (data.getStorageLevel == StorageLevel.NONE) { logWarning("The input data was not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 00dfc86c9e0bd..0287f04e2c777 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -136,15 +136,6 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] this } - /** Whether a warning should be logged if the input RDD is uncached. */ - private var warnOnUncachedInput = true - - /** Disable warnings about uncached input. */ - private[spark] def disableUncachedWarning(): this.type = { - warnOnUncachedInput = false - this - } - /** * Run the algorithm with the configured parameters on an input * RDD of LabeledPoint entries. @@ -161,7 +152,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] */ def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { - if (warnOnUncachedInput && input.getStorageLevel == StorageLevel.NONE) { + if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } @@ -241,7 +232,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] } // Warn at the end of the run as well, for increased visibility. - if (warnOnUncachedInput && input.getStorageLevel == StorageLevel.NONE) { + if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data was not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 3b3ac33aec60b..e2492eef5bd6a 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -16,7 +16,7 @@ # from pyspark import SparkContext -from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _to_java_object_rdd +from pyspark.mllib.common import callMLlibFunc, callJavaFunc from pyspark.mllib.linalg import SparseVector, _convert_to_vector __all__ = ['KMeansModel', 'KMeans'] @@ -80,9 +80,8 @@ class KMeans(object): @classmethod def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" - jrdd = _to_java_object_rdd(rdd.map(_convert_to_vector)) - model = callMLlibFunc("trainKMeansModel", jrdd.cache(), k, maxIterations, runs, - initializationMode) + model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations, + runs, initializationMode) centers = callJavaFunc(rdd.context, model.clusterCenters) return KMeansModel([c.toArray() for c in centers]) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 2bcbf2aaf8e3e..97ec74eda0b71 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -19,7 +19,7 @@ from pyspark import SparkContext from pyspark.rdd import RDD -from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, _to_java_object_rdd +from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating'] @@ -110,7 +110,7 @@ def _prepare(cls, ratings): ratings = ratings.map(lambda x: Rating(*x)) else: raise ValueError("rating should be RDD of Rating or tuple/list") - return _to_java_object_rdd(ratings, True) + return ratings @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 9e203c95471b8..210060140fd91 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -18,7 +18,7 @@ import numpy as np from numpy import array -from pyspark.mllib.common import callMLlibFunc, _to_java_object_rdd +from pyspark.mllib.common import callMLlibFunc from pyspark.mllib.linalg import SparseVector, _convert_to_vector __all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel', @@ -129,8 +129,7 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights): if not isinstance(first, LabeledPoint): raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first) initial_weights = initial_weights or [0.0] * len(data.first().features) - weights, intercept = train_func(_to_java_object_rdd(data).cache(), - _convert_to_vector(initial_weights)) + weights, intercept = train_func(data, _convert_to_vector(initial_weights)) return modelClass(weights, intercept) From dff33e12cc01c355fab400619cbb7d6d9534a2e7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 00:32:56 -0800 Subject: [PATCH 3/7] address comments --- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index dc0820a8e0033..0d7baa6ee7a14 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -74,7 +74,7 @@ class PythonMLLibAPI extends Serializable { learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel], data: JavaRDD[LabeledPoint], initialWeights: Vector): JList[Object] = { - val model = learner.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER), initialWeights) + val model = learner.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK), initialWeights) List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava } @@ -266,7 +266,7 @@ class PythonMLLibAPI extends Serializable { .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) - kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) + kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) } /** @@ -470,7 +470,7 @@ class PythonMLLibAPI extends Serializable { categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap, minInstancesPerNode = minInstancesPerNode, minInfoGain = minInfoGain) - val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) + val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK) DecisionTree.train(cached, strategy) } @@ -501,7 +501,7 @@ class PythonMLLibAPI extends Serializable { numClassesForClassification = numClasses, maxBins = maxBins, categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap) - val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) + val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK) if (algo == Algo.Classification) { RandomForest.trainClassifier(cached, strategy, numTrees, featureSubsetStrategy, seed) } else { From 7da03321aaaf1a0409eb158ddcfa2a0fdeaa2f8c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 08:37:23 -0800 Subject: [PATCH 4/7] add unpersist() --- .../mllib/api/python/PythonMLLibAPI.scala | 45 ++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 0d7baa6ee7a14..16273a7533853 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -74,8 +74,12 @@ class PythonMLLibAPI extends Serializable { learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel], data: JavaRDD[LabeledPoint], initialWeights: Vector): JList[Object] = { - val model = learner.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK), initialWeights) - List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava + try { + val model = learner.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK), initialWeights) + List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava + } finally { + data.rdd.unpersist(false) + } } /** @@ -89,7 +93,7 @@ class PythonMLLibAPI extends Serializable { } else if (regType == null || regType == "none") { new SimpleUpdater } else { - throw new java.lang.IllegalArgumentException("Invalid value for 'regType' parameter." + throw new IllegalArgumentException("Invalid value for 'regType' parameter." + " Can only be initialized using the following string values: ['l1', 'l2', None].") } } @@ -266,7 +270,11 @@ class PythonMLLibAPI extends Serializable { .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) - kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) + try { + kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) + } finally { + data.rdd.unpersist(false) + } } /** @@ -308,7 +316,7 @@ class PythonMLLibAPI extends Serializable { if (seed != null) als.setSeed(seed) - val model = als.run(ratingsJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) + val model = als.run(ratingsJRDD.rdd) new MatrixFactorizationModelWrapper(model) } @@ -339,7 +347,7 @@ class PythonMLLibAPI extends Serializable { if (seed != null) als.setSeed(seed) - val model = als.run(ratingsJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) + val model = als.run(ratingsJRDD.rdd) new MatrixFactorizationModelWrapper(model) } @@ -400,15 +408,15 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, numIterations: Int, seed: Long): Word2VecModelWrapper = { - val data = dataJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) val word2vec = new Word2Vec() .setVectorSize(vectorSize) .setLearningRate(learningRate) .setNumPartitions(numPartitions) .setNumIterations(numIterations) .setSeed(seed) + val data = dataJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) val model = word2vec.fit(data) - data.unpersist() + data.unpersist(false) new Word2VecModelWrapper(model) } @@ -470,8 +478,11 @@ class PythonMLLibAPI extends Serializable { categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap, minInstancesPerNode = minInstancesPerNode, minInfoGain = minInfoGain) - val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK) - DecisionTree.train(cached, strategy) + try { + DecisionTree.train(data.rdd.persist(StorageLevel.MEMORY_AND_DISK), strategy) + } finally { + data.rdd.unpersist(false) + } } /** @@ -502,10 +513,14 @@ class PythonMLLibAPI extends Serializable { maxBins = maxBins, categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap) val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK) - if (algo == Algo.Classification) { - RandomForest.trainClassifier(cached, strategy, numTrees, featureSubsetStrategy, seed) - } else { - RandomForest.trainRegressor(cached, strategy, numTrees, featureSubsetStrategy, seed) + try { + if (algo == Algo.Classification) { + RandomForest.trainClassifier(cached, strategy, numTrees, featureSubsetStrategy, seed) + } else { + RandomForest.trainRegressor(cached, strategy, numTrees, featureSubsetStrategy, seed) + } + } finally { + cached.unpersist(false) } } @@ -687,7 +702,7 @@ private[spark] object SerDe extends Serializable { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + name + "\n").getBytes()) + out.write((module + "\n" + name + "\n").getBytes) } else { pickler.save(this) // it will be memorized by Pickler saveState(obj, out, pickler) From 63b984e042c14e6f53ebcf2a622a66021dfb0b73 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 08:42:21 -0800 Subject: [PATCH 5/7] fix --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 16273a7533853..7fe7fc456dacd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -117,6 +117,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) + lrAlg.optimizer.setUpdater(getUpdateFromString(regType)) trainRegressionModel( lrAlg, data, @@ -414,10 +415,12 @@ class PythonMLLibAPI extends Serializable { .setNumPartitions(numPartitions) .setNumIterations(numIterations) .setSeed(seed) - val data = dataJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) - val model = word2vec.fit(data) - data.unpersist(false) - new Word2VecModelWrapper(model) + try { + val model = word2vec.fit(dataJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) + new Word2VecModelWrapper(model) + } finally { + dataJRDD.rdd.unpersist(false) + } } private[python] class Word2VecModelWrapper(model: Word2VecModel) { From 4b52edd5e16aa91b57840f505e9fb7b243f8a233 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 08:45:12 -0800 Subject: [PATCH 6/7] using named argument --- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 7fe7fc456dacd..803fbe0ba9997 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -78,7 +78,7 @@ class PythonMLLibAPI extends Serializable { val model = learner.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK), initialWeights) List(model.weights, model.intercept).map(_.asInstanceOf[Object]).asJava } finally { - data.rdd.unpersist(false) + data.rdd.unpersist(blocking = false) } } @@ -274,7 +274,7 @@ class PythonMLLibAPI extends Serializable { try { kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) } finally { - data.rdd.unpersist(false) + data.rdd.unpersist(blocking = false) } } @@ -419,7 +419,7 @@ class PythonMLLibAPI extends Serializable { val model = word2vec.fit(dataJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)) new Word2VecModelWrapper(model) } finally { - dataJRDD.rdd.unpersist(false) + dataJRDD.rdd.unpersist(blocking = false) } } @@ -484,7 +484,7 @@ class PythonMLLibAPI extends Serializable { try { DecisionTree.train(data.rdd.persist(StorageLevel.MEMORY_AND_DISK), strategy) } finally { - data.rdd.unpersist(false) + data.rdd.unpersist(blocking = false) } } @@ -523,7 +523,7 @@ class PythonMLLibAPI extends Serializable { RandomForest.trainRegressor(cached, strategy, numTrees, featureSubsetStrategy, seed) } } finally { - cached.unpersist(false) + cached.unpersist(blocking = false) } } From 7f6e6cec160b70aee48e72faab4fd84ec0e338f8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 12:54:30 -0800 Subject: [PATCH 7/7] Update -> Updater --- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 803fbe0ba9997..f04df1c156898 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -85,7 +85,7 @@ class PythonMLLibAPI extends Serializable { /** * Return the Updater from string */ - def getUpdateFromString(regType: String): Updater = { + def getUpdaterFromString(regType: String): Updater = { if (regType == "l2") { new SquaredL2Updater } else if (regType == "l1") { @@ -117,7 +117,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) - lrAlg.optimizer.setUpdater(getUpdateFromString(regType)) + lrAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( lrAlg, data, @@ -187,7 +187,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) - SVMAlg.optimizer.setUpdater(getUpdateFromString(regType)) + SVMAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( SVMAlg, data, @@ -213,7 +213,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setStepSize(stepSize) .setMiniBatchFraction(miniBatchFraction) - LogRegAlg.optimizer.setUpdater(getUpdateFromString(regType)) + LogRegAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( LogRegAlg, data, @@ -239,7 +239,7 @@ class PythonMLLibAPI extends Serializable { .setRegParam(regParam) .setNumCorrections(corrections) .setConvergenceTol(tolerance) - LogRegAlg.optimizer.setUpdater(getUpdateFromString(regType)) + LogRegAlg.optimizer.setUpdater(getUpdaterFromString(regType)) trainRegressionModel( LogRegAlg, data,