diff --git a/NOTICE b/NOTICE index 7cbb114b2ae2d..33e19418f4e56 100644 --- a/NOTICE +++ b/NOTICE @@ -3,3 +3,18 @@ Copyright 2013 The Apache Software Foundation. This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + +Numerical linear algebra support in MLlib is provided by the breeze package, +which depends on the following packages that are not distributed under +Apache authorized licenses: + +- netlib-core, which is open source software written by Samuel Halliday, + and copyright by the University of Tennessee, the University of Tennessee + Research Foundation, the University of California at Berkeley, and the + University of Colorado at Denver. The original software is available from + https://github.com/fommil/netlib-java + +- JTransforms, which is open source software written by Piotr Wendykier, + and distributed under the the terms of the MPL/LGPL/GPL tri-license. + The original software is available from + https://sites.google.com/site/piotrwendykier/software/jtransforms diff --git a/mllib/pom.xml b/mllib/pom.xml index 760a2a85d5ffa..44dfb7dfb11c1 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -60,6 +60,11 @@ jblas 1.2.3 + + org.scalanlp + breeze_${scala.binary.version} + 0.7-SNAPSHOT + org.scalatest scalatest_${scala.binary.version} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index e508b76c3f8c5..a6ecf64922713 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -19,16 +19,15 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer -import org.jblas.DoubleMatrix +import breeze.linalg.{DenseVector => BDV, Vector => BV, squaredDistance => breezeSquaredDistance} -import org.apache.spark.SparkContext +import org.apache.spark.{Logging, SparkContext} import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD import org.apache.spark.util.random.XORShiftRandom - /** * K-means clustering with support for multiple parallel runs and a k-means++ like initialization * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, @@ -46,8 +45,6 @@ class KMeans private ( var epsilon: Double) extends Serializable with Logging { - private type ClusterCenters = Array[Array[Double]] - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) /** Set the number of clusters to create (k). Default: 2. */ @@ -114,6 +111,23 @@ class KMeans private ( * performance, because this is an iterative algorithm. */ def run(data: RDD[Array[Double]]): KMeansModel = { + val breezeData = data.map(v => new BDV[Double](v).asInstanceOf[BV[Double]]) + runBreeze(breezeData) + } + + /** + * Train a K-means model on the given set of points; `data` should be cached for high + * performance, because this is an iterative algorithm. + */ + def run(data: RDD[Vector])(implicit d: DummyImplicit): KMeansModel = { + val breezeData = data.map(v => v.toBreeze) + runBreeze(breezeData) + } + + /** + * Implementation using Breeze. + */ + private def runBreeze(data: RDD[BV[Double]]): KMeansModel = { // TODO: check whether data is persistent; this needs RDD.storageLevel to be publicly readable val sc = data.sparkContext @@ -132,9 +146,9 @@ class KMeans private ( // Execute iterations of Lloyd's algorithm until all runs have converged while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (DoubleMatrix, Long) + type WeightedPoint = (BDV[Double], Long) def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { - (p1._1.addi(p2._1), p1._2 + p2._2) + (p1._1 += p2._1, p1._2 + p2._2) } val activeCenters = activeRuns.map(r => centers(r)).toArray @@ -146,13 +160,13 @@ class KMeans private ( val k = activeCenters(0).length val dims = activeCenters(0)(0).length - val sums = Array.fill(runs, k)(new DoubleMatrix(dims)) + val sums = Array.fill(runs, k)(BDV.zeros[Double](dims)) val counts = Array.fill(runs, k)(0L) for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) { val (bestCenter, cost) = KMeans.findClosest(centers, point) costAccums(runIndex) += cost - sums(runIndex)(bestCenter).addi(new DoubleMatrix(point)) + sums(runIndex)(bestCenter) += point counts(runIndex)(bestCenter) += 1 } @@ -168,8 +182,9 @@ class KMeans private ( for (j <- 0 until k) { val (sum, count) = totalContribs((i, j)) if (count != 0) { - val newCenter = sum.divi(count).data - if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { + sum /= count.toDouble + val newCenter = sum + if (breezeSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { changed = true } centers(run)(j) = newCenter @@ -187,16 +202,20 @@ class KMeans private ( } val bestRun = costs.zipWithIndex.min._2 - new KMeansModel(centers(bestRun)) + new KMeansModel(centers(bestRun).map { v => + v.toArray + }) } /** * Initialize `runs` sets of cluster centers at random. */ - private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = { + private def initRandom(data: RDD[BV[Double]]): Array[Array[BV[Double]]] = { // Sample all the cluster centers in one pass to avoid repeated scans val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq - Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray) + Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => + v.toDenseVector + }.toArray) } /** @@ -208,41 +227,39 @@ class KMeans private ( * * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. */ - private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { + private def initKMeansParallel(data: RDD[BV[Double]]): Array[Array[BV[Double]]] = { // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq - val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) + val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDenseVector)) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step <- 0 until initializationSteps) { - val centerArrays = centers.map(_.toArray) val sumCosts = data.flatMap { point => - for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point)) + for (r <- 0 until runs) yield (r, KMeans.pointCost(centers(r), point)) }.reduceByKey(_ + _).collectAsMap() val chosen = data.mapPartitionsWithIndex { (index, points) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) for { p <- points r <- 0 until runs - if rand.nextDouble() < KMeans.pointCost(centerArrays(r), p) * 2 * k / sumCosts(r) + if rand.nextDouble() < KMeans.pointCost(centers(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() for ((r, p) <- chosen) { - centers(r) += p + centers(r) += p.toDenseVector } } // Finally, we might have a set of more than k candidate centers for each run; weigh each // candidate by the number of points in the dataset mapping to it and run a local k-means++ // on the weighted centers to pick just k of them - val centerArrays = centers.map(_.toArray) val weightMap = data.flatMap { p => - for (r <- 0 until runs) yield ((r, KMeans.findClosest(centerArrays(r), p)._1), 1.0) + for (r <- 0 until runs) yield ((r, KMeans.findClosest(centers(r), p)._1), 1.0) }.reduceByKey(_ + _).collectAsMap() val finalCenters = (0 until runs).map { r => - val myCenters = centers(r).toArray + val myCenters = centers(r).toArray.asInstanceOf[Array[BV[Double]]] val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) } @@ -256,6 +273,7 @@ class KMeans private ( * Top-level methods for calling K-means clustering. */ object KMeans { + // Initialization mode names val RANDOM = "random" val K_MEANS_PARALLEL = "k-means||" @@ -268,6 +286,28 @@ object KMeans { initializationMode: String) : KMeansModel = { + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .setRuns(runs) + .setInitializationMode(initializationMode) + .run(data) + } + + def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = { + train(data, k, maxIterations, runs, K_MEANS_PARALLEL) + } + + def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = { + train(data, k, maxIterations, 1, K_MEANS_PARALLEL) + } + + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String + )(implicit d: DummyImplicit): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) @@ -275,11 +315,13 @@ object KMeans { .run(data) } - def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = { + def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int) + (implicit d: DummyImplicit): KMeansModel = { train(data, k, maxIterations, runs, K_MEANS_PARALLEL) } - def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = { + def train(data: RDD[Vector], k: Int, maxIterations: Int) + (implicit d: DummyImplicit): KMeansModel = { train(data, k, maxIterations, 1, K_MEANS_PARALLEL) } @@ -301,6 +343,25 @@ object KMeans { (bestIndex, bestDistance) } + /** + * Returns the index of the closest center to the given point, as well as the squared distance. + */ + private[mllib] def findClosest(centers: TraversableOnce[BV[Double]], point: BV[Double]) + : (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + centers.foreach { v => + val distance: Double = breezeSquaredDistance(v, point) + if (distance < bestDistance) { + bestDistance = distance + bestIndex = i + } + i += 1 + } + (bestIndex, bestDistance) + } + /** * Return the K-means cost of a given point against the given cluster centers. */ @@ -315,6 +376,12 @@ object KMeans { bestDistance } + /** + * Returns the K-means cost of a given point against the given cluster centers. + */ + private[mllib] def pointCost(centers: TraversableOnce[BV[Double]], point: BV[Double]): Double = + findClosest(centers, point)._2 + def main(args: Array[String]) { if (args.length < 4) { println("Usage: KMeans []") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 980be931576dc..06cf2e3ceb36a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -17,13 +17,21 @@ package org.apache.spark.mllib.clustering +import breeze.linalg.{DenseVector => BreezeDenseVector} + import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable { + + private val breezeClusterCenters = clusterCenters.map { v => + new BreezeDenseVector[Double](v) + } + /** Total number of clusters. */ def k: Int = clusterCenters.length @@ -32,6 +40,10 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable KMeans.findClosest(clusterCenters, point)._1 } + def predict(point: Vector): Int = { + KMeans.findClosest(breezeClusterCenters, point.toBreeze)._1 + } + /** * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. @@ -39,4 +51,12 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable def computeCost(data: RDD[Array[Double]]): Double = { data.map(p => KMeans.pointCost(clusterCenters, p)).sum() } + + /** + * Return the K-means cost (sum of squared distances of points to their nearest center) for this + * model on the given data. + */ + def computeCost(data: RDD[Vector])(implicit d: DummyImplicit): Double = { + data.map(p => KMeans.pointCost(breezeClusterCenters, p.toBreeze)).sum() + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala index baf8251d8fc53..9226538fac3ad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala @@ -19,31 +19,43 @@ package org.apache.spark.mllib.clustering import scala.util.Random -import org.jblas.{DoubleMatrix, SimpleBlas} +import breeze.linalg.{Vector => BV, DenseVector => BDV} /** * An utility object to run K-means locally. This is private to the ML package because it's used * in the initialization of KMeans but not meant to be publicly exposed. */ private[mllib] object LocalKMeans { + + def kMeansPlusPlus( + seed: Int, + points: Array[Array[Double]], + weights: Array[Double], + k: Int, + maxIterations: Int + ): Array[Array[Double]] = { + val breezePoints = points.map(v => new BDV[Double](v).asInstanceOf[BV[Double]]) + val breezeCenters = kMeansPlusPlus(seed, breezePoints, weights, k, maxIterations) + breezeCenters.map(_.toArray) + } + /** * Run K-means++ on the weighted point set `points`. This first does the K-means++ - * initialization procedure and then roudns of Lloyd's algorithm. + * initialization procedure and then rounds of Lloyd's algorithm. */ def kMeansPlusPlus( seed: Int, - points: Array[Array[Double]], + points: Array[BV[Double]], weights: Array[Double], k: Int, - maxIterations: Int) - : Array[Array[Double]] = - { + maxIterations: Int + )(implicit d: DummyImplicit): Array[BV[Double]] = { val rand = new Random(seed) val dimensions = points(0).length - val centers = new Array[Array[Double]](k) + val centers = new Array[BV[Double]](k) - // Initialize centers by sampling using the k-means++ procedure - centers(0) = pickWeighted(rand, points, weights) + // Initialize centers by sampling using the k-means++ procedure. + centers(0) = (pickWeighted(rand, points, weights)).toDenseVector for (i <- 1 until k) { // Pick the next center with a probability proportional to cost under current centers val curCenters = centers.slice(0, i) @@ -57,7 +69,7 @@ private[mllib] object LocalKMeans { cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j)) j += 1 } - centers(i) = points(j-1) + centers(i) = points(j-1).toDenseVector } // Run up to maxIterations iterations of Lloyd's algorithm @@ -66,11 +78,13 @@ private[mllib] object LocalKMeans { var moved = true while (moved && iteration < maxIterations) { moved = false - val sums = Array.fill(k)(new DoubleMatrix(dimensions)) + val sums = Array.fill(k)( + new BDV[Double](new Array[Double](dimensions)).asInstanceOf[BV[Double]] + ) val counts = Array.fill(k)(0.0) for ((p, i) <- points.zipWithIndex) { val index = KMeans.findClosest(centers, p)._1 - SimpleBlas.axpy(weights(i), new DoubleMatrix(p), sums(index)) + breeze.linalg.axpy(weights(i), p, sums(index)) counts(index) += weights(i) if (index != oldClosest(i)) { moved = true @@ -81,9 +95,10 @@ private[mllib] object LocalKMeans { for (i <- 0 until k) { if (counts(i) == 0.0) { // Assign center to a random point - centers(i) = points(rand.nextInt(points.length)) + centers(i) = points(rand.nextInt(points.length)).toDenseVector } else { - centers(i) = sums(i).divi(counts(i)).data + sums(i) /= counts(i) + centers(i) = sums(i) } } iteration += 1 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala new file mode 100644 index 0000000000000..b95889f9a44ff --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector, + SparseVector => BreezeSparseVector} + +/** + * Represents a numeric vector, whose index type is Int and value type is Double. + */ +trait Vector extends Serializable { + + /** + * Size of the vector. + */ + def size: Int + + /** + * Converts the instance to a Mahout vector wrapper. + */ + private[mllib] def toBreeze: BreezeVector[Double] +} + +/** + * Represents a vector with random access to its elements. + * + */ +trait RandomAccessVector extends Vector { + // empty +} + +/** + * Factory methods for [[org.apache.spark.mllib.linalg.Vector]]. + */ +object Vectors { + + /** Creates a dense vector. */ + def dense(values: Array[Double]): Vector = new DenseVector(values) + + /** + * Creates a sparse vector providing its index array and value array. + * + * @param size vector size. + * @param indices index array, must be strictly increasing. + * @param values value array, must have the same length as indices. + */ + def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector = + new SparseVector(size, indices, values) + + /** + * Creates a sparse vector using unordered (index, value) pairs. + * + * @param size vector size. + * @param elements vector elements in (index, value) pairs. + */ + def sparse(size: Int, elements: Iterable[(Int, Double)]): Vector = { + + require(size > 0) + + val (indices, values) = elements.toArray.sortBy(_._1).unzip + var prev = -1 + indices.foreach { i => + require(prev < i, "Found duplicate indices: " + i) + prev = i + } + require(prev < size) + + new SparseVector(size, indices.toArray, values.toArray) + } + + /** + * Creates a vector instance from a breeze vector. + */ + private[mllib] def fromBreeze(breezeVector: BreezeVector[Double]): Vector = { + breezeVector match { + case v: BreezeDenseVector[Double] => { + require(v.offset == 0) + require(v.stride == 1) + new DenseVector(v.data) + } + case v: BreezeSparseVector[Double] => { + new SparseVector(v.length, v.index, v.data) + } + case v: BreezeVector[_] => { + sys.error("Unsupported Breeze vector type: " + v.getClass.getName) + } + } + } +} + +/** + * A dense vector represented by a value array. + * + * @param values + */ +class DenseVector(var values: Array[Double]) extends RandomAccessVector { + + override def size: Int = values.length + + override def toString = values.mkString("[", ",", "]") + + private[mllib] override def toBreeze = new BreezeDenseVector[Double](values) +} + +/** + * A sparse vector represented by an index array and an value array. + * + * @param n size of the vector. + * @param indices index array, assume to be strictly increasing. + * @param values value array, must have the same length as the index array. + */ +class SparseVector(var n: Int, var indices: Array[Int], var values: Array[Double]) extends Vector { + + override def size: Int = n + + override def toString = { + "(" + n + "," + indices.zip(values).mkString("[", "," ,"]") + ")" + } + + private[mllib] override def toBreeze = new BreezeSparseVector[Double](indices, values, n) +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 4ef1d1f64ff06..cabadd15731cb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -17,11 +17,10 @@ package org.apache.spark.mllib.clustering - -import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.linalg.{Vectors, Vector} class KMeansSuite extends FunSuite with LocalSparkContext { @@ -131,6 +130,45 @@ class KMeansSuite extends FunSuite with LocalSparkContext { assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) } + test("single cluster with sparse data") { + val n = 1000 + val smallData = Array( + Vectors.sparse(n, Seq((0, 1.0), (1, 2.0), (2, 6.0))), + Vectors.sparse(n, Seq((0, 1.0), (1, 3.0))), + Vectors.sparse(n, Seq((0, 1.0), (1, 4.0), (2, 6.0))) + ) + val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + val center = new Array[Double](n) + center(0) = 1.0 + center(1) = 3.0 + center(2) = 4.0 + + var model = KMeans.train(data, k=1, maxIterations=1) + assertSetsEqual(model.clusterCenters, Array(center)) + + model = KMeans.train(data, k=1, maxIterations=2) + assertSetsEqual(model.clusterCenters, Array(center)) + + model = KMeans.train(data, k=1, maxIterations=5) + assertSetsEqual(model.clusterCenters, Array(center)) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assertSetsEqual(model.clusterCenters, Array(center)) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assertSetsEqual(model.clusterCenters, Array(center)) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assertSetsEqual(model.clusterCenters, Array(center)) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assertSetsEqual(model.clusterCenters, Array(center)) + } + test("k-means|| initialization") { val points = Array( Array(1.0, 2.0, 6.0), diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala new file mode 100644 index 0000000000000..aacaa300849aa --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.scalatest.FunSuite + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV} + +/** + * Test Breeze vector conversions. + */ +class BreezeVectorConversionSuite extends FunSuite { + + val arr = Array(0.1, 0.2, 0.3, 0.4) + val n = 20 + val indices = Array(0, 3, 5, 10, 13) + val values = Array(0.1, 0.5, 0.3, -0.8, -1.0) + + test("dense to breeze") { + val vec = Vectors.dense(arr) + assert(vec.toBreeze === new BDV[Double](arr)) + } + + test("sparse to breeze") { + val vec = Vectors.sparse(n, indices, values) + assert(vec.toBreeze === new BSV[Double](indices, values, n)) + } + + test("dense breeze to vector") { + val breeze = new BDV[Double](arr) + val vec = Vectors.fromBreeze(breeze).asInstanceOf[DenseVector] + assert(vec.size === arr.length) + assert(vec.values.eq(arr), "should not copy data") + } + + test("sparse breeze to vector") { + val breeze = new BSV[Double](indices, values, n) + val vec = Vectors.fromBreeze(breeze).asInstanceOf[SparseVector] + assert(vec.size === n) + assert(vec.indices.eq(indices), "should not copy data") + assert(vec.values.eq(values), "should not copy data") + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorSuite.scala new file mode 100644 index 0000000000000..e3ee97121f822 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorSuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.scalatest.FunSuite + +class VectorSuite extends FunSuite { + + val arr = Array(0.1, 0.2, 0.3, 0.4) + val n = 20 + val indices = Array(0, 3, 5, 10, 13) + val values = Array(0.1, 0.5, 0.3, -0.8, -1.0) + + test("dense vector construction") { + val vec = Vectors.dense(arr).asInstanceOf[DenseVector] + assert(vec.size === arr.length) + assert(vec.values.eq(arr)) + } + + test("sparse vector construction") { + val vec = Vectors.sparse(n, indices, values).asInstanceOf[SparseVector] + assert(vec.size === n) + assert(vec.indices.eq(indices)) + assert(vec.values.eq(values)) + } + + test("sparse vector construction with unordered elements") { + val vec = Vectors.sparse(n, indices.zip(values).reverse).asInstanceOf[SparseVector] + assert(vec.size === n) + assert(vec.indices === indices) + assert(vec.values === values) + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 138aad7561043..83dd439720172 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -350,7 +350,8 @@ object SparkBuild extends Build { def mllibSettings = sharedSettings ++ Seq( name := "spark-mllib", libraryDependencies ++= Seq( - "org.jblas" % "jblas" % "1.2.3" + "org.jblas" % "jblas" % "1.2.3", + "org.scalanlp" %% "breeze" % "0.7-SNAPSHOT" ) )