diff --git a/gensim/models/doc2vec.py b/gensim/models/doc2vec.py index 1b143d92ad..5f085452de 100644 --- a/gensim/models/doc2vec.py +++ b/gensim/models/doc2vec.py @@ -597,6 +597,20 @@ def reset_from(self, other_model): self.docvecs.borrow_from(other_model.docvecs) super(Doc2Vec, self).reset_from(other_model) + def build_vocab(self, sentences, keep_raw_vocab=False): + """ + Build vocabulary from a sequence of sentences (can be a once-only generator stream). + Each sentence must be a list of unicode strings. + + """ + if isinstance(sentences, tuple): + raise TypeError + if not sentences: + return + self.scan_vocab(sentences) # initial survey + self.scale_vocab(keep_raw_vocab) # trim by min_count & precalculate downsampling + self.finalize_vocab() # build tables & arrays + def scan_vocab(self, documents, progress_per=10000): logger.info("collecting all words and their counts") document_no = -1 diff --git a/gensim/models/word2vec.py b/gensim/models/word2vec.py index 1f0a0cbc44..c2a6310301 100755 --- a/gensim/models/word2vec.py +++ b/gensim/models/word2vec.py @@ -472,40 +472,108 @@ def create_binary_tree(self): logger.info("built huffman tree with maximum node depth %i", max_depth) - def build_vocab(self, sentences, keep_raw_vocab=False): + def build_vocab(self, sentences, keep_raw_vocab=False, scan_size=10000): """ Build vocabulary from a sequence of sentences (can be a once-only generator stream). Each sentence must be a list of unicode strings. """ - self.scan_vocab(sentences) # initial survey + if isinstance(sentences, tuple): + raise TypeError + if not sentences: + return + self.scan_vocab(sentences, chunksize=scan_size) # initial survey self.scale_vocab(keep_raw_vocab) # trim by min_count & precalculate downsampling self.finalize_vocab() # build tables & arrays - def scan_vocab(self, sentences, progress_per=10000): + def scan_vocab(self, sentences, chunksize=10000, queue_factor=2, report_delay=1): """Do an initial scan of all words appearing in sentences.""" + def worker_init(): + work_vocab = defaultdict(int) + return work_vocab + + def worker_one_job(job, inits): + items = job + if items is None: # signal to finish + return False + # train & return tally + work_vocab = self._do_scan_vocab_job(items, inits) + progress_queue.put((len(items), work_vocab.copy())) # report progress + return True + + def worker_loop(): + init = worker_init() + while True: + job = job_queue.get() + if not worker_one_job(job, init): + break + + start, next_report = default_timer(), 1.0 + + # buffer ahead only a limited number of jobs.. this is the reason we can't simply use ThreadPool :( + if self.workers > 0: + job_queue = Queue(maxsize=queue_factor * self.workers) + else: + job_queue = FakeJobQueue(worker_init, worker_one_job) + progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers) + + workers = [threading.Thread(target=worker_loop) for _ in xrange(self.workers)] + for thread in workers: + thread.daemon = True # make interrupting the process with ctrl+c easier + thread.start() + logger.info("collecting all words and their counts") - sentence_no = -1 total_words = 0 - min_reduce = 1 + sent_count = 0 + push_done = False + done_jobs = 0 + job_no = 0 vocab = defaultdict(int) - for sentence_no, sentence in enumerate(sentences): - if sentence_no % progress_per == 0: - logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types", - sentence_no, sum(itervalues(vocab)) + total_words, len(vocab)) - for word in sentence: - vocab[word] += 1 + jobs_source = enumerate(utils.grouper(sentences, chunksize)) + while True: + try: + job_no, items = next(jobs_source) + logger.debug("putting job #%i in the queue", job_no) + job_queue.put(items) + except StopIteration: + logger.info( + "reached end of input; waiting to finish %i outstanding jobs", + job_no - done_jobs + 1) + for _ in xrange(self.workers): + job_queue.put(None) # give the workers heads up that they can finish -- no more work! + push_done = True + try: + while done_jobs < (job_no+1) or not push_done: + sentences, work_vocab = progress_queue.get(push_done) # only block after all jobs pushed + sent_count += sentences + for k in work_vocab: + vocab[k] += work_vocab[k] + done_jobs += 1 + elapsed = default_timer() - start + if elapsed >= next_report: + logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types", + sent_count, sum(itervalues(vocab)) + total_words, len(vocab)) + next_report = elapsed + report_delay # don't flood log, wait report_delay seconds + else: + # loop ended by job count; really done + break + except Empty: + pass # already out of loop; continue to next push - if self.max_vocab_size and len(vocab) > self.max_vocab_size: - total_words += utils.prune_vocab(vocab, min_reduce) - min_reduce += 1 + logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types", + sent_count, sum(itervalues(vocab)) + total_words, len(vocab)) total_words += sum(itervalues(vocab)) - logger.info("collected %i word types from a corpus of %i raw words and %i sentences", - len(vocab), total_words, sentence_no + 1) - self.corpus_count = sentence_no + 1 + self.corpus_count = sent_count self.raw_vocab = vocab + def _do_scan_vocab_job(self, job, inits): + inits.clear() + for sentence in job: + for word in sentence: + inits[word] += 1 + return inits + def scale_vocab(self, min_count=None, sample=None, dry_run=False, keep_raw_vocab=False): """ Apply vocabulary settings for `min_count` (discarding less-frequent words) diff --git a/gensim/test/test_word2vec.py b/gensim/test/test_word2vec.py index d955def5b8..561f4bee71 100644 --- a/gensim/test/test_word2vec.py +++ b/gensim/test/test_word2vec.py @@ -111,20 +111,22 @@ def testVocab(self): corpus = LeeCorpus() total_words = sum(len(sentence) for sentence in corpus) - # try vocab building explicitly, using all words - model = word2vec.Word2Vec(min_count=1) - model.build_vocab(corpus) - self.assertTrue(len(model.vocab) == 6981) - # with min_count=1, we're not throwing away anything, so make sure the word counts add up to be the entire corpus - self.assertEqual(sum(v.count for v in model.vocab.values()), total_words) - # make sure the binary codes are correct - numpy.allclose(model.vocab['the'].code, [1, 1, 0, 0]) - - # test building vocab with default params - model = word2vec.Word2Vec() - model.build_vocab(corpus) - self.assertTrue(len(model.vocab) == 1750) - numpy.allclose(model.vocab['the'].code, [1, 1, 1, 0]) + for workers in [2, 4]: + for batch_size in [25, 50]: + # try vocab building explicitly, using all words + model = word2vec.Word2Vec(min_count=1, workers=workers) + model.build_vocab(corpus, scan_size=batch_size) + self.assertTrue(len(model.vocab) == 6981) + # with min_count=1, we're not throwing away anything, so make sure the word counts add up to be the entire corpus + self.assertEqual(sum(v.count for v in model.vocab.values()), total_words) + # make sure the binary codes are correct + numpy.allclose(model.vocab['the'].code, [1, 1, 0, 0]) + + # test building vocab with default params + model = word2vec.Word2Vec(workers=workers) + model.build_vocab(corpus, scan_size=batch_size) + self.assertTrue(len(model.vocab) == 1750) + numpy.allclose(model.vocab['the'].code, [1, 1, 1, 0]) # no input => "RuntimeError: you must first build vocabulary before training the model" self.assertRaises(RuntimeError, word2vec.Word2Vec, []) @@ -155,7 +157,6 @@ def testTraining(self): model2 = word2vec.Word2Vec(sentences, size=2, min_count=1) self.models_equal(model, model2) - def testLocking(self): """Test word2vec training doesn't change locked vectors.""" corpus = LeeCorpus()