Skip to content

Commit

Permalink
Refactor ldamulticore to serialize less data (#2300)
Browse files Browse the repository at this point in the history
* fix

* Update CHANGELOG.md

Co-authored-by: Michael Penkov <m@penkov.dev>
  • Loading branch information
horpto and mpenkov authored Sep 26, 2020
1 parent c6c24ea commit e210f73
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This release contains a major refactoring.

### :+1: Improvements

* Refactor ldamulticore to serialize less data (PR [#2300](https://github.com/RaRe-Technologies/gensim/pull/2300), __[@horpto](https://github.com/horpto)__)
* KeyedVectors & X2Vec API streamlining, consistency (PR [#2698](https://github.com/RaRe-Technologies/gensim/pull/2698), __[@gojomo](https://github.com/gojomo)__)
* No more wheels for x32 platforms (if you need x32 binaries, please build them yourself).
(__[menshikh-iv](https://github.com/menshikh-iv)__, [#6](https://github.com/RaRe-Technologies/gensim-wheels/pull/6))
Expand Down
15 changes: 9 additions & 6 deletions gensim/models/ldamulticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def process_result_queue(force=False):
self.log_perplexity(chunk, total_docs=lencorpus)

logger.info("training LDA model using %i processes", self.workers)
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue, self))
for pass_ in range(self.passes):
queue_size, reallen = [0], 0
other = LdaState(self.eta, self.state.sstats.shape)
Expand All @@ -289,7 +289,7 @@ def process_result_queue(force=False):
# put the chunk into the workers' input job queue
while True:
try:
job_queue.put((chunk_no, chunk, self), block=False)
job_queue.put((chunk_no, chunk, self.state), block=False)
queue_size[0] += 1
logger.info(
"PROGRESS: pass %i, dispatched chunk #%i = documents up to #%i/%i, "
Expand All @@ -316,7 +316,7 @@ def process_result_queue(force=False):
pool.terminate()


def worker_e_step(input_queue, result_queue):
def worker_e_step(input_queue, result_queue, worker_lda):
"""Perform E-step for each job.
Parameters
Expand All @@ -326,17 +326,20 @@ def worker_e_step(input_queue, result_queue):
responsible for processing it.
result_queue : queue of :class:`~gensim.models.ldamodel.LdaState`
After the worker finished the job, the state of the resulting (trained) worker model is appended to this queue.
worker_lda : :class:`~gensim.models.ldamulticore.LdaMulticore`
LDA instance which performed e step
"""
logger.debug("worker process entering E-step loop")
while True:
logger.debug("getting a new job")
chunk_no, chunk, worker_lda = input_queue.get()
chunk_no, chunk, w_state = input_queue.get()
logger.debug("processing chunk #%i of %i documents", chunk_no, len(chunk))
worker_lda.state = w_state
worker_lda.sync_state()
worker_lda.state.reset()
worker_lda.do_estep(chunk) # TODO: auto-tune alpha?
del chunk
logger.debug("processed chunk, queuing the result")
result_queue.put(worker_lda.state)
del worker_lda # free up some memory
worker_lda.state = None
logger.debug("result put")

0 comments on commit e210f73

Please sign in to comment.