Skip to content

Commit

Permalink
Fixed style issues
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaloppo committed Dec 16, 2014
1 parent dc9c742 commit 97044cf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ object DenseGmmEM {
val ctx = new SparkContext(conf)

val data = ctx.textFile(inputFile).map{ line =>
Vectors.dense(line.trim.split(' ').map(_.toDouble))
}.cache()
Vectors.dense(line.trim.split(' ').map(_.toDouble))
}.cache

val clusters = new GaussianMixtureModelEM()
.setK(k)
.setConvergenceTol(convergenceTol)
.run(data)
.setK(k)
.setConvergenceTol(convergenceTol)
.run(data)

for (i <- 0 until clusters.k) {
println("weight=%f mu=%s sigma=\n%s\n" format
(clusters.weight(i), clusters.mu(i), clusters.sigma(i)))
(clusters.weight(i), clusters.mu(i), clusters.sigma(i)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ class GaussianMixtureModelEM private (
// we start with uniform weights, a random mean from the data, and
// diagonal covariance matrices using component variances
// derived from the samples
var gaussians = (0 until k).map{ i => (1.0 / k,
vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)),
initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples)))
}.toArray
var gaussians = (0 until k).map{ i =>
(1.0 / k,
vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)),
initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples)))
}.toArray

val accW = new Array[Accumulator[Double]](k)
val accMu = new Array[Accumulator[DenseDoubleVector]](k)
Expand All @@ -129,25 +130,23 @@ class GaussianMixtureModelEM private (
for (i <- 0 until k) {
accW(i) = ctx.accumulator(0.0)
accMu(i) = ctx.accumulator(
BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam)
BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam)
accSigma(i) = ctx.accumulator(
BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam)
BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam)
}

val logLikelihood = ctx.accumulator(0.0)

// broadcast the current weights and distributions to all nodes
val dists = ctx.broadcast((0 until k).map{ i =>
new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)
}.toArray)
val dists = ctx.broadcast{
(0 until k).map(i => new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)).toArray
}
val weights = ctx.broadcast((0 until k).map(i => gaussians(i)._1).toArray)

// calculate partial assignments for each sample in the data
// (often referred to as the "E" step in literature)
breezeData.foreach(x => {
val p = (0 until k).map{ i =>
eps + weights.value(i) * dists.value(i).pdf(x)
}.toArray
breezeData.foreach{ x =>
val p = (0 until k).map(i => eps + weights.value(i) * dists.value(i).pdf(x)).toArray

val pSum = p.sum

Expand All @@ -161,7 +160,7 @@ class GaussianMixtureModelEM private (
accMu(i) += x * p(i)
accSigma(i) += xxt * p(i)
}
})
}

// Collect the computed sums
val W = (0 until k).map(i => accW(i).value).toArray
Expand All @@ -170,15 +169,14 @@ class GaussianMixtureModelEM private (

// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
gaussians = (0 until k).map{ i => {
val weight = W(i) / W.sum
val mu = MU(i) / W(i)
val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu)
(weight, mu, sigma)
}
}.toArray
gaussians = (0 until k).map{ i =>
val weight = W(i) / W.sum
val mu = MU(i) / W(i)
val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu)
(weight, mu, sigma)
}.toArray

llhp = llh; // current becomes previous
llhp = llh // current becomes previous
llh = logLikelihood.value // this is the freshly computed log-likelihood
iter += 1
} while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.spark.mllib.util.TestingUtils._
class GMMExpectationMaximizationSuite extends FunSuite with MLlibTestSparkContext {
test("single cluster") {
val data = sc.parallelize(Array(
Vectors.dense(6.0, 9.0),
Vectors.dense(5.0, 10.0),
Vectors.dense(4.0, 11.0)
))
Vectors.dense(6.0, 9.0),
Vectors.dense(5.0, 10.0),
Vectors.dense(4.0, 11.0)
))

// expectations
val Ew = 1.0
Expand Down

0 comments on commit 97044cf

Please sign in to comment.