Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel scan_vocab #406

Closed
wants to merge 9 commits into from
14 changes: 14 additions & 0 deletions gensim/models/doc2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,20 @@ def reset_from(self, other_model):
self.docvecs.borrow_from(other_model.docvecs)
super(Doc2Vec, self).reset_from(other_model)

def build_vocab(self, sentences, keep_raw_vocab=False):
"""
Build vocabulary from a sequence of sentences (can be a once-only generator stream).
Each sentence must be a list of unicode strings.

"""
if isinstance(sentences, tuple):
raise TypeError
if not sentences:
return
self.scan_vocab(sentences) # initial survey
self.scale_vocab(keep_raw_vocab) # trim by min_count & precalculate downsampling
self.finalize_vocab() # build tables & arrays

def scan_vocab(self, documents, progress_per=10000):
logger.info("collecting all words and their counts")
document_no = -1
Expand Down
108 changes: 88 additions & 20 deletions gensim/models/word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@
.. [3] Optimizing word2vec in gensim, http://radimrehurek.com/2013/09/word2vec-in-python-part-two-optimizing/
"""
from __future__ import division # py3 "true division"
import inspect

import logging
import sys
import os
import heapq
from timeit import default_timer
from copy import deepcopy
from collections import defaultdict
from collections import defaultdict, Counter
import threading
import time
try:
Expand Down Expand Up @@ -472,39 +473,106 @@ def create_binary_tree(self):

logger.info("built huffman tree with maximum node depth %i", max_depth)

def build_vocab(self, sentences, keep_raw_vocab=False):
def build_vocab(self, sentences, keep_raw_vocab=False, scan_size=10000):
"""
Build vocabulary from a sequence of sentences (can be a once-only generator stream).
Each sentence must be a list of unicode strings.

"""
self.scan_vocab(sentences) # initial survey
if isinstance(sentences, tuple):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? Is it necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new logic in place, if sentences is a generator (which it shouldn't) it won't get detected and the code will spin indefinitely. It's the best way I found to prevent it, making sure the code is passing the unit test for the generator.

If someone has a better way to deal with that, I'll be more than happy to apply it!

P.S. I also tried inspect.isgeneratorobject, but that will trigger an exception if sentences is non-indexable, just like in the unit test case. I guess I could empty-catch it, but I though this was cleaner...

raise TypeError
if not sentences:
return
self.scan_vocab(sentences, chunksize=scan_size) # initial survey
self.scale_vocab(keep_raw_vocab) # trim by min_count & precalculate downsampling
self.finalize_vocab() # build tables & arrays

def scan_vocab(self, sentences, progress_per=10000):
def scan_vocab(self, sentences, chunksize=10000, queue_factor=2, report_delay=1):
"""Do an initial scan of all words appearing in sentences."""
def worker_init():
work_vocab = Counter()
return work_vocab

def worker_one_job(job, inits):
items = job
if items is None: # signal to finish
return False
# train & return tally
work_vocab = self._do_scan_vocab_job(items, inits)
progress_queue.put((len(items), work_vocab.copy())) # report progress
return True

def worker_loop():
init = worker_init()
while True:
job = job_queue.get()
if not worker_one_job(job, init):
break

start, next_report = default_timer(), 1.0

# buffer ahead only a limited number of jobs.. this is the reason we can't simply use ThreadPool :(
if self.workers > 0:
job_queue = Queue(maxsize=queue_factor * self.workers)
else:
job_queue = FakeJobQueue(worker_init, worker_one_job)
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers)

workers = [threading.Thread(target=worker_loop) for _ in xrange(self.workers)]
for thread in workers:
thread.daemon = True # make interrupting the process with ctrl+c easier
thread.start()

logger.info("collecting all words and their counts")
sentence_no = -1
total_words = 0
min_reduce = 1
vocab = defaultdict(int)
for sentence_no, sentence in enumerate(sentences):
if sentence_no % progress_per == 0:
logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types",
sentence_no, sum(itervalues(vocab)) + total_words, len(vocab))
for word in sentence:
vocab[word] += 1
sent_count = 0
push_done = False
done_jobs = 0
job_no = 0
vocab = Counter()
jobs_source = enumerate(utils.grouper(sentences, chunksize))
while True:
try:
job_no, items = next(jobs_source)
logger.debug("putting job #%i in the queue", job_no)
job_queue.put(items)
except StopIteration:
logger.info(
"reached end of input; waiting to finish %i outstanding jobs",
job_no - done_jobs + 1)
for _ in xrange(self.workers):
job_queue.put(None) # give the workers heads up that they can finish -- no more work!
push_done = True
try:
while done_jobs < (job_no+1) or not push_done:
sentences, work_vocab = progress_queue.get(push_done) # only block after all jobs pushed
sent_count += sentences
vocab += work_vocab
done_jobs += 1
elapsed = default_timer() - start
if elapsed >= next_report:
logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types",
sent_count, sum(itervalues(vocab)) + total_words, len(vocab))
next_report = elapsed + report_delay # don't flood log, wait report_delay seconds
else:
# loop ended by job count; really done
break
except Empty:
pass # already out of loop; continue to next push

if self.max_vocab_size and len(vocab) > self.max_vocab_size:
total_words += utils.prune_vocab(vocab, min_reduce)
min_reduce += 1
logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types",
sent_count, sum(itervalues(vocab)) + total_words, len(vocab))

total_words += sum(itervalues(vocab))
logger.info("collected %i word types from a corpus of %i raw words and %i sentences",
len(vocab), total_words, sentence_no + 1)
self.corpus_count = sentence_no + 1
self.raw_vocab = vocab
self.corpus_count = sent_count
self.raw_vocab = dict(vocab)

def _do_scan_vocab_job(self, job, inits):
inits.clear()
for sentence in job:
for word in sentence:
inits[word] += 1
return inits

def scale_vocab(self, min_count=None, sample=None, dry_run=False, keep_raw_vocab=False):
"""
Expand Down
31 changes: 16 additions & 15 deletions gensim/test/test_word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,22 @@ def testVocab(self):
corpus = LeeCorpus()
total_words = sum(len(sentence) for sentence in corpus)

