Skip to content

Commit

Permalink
[1 of 3] Make ldamulticore more similar to distributed lda
Browse files Browse the repository at this point in the history
Communication between processes is not free due to pickling/unpickling.
So to reduce overhead we are trying to send a less data.
  • Loading branch information
horpto committed Jun 28, 2020
1 parent a74f8e3 commit 7464442
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions gensim/models/ldamulticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def process_result_queue(force=False):
self.log_perplexity(chunk, total_docs=lencorpus)

logger.info("training LDA model using %i processes", self.workers)
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue, self))
for pass_ in range(self.passes):
queue_size, reallen = [0], 0
other = LdaState(self.eta, self.state.sstats.shape)
Expand Down Expand Up @@ -316,7 +316,7 @@ def process_result_queue(force=False):
pool.terminate()


def worker_e_step(input_queue, result_queue):
def worker_e_step(input_queue, result_queue, worker_lda):
"""Perform E-step for each job.
Parameters
Expand All @@ -326,17 +326,20 @@ def worker_e_step(input_queue, result_queue):
responsible for processing it.
result_queue : queue of :class:`~gensim.models.ldamodel.LdaState`
After the worker finished the job, the state of the resulting (trained) worker model is appended to this queue.
worker_lda : :class:`~gensim.models.ldamulticore.LdaMulticore`
LDA instance which performed e step
"""
logger.debug("worker process entering E-step loop")
while True:
logger.debug("getting a new job")
chunk_no, chunk, worker_lda = input_queue.get()
chunk_no, chunk, w_state = input_queue.get()
logger.debug("processing chunk #%i of %i documents", chunk_no, len(chunk))
worker_lda.state = w_state
worker_lda.sync_state()
worker_lda.state.reset()
worker_lda.do_estep(chunk) # TODO: auto-tune alpha?
del chunk
logger.debug("processed chunk, queuing the result")
result_queue.put(worker_lda.state)
del worker_lda # free up some memory
worker_lda.state = None
logger.debug("result put")

0 comments on commit 7464442

Please sign in to comment.