Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel scan_vocab #406

Closed
wants to merge 9 commits into from
Closed

Parallel scan_vocab #406

wants to merge 9 commits into from

Conversation

fortiema
Copy link

Related to Issue #400

In Word2Vec.build_vocab, scan_vocab not supports multi-threaded workers, in the same fashion as train.

The compromise is that I had to remove prune_vocab, as its logic is much harder to parallelize. I think this feature is more important, but I guess this is highly debatable.

Each worker temp vocab is a Counter instead of a dict, which eases merging of word counts.

Also modified tests to add multiple workers and different batch sizes.

Currently passing all unit tests, but so far only tested on Linux as other OSes are not easily accessible to me. Maybe someone can do this quicker than me? That would be appreciated.

@fortiema
Copy link
Author

Apparently doc2vec had its own scan_vocab method, but didn't have a build_vocab method, making it use word2vec's scan_vocab. I guess this wasn't the intended behavior so I copied the build_vocab method over to doc2vec, making it use its own scan_vocab.

@fortiema
Copy link
Author

Apparently, most of the time the test is passing, but occasionally it fails. I'm currently investigating possible race condition preventing vocabulary to fully build.

Also, seems like Counter support in Python 2.6 is somehow different? That is not what the doc seemed to suggest, but I'll also investigate that.

@fortiema
Copy link
Author

Intermittently failing test has been fixed, all passing 100% time for 2.7-3.3-3.4 !

Now only problem remaining is support of Counters in Python 2.6, which I just realized are not supported. I can either revert to a more complex logic with the standard dict, backport Counter in gensim itself, or consider dropping support for 2.6.

I don't know how much this compatibility is needed...

