From 61d60dfb2f6a614a97d327bed3183410a8a65700 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 28 Apr 2015 20:28:03 -0700 Subject: [PATCH] Minor cleanups: * Update *Concentration parameter documentation * EM Optimizer: createVertices() does not need to be a function * OnlineLDAOptimizer: typos in doc * Clean up the core code for online LDA (Scala style) --- .../apache/spark/mllib/clustering/LDA.scala | 37 ++++---- .../spark/mllib/clustering/LDAOptimizer.scala | 89 +++++++++---------- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 70120b9d0192c..597f17b0972a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -90,14 +90,15 @@ class LDA private ( * If set to -1, then docConcentration is set automatically. * (default = -1 = automatic) * - * Automatic setting of parameter: - * - For EM: default = (50 / k) + 1. - * - The 50/k is common in LDA libraries. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - For Online: default = (1.0 / k). - * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. - * - * Note: For EM optimizer, This value should be > 1.0. + * Optimizer-specific parameter settings: + * - EM + * - Value should be > 1.0 + * - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows + * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Value should be >= 0 + * - default = (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. */ def setDocConcentration(docConcentration: Double): this.type = { this.docConcentration = docConcentration @@ -117,8 +118,7 @@ class LDA private ( * This is the parameter to a symmetric Dirichlet distribution. * * Note: The topics' distributions over terms are called "beta" in the original LDA paper - * by Blei et al., but are ca - * lled "phi" in many later papers such as Asuncion et al., 2009. + * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. */ def getTopicConcentration: Double = this.topicConcentration @@ -134,14 +134,15 @@ class LDA private ( * If set to -1, then topicConcentration is set automatically. * (default = -1 = automatic) * - * Automatic setting of parameter: - * - For EM: default = 0.1 + 1. - * - The 0.1 gives a small amount of smoothing. - * - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM. - * - For Online: default = (1.0 / k). - * - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]]. - * - * Note: For EM optimizer, This value should be > 1.0. + * Optimizer-specific parameter settings: + * - EM + * - Value should be > 1.0 + * - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows + * Asuncion et al. (2009), who recommend a +1 adjustment for EM. + * - Online + * - Value should be >= 0 + * - default = (1.0 / k), following the implementation from + * [[https://github.com/Blei-Lab/onlineldavb]]. */ def setTopicConcentration(topicConcentration: Double): this.type = { this.topicConcentration = topicConcentration diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 6d2d93a525a9c..f8b386ed5e2e3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -81,7 +81,7 @@ class EMLDAOptimizer extends LDAOptimizer { import LDA._ /** - * Following fields will only be initialized through initialize method + * The following fields will only be initialized through the initialize() method */ private[clustering] var graph: Graph[TopicCounts, TokenCount] = null private[clustering] var k: Int = 0 @@ -94,7 +94,7 @@ class EMLDAOptimizer extends LDAOptimizer { /** * Compute bipartite term/doc graph. */ - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ + override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { val docConcentration = lda.getDocConcentration val topicConcentration = lda.getTopicConcentration @@ -121,7 +121,7 @@ class EMLDAOptimizer extends LDAOptimizer { // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). - def createVertices(): RDD[(VertexId, TopicCounts)] = { + val docTermVertices: RDD[(VertexId, TopicCounts)] = { val verticesTMP: RDD[(VertexId, TopicCounts)] = edges.mapPartitionsWithIndex { case (partIndex, partEdges) => val random = new Random(partIndex + randomSeed) @@ -134,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer { verticesTMP.reduceByKey(_ + _) } - val docTermVertices = createVertices() - // Partition such that edges are grouped by document this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k @@ -216,10 +214,10 @@ class EMLDAOptimizer extends LDAOptimizer { * :: Experimental :: * * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA - * algorithm, which processes a subset of the corpus on each iteration, and update the term-topic + * algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic * distribution adaptively. * - * References: + * Original Online LDA paper: * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. */ @Experimental @@ -236,31 +234,30 @@ class OnlineLDAOptimizer extends LDAOptimizer { // Online LDA specific parameters private var tau_0: Double = 1024 private var kappa: Double = 0.51 - private var minibatchFraction: Double = 0.01 + private var miniBatchFraction: Double = 0.01 // internal data structure private var docs: RDD[(Long, Vector)] = null private var lambda: BDM[Double] = null - private var Elogbeta: BDM[Double]= null + private var Elogbeta: BDM[Double] = null private var expElogbeta: BDM[Double] = null // count of invocation to next, which helps deciding the weight for each iteration - private var iteration = 0 + private var iteration: Int = 0 /** * A (positive) learning parameter that downweights early iterations. Larger values make early - * iterations count less + * iterations count less. */ def getTau_0: Double = this.tau_0 /** * A (positive) learning parameter that downweights early iterations. Larger values make early - * iterations count less - * Automatic setting of parameter: - * - default = 1024, which follows the recommendation from OnlineLDA paper. + * iterations count less. + * Default: 1024, following the original Online LDA paper. */ def setTau_0(tau_0: Double): this.type = { - require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0") + require(tau_0 > 0, s"LDA tau_0 must be positive, but was set to $tau_0") this.tau_0 = tau_0 this } @@ -273,31 +270,32 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. - * - default = 0.51, which follows the recommendation from OnlineLDA paper. + * Default: 0.51, based on the original Online LDA paper. */ def setKappa(kappa: Double): this.type = { - require(kappa >= 0 || kappa == -1.0, - s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa") + require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa") this.kappa = kappa this } /** - * Mini-batch size, which controls how many documents are used in each iteration + * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration */ - def getMiniBatchFraction: Double = this.minibatchFraction + def getMiniBatchFraction: Double = this.miniBatchFraction /** - * Mini-batch size, which controls how many documents are used in each iteration - * default = 1% from total documents. + * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in + * each iteration. + * Default: 0.01, i.e., 1% of total documents */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { - this.minibatchFraction = miniBatchFraction + require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0, + s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction") + this.miniBatchFraction = miniBatchFraction this } - private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={ - + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size @@ -322,22 +320,23 @@ class OnlineLDAOptimizer extends LDAOptimizer { */ private[clustering] override def next(): OnlineLDAOptimizer = { iteration += 1 - val batch = docs.sample(true, minibatchFraction, randomGenerator.nextLong()) - if(batch.isEmpty()) return this + val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) + if (batch.isEmpty()) return this val k = this.k val vocabSize = this.vocabSize val expElogbeta = this.expElogbeta val alpha = this.alpha - val stats = batch.mapPartitions(docs =>{ + val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => val stat = BDM.zeros[Double](k, vocabSize) - docs.foreach(doc =>{ + docs.foreach { doc => val termCounts = doc._2 - val (ids, cts) = termCounts match { - case v: DenseVector => (((0 until v.size).toList), v.values) + val (ids: List[Int], cts: Array[Double]) = termCounts match { + case v: DenseVector => ((0 until v.size).toList, v.values) case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + case v => throw new IllegalArgumentException("Online LDA does not support vector type " + + v.getClass) } // Initialize the variational distribution q(theta|gamma) for the mini-batch @@ -354,7 +353,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { while (meanchange > 1e-5) { val lastgamma = gammad // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha + gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha Elogthetad = digamma(gammad) - digamma(sum(gammad)) expElogthetad = exp(Elogthetad) phinorm = expElogthetad * expElogbetad + 1e-100 @@ -364,28 +363,28 @@ class OnlineLDAOptimizer extends LDAOptimizer { val m1 = expElogthetad.t.toDenseMatrix.t val m2 = (ctsVector / phinorm).t.toDenseMatrix val outerResult = kron(m1, m2) // K * ids - for (i <- 0 until ids.size) { + var i = 0 + while (i < ids.size) { stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + i += 1 } - stat - }) + } Iterator(stat) - }) + } - val batchResult = stats.reduce(_ += _) - update(batchResult, iteration, (minibatchFraction * corpusSize).toInt) + val batchResult: BDM[Double] = stats.reduce(_ += _) + update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt) this } - private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) } /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - + private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = { val tau_0 = this.getTau_0 val kappa = this.getKappa @@ -405,17 +404,17 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Get a random matrix to initialize lambda */ - private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ + private def getGammaMatrix(row: Int, col: Int): BDM[Double] = { val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) val temp = gammaRandomGenerator.sample(row * col).toArray - (new BDM[Double](col, row, temp)).t + new BDM[Double](col, row, temp).t } /** * For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation * uses digamma which is accurate but expensive. */ - private def dirichletExpectation(alpha : BDM[Double]): BDM[Double] = { + private def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = { val rowSum = sum(alpha(breeze.linalg.*, ::)) val digAlpha = digamma(alpha) val digRowSum = digamma(rowSum)