# try vocab building explicitly, using all words
model = word2vec.Word2Vec(min_count=1)
model.build_vocab(corpus)
self.assertTrue(len(model.vocab) == 6981)
# with min_count=1, we're not throwing away anything, so make sure the word counts add up to be the entire corpus
self.assertEqual(sum(v.count for v in model.vocab.values()), total_words)
# make sure the binary codes are correct
numpy.allclose(model.vocab['the'].code, [1, 1, 0, 0])

# test building vocab with default params
model = word2vec.Word2Vec()
model.build_vocab(corpus)
self.assertTrue(len(model.vocab) == 1750)
numpy.allclose(model.vocab['the'].code, [1, 1, 1, 0])
for workers in [2, 4]:
for batch_size in [25, 50]:
# try vocab building explicitly, using all words
model = word2vec.Word2Vec(min_count=1, workers=workers)
model.build_vocab(corpus, scan_size=batch_size)
self.assertTrue(len(model.vocab) == 6981)
# with min_count=1, we're not throwing away anything, so make sure the word counts add up to be the entire corpus
self.assertEqual(sum(v.count for v in model.vocab.values()), total_words)
# make sure the binary codes are correct
numpy.allclose(model.vocab['the'].code, [1, 1, 0, 0])

# test building vocab with default params
model = word2vec.Word2Vec(workers=workers)
model.build_vocab(corpus, scan_size=batch_size)
self.assertTrue(len(model.vocab) == 1750)
numpy.allclose(model.vocab['the'].code, [1, 1, 1, 0])

# no input => "RuntimeError: you must first build vocabulary before training the model"
self.assertRaises(RuntimeError, word2vec.Word2Vec, [])
Expand Down Expand Up @@ -155,7 +157,6 @@ def testTraining(self):
model2 = word2vec.Word2Vec(sentences, size=2, min_count=1)
self.models_equal(model, model2)


def testLocking(self):
"""Test word2vec training doesn't change locked vectors."""
corpus = LeeCorpus()
Expand Down