"""
Build vocabulary from a sequence of sentences (can be a once-only generator stream).
Each sentence must be a list of unicode strings.

"""
self.scan_vocab(sentences) # initial survey
if isinstance(sentences, tuple):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? Is it necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new logic in place, if sentences is a generator (which it shouldn't) it won't get detected and the code will spin indefinitely. It's the best way I found to prevent it, making sure the code is passing the unit test for the generator.

If someone has a better way to deal with that, I'll be more than happy to apply it!

P.S. I also tried inspect.isgeneratorobject, but that will trigger an exception if sentences is non-indexable, just like in the unit test case. I guess I could empty-catch it, but I though this was cleaner...

@piskvorky
Copy link
Owner

Thanks @fortiema !

The changes around doc2vec will need @gojomo's sharp-eyed review.

Regarding Counter: I'm +1 on using the plain defaultdict. If I see correctly, the only operation you need is merging counts, which is pretty trivial. No need to bring in an extra backport dependency for that. Dropping 2.6 is not an option.

@fortiema
Copy link
Author

Sure thing for Counter, I'll revert to dict as soon as I have some time tomorrow morning.

@gojomo
Copy link
Collaborator

gojomo commented Jul 21, 2015

What sort of a speedup are you seeing, after these changes, on a large-corpus scan?

@fortiema
Copy link
Author

@gojomo This is also what I'm wondering, didn't have big enough corpus to bench it correctly today.

Plan to run it on 1B+ corpus tomorrow and get some real numbers, I'll be sure to share them here.

@fortiema
Copy link
Author

2.6 build also passing now. Will post training results later today.

@fortiema
Copy link
Author

@gojomo Right now, there doesn't seem to be any significant speedup. I also tried using multiprocessing instead thinking the GIL was the problem, but performance is actually worse (although cpu load is a lot more uniform across cores). I tried with various chunksize, no significant difference.

I need to think more about this and see why it doesn't scale. Any idea why?

@gojomo
Copy link
Collaborator

gojomo commented Jul 22, 2015

In the threading scenario, the GIL is probably the main issue. I don't see any big blocks of work where the GIL would be released, and thus multi-threading possible. (Threads might offer some benefit if the 'upstream' source of the token-lists were the result of some costly parsing process that for some stages could release the GIL, or maybe if the text was coming from multiple laggy IO sources, so one thread could proceed while another was awaiting input. But there the multithreading would have to be elsewhere, supporting the iterator.)

In the multiprocessing case, there's bigger communication overhead moving the data read (in the 'master' process) to workers, then back again. That serialize/deserialize 'out' (then serialize/deserialize of the totals back 'in') may be outweighing any benefit from having multiple cores tallying into separate dicts at once.

If the data source were easily/equally split among N files (or file-ranges), then spawning N independent processes to scan each part, then report back the totals, might get a noticeable speedup. It'd eliminate the larger 'out' side of the serialize/deserialize mentioned above, and also presumably move any other decompress/tokenization from the one (bottlenecked) master process to N processes (independent until final tally reporting). (Or, it might just reveal the source disk to be the next/real bottleneck.)

Still, that parallelization would require bigger assumptions about the real format of data sources, compared to the current API of "just feed us an iterator [of token lists]". So my hunch is it'd be most appropriate for such format-specific parallelism to live outside the Word2Vec class, as one or more utility classes that are specialized for the data's raw format(s) – compressed/uncompressed, one-file-or-many, etc. There could be support in Word2Vec for receiving (and merging) the summary-tallies of these external utilities.

What's your usual raw corpus format and source disk(s)?

@fortiema
Copy link
Author

Thanks for the clarification about threads/process and GIL, parallel python is new for me so still trying to get a good understanding of how everything works!

My raw corpus are single-files, 10s of GB in size and sit on normal HDDs. I agree with you on dealing with that problem outside word2vec (or even gensim), as it does not make much sense to implement that kind of input splitting logic in here.

I am also working with Hadoop, and am starting to consider using that instead to build the vocabulary, then feed it back to Word2Vec by putting together a utility class.

@piskvorky
Copy link
Owner

Ha, finally a practical use case for Hadoop's WordCount example? :)
But if you mean vanilla Hadoop, the overhead there is very large, so I doubt you'd see much speed up anyway.

And yes, threading won't help because of GIL (as discussed in the original issue #400).

A good first step could be profiling the current code (serial), to see where the bottlenecks are. There's a line profiler kernprof for Python that is very convenient for this.

It's quite possible that for such trivial tasks, (de)serialization is too slow. Then the solution would be to go compiled (Cython), which sounds pretty straightforward too. That may be even easier (and faster) than parallelization.

@fortiema
Copy link
Author

@piskvorky Thanks for the pointers, I did manage to profile scan_vocab.

Here's the result for LeeCorpus:

Total time: 0.15204 s
File: gensim/models/word2vec.py
Function: scan_vocab at line 487

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   487                                               @profile
   488                                               def scan_vocab(self, sentences, progress_per=10000):
   489                                                   """Do an initial scan of all words appearing in sentences."""
   490         1           11     11.0      0.0          logger.info("collecting all words and their counts")
   491         1            1      1.0      0.0          sentence_no = -1
   492         1            1      1.0      0.0          total_words = 0
   493         1            1      1.0      0.0          min_reduce = 1
   494         1            2      2.0      0.0          vocab = defaultdict(int)
   495       301       100749    334.7     66.3          for sentence_no, sentence in enumerate(sentences):
   496       300          175      0.6      0.1              if sentence_no % progress_per == 0:
   497         1            1      1.0      0.0                  logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types",
   498         1            8      8.0      0.0                              sentence_no, sum(itervalues(vocab)) + total_words, len(vocab))
   499     58452        20683      0.4     13.6              for word in sentence:
   500     58152        30155      0.5     19.8                  vocab[word] += 1
   501                                           
   502       300          116      0.4      0.1              if self.max_vocab_size and len(vocab) > self.max_vocab_size:
   503                                                           total_words += utils.prune_vocab(vocab, min_reduce)
   504                                                           min_reduce += 1
   505                                           
   506         1          127    127.0      0.1          total_words += sum(itervalues(vocab))
   507         1            1      1.0      0.0          logger.info("collected %i word types from a corpus of %i raw words and %i sentences",
   508         1            7      7.0      0.0                      len(vocab), total_words, sentence_no + 1)
   509         1            1      1.0      0.0          self.corpus_count = sentence_no + 1
   510         1            1      1.0      0.0          self.raw_vocab = vocab

I also ran it on one of my small 5MB gzip corpus for the sake of comparison:

Total time: 7.35724 s
File: gensim/models/word2vec.py
Function: scan_vocab at line 487

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   487                                               @profile
   488                                               def scan_vocab(self, sentences, progress_per=10000):
   489                                                   """Do an initial scan of all words appearing in sentences."""
   490         1            8      8.0      0.0          logger.info("collecting all words and their counts")
   491         1            1      1.0      0.0          sentence_no = -1
   492         1            1      1.0      0.0          total_words = 0
   493         1            1      1.0      0.0          min_reduce = 1
   494         1            1      1.0      0.0          vocab = defaultdict(int)
   495     25962      5027773    193.7     68.3          for sentence_no, sentence in enumerate(sentences):
   496     25961        16278      0.6      0.2              if sentence_no % progress_per == 0:
   497         3            3      1.0      0.0                  logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types",
   498         3          744    248.0      0.0                              sentence_no, sum(itervalues(vocab)) + total_words, len(vocab))
   499   2598443       943886      0.4     12.8              for word in sentence:
   500   2572482      1356498      0.5     18.4                  vocab[word] += 1
   501                                           
   502     25961        11465      0.4      0.2              if self.max_vocab_size and len(vocab) > self.max_vocab_size:
   503                                                           total_words += utils.prune_vocab(vocab, min_reduce)
   504                                                           min_reduce += 1
   505                                           
   506         1          567    567.0      0.0          total_words += sum(itervalues(vocab))
   507         1            2      2.0      0.0          logger.info("collected %i word types from a corpus of %i raw words and %i sentences",
   508         1            9      9.0      0.0                      len(vocab), total_words, sentence_no + 1)
   509         1            1      1.0      0.0          self.corpus_count = sentence_no + 1
   510         1            1      1.0      0.0          self.raw_vocab = vocab

