Skip to content

Commit

Permalink
Merge pull request piskvorky#2931 from lunastera/w2v_fix_jobqueue-info
Browse files Browse the repository at this point in the history
Clear up job queue parameters in word2vec
  • Loading branch information
piskvorky authored Sep 8, 2020
2 parents 6e0d00b + 63f977a commit 9cd72f5
Showing 1 changed file with 15 additions and 34 deletions.
49 changes: 15 additions & 34 deletions gensim/models/word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,10 @@ def _worker_loop(self, job_queue, progress_queue):
Parameters
----------
job_queue : Queue of (list of objects, (str, int))
job_queue : Queue of (list of objects, float)
A queue of jobs still to be processed. The worker will take up jobs from this queue.
Each job is represented by a tuple where the first element is the corpus chunk to be processed and
the second is the dictionary of parameters.
the second is the floating-point learning rate.
progress_queue : Queue of (int, int, int)
A queue of progress reports. Each report is represented as a tuple of these 3 elements:
* Size of data chunk processed, for example number of sentences in the corpus chunk.
Expand All @@ -1064,12 +1064,12 @@ def _worker_loop(self, job_queue, progress_queue):
if job is None:
progress_queue.put(None)
break # no more jobs => quit this worker
data_iterable, job_parameters = job
data_iterable, alpha = job

for callback in callbacks:
callback.on_batch_begin(self)

tally, raw_tally = self._do_train_job(data_iterable, job_parameters, thread_private_mem)
tally, raw_tally = self._do_train_job(data_iterable, alpha, thread_private_mem)

for callback in callbacks:
callback.on_batch_end(self)
Expand All @@ -1088,10 +1088,10 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
----------
data_iterator : iterable of list of objects
The input dataset. This will be split in chunks and these chunks will be pushed to the queue.
job_queue : Queue of (list of object, dict of (str, int))
job_queue : Queue of (list of object, float)
A queue of jobs still to be processed. The worker will take up jobs from this queue.
Each job is represented by a tuple where the first element is the corpus chunk to be processed and
the second is the dictionary of parameters.
the second is the floating-point learning rate.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand All @@ -1105,7 +1105,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
"""
job_batch, batch_size = [], 0
pushed_words, pushed_examples = 0, 0
next_job_params = self._get_job_params(cur_epoch)
next_alpha = self._get_next_alpha(0.0, cur_epoch)
job_no = 0

for data_idx, data in enumerate(data_iterator):
Expand All @@ -1118,7 +1118,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
batch_size += data_length
else:
job_no += 1
job_queue.put((job_batch, next_job_params))
job_queue.put((job_batch, next_alpha))

# update the learning rate for the next job
if total_examples:
Expand All @@ -1129,14 +1129,14 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
# words-based decay
pushed_words += self._raw_word_count(job_batch)
epoch_progress = 1.0 * pushed_words / total_words
next_job_params = self._update_job_params(next_job_params, epoch_progress, cur_epoch)
next_alpha = self._get_next_alpha(epoch_progress, cur_epoch)

# add the sentence that didn't fit as the first item of a new job
job_batch, batch_size = [data], data_length
# add the last job too (may be significantly smaller than batch_words)
if job_batch:
job_no += 1
job_queue.put((job_batch, next_job_params))
job_queue.put((job_batch, next_alpha))

if job_no == 0 and self.train_count == 0:
logger.warning(
Expand All @@ -1160,10 +1160,10 @@ def _log_epoch_progress(self, progress_queue=None, job_queue=None, cur_epoch=0,
* size of data chunk processed, for example number of sentences in the corpus chunk.
* Effective word count used in training (after ignoring unknown words and trimming the sentence length).
* Total word count used in training.
job_queue : Queue of (list of object, dict of (str, int))
job_queue : Queue of (list of object, float)
A queue of jobs still to be processed. The worker will take up jobs from this queue.
Each job is represented by a tuple where the first element is the corpus chunk to be processed and
the second is the dictionary of parameters.
the second is the floating-point learning rate.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand Down Expand Up @@ -1342,30 +1342,11 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None, total_wo

return trained_word_count, raw_word_count, job_tally

def _get_job_params(self, cur_epoch):
"""Get the learning rate used in the current epoch.
Parameters
----------
cur_epoch : int
Current iteration through the corpus
Returns
-------
float
The learning rate for this epoch (it is linearly reduced with epochs from `self.alpha` to `self.min_alpha`).
"""
alpha = self.alpha - ((self.alpha - self.min_alpha) * float(cur_epoch) / self.epochs)
return alpha

def _update_job_params(self, job_params, epoch_progress, cur_epoch):
def _get_next_alpha(self, epoch_progress, cur_epoch):
"""Get the correct learning rate for the next iteration.
Parameters
----------
job_params : dict of (str, obj)
UNUSED.
epoch_progress : float
Ratio of finished work in the current epoch.
cur_epoch : int
Expand Down Expand Up @@ -1476,9 +1457,9 @@ def _log_progress(self, job_queue, progress_queue, cur_epoch, example_count, tot
Parameters
----------
job_queue : Queue of (list of object, dict of (str, float))
job_queue : Queue of (list of object, float)
The queue of jobs still to be performed by workers. Each job is represented as a tuple containing
the batch of data to be processed and the parameters to be used for the processing as a dict.
the batch of data to be processed and the floating-point learning rate.
progress_queue : Queue of (int, int, int)
A queue of progress reports. Each report is represented as a tuple of these 3 elements:
* size of data chunk processed, for example number of sentences in the corpus chunk.
Expand Down

0 comments on commit 9cd72f5

Please sign in to comment.