Skip to content

Commit

Permalink
add toArray to Vector
Browse files Browse the repository at this point in the history
add Java friendly methods to Vectors

remove RDD[Array[Double]] support from KMeans

update Java KMeans API
  • Loading branch information
mengxr committed Mar 21, 2014
1 parent e69b10c commit b28ba2f
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,33 @@

package org.apache.spark.mllib.examples;

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;

import java.util.Arrays;
import java.util.regex.Pattern;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

/**
* Example using MLLib KMeans from Java.
*/
public final class JavaKMeans {

static class ParsePoint implements Function<String, double[]> {
private static class ParsePoint implements Function<String, Vector> {
private static final Pattern SPACE = Pattern.compile(" ");

@Override
public double[] call(String line) {
public Vector call(String line) {
String[] tok = SPACE.split(line);
double[] point = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
point[i] = Double.parseDouble(tok[i]);
}
return point;
return Vectors.dense(point);
}
}

Expand All @@ -65,15 +66,15 @@ public static void main(String[] args) {

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<String> lines = sc.textFile(inputFile);

JavaRDD<double[]> points = lines.map(new ParsePoint());
JavaRDD<Vector> points = lines.map(new ParsePoint());

KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs);
KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL());

System.out.println("Cluster centers:");
for (double[] center : model.clusterCenters()) {
System.out.println(" " + Arrays.toString(center));
for (Vector center : model.clusterCenters()) {
System.out.println(" " + center);
}
double cost = model.computeCost(points.rdd());
System.out.println("Cost: " + cost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
*/

package org.apache.spark.mllib.api.python

import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD
import java.nio.ByteBuffer
import java.nio.ByteOrder

/**
* The Java stubs necessary for the Python mllib bindings.
Expand Down Expand Up @@ -205,10 +207,10 @@ class PythonMLLibAPI extends Serializable {
def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
maxIterations: Int, runs: Int, initializationMode: String):
java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleMatrix(model.clusterCenters))
ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))
ret
}

Expand Down
107 changes: 37 additions & 70 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,14 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom

/**
* A breeze vector with its norm for fast distance computation.
*
* @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]]
*/
private[clustering]
class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable {

def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0))

def this(array: Array[Double]) = this(new BDV[Double](array))

def this(v: Vector) = this(v.toBreeze)

/** Converts the vector to a dense vector. */
def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm)
}

/**
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
Expand All @@ -60,8 +43,7 @@ class KMeans private (
var initializationMode: String,
var initializationSteps: Int,
var epsilon: Double)
extends Serializable with Logging
{
extends Serializable with Logging {
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)

/** Set the number of clusters to create (k). Default: 2. */
Expand Down Expand Up @@ -127,15 +109,7 @@ class KMeans private (
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Array[Double]]): KMeansModel = {
run(data.map(v => Vectors.dense(v)))
}

/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Vector])(implicit d: DummyImplicit): KMeansModel = {
def run(data: RDD[Vector]): KMeansModel = {
// Compute squared norms and cache them.
val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
norms.persist()
Expand Down Expand Up @@ -248,9 +222,7 @@ class KMeans private (

logInfo(s"The cost for the best run is $minCost.")

new KMeansModel(centers(bestRun).map { v =>
v.vector.toArray
})
new KMeansModel(centers(bestRun).map(c => Vectors.fromBreeze(c.vector)))
}

/**
Expand Down Expand Up @@ -332,53 +304,28 @@ object KMeans {
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"

def train(
data: RDD[Array[Double]],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String)
: KMeansModel =
{
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.run(data)
}

def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}

def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}

/**
* Trains a k-means model using the given set of parameters.
*
* @param data training points stored as `RDD[Array[Double]]`
* @param k number of clusters
* @param maxIterations max number of iterations
* @param runs number of parallel runs, defaults to 1. The best model is returned.
* @param initializationMode initialization model, either "random" or "k-means||" (default).
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String
)(implicit d: DummyImplicit): KMeansModel = {
runs: Int = 1,
initializationMode: String = K_MEANS_PARALLEL): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.run(data)
}

def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int)
(implicit d: DummyImplicit): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}

def train(data: RDD[Vector], k: Int, maxIterations: Int)
(implicit d: DummyImplicit): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}

/**
* Returns the index of the closest center to the given point, as well as the squared distance.
*/
Expand Down Expand Up @@ -431,14 +378,34 @@ object KMeans {
val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
val runs = if (args.length >= 5) args(4).toInt else 1
val sc = new SparkContext(master, "KMeans")
val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache()
val data = sc.textFile(inputFile)
.map(line => Vectors.dense(line.split(' ').map(_.toDouble)))
.cache()
val model = KMeans.train(data, k, iters, runs)
val cost = model.computeCost(data)
println("Cluster centers:")
for (c <- model.clusterCenters) {
println(" " + c.mkString(" "))
println(" " + c)
}
println("Cost: " + cost)
System.exit(0)
}
}

/**
* A breeze vector with its norm for fast distance computation.
*
* @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]]
*/
private[clustering]
class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable {

def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0))

def this(array: Array[Double]) = this(new BDV[Double](array))

def this(v: Vector) = this(v.toBreeze)

/** Converts the vector to a dense vector. */
def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@ import org.apache.spark.mllib.linalg.Vector
/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
*/
class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable {
class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {

/** Total number of clusters. */
def k: Int = clusterCenters.length

/** Return the cluster index that a given point belongs to. */
def predict(point: Array[Double]): Int = {
KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1
}

/** Returns the cluster index that a given point belongs to. */
def predict(point: Vector): Int = {
KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1
Expand All @@ -49,16 +44,7 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
*/
def computeCost(data: RDD[Array[Double]]): Double = {
val centersWithNorm = clusterCentersWithNorm
data.map(p => KMeans.pointCost(centersWithNorm, new BreezeVectorWithNorm(p))).sum()
}

/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
*/
def computeCost(data: RDD[Vector])(implicit d: DummyImplicit): Double = {
def computeCost(data: RDD[Vector]): Double = {
val centersWithNorm = clusterCentersWithNorm
data.map(p => KMeans.pointCost(centersWithNorm, new BreezeVectorWithNorm(p))).sum()
}
Expand Down
Loading

0 comments on commit b28ba2f

Please sign in to comment.