Skip to content

Commit

Permalink
Maintain task order for fast-finishing/blocking tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jun 20, 2017
1 parent 998a255 commit 29a1d1b
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions lib/Worker/AbstractWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ public function __construct(Strand $strand) {

$deferred = $this->jobQueue[$id];
unset($this->jobQueue[$id]);
$empty = empty($this->jobQueue);

if (!empty($this->jobQueue)) {
$deferred->resolve($data->promise());

if (!$empty) {
$this->context->receive()->onResolve($this->onResolve);
}

$deferred->resolve($data->promise());
};
}

Expand Down Expand Up @@ -108,13 +109,16 @@ public function enqueue(Task $task): Promise {
* @throws \Amp\Parallel\Worker\WorkerException
*/
private function doEnqueue(Task $task): \Generator {
if (empty($this->jobQueue)) {
$this->context->receive()->onResolve($this->onResolve);
}
$empty = empty($this->jobQueue);

try {
$job = new Internal\Job($task);
$this->jobQueue[$job->getId()] = $deferred = new Deferred;

if ($empty) {
$this->context->receive()->onResolve($this->onResolve);
}

yield $this->context->send($job);
} catch (\Throwable $exception) {
$exception = new WorkerException("Sending the task to the worker failed", $exception);
Expand Down

0 comments on commit 29a1d1b

Please sign in to comment.