Implements a prototype for running gensim
Doc2Vec
on Spark. Only PV-DBOW
with negative sampling is implemented.
This work is inspired by:
https://github.com/dirkneumann/deepdist
https://github.com/klb3713/sentence2ve
Most ML models in Spark
's MLLib
only palatalizes training process, while keeping model parameters in driver program and broadcast to workers. This works for models which themselves are small enough to hold in memory on a single node, while training set can be large and has to be parallelized as RDD
s. However, Doc2Vec
models are not of this category - number of model parameters is linear to number of points in dataset.
The goal of Doc2Vec
is to learn vector representation of each document in training set. For example, a dataset of 10 million documents and vector size 300, requires 300,000,000 floating number parameters (or a 300x1000,000 array). Fortunately, each data point (a.ka. sentence or document) only updates its corresponding row in the weights matrix during training process, therefore, it's possible to parallelize the model by zipping its parameters with training dataset: each partition only holds the parameters relevant to its own share of data.
gensim
is used as a basis for this setup, training for sentence vectors are adapted to work on RDD
s.
When training Doc2Vec
in PV-DBOW
model and using negative sampling, three numpy
arrays are of interests in gensim
, the fully captures the model state:
model.syn0
model.syn1neg
model.docvecs.doctag_syn0
In our implementation, we keep syn0
and syn1neg
centralized, as they are of limited size (size of total vocabulary). doctag_syn0
is held as RDD (each partition holds a single numpy
array for it). Word2Vec
model is broadcasted to all partitions.
In each training iteration, the following happens:
- On each partition,
Cython
andBLAS
powered proceduretrain_document_dbow
fromgensim.models.doc2vec_inner
is called, and trains word vectors, document vector and hidden layer weights jointly - We record triplet (
syn0
deltas,syn1neg
deltas,doctag_syn0
) and produce a new RDD with each partition holding a single triplet; and this new RDD is cached (as it will be used twice) - We aggregate all deltas through Spark's
RDD.aggregate
api, to sum all deltas, then apply deltas tomodel
object in driver program - Previous generation of model broadcased is unpersisted, new model is broadcasted to all executors (runs actual training as
aggregate
is a Spark action) - Create new inputs from new RDD in step 4, training will not be re-run as we have cached results, then we invalid stale triplet RDD from previous iteration
By tweaking num_partitions
and num_iterations
, we can balance the trade-off between accuracy, speed and network overhead.
Cornell Movie Reviews Dataset is used to test the approach out. Model is trained on 5 partitions and 20 iterations, and we were able to classify movie reviews labels with about 11% error rate only from docvectors, with balanced false negative and postie rate:
*** Error Rate: 0.107995 ***
*** False Positive Rate: 0.107799 ***
*** False Negative Rate: 0.108191 ***
The following shell script downloads movie review data and uploads it to HDFS
wget http://www.cs.cornell.edu/people/pabo/movie-review-data/review_polarity.tar.gz
tar xzvf review_polarity.tar.gz
cd txt_sentoken/
cat pos/*.txt > positive.txt
cat neg/*.txt > negative.txt
hadoop fs -mkdir -p /movie_review/positive
hadoop fs -mkdir -p /movie_review/negative
hadoop fs -put positive.txt /movie_review/positive/
hadoop fs -put negative.txt /movie_review/negative/
The following command submits testing script moview_review.py
$SPARK_HOME/bin/spark-submit --verbose \
--master yarn
--deploy-mode client
--py-files ddoc2vecf.py
movie_review.py