Is my understanding correct in saying the bottleneck is coming from I/O (enumerate), and so using multiprocessing should indeed sidestep it and allow for greater performance (provided it is done in a way that minimizes communication overhead)?

The other thing is of course the wordcount for loop itself, is this something that is easy to cythonize?

@fortiema
Copy link
Author

@gojomo What about Doc2Vec class not overriding build_vocab and never using its own implementation of scan_vocab. Should I submit a separate PR for that, or did I miss something and this is the intended behavior?

@gojomo
Copy link
Collaborator

gojomo commented Jul 23, 2015

Yes, my interpretation of that profiling data is that most of the time is spent in the enumerate(sentences) work – which would include raw IO, decompression, and (I suspect) character-coding and splitting-on-whitespace. Doing that on (say) 8 equally-sized part files (or slightly harder, non-overlapping ranges of one file) on an 8-core machine might offer a significant speedup. Though, if the disk rather than the computation is the real blocker and all ranges are on the same disk, maybe not any speedup at all, until the ranges are moved to independent/faster disks. (Is one CPU core utilization totally pegged with the current implementation? If not, disk may already be the bottleneck.)

Even if the for-loop – looking-up-words and incrementing-their-counts – took zero time the speedup would be only 30%. There's probably some potential win there from the compactness and lesser-indirection of C/cython, but I can't guess how much. (If going down that road, you'd likely want to do the whole process, starting with the IO/encoding/splitting, in C/cython, so there's never python string/list/object overhead in the mix. The needed raw result from the scan is so simple – a list of words and their frequencies – it could be a separate tool.)

If just generally wanting a 10s-of-GB scan to go faster, my main thoughts would be: (1) SSD; (2) split to N (= number of cores) parts, ideally spread over different disks, run all parts in parallel, merge final results.

Doc2Vec shouldn't have to override build_vocab; on a Doc2Vec instance, the (inherited) build_vocab's call to scan_vocab should use the instance's own implementation. And that's the behavior I'm seeing locally – progress updates have the 'tags' count characteristic of the Doc2Vec.scan_vocab implementation. Are you sure you've seen otherwise? (And if so, might it have been the result of some transient local weirdness, a typo or class-auto-reload thwarting usual behavior?)

@fortiema
Copy link
Author

@gojomo You are right about Doc2Vec, this change was only necessary after I modified Word2Vec.scan_vocab and its signature, which wasn't matching the one in Doc2Vec anymore. My bad!

I will investigate further and try to come up with a simple and clean way to speedup vocab building. As for the wordcount loop optimization, even 20-30% speedup on a multi-day training time really doesn't sound too bad.

I'll be doing some testing on my server SSDs later, see how much performance gain can better IO provide.

Not so straightforward after all :-p

@piskvorky
Copy link
Owner

Yes, the loop is fairly easy to cythonize (unless you're on Windows, where compilation is generally weird). It's essentially the exact same python code, just compiled to C auto-magically using Cython.

I mean, if you've never used Cython before, there's some learning curve in "what file goes where, how do I write it, how do I load it", but it's really simple in this case.

And even if the Cython code is without all the robustness bells and whistles (=not ready to merge), it'd be an interesting comparison point against the pure Python version. Since the bottleneck seems to be file iteration itself, it may also be worth cythonizing your iterator (not just the word count for-loop discussed here). The lower bound here is obviously your disk speed: if your disk can read 50MB/s, and you have 5GB of disk data, then 100 seconds is the best you can do, no matter the language :)

@fortiema
Copy link
Author

Okay, I just took quick measurements with basic cython code (no crazy optimization):

Setup 1M sent. 2M sent. 10M sent.
Full Python 12.7s 24.0s 2m05.9s
Cython scan_vocab 11.8s 23.2s 1m57.4s
Cython scan_vobab & Iterator 11.0s 21.8s 1m51.0s

Not the most rigorous test there is, but we can see ~10% speedup with both scan_vocab and the iterator cythonized... Nothing to get too excited about for now.

@piskvorky
Copy link
Owner

Thanks a lot for investigating!

Yeah, 10% doesn't sound worth the extra complexity. Can you share your code so we can check as well?

@fortiema
Copy link
Author

I guess I could open up a new PR since I started from scratch on 'develop', but let me just paste the code here verbrose:

Added to word2vec_inner.pyx:

def scan_vocab(model, sentences):
    logger.info("using cythonized wordcount")
    logger.info("collecting all words and their counts")
    cdef int sentence_no = -1
    cdef int total_words = 0
    cdef int min_reduce = 1
    vocab = dict()
    for sentence_no, sentence in enumerate(sentences):
        if sentence_no % 10000 == 0:
            logger.info("PROGRESS: at sentence #%i, encountered %i unique words", sentence_no, len(vocab))

        for word in sentence:
            if word not in vocab:
                vocab[word] = ONE
            else:
                vocab[word] += ONE

        if model.max_vocab_size and len(vocab) > model.max_vocab_size:
                total_words += utils.prune_vocab(vocab, min_reduce)
                min_reduce += 1

    logger.info("PROGRESS: at sentence #%i, encountered %i unique words", sentence_no, len(vocab))
    model.corpus_count = sentence_no + 1
    model.raw_vocab = vocab

Beginning of word2vec.py now looks like this:

try:
    from gensim.models.word2vec_inner import scan_vocab, train_sentence_sg, train_sentence_cbow, FAST_VERSION
except ImportError:
    # failed... fall back to plain numpy (20-80x slower training than the above)
    FAST_VERSION = -1

    def scan_vocab(model, sentences, progress_per=10000):
        """Do an initial scan of all words appearing in sentences."""

        if FAST_VERSION < 0:
            import warnings
            warnings.warn("C extension not loaded for Word2Vec, wordcount will be slow. "
                          "Install a C compiler and reinstall gensim for fast worccount.")

        print "PYTHON VERSION"

        logger.info("collecting all words and their counts")
        sentence_no = -1
        total_words = 0
        min_reduce = 1
        vocab = defaultdict(int)
        for sentence_no, sentence in enumerate(sentences):
            if sentence_no % progress_per == 0:
                logger.info("PROGRESS: at sentence #%i, processed %i words, keeping %i word types",
                            sentence_no, sum(itervalues(vocab)) + total_words, len(vocab))
            for word in sentence:
                vocab[word] += 1

            if model.max_vocab_size and len(vocab) > model.max_vocab_size:
                total_words += utils.prune_vocab(vocab, min_reduce)
                min_reduce += 1

        total_words += sum(itervalues(vocab))
        logger.info("collected %i word types from a corpus of %i raw words and %i sentences",
                    len(vocab), total_words, sentence_no + 1)
        model.corpus_count = sentence_no + 1
        model.raw_vocab = vocab
...

@piskvorky
Copy link
Owner

Thanks again @fortiema .

How large (in MB) are these three testing corpora?

I'll try to profile too when I get some time, because Python's IO is actually pretty fast, so with cythonization added, I'd expect the performance to be closer to HW limits.

Assuming your "1M sentences" is 100 MB of data, and it takes you 11s to process, that's 9MB/s. For SSD, I think that's too slow. Even for spinning disk this is slow, considering we're reading the data off disk sequentially = the most predictable pattern imaginable.

What iterator do you use to iterate the sentences? Is it TaggedLineDocument?

Can you confirm that just iterating over the sentences (no scanning, no training) takes this long?

@fortiema
Copy link
Author

I have actually started playing with the dask library for another project, and am starting to think it would be a good fit to solve this particular problem here. I'm gonna put up a quick prototype as soon as I have some spare time this week.

Any prior objection(s) to integrate this into gensim if it were to work?

@piskvorky
Copy link
Owner

Not at all, I've had my eyes on dask / blaze for a while.

The "streaming" abstraction in gensim is very powerful, but the guys at Continuum are doing lots of great work, so why not tap into it. Similarly with a Spark bridge.

@honnibal
Copy link

honnibal commented Dec 5, 2015

@fortiema , see my code snippet in #400 : I think the big problem is actually that the counts are stored in a Python dictionary, which gets slow when it gets very large.

I'm working on proper benchmarks, over large jobs. So far my process is at 3.5bn words counted after 30 minutes. I've only just written this, so there could be bugs. But this matches my experience with doing large word counts in the past.

I'd also suggest that instead of line profiling, you could just comment out the access to the vocab, and run the test on a large job. Since we're doing something small billions of times here, and the hash table is growing very large, I tend not to trust the profiler so much. At least, not when there's an easy way to collect another data point.

@menshikh-iv
Copy link
Contributor

menshikh-iv commented Jun 13, 2017

Ping @fortiema, what status of this PR? Will you finish it soon?

@menshikh-iv
Copy link
Contributor

Connected with #1446

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty hard Hard issue: required deep gensim understanding & high python/cython skills
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants