From 7668124d7d774b04ac99d683781acb2f4d9ea2c6 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 18 Jan 2015 17:38:24 -0800 Subject: [PATCH] minor updates --- .../apache/spark/mllib/clustering/KMeans.scala | 18 +++++++++--------- .../spark/mllib/clustering/KMeansSuite.scala | 14 +++++++------- python/pyspark/mllib/tests.py | 16 +++++++--------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index a167c26598bfb..6b5c934f015ba 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -19,14 +19,14 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer -import org.apache.spark.annotation.Experimental import org.apache.spark.Logging -import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom /** @@ -48,9 +48,9 @@ class KMeans private ( /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, System.nanoTime()}. + * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}. */ - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, System.nanoTime()) + def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong()) /** Set the number of clusters to create (k). Default: 2. */ def setK(k: Int): this.type = { @@ -345,17 +345,20 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @param seed random seed value for cluster initialization */ def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, - initializationMode: String): KMeansModel = { + initializationMode: String, + seed: Long): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) + .setSeed(seed) .run(data) } @@ -367,20 +370,17 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). - * @param seed random seed value for cluster initialization */ def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, - initializationMode: String, - seed: Long): KMeansModel = { + initializationMode: String): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) - .setSeed(seed) .run(data) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 5948aa1085ebc..caee5917000aa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -97,17 +97,17 @@ class KMeansSuite extends FunSuite with MLlibTestSparkContext { for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { // Create three deterministic models and compare cluster means - val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, + initializationMode = initMode, seed = 42) val centers1 = model1.clusterCenters - val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, + initializationMode = initMode, seed = 42) val centers2 = model2.clusterCenters - val model3 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) - val centers3 = model3.clusterCenters - - assert(centers1.deep == centers2.deep) - assert(centers1.deep == centers3.deep) + centers1.zip(centers2).foreach { case (c1, c2) => + assert(c1 ~== c2 absTol 1E-14) + } } } diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 20e451a7f93f8..fc575671139e2 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -117,7 +117,7 @@ class ListTests(PySparkTestCase): as NumPy arrays. """ - def test_clustering(self): + def test_kmeans(self): from pyspark.mllib.clustering import KMeans data = [ [0, 1.1], @@ -129,7 +129,7 @@ def test_clustering(self): self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) - def test_clustering_deterministic(self): + def test_kmeans_deterministic(self): from pyspark.mllib.clustering import KMeans X = range(0, 100, 10) Y = range(0, 100, 10) @@ -138,13 +138,11 @@ def test_clustering_deterministic(self): 3, initializationMode="k-means||", seed=42) clusters2 = KMeans.train(self.sc.parallelize(data), 3, initializationMode="k-means||", seed=42) - clusters3 = KMeans.train(self.sc.parallelize(data), - 3, initializationMode="k-means||", seed=42) - centers1 = array(clusters1.centers).flatten().tolist() - centers2 = array(clusters2.centers).flatten().tolist() - centers3 = array(clusters3.centers).flatten().tolist() - self.assertListEqual(centers1, centers2) - self.assertListEqual(centers1, centers3) + centers1 = clusters1.centers + centers2 = clusters2.centers + for c1, c2 in zip(centers1, centers2): + # TODO: Allow small numeric difference. + self.assertTrue(array_equal(c1, c2)) def test_classification(self): from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes