diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java index 76ebdccfd6b67..7b0ec36424e97 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java @@ -17,32 +17,33 @@ package org.apache.spark.mllib.examples; +import java.util.regex.Pattern; + import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; - -import java.util.Arrays; -import java.util.regex.Pattern; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; /** * Example using MLLib KMeans from Java. */ public final class JavaKMeans { - static class ParsePoint implements Function { + private static class ParsePoint implements Function { private static final Pattern SPACE = Pattern.compile(" "); @Override - public double[] call(String line) { + public Vector call(String line) { String[] tok = SPACE.split(line); double[] point = new double[tok.length]; for (int i = 0; i < tok.length; ++i) { point[i] = Double.parseDouble(tok[i]); } - return point; + return Vectors.dense(point); } } @@ -65,15 +66,15 @@ public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class)); - JavaRDD lines = sc.textFile(args[1]); + JavaRDD lines = sc.textFile(inputFile); - JavaRDD points = lines.map(new ParsePoint()); + JavaRDD points = lines.map(new ParsePoint()); - KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs); + KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL()); System.out.println("Cluster centers:"); - for (double[] center : model.clusterCenters()) { - System.out.println(" " + Arrays.toString(center)); + for (Vector center : model.clusterCenters()) { + System.out.println(" " + center); } double cost = model.computeCost(points.rdd()); System.out.println("Cost: " + cost); diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index efe99a31beac4..3449c698da60b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -16,14 +16,16 @@ */ package org.apache.spark.mllib.api.python + +import java.nio.{ByteBuffer, ByteOrder} + import org.apache.spark.api.java.JavaRDD -import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.recommendation._ +import org.apache.spark.mllib.regression._ import org.apache.spark.rdd.RDD -import java.nio.ByteBuffer -import java.nio.ByteOrder /** * The Java stubs necessary for the Python mllib bindings. @@ -205,10 +207,10 @@ class PythonMLLibAPI extends Serializable { def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int, maxIterations: Int, runs: Int, initializationMode: String): java.util.List[java.lang.Object] = { - val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes)) + val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes))) val model = KMeans.train(data, k, maxIterations, runs, initializationMode) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(serializeDoubleMatrix(model.clusterCenters)) + ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray))) ret } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index d09f29f882310..b412738e3f00a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} + import org.apache.spark.{Logging, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{Vector, Vectors} @@ -27,24 +28,6 @@ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.random.XORShiftRandom -/** - * A breeze vector with its norm for fast distance computation. - * - * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] - */ -private[clustering] -class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable { - - def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0)) - - def this(array: Array[Double]) = this(new BDV[Double](array)) - - def this(v: Vector) = this(v.toBreeze) - - /** Converts the vector to a dense vector. */ - def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm) -} - /** * K-means clustering with support for multiple parallel runs and a k-means++ like initialization * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, @@ -60,8 +43,7 @@ class KMeans private ( var initializationMode: String, var initializationSteps: Int, var epsilon: Double) - extends Serializable with Logging -{ + extends Serializable with Logging { def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) /** Set the number of clusters to create (k). Default: 2. */ @@ -127,15 +109,7 @@ class KMeans private ( * Train a K-means model on the given set of points; `data` should be cached for high * performance, because this is an iterative algorithm. */ - def run(data: RDD[Array[Double]]): KMeansModel = { - run(data.map(v => Vectors.dense(v))) - } - - /** - * Train a K-means model on the given set of points; `data` should be cached for high - * performance, because this is an iterative algorithm. - */ - def run(data: RDD[Vector])(implicit d: DummyImplicit): KMeansModel = { + def run(data: RDD[Vector]): KMeansModel = { // Compute squared norms and cache them. val norms = data.map(v => breezeNorm(v.toBreeze, 2.0)) norms.persist() @@ -248,9 +222,7 @@ class KMeans private ( logInfo(s"The cost for the best run is $minCost.") - new KMeansModel(centers(bestRun).map { v => - v.vector.toArray - }) + new KMeansModel(centers(bestRun).map(c => Vectors.fromBreeze(c.vector))) } /** @@ -332,36 +304,21 @@ object KMeans { val RANDOM = "random" val K_MEANS_PARALLEL = "k-means||" - def train( - data: RDD[Array[Double]], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String) - : KMeansModel = - { - new KMeans().setK(k) - .setMaxIterations(maxIterations) - .setRuns(runs) - .setInitializationMode(initializationMode) - .run(data) - } - - def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = { - train(data, k, maxIterations, runs, K_MEANS_PARALLEL) - } - - def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = { - train(data, k, maxIterations, 1, K_MEANS_PARALLEL) - } - + /** + * Trains a k-means model using the given set of parameters. + * + * @param data training points stored as `RDD[Array[Double]]` + * @param k number of clusters + * @param maxIterations max number of iterations + * @param runs number of parallel runs, defaults to 1. The best model is returned. + * @param initializationMode initialization model, either "random" or "k-means||" (default). + */ def train( data: RDD[Vector], k: Int, maxIterations: Int, - runs: Int, - initializationMode: String - )(implicit d: DummyImplicit): KMeansModel = { + runs: Int = 1, + initializationMode: String = K_MEANS_PARALLEL): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) @@ -369,16 +326,6 @@ object KMeans { .run(data) } - def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int) - (implicit d: DummyImplicit): KMeansModel = { - train(data, k, maxIterations, runs, K_MEANS_PARALLEL) - } - - def train(data: RDD[Vector], k: Int, maxIterations: Int) - (implicit d: DummyImplicit): KMeansModel = { - train(data, k, maxIterations, 1, K_MEANS_PARALLEL) - } - /** * Returns the index of the closest center to the given point, as well as the squared distance. */ @@ -431,14 +378,34 @@ object KMeans { val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt) val runs = if (args.length >= 5) args(4).toInt else 1 val sc = new SparkContext(master, "KMeans") - val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache() + val data = sc.textFile(inputFile) + .map(line => Vectors.dense(line.split(' ').map(_.toDouble))) + .cache() val model = KMeans.train(data, k, iters, runs) val cost = model.computeCost(data) println("Cluster centers:") for (c <- model.clusterCenters) { - println(" " + c.mkString(" ")) + println(" " + c) } println("Cost: " + cost) System.exit(0) } } + +/** + * A breeze vector with its norm for fast distance computation. + * + * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] + */ +private[clustering] +class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable { + + def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0)) + + def this(array: Array[Double]) = this(new BDV[Double](array)) + + def this(v: Vector) = this(v.toBreeze) + + /** Converts the vector to a dense vector. */ + def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 5ec263ff44fef..18abbf2758b86 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -24,16 +24,11 @@ import org.apache.spark.mllib.linalg.Vector /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ -class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable { +class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable { /** Total number of clusters. */ def k: Int = clusterCenters.length - /** Return the cluster index that a given point belongs to. */ - def predict(point: Array[Double]): Int = { - KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1 - } - /** Returns the cluster index that a given point belongs to. */ def predict(point: Vector): Int = { KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1 @@ -49,16 +44,7 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. */ - def computeCost(data: RDD[Array[Double]]): Double = { - val centersWithNorm = clusterCentersWithNorm - data.map(p => KMeans.pointCost(centersWithNorm, new BreezeVectorWithNorm(p))).sum() - } - - /** - * Return the K-means cost (sum of squared distances of points to their nearest center) for this - * model on the given data. - */ - def computeCost(data: RDD[Vector])(implicit d: DummyImplicit): Double = { + def computeCost(data: RDD[Vector]): Double = { val centersWithNorm = clusterCentersWithNorm data.map(p => KMeans.pointCost(centersWithNorm, new BreezeVectorWithNorm(p))).sum() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 16a19df4472f4..f772ff32bebe6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg import java.lang.{Iterable => JavaIterable} +import scala.annotation.varargs import scala.collection.JavaConverters._ -import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector, - SparseVector => BreezeSparseVector} +import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV} /** * Represents a numeric vector, whose index type is Int and value type is Double. @@ -34,10 +34,25 @@ trait Vector extends Serializable { */ def size: Int + /** + * Converts the instance to a double array. + */ + def toArray: Array[Double] + + override def equals(other: Any): Boolean = { + other match { + case v: Vector => + this.toArray.view.equals(v.toArray.view) + case _ => false + } + } + + override def hashCode(): Int = toArray.view.hashCode() + /** * Converts the instance to a breeze vector. */ - private[mllib] def toBreeze: BreezeVector[Double] + private[mllib] def toBreeze: BV[Double] } /** @@ -53,8 +68,17 @@ trait RandomAccessVector extends Vector { */ object Vectors { - /** Creates a dense vector. */ - def dense(values: Array[Double]): Vector = new DenseVector(values) + /** + * Creates a dense vector. + */ + @varargs + def dense(values: Double*): Vector = new DenseVector(values.toArray) + + // A dummy implicit is used to avoid signature collision with the one generated by @varargs. + /** + * Creates a dense vector from a double array. + */ + def dense(values: Array[Double])(implicit d: DummyImplicit): Vector = new DenseVector(values) /** * Creates a sparse vector providing its index array and value array. @@ -79,7 +103,7 @@ object Vectors { val (indices, values) = elements.sortBy(_._1).unzip var prev = -1 indices.foreach { i => - require(prev < i, "Found duplicate indices: " + i) + require(prev < i, s"Found duplicate indices: $i.") prev = i } require(prev < size) @@ -93,41 +117,39 @@ object Vectors { * @param size vector size. * @param elements vector elements in (index, value) pairs. */ - def sparse(size: Int, elements: JavaIterable[(Int, Double)]): Vector = + def sparse(size: Int, elements: JavaIterable[(Int, Double)]): Vector = { sparse(size, elements.asScala.toSeq) + } /** * Creates a vector instance from a breeze vector. */ - private[mllib] def fromBreeze(breezeVector: BreezeVector[Double]): Vector = { + private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = { breezeVector match { - case v: BreezeDenseVector[Double] => { + case v: BDV[Double] => require(v.offset == 0, s"Do not support non-zero offset ${v.offset}.") require(v.stride == 1, s"Do not support stride other than 1, but got ${v.stride}.") new DenseVector(v.data) - } - case v: BreezeSparseVector[Double] => { + case v: BSV[Double] => new SparseVector(v.length, v.index, v.data) - } - case v: BreezeVector[_] => { + case v: BV[_] => sys.error("Unsupported Breeze vector type: " + v.getClass.getName) - } } } } /** * A dense vector represented by a value array. - * - * @param values */ -class DenseVector(var values: Array[Double]) extends RandomAccessVector { +class DenseVector(val values: Array[Double]) extends RandomAccessVector { override def size: Int = values.length - override def toString = values.mkString("[", ",", "]") + override def toString: String = values.mkString("[", ",", "]") - private[mllib] override def toBreeze = new BreezeDenseVector[Double](values) + override def toArray: Array[Double] = values + + private[mllib] override def toBreeze: BV[Double] = new BDV[Double](values) } /** @@ -137,13 +159,21 @@ class DenseVector(var values: Array[Double]) extends RandomAccessVector { * @param indices index array, assume to be strictly increasing. * @param values value array, must have the same length as the index array. */ -class SparseVector(var n: Int, var indices: Array[Int], var values: Array[Double]) extends Vector { +class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double]) extends Vector { override def size: Int = n - override def toString = { + override def toString: String = { "(" + n + "," + indices.zip(values).mkString("[", "," ,"]") + ")" } - private[mllib] override def toBreeze = new BreezeSparseVector[Double](indices, values, n) + override def toArray: Array[Double] = { + val data = new Array[Double](n) + indices.view.zip(values).foreach { case (i, x) => + data.update(i, x) + } + data + } + + private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, n) } diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 33b99f4bd3bcf..49a614bd90cab 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -18,16 +18,19 @@ package org.apache.spark.mllib.clustering; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + +import com.google.common.collect.Lists; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; public class JavaKMeansSuite implements Serializable { private transient JavaSparkContext sc; @@ -44,72 +47,45 @@ public void tearDown() { System.clearProperty("spark.driver.port"); } - // L1 distance between two points - double distance1(double[] v1, double[] v2) { - double distance = 0.0; - for (int i = 0; i < v1.length; ++i) { - distance = Math.max(distance, Math.abs(v1[i] - v2[i])); - } - return distance; - } - - // Assert that two sets of points are equal, within EPSILON tolerance - void assertSetsEqual(double[][] v1, double[][] v2) { - double EPSILON = 1e-4; - Assert.assertTrue(v1.length == v2.length); - for (int i = 0; i < v1.length; ++i) { - double minDistance = Double.MAX_VALUE; - for (int j = 0; j < v2.length; ++j) { - minDistance = Math.min(minDistance, distance1(v1[i], v2[j])); - } - Assert.assertTrue(minDistance <= EPSILON); - } - - for (int i = 0; i < v2.length; ++i) { - double minDistance = Double.MAX_VALUE; - for (int j = 0; j < v1.length; ++j) { - minDistance = Math.min(minDistance, distance1(v2[i], v1[j])); - } - Assert.assertTrue(minDistance <= EPSILON); - } - } - - @Test public void runKMeansUsingStaticMethods() { - List points = new ArrayList(); - points.add(new double[]{1.0, 2.0, 6.0}); - points.add(new double[]{1.0, 3.0, 0.0}); - points.add(new double[]{1.0, 4.0, 6.0}); + List points = Lists.newArrayList( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) + ); - double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); - JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = KMeans.train(data.rdd(), 1, 1); - assertSetsEqual(model.clusterCenters(), expectedCenter); + JavaRDD data = sc.parallelize(points, 2); + KMeansModel model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.K_MEANS_PARALLEL()); + assertEquals(1, model.clusterCenters().length); + assertEquals(expectedCenter, model.clusterCenters()[0]); model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); - assertSetsEqual(model.clusterCenters(), expectedCenter); + assertEquals(expectedCenter, model.clusterCenters()[0]); } @Test public void runKMeansUsingConstructor() { - List points = new ArrayList(); - points.add(new double[]{1.0, 2.0, 6.0}); - points.add(new double[]{1.0, 3.0, 0.0}); - points.add(new double[]{1.0, 4.0, 6.0}); + List points = Lists.newArrayList( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) + ); - double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); - JavaRDD data = sc.parallelize(points, 2); + JavaRDD data = sc.parallelize(points, 2); KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); - assertSetsEqual(model.clusterCenters(), expectedCenter); - - model = new KMeans().setK(1) - .setMaxIterations(1) - .setRuns(1) - .setInitializationMode(KMeans.RANDOM()) - .run(data.rdd()); - assertSetsEqual(model.clusterCenters(), expectedCenter); + assertEquals(1, model.clusterCenters().length); + assertEquals(expectedCenter, model.clusterCenters()[0]); + + model = new KMeans() + .setK(1) + .setMaxIterations(1) + .setInitializationMode(KMeans.RANDOM()) + .run(data.rdd()); + assertEquals(expectedCenter, model.clusterCenters()[0]); } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index e73ea9b53918e..560a4ad71a4de 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -24,110 +24,77 @@ import org.apache.spark.mllib.linalg.Vectors class KMeansSuite extends FunSuite with LocalSparkContext { - val EPSILON = 1e-4 - import KMeans.{RANDOM, K_MEANS_PARALLEL} - def prettyPrint(point: Array[Double]): String = point.mkString("(", ", ", ")") - - def prettyPrint(points: Array[Array[Double]]): String = { - points.map(prettyPrint).mkString("(", "; ", ")") - } - - // L1 distance between two points - def distance1(v1: Array[Double], v2: Array[Double]): Double = { - v1.zip(v2).map{ case (a, b) => math.abs(a-b) }.max - } - - // Assert that two vectors are equal within tolerance EPSILON - def assertEqual(v1: Array[Double], v2: Array[Double]) { - def errorMessage = prettyPrint(v1) + " did not equal " + prettyPrint(v2) - assert(v1.length == v2.length, errorMessage) - assert(distance1(v1, v2) <= EPSILON, errorMessage) - } - - // Assert that two sets of points are equal, within EPSILON tolerance - def assertSetsEqual(set1: Array[Array[Double]], set2: Array[Array[Double]]) { - def errorMessage = prettyPrint(set1) + " did not equal " + prettyPrint(set2) - assert(set1.length == set2.length, errorMessage) - for (v <- set1) { - val closestDistance = set2.map(w => distance1(v, w)).min - if (closestDistance > EPSILON) { - fail(errorMessage) - } - } - for (v <- set2) { - val closestDistance = set1.map(w => distance1(v, w)).min - if (closestDistance > EPSILON) { - fail(errorMessage) - } - } - } - test("single cluster") { val data = sc.parallelize(Array( - Array(1.0, 2.0, 6.0), - Array(1.0, 3.0, 0.0), - Array(1.0, 4.0, 6.0) + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) )) + val center = Vectors.dense(1.0, 3.0, 4.0) + // No matter how many runs or iterations we use, we should get one cluster, // centered at the mean of the points var model = KMeans.train(data, k=1, maxIterations=1) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=2) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train( data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) } test("single cluster with big dataset") { val smallData = Array( - Array(1.0, 2.0, 6.0), - Array(1.0, 3.0, 0.0), - Array(1.0, 4.0, 6.0) + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0) ) val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) // No matter how many runs or iterations we use, we should get one cluster, // centered at the mean of the points + val center = Vectors.dense(1.0, 3.0, 4.0) + var model = KMeans.train(data, k=1, maxIterations=1) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.size === 1) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=2) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + assert(model.clusterCenters.head === center) } test("single cluster with sparse data") { @@ -150,42 +117,39 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // No matter how many runs or iterations we use, we should get one cluster, // centered at the mean of the points - val center = new Array[Double](n) - center(0) = 1.0 - center(1) = 3.0 - center(2) = 4.0 + val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0))) var model = KMeans.train(data, k=1, maxIterations=1) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=2) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=5) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=5) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) - assertSetsEqual(model.clusterCenters, Array(center)) + assert(model.clusterCenters.head === center) data.unpersist() } test("k-means|| initialization") { - val points = Array( - Array(1.0, 2.0, 6.0), - Array(1.0, 3.0, 0.0), - Array(1.0, 4.0, 6.0), - Array(1.0, 0.0, 1.0), - Array(1.0, 1.0, 1.0) + val points = Seq( + Vectors.dense(1.0, 2.0, 6.0), + Vectors.dense(1.0, 3.0, 0.0), + Vectors.dense(1.0, 4.0, 6.0), + Vectors.dense(1.0, 0.0, 1.0), + Vectors.dense(1.0, 1.0, 1.0) ) val rdd = sc.parallelize(points) @@ -194,26 +158,26 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // unselected point as long as it hasn't yet selected all of them var model = KMeans.train(rdd, k=5, maxIterations=1) - assertSetsEqual(model.clusterCenters, points) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) // Iterations of Lloyd's should not change the answer either model = KMeans.train(rdd, k=5, maxIterations=10) - assertSetsEqual(model.clusterCenters, points) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) // Neither should more runs model = KMeans.train(rdd, k=5, maxIterations=10, runs=5) - assertSetsEqual(model.clusterCenters, points) + assert(Set(model.clusterCenters: _*) === Set(points: _*)) } test("two clusters") { - val points = Array( - Array(0.0, 0.0), - Array(0.0, 0.1), - Array(0.1, 0.0), - Array(9.0, 0.0), - Array(9.0, 0.2), - Array(9.2, 0.0) - ).map(Vectors.dense) + val points = Seq( + Vectors.dense(0.0, 0.0), + Vectors.dense(0.0, 0.1), + Vectors.dense(0.1, 0.0), + Vectors.dense(9.0, 0.0), + Vectors.dense(9.0, 0.2), + Vectors.dense(9.2, 0.0) + ) val rdd = sc.parallelize(points, 3) for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index adf2005b84f1d..11056eb6d5611 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -28,10 +28,16 @@ class VectorsSuite extends FunSuite { val indices = Array(0, 3, 5, 10, 13) val values = Array(0.1, 0.5, 0.3, -0.8, -1.0) - test("dense vector construction") { + test("dense vector construction with varargs") { + val vec = Vectors.dense(arr: _*).asInstanceOf[DenseVector] + assert(vec.size === arr.length) + assert(vec.values.eq(arr)) + } + + test("dense vector construction from a double array") { val vec = Vectors.dense(arr).asInstanceOf[DenseVector] - assert(vec.size === arr.length) - assert(vec.values.eq(arr)) + assert(vec.size === arr.length) + assert(vec.values.eq(arr)) } test("sparse vector construction") { @@ -54,4 +60,25 @@ class VectorsSuite extends FunSuite { assert(vec.indices === indices) assert(vec.values === values) } + + test("vector equals") { + val dv1 = Vectors.dense(1.0, 0.0, 2.0) + val dv2 = Vectors.dense(1.0, 0.0, 2.0) + val sv1 = Vectors.sparse(3, Seq((0, 1.0), (2, 2.0))) + val sv2 = Vectors.sparse(3, Seq((0, 1.0), (2, 2.0))) + + val vectors = Seq(dv1, dv2, sv1, sv2) + + for (v <- vectors; u <- vectors) { + assert(v === u) + assert(v.## === u.##) + } + + val another = Vectors.dense(1.0, 1.0, 2.0) + + for (v <- vectors) { + assert(v != another) + assert(v.## != another.##) + } + } }