From cff73e0de3c7dd68fe7a23db385459678a646099 Mon Sep 17 00:00:00 2001 From: Travis Galoppo Date: Wed, 17 Dec 2014 15:22:49 -0500 Subject: [PATCH] Replaced accumulators with RDD.aggregate --- .../clustering/GaussianMixtureModelEM.scala | 125 +++++++++--------- 1 file changed, 60 insertions(+), 65 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala index 752615dde576b..5ab654f609bd2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala @@ -52,6 +52,50 @@ class GaussianMixtureModelEM private ( private type DenseDoubleVector = BreezeVector[Double] private type DenseDoubleMatrix = BreezeMatrix[Double] + private type ExpectationSum = ( + Array[Double], // log-likelihood in index 0 + Array[Double], // array of weights + Array[DenseDoubleVector], // array of means + Array[DenseDoubleMatrix]) // array of cov matrices + + // create a zero'd ExpectationSum instance + private def zeroExpectationSum(k: Int, d: Int): ExpectationSum = { + (Array(0.0), + new Array[Double](k), + (0 until k).map(_ => BreezeVector.zeros[Double](d)).toArray, + (0 until k).map(_ => BreezeMatrix.zeros[Double](d,d)).toArray) + } + + // add two ExpectationSum objects (allowed to use modify m1) + // (U, U) => U for aggregation + private def addExpectationSums(m1: ExpectationSum, m2: ExpectationSum): ExpectationSum = { + m1._1(0) += m2._1(0) + for (i <- 0 until m1._2.length) { + m1._2(i) += m2._2(i) + m1._3(i) += m2._3(i) + m1._4(i) += m2._4(i) + } + m1 + } + + // compute cluster contributions for each input point + // (U, T) => U for aggregation + private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian]) + (model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = { + val k = model._2.length + val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray + val pSum = p.sum + model._1(0) += math.log(pSum) + val xxt = x * new Transpose(x) + for (i <- 0 until k) { + p(i) /= pSum + model._2(i) += p(i) + model._3(i) += x * p(i) + model._4(i) += xxt * p(i) + } + model + } + // number of samples per cluster to use when initializing Gaussians private val nSamples = 5 @@ -115,7 +159,7 @@ class GaussianMixtureModelEM private ( val ctx = data.sparkContext // we will operate on the data as breeze data - val breezeData = data.map( u => u.toBreeze.toDenseVector ).cache() + val breezeData = data.map(u => u.toBreeze.toDenseVector).cache() // Get length of the input vectors val d = breezeData.first.length @@ -143,55 +187,28 @@ class GaussianMixtureModelEM private ( } } - val accW = new Array[Accumulator[Double]](k) - val accMu = new Array[Accumulator[DenseDoubleVector]](k) - val accSigma = new Array[Accumulator[DenseDoubleMatrix]](k) - var llh = Double.MinValue // current log-likelihood var llhp = 0.0 // previous log-likelihood var iter = 0 do { - // reset accumulators - for (i <- 0 until k) { - accW(i) = ctx.accumulator(0.0) - accMu(i) = ctx.accumulator( - BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) - accSigma(i) = ctx.accumulator( - BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) - } + // pivot gaussians into weight and distribution arrays + val weights = (0 until k).map(i => gaussians(i)._1).toArray + val dists = (0 until k).map{ i => + new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3) + }.toArray - val logLikelihood = ctx.accumulator(0.0) - - // broadcast the current weights and distributions to all nodes - val dists = ctx.broadcast{ - (0 until k).map(i => new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)).toArray - } - val weights = ctx.broadcast((0 until k).map(i => gaussians(i)._1).toArray) + // create and broadcast curried cluster contribution function + val compute = ctx.broadcast(computeExpectation(weights, dists)_) - // calculate partial assignments for each sample in the data - // (often referred to as the "E" step in literature) - breezeData.foreach{ x => - val p = (0 until k).map(i => eps + weights.value(i) * dists.value(i).pdf(x)).toArray - - val pSum = p.sum - - logLikelihood += math.log(pSum) - - // accumulate weighted sums - val xxt = x * new Transpose(x) - for (i <- 0 until k) { - p(i) /= pSum - accW(i) += p(i) - accMu(i) += x * p(i) - accSigma(i) += xxt * p(i) - } - } + // aggregate the cluster contribution for all sample points + val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums) - // Collect the computed sums - val W = (0 until k).map(i => accW(i).value).toArray - val MU = (0 until k).map(i => accMu(i).value).toArray - val SIGMA = (0 until k).map(i => accSigma(i).value).toArray + // Assignments to make the code more readable + val logLikelihood = sums._1(0) + val W = sums._2 + val MU = sums._3 + val SIGMA = sums._4 // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) @@ -203,7 +220,7 @@ class GaussianMixtureModelEM private ( }.toArray llhp = llh // current becomes previous - llh = logLikelihood.value // this is the freshly computed log-likelihood + llh = logLikelihood // this is the freshly computed log-likelihood iter += 1 } while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol) @@ -264,26 +281,4 @@ class GaussianMixtureModelEM private ( } p } - - /** AccumulatorParam for Dense Breeze Vectors */ - private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { - def zero(initialVector: DenseDoubleVector): DenseDoubleVector = { - BreezeVector.zeros[Double](initialVector.length) - } - - def addInPlace(a: DenseDoubleVector, b: DenseDoubleVector): DenseDoubleVector = { - a += b - } - } - - /** AccumulatorParam for Dense Breeze Matrices */ - private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] { - def zero(initialMatrix: DenseDoubleMatrix): DenseDoubleMatrix = { - BreezeMatrix.zeros[Double](initialMatrix.rows, initialMatrix.cols) - } - - def addInPlace(a: DenseDoubleMatrix, b: DenseDoubleMatrix): DenseDoubleMatrix = { - a += b - } - } }