Skip to content

Commit

Permalink
[SPARK-1484][MLLIB] Warn when running an iterative algorithm on uncac…
Browse files Browse the repository at this point in the history
…hed data.
  • Loading branch information
staple committed Sep 25, 2014
1 parent 3b6c511 commit c77e939
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 3 deletions.
3 changes: 2 additions & 1 deletion docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
Expand Down Expand Up @@ -100,6 +100,7 @@ public class KMeansExample {
}
}
);
parsedData.cache();

// Cluster the data into two classes using KMeans
int numClusters = 2;
Expand Down
5 changes: 3 additions & 2 deletions docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}
}.cache()

// Building the model
val numIterations = 100
Expand Down Expand Up @@ -455,6 +455,7 @@ public class LinearRegression {
}
}
);
parsedData.cache();

// Building the model
int numIterations = 100;
Expand Down Expand Up @@ -553,7 +554,7 @@ but in practice you will likely want to use unlabeled vectors for test data.

{% highlight scala %}

val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)
val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

{% endhighlight %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.random.XORShiftRandom

/**
Expand Down Expand Up @@ -117,6 +118,12 @@ class KMeans private (
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Vector]): KMeansModel = {

if (data.getStorageLevel == StorageLevel.NONE) {
// Warn when running an iterative algorithm on uncached data. SPARK-1484
logWarning("KMeans.run called with uncached input data.")
}

// Compute squared norms and cache them.
val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
norms.persist()
Expand All @@ -125,6 +132,10 @@ class KMeans private (
}
val model = runBreeze(breezeData)
norms.unpersist()

if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("KMeans.run ran with uncached input data.")
}
model
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
import org.apache.spark.storage.StorageLevel

/**
* :: Experimental ::
Expand Down Expand Up @@ -231,6 +232,10 @@ class RowMatrix(
val brzSvd.SVD(uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
(sigmaSquaresFull, uFull)
case SVDMode.DistARPACK =>
if (rows.getStorageLevel == StorageLevel.NONE) {
// Warn when running an iterative algorithm on uncached data. SPARK-1484
logWarning("RowMatrix.computeSVD DistARPACK called with uncached input rows.")
}
require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
}
Expand All @@ -256,6 +261,10 @@ class RowMatrix(
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
}

if (computeMode == SVDMode.DistARPACK && rows.getStorageLevel == StorageLevel.NONE) {
logWarning("RowMatrix.computeSVD DistARPACK ran with uncached input rows.")
}

val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.util.MLUtils._
import org.apache.spark.storage.StorageLevel

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -149,6 +150,11 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
*/
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {

if (input.getStorageLevel == StorageLevel.NONE) {
// Warn when running an iterative algorithm on uncached data. SPARK-1484
logWarning("GeneralizedLinearAlgorithm.run called with uncached input data.")
}

// Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
throw new SparkException("Input validation failed.")
Expand Down Expand Up @@ -223,6 +229,10 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
weights = scaler.transform(weights)
}

if (input.getStorageLevel == StorageLevel.NONE) {
logWarning("GeneralizedLinearAlgorithm.run ran with uncached input data.")
}

createModel(weights, intercept)
}
}

0 comments on commit c77e939

Please sign in to comment.