From 2086bdc56a29f63a1c2143f88303e1296df45260 Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 04:04:31 -0400 Subject: [PATCH] Log cluster center updates --- .../spark/mllib/clustering/StreamingKMeans.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 92564eb0f535b..706b4f1d187dc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -76,7 +76,7 @@ import org.apache.spark.streaming.StreamingContext._ @DeveloperApi class StreamingKMeansModel( override val clusterCenters: Array[Vector], - val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) { + val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { // do a sequential KMeans update on a batch of data def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { @@ -113,8 +113,14 @@ class StreamingKMeansModel( // store the new counts and centers counts(newP._1) = oldCount + newCount centers(newP._1) = Vectors.fromBreeze(updatedCentroid) - } + // display the updated cluster centers + val display = centers(newP._1).size match { + case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...") + case _ => centers(newP._1).toArray.mkString("[", ",", "]") + } + logInfo("Cluster %d updated: %s ".format (newP._1, display)) + } new StreamingKMeansModel(centers, counts) }