From 77dbd3fe9ec025868f4a89936afca3b76f53fbf8 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:14:28 -0700 Subject: [PATCH] Make initialization check an assertion --- .../mllib/clustering/StreamingKMeans.scala | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 1cb2b55a7fc8e..2520b89ac0569 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -214,7 +214,7 @@ class StreamingKMeans( * @param data DStream containing vector data */ def trainOn(data: DStream[Vector]) { - this.isInitialized + this.assertInitialized() data.foreachRDD { (rdd, time) => model = model.update(rdd, this.a, this.units) } @@ -227,7 +227,7 @@ class StreamingKMeans( * @return DStream containing predictions */ def predictOn(data: DStream[Vector]): DStream[Int] = { - this.isInitialized + this.assertInitialized() data.map(model.predict) } @@ -239,21 +239,14 @@ class StreamingKMeans( * @return DStream containing the input keys and the predictions as values */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { - this.isInitialized + this.assertInitialized() data.mapValues(model.predict) } - /** - * Check whether cluster centers have been initialized. - * - * @return Boolean, True if cluster centrs have been initialized - */ - def isInitialized: Boolean = { + /** Check whether cluster centers have been initialized.*/ + def assertInitialized(): Unit = { if (Option(model.clusterCenters) == None) { - logError("Initial cluster centers must be set before starting predictions") - throw new IllegalArgumentException - } else { - true + throw new IllegalStateException("Initial cluster centers must be set before starting predictions") } }