diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java index 36207ae38d9a9..fd53c81cc4974 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java @@ -58,7 +58,7 @@ public Tuple2 call(Tuple2 doc_id) { corpus.cache(); // Cluster the documents into three topics using LDA - DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus); + DistributedLDAModel ldaModel = (DistributedLDAModel)new LDA().setK(3).run(corpus); // Output topics. Each is a distribution over words (matching word count vectors) System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index 08a93595a2e17..a1850390c0a86 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -26,7 +26,7 @@ import scopt.OptionParser import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.mllib.clustering.LDA +import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD @@ -137,7 +137,7 @@ object LDAExample { sc.setCheckpointDir(params.checkpointDir.get) } val startTime = System.nanoTime() - val ldaModel = lda.run(corpus) + val ldaModel = lda.run(corpus).asInstanceOf[DistributedLDAModel] val elapsed = (System.nanoTime() - startTime) / 1e9 println(s"Finished training LDA model. Summary:") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index dd0a03b58dc41..064fa420affff 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -17,17 +17,11 @@ package org.apache.spark.mllib.clustering -import java.util.Random - -import breeze.linalg.{DenseVector => BDV, normalize} - +import breeze.linalg.{DenseVector => BDV} import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ -import org.apache.spark.graphx.impl.GraphImpl -import org.apache.spark.mllib.clustering.LDAOptimizer -import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -43,17 +37,6 @@ import org.apache.spark.util.Utils * - "token": instance of a term appearing in a document * - "topic": multinomial distribution over words representing some concept * - * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented - * according to the Asuncion et al. (2009) paper referenced below. - * - * References: - * - Original LDA paper (journal version): - * Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. - * - This class implements their "smoothed" LDA model. - * - Paper which clearly explains several algorithms, including EM: - * Asuncion, Welling, Smyth, and Teh. - * "On Smoothing and Inference for Topic Models." UAI, 2009. - * * @see [[http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation Latent Dirichlet allocation * (Wikipedia)]] */ @@ -69,25 +52,7 @@ class LDA private ( def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10) - var ldaOptimizer = setOptimizer("EM") - - def getOptimizer(): LDAOptimizer = { - ldaOptimizer - } - - def setOptimizer(optimizer: LDAOptimizer): this.type = { - this.ldaOptimizer = optimizer - this - } - - def setOptimizer(optimizer: String): this.type = { - optimizer match{ - case "EM" => this.setOptimizer(new EMOptimizer(default parameter)) - case "Gibbs"=> this.setOptimizer(new GibbsOptimizer(default parameter)) - case "Online"=> this.setOptimizer(new OnlineLDAOptimizer(default parameter)) - } - } - + private var ldaOptimizer: LDAOptimizer = getDefaultOptimizer("EM") /** * Number of topics to infer. I.e., the number of soft cluster centers. @@ -241,6 +206,38 @@ class LDA private ( this } + + /** LDAOptimizer used to perform the actual calculation */ + def getOptimizer(): LDAOptimizer = ldaOptimizer + + /** + * LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer) + */ + def setOptimizer(optimizer: LDAOptimizer): this.type = { + this.ldaOptimizer = optimizer + this + } + + /** + * Set the LDAOptimizer used to perform the actual calculation by algorithm name. + * Currently "EM" is supported. + */ + def setOptimizer(optimizerName: String): this.type = { + this.ldaOptimizer = getDefaultOptimizer(optimizerName) + this + } + + /** + * Get the default optimizer from String parameter. + */ + private def getDefaultOptimizer(optimizerName: String): LDAOptimizer = { + optimizerName match{ + case "EM" => new EMLDAOptimizer() + case other => + throw new UnsupportedOperationException(s"Only EM are supported but got $other.") + } + } + /** * Learn an LDA model using the given dataset. * @@ -250,42 +247,23 @@ class LDA private ( * Document IDs must be unique and >= 0. * @return Inferred LDA model */ - def run(documents: RDD[(Long, Vector)]): DistributedLDAModel = { - if(ldaOptimizer.isInstanceOf[EMOptimizer]){ - val state = LDA.initialState(documents, k, getDocConcentration, getTopicConcentration, seed, - checkpointInterval) - var iter = 0 - val iterationTimes = Array.fill[Double](maxIterations)(0) - while (iter < maxIterations) { - val start = System.nanoTime() - state.next() - val elapsedSeconds = (System.nanoTime() - start) / 1e9 - iterationTimes(iter) = elapsedSeconds - iter += 1 - } - state.graphCheckpointer.deleteAllCheckpoints() - new DistributedLDAModel(state, iterationTimes) - } - else if(ldaOptimizer.isInstanceOf[OnlineLDAOptimizer]){ - val vocabSize = documents.first._2.size - val D = documents.count().toInt // total documents count - val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize, 1.0/k, 1.0/k, tau_0, kappa) - - val arr = Array.fill(math.ceil(1.0 / miniBatchFraction).toInt)(miniBatchFraction) - val splits = documents.randomSplit(arr) - for(i <- 0 until numIterations){ - val index = i % splits.size - onlineLDA.submitMiniBatch(splits(index)) - } - onlineLDA.getTopicDistribution() + def run(documents: RDD[(Long, Vector)]): LDAModel = { + val state = ldaOptimizer.initialState(documents, k, getDocConcentration, getTopicConcentration, + seed, checkpointInterval) + var iter = 0 + val iterationTimes = Array.fill[Double](maxIterations)(0) + while (iter < maxIterations) { + val start = System.nanoTime() + state.next() + val elapsedSeconds = (System.nanoTime() - start) / 1e9 + iterationTimes(iter) = elapsedSeconds + iter += 1 } - - - + state.getLDAModel(iterationTimes) } /** Java-friendly version of [[run()]] */ - def run(documents: JavaPairRDD[java.lang.Long, Vector]): DistributedLDAModel = { + def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } } @@ -346,51 +324,46 @@ private[clustering] object LDA { * Vector over topics (length k) of token counts. * The meaning of these counts can vary, and it may or may not be normalized to be a distribution. */ + private[clustering] type TopicCounts = BDV[Double] + private[clustering] type TokenCount = Double - /** - * Compute bipartite term/doc graph. - */ - private def initialState( - docs: RDD[(Long, Vector)], - k: Int, - docConcentration: Double, - topicConcentration: Double, - randomSeed: Long, - checkpointInterval: Int): EMOptimizer = { - // For each document, create an edge (Document -> Term) for each unique term in the document. - val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) => - // Add edges for terms with non-zero counts. - termCounts.toBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) => - Edge(docID, term2index(term), cnt) - } - } + /** Term vertex IDs are {-1, -2, ..., -vocabSize} */ + private[clustering] def term2index(term: Int): Long = -(1 + term.toLong) - val vocabSize = docs.take(1).head._2.size - - // Create vertices. - // Initially, we use random soft assignments of tokens to topics (random gamma). - def createVertices(): RDD[(VertexId, TopicCounts)] = { - val verticesTMP: RDD[(VertexId, TopicCounts)] = - edges.mapPartitionsWithIndex { case (partIndex, partEdges) => - val random = new Random(partIndex + randomSeed) - partEdges.flatMap { edge => - val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) - val sum = gamma * edge.attr - Seq((edge.srcId, sum), (edge.dstId, sum)) - } - } - verticesTMP.reduceByKey(_ + _) - } + private[clustering] def index2term(termIndex: Long): Int = -(1 + termIndex).toInt - val docTermVertices = createVertices() + private[clustering] def isDocumentVertex(v: (VertexId, _)): Boolean = v._1 >= 0 - // Partition such that edges are grouped by document - val graph = Graph(docTermVertices, edges) - .partitionBy(PartitionStrategy.EdgePartition1D) + private[clustering] def isTermVertex(v: (VertexId, _)): Boolean = v._1 < 0 - new EMOptimizer(graph, k, vocabSize, docConcentration, topicConcentration, checkpointInterval) + /** + * Compute gamma_{wjk}, a distribution over topics k. + */ + private[clustering] def computePTopic( + docTopicCounts: TopicCounts, + termTopicCounts: TopicCounts, + totalTopicCounts: TopicCounts, + vocabSize: Int, + eta: Double, + alpha: Double): TopicCounts = { + val K = docTopicCounts.length + val N_j = docTopicCounts.data + val N_w = termTopicCounts.data + val N = totalTopicCounts.data + val eta1 = eta - 1.0 + val alpha1 = alpha - 1.0 + val Weta1 = vocabSize * eta1 + var sum = 0.0 + val gamma_wj = new Array[Double](K) + var k = 0 + while (k < K) { + val gamma_wjk = (N_w(k) + eta1) * (N_j(k) + alpha1) / (N(k) + Weta1) + gamma_wj(k) = gamma_wjk + sum += gamma_wjk + k += 1 + } + // normalize + BDV(gamma_wj) /= sum } - } - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 0a3f21ecee0dc..6cf26445f20a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -203,7 +203,7 @@ class DistributedLDAModel private ( import LDA._ - private[clustering] def this(state: LDA.EMOptimizer, iterationTimes: Array[Double]) = { + private[clustering] def this(state: EMLDAOptimizer, iterationTimes: Array[Double]) = { this(state.graph, state.globalTopicTotals, state.k, state.vocabSize, state.docConcentration, state.topicConcentration, iterationTimes) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index e0365c9733366..a47ad28486491 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -1,52 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.clustering import java.util.Random -import breeze.linalg.{DenseVector => BDV, DenseMatrix, normalize} -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, kron, sum} -import breeze.numerics._ -import breeze.stats.distributions.Gamma - -import org.apache.spark.Logging +import breeze.linalg.{DenseVector => BDV, normalize} import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer -import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Matrices, Vector} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD -import org.apache.spark.util.Utils + /** - * Created by yuhao on 4/22/15. + * :: Experimental :: + * + * An LDAOptimizer contains an algorithm for LDA and performs the actual computation, which + * stores internal data structure (Graph or Matrix) and any other parameter for the algorithm. + * The interface is isolated to improve the extensibility of LDA. */ -trait LDAOptimizer +@Experimental +trait LDAOptimizer{ + + /** + * Initializer for the optimizer. LDA passes the common parameters to the optimizer and + * the internal structure can be initialized properly. + */ + private[clustering] def initialState( + docs: RDD[(Long, Vector)], + k: Int, + docConcentration: Double, + topicConcentration: Double, + randomSeed: Long, + checkpointInterval: Int): LDAOptimizer + + private[clustering] def next(): LDAOptimizer + + private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel +} /** + * :: Experimental :: + * * Optimizer for EM algorithm which stores data + parameter graph, plus algorithm parameters. * - * @param graph EM graph, storing current parameter estimates in vertex descriptors and - * data (token counts) in edge descriptors. - * @param k Number of topics - * @param vocabSize Number of unique terms - * @param docConcentration "alpha" - * @param topicConcentration "beta" or "eta" + * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented + * according to the Asuncion et al. (2009) paper referenced below. + * + * References: + * - Original LDA paper (journal version): + * Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. + * - This class implements their "smoothed" LDA model. + * - Paper which clearly explains several algorithms, including EM: + * Asuncion, Welling, Smyth, and Teh. + * "On Smoothing and Inference for Topic Models." UAI, 2009. + * */ -class EMOptimizer( - var graph: Graph[BDV[Double], Double], - val k: Int, - val vocabSize: Int, - val docConcentration: Double, - val topicConcentration: Double, - checkpointInterval: Int) extends LDAOptimizer{ +@Experimental +class EMLDAOptimizer extends LDAOptimizer{ + + import LDA._ + /** + * Following fields will only be initialized through initialState method + */ + private[clustering] var graph: Graph[TopicCounts, TokenCount] = null + private[clustering] var k: Int = 0 + private[clustering] var vocabSize: Int = 0 + private[clustering] var docConcentration: Double = 0 + private[clustering] var topicConcentration: Double = 0 + private[clustering] var checkpointInterval: Int = 10 + private var graphCheckpointer: PeriodicGraphCheckpointer[TopicCounts, TokenCount] = null - private[clustering] type TopicCounts = BDV[Double] + /** + * Compute bipartite term/doc graph. + */ + private[clustering] override def initialState( + docs: RDD[(Long, Vector)], + k: Int, + docConcentration: Double, + topicConcentration: Double, + randomSeed: Long, + checkpointInterval: Int): LDAOptimizer = { + // For each document, create an edge (Document -> Term) for each unique term in the document. + val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) => + // Add edges for terms with non-zero counts. + termCounts.toBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) => + Edge(docID, term2index(term), cnt) + } + } - private[clustering] type TokenCount = Double + val vocabSize = docs.take(1).head._2.size + + // Create vertices. + // Initially, we use random soft assignments of tokens to topics (random gamma). + def createVertices(): RDD[(VertexId, TopicCounts)] = { + val verticesTMP: RDD[(VertexId, TopicCounts)] = + edges.mapPartitionsWithIndex { case (partIndex, partEdges) => + val random = new Random(partIndex + randomSeed) + partEdges.flatMap { edge => + val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) + val sum = gamma * edge.attr + Seq((edge.srcId, sum), (edge.dstId, sum)) + } + } + verticesTMP.reduceByKey(_ + _) + } + + val docTermVertices = createVertices() + + // Partition such that edges are grouped by document + this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) + this.k = k + this.vocabSize = vocabSize + this.docConcentration = docConcentration + this.topicConcentration = topicConcentration + this.checkpointInterval = checkpointInterval + this.graphCheckpointer = new + PeriodicGraphCheckpointer[TopicCounts, TokenCount](graph, checkpointInterval) + this.globalTopicTotals = computeGlobalTopicTotals() + this + } - private[LDA] val graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount]( - graph, checkpointInterval) + private[clustering] override def next(): EMLDAOptimizer = { + require(graph != null, "graph is null, EMLDAOptimizer not initialized.") - def next(): EMOptimizer = { val eta = topicConcentration val W = vocabSize val alpha = docConcentration @@ -94,189 +186,16 @@ class EMOptimizer( * * Note: This executes an action on the graph RDDs. */ - var globalTopicTotals: TopicCounts = computeGlobalTopicTotals() + private[clustering] var globalTopicTotals: TopicCounts = null private def computeGlobalTopicTotals(): TopicCounts = { val numTopics = k graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _) } - /** Term vertex IDs are {-1, -2, ..., -vocabSize} */ - private[clustering] def term2index(term: Int): Long = -(1 + term.toLong) - - private[clustering] def index2term(termIndex: Long): Int = -(1 + termIndex).toInt - - private[clustering] def isDocumentVertex(v: (VertexId, _)): Boolean = v._1 >= 0 - - private[clustering] def isTermVertex(v: (VertexId, _)): Boolean = v._1 < 0 - - /** - * Compute gamma_{wjk}, a distribution over topics k. - */ - private def computePTopic( - docTopicCounts: TopicCounts, - termTopicCounts: TopicCounts, - totalTopicCounts: TopicCounts, - vocabSize: Int, - eta: Double, - alpha: Double): TopicCounts = { - val K = docTopicCounts.length - val N_j = docTopicCounts.data - val N_w = termTopicCounts.data - val N = totalTopicCounts.data - val eta1 = eta - 1.0 - val alpha1 = alpha - 1.0 - val Weta1 = vocabSize * eta1 - var sum = 0.0 - val gamma_wj = new Array[Double](K) - var k = 0 - while (k < K) { - val gamma_wjk = (N_w(k) + eta1) * (N_j(k) + alpha1) / (N(k) + Weta1) - gamma_wj(k) = gamma_wjk - sum += gamma_wjk - k += 1 - } - // normalize - BDV(gamma_wj) /= sum + private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + require(graph != null, "graph is null, EMLDAOptimizer not initialized.") + this.graphCheckpointer.deleteAllCheckpoints() + new DistributedLDAModel(this, iterationTimes) } - } - - - -/** - * :: Experimental :: - * Latent Dirichlet Allocation (LDA), a topic model designed for text documents. - * - * An online training optimizer for LDA. The Optimizer processes a subset (like 1%) of the corpus - * by each call to submitMiniBatch, and update the term-topic distribution adaptively. User can - * get the result from getTopicDistribution. - * - * References: - * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. - */ -@Experimental -class OnlineLDAOptimizer ( - private var k: Int, - private var D: Int, - private val vocabSize: Int, - private val alpha: Double, - private val eta: Double, - private val tau_0: Double, - private val kappa: Double) extends Serializable with LDAOptimizer { - - // Initialize the variational distribution q(beta|lambda) - var lambda = new BDM[Double](k, vocabSize, Array.fill(k * vocabSize)(0.5)) - private var Elogbeta = dirichlet_expectation(lambda) // K * V - private var expElogbeta = exp(Elogbeta) // K * V - private var i = 0 - - def update(): Unit ={ - Elogbeta = dirichlet_expectation(lambda) - expElogbeta = exp(Elogbeta) - } - - /** - * Submit a a subset (like 1%) of the corpus to the Online LDA model, and it will update - * the topic distribution adaptively for the terms appearing in the subset (minibatch). - * The documents RDD can be discarded after submitMiniBatch finished. - * - * @param documents RDD of documents, which are term (word) count vectors paired with IDs. - * The term count vectors are "bags of words" with a fixed-size vocabulary - * (where the vocabulary size is the length of the vector). - * Document IDs must be unique and >= 0. - * @return Inferred LDA model - */ - def submitMiniBatch(documents: RDD[(Long, Vector)]): Unit = { - if(documents.isEmpty()){ - return - } - - var stat = BDM.zeros[Double](k, vocabSize) - stat = documents.treeAggregate(stat)(gradient, _ += _) - update(stat, i, documents.count().toInt) - i += 1 - } - - /** - * get the topic-term distribution - */ - def getTopicDistribution(): LDAModel ={ - new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) - } - - private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ - // weight of the mini-batch. - val weight = math.pow(tau_0 + iter, -kappa) - - // This step finishes computing the sufficient statistics for the M step - val stat = raw :* expElogbeta - - // Update lambda based on documents. - lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight - Elogbeta = dirichlet_expectation(lambda) - expElogbeta = exp(Elogbeta) - } - - // for each document d update that document's gamma and phi - private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = { - val termCounts = doc._2 - val (ids, cts) = termCounts match { - case v: DenseVector => (((0 until v.size).toList), v.values) - case v: SparseVector => (v.indices.toList, v.values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) - } - - // Initialize the variational distribution q(theta|gamma) for the mini-batch - var gammad = new BDV[Double](Array.fill(k)(0.5)).t - var Elogthetad = vector_dirichlet_expectation(gammad.t).t // 1 * K - var expElogthetad = exp(Elogthetad.t).t // 1 * K - val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids - - var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids - var meanchange = 1D - val ctsVector = new BDV[Double](cts).t // 1 * ids - - // Iterate between gamma and phi until convergence - while (meanchange > 1e-5) { - val lastgamma = gammad - // 1*K 1 * ids ids * k - gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha - Elogthetad = vector_dirichlet_expectation(gammad.t).t - expElogthetad = exp(Elogthetad.t).t - phinorm = expElogthetad * expElogbetad + 1e-100 - meanchange = sum(abs((gammad - lastgamma).t)) / gammad.t.size.toDouble - } - - val m1 = expElogthetad.t.toDenseMatrix.t - val m2 = (ctsVector / phinorm).t.toDenseMatrix - val outerResult = kron(m1, m2) // K * ids - for (i <- 0 until ids.size) { - stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) - } - stat - } - - private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={ - val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) - val temp = gammaRandomGenerator.sample(row * col).toArray - (new BDM[Double](col, row, temp)).t - } - - private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = { - val rowSum = sum(alpha(breeze.linalg.*, ::)) - val digAlpha = digamma(alpha) - val digRowSum = digamma(rowSum) - val result = digAlpha(::, breeze.linalg.*) - digRowSum - result - } - - private def vector_dirichlet_expectation(v : BDV[Double]): (BDV[Double]) ={ - digamma(v) - digamma(sum(v)) - } -} - - -//specific questions: -1. use "numIterations and miniBatchFraction" or randomsplit -2. How would the stream interface fit in? \ No newline at end of file diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index dc10aa67c7c1f..fbe171b4b1ab1 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -88,7 +88,7 @@ public void distributedLDAModel() { .setMaxIterations(5) .setSeed(12345); - DistributedLDAModel model = lda.run(corpus); + DistributedLDAModel model = (DistributedLDAModel)lda.run(corpus); // Check: basic parameters LocalLDAModel localModel = model.toLocal(); diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 15de10fd13a19..e3cf8efc311ab 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -68,7 +68,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { .setSeed(12345) val corpus = sc.parallelize(tinyCorpus, 2) - val model: DistributedLDAModel = lda.run(corpus) + val model: DistributedLDAModel = lda.run(corpus).asInstanceOf[DistributedLDAModel] // Check: basic parameters val localModel = model.toLocal