From d8dc8dc4bde56f63d8b1eacec3f3d4d68cc51894 Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Thu, 29 Sep 2016 17:12:50 -0500 Subject: [PATCH] revert queue things :( --- .../Queue/Console/DaemonCommand.php | 192 ------------------ src/Illuminate/Queue/Console/WorkCommand.php | 157 ++++++++++++-- src/Illuminate/Queue/QueueServiceProvider.php | 10 +- src/Illuminate/Queue/Worker.php | 24 --- 4 files changed, 144 insertions(+), 239 deletions(-) delete mode 100755 src/Illuminate/Queue/Console/DaemonCommand.php diff --git a/src/Illuminate/Queue/Console/DaemonCommand.php b/src/Illuminate/Queue/Console/DaemonCommand.php deleted file mode 100755 index 3b4afc5acba7..000000000000 --- a/src/Illuminate/Queue/Console/DaemonCommand.php +++ /dev/null @@ -1,192 +0,0 @@ -worker = $worker; - } - - /** - * Execute the console command. - * - * @return void - */ - public function fire() - { - if ($this->downForMaintenance() && $this->option('once')) { - return $this->worker->sleep($this->option('sleep')); - } - - // We'll listen to the processed and failed events so we can write information - // to the console as jobs are processed, which will let the developer watch - // which jobs are coming through a queue and be informed on its progress. - $this->listenForEvents(); - - $connection = $this->argument('connection') - ?: $this->laravel['config']['queue.default']; - - // We need to get the right queue for the connection which is set in the queue - // configuration file for the application. We will pull it based on the set - // connection being run for the queue operation currently being executed. - $queue = $this->getQueue($connection); - - $response = $this->runWorker( - $connection, $queue - ); - } - - /** - * Run the worker instance. - * - * @param string $connection - * @param string $queue - * @return array - */ - protected function runWorker($connection, $queue) - { - $this->worker->setCache($this->laravel['cache']->driver()); - - return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( - $connection, $queue, $this->gatherWorkerOptions() - ); - } - - /** - * Gather all of the queue worker options as a single object. - * - * @return \Illuminate\Queue\WorkerOptions - */ - protected function gatherWorkerOptions() - { - return new WorkerOptions( - $this->option('delay'), $this->option('memory'), - $this->option('timeout'), $this->option('sleep'), - $this->option('tries') - ); - } - - /** - * Listen for the queue events in order to update the console output. - * - * @return void - */ - protected function listenForEvents() - { - $this->laravel['events']->listen('illuminate.queue.looping', function () { - $this->output->writeln('.'); - }); - - $this->laravel['events']->listen(JobProcessed::class, function ($event) { - $this->writeOutput($event->job, false); - }); - - $this->laravel['events']->listen(JobFailed::class, function ($event) { - $this->writeOutput($event->job, true); - - $this->logFailedJob($event); - }); - } - - /** - * Write the status output for the queue worker. - * - * @param \Illuminate\Contracts\Queue\Job $job - * @param bool $failed - * @return void - */ - protected function writeOutput(Job $job, $failed) - { - if ($failed) { - $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$job->resolveName()); - } else { - $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$job->resolveName()); - } - } - - /** - * Store a failed job event. - * - * @param JobFailed $event - * @return void - */ - protected function logFailedJob(JobFailed $event) - { - $this->laravel['queue.failer']->log( - $event->connectionName, $event->job->getQueue(), - $event->job->getRawBody(), $event->exception - ); - } - - /** - * Get the queue name for the worker. - * - * @param string $connection - * @return string - */ - protected function getQueue($connection) - { - return $this->option('queue') ?: $this->laravel['config']->get( - "queue.connections.{$connection}.queue", 'default' - ); - } - - /** - * Determine if the worker should run in maintenance mode. - * - * @return bool - */ - protected function downForMaintenance() - { - return $this->option('force') ? false : $this->laravel->isDownForMaintenance(); - } -} diff --git a/src/Illuminate/Queue/Console/WorkCommand.php b/src/Illuminate/Queue/Console/WorkCommand.php index 1e3c12dbc076..2d4a5b8cecc1 100644 --- a/src/Illuminate/Queue/Console/WorkCommand.php +++ b/src/Illuminate/Queue/Console/WorkCommand.php @@ -2,8 +2,13 @@ namespace Illuminate\Queue\Console; +use Carbon\Carbon; +use Illuminate\Queue\Worker; use Illuminate\Console\Command; -use Symfony\Component\Process\Process; +use Illuminate\Queue\WorkerOptions; +use Illuminate\Contracts\Queue\Job; +use Illuminate\Queue\Events\JobFailed; +use Illuminate\Queue\Events\JobProcessed; class WorkCommand extends Command { @@ -12,7 +17,7 @@ class WorkCommand extends Command * * @var string */ - protected $signature = 'queue:work + protected $signature = 'queue:work {connection? : The name of connection} {--queue= : The queue to listen on} {--daemon : Run the worker in daemon mode (Deprecated)} @@ -29,7 +34,27 @@ class WorkCommand extends Command * * @var string */ - protected $description = 'Start processing jobs from the queue'; + protected $description = 'Start processing jobs on the queue as a daemon'; + + /** + * The queue worker instance. + * + * @var \Illuminate\Queue\Worker + */ + protected $worker; + + /** + * Create a new queue listen command. + * + * @param \Illuminate\Queue\Worker $worker + * @return void + */ + public function __construct(Worker $worker) + { + parent::__construct(); + + $this->worker = $worker; + } /** * Execute the console command. @@ -38,26 +63,126 @@ class WorkCommand extends Command */ public function fire() { - $process = $this->newProxyProcess(); + if ($this->downForMaintenance() && $this->option('once')) { + return $this->worker->sleep($this->option('sleep')); + } + + // We'll listen to the processed and failed events so we can write information + // to the console as jobs are processed, which will let the developer watch + // which jobs are coming through a queue and be informed on its progress. + $this->listenForEvents(); + + $connection = $this->argument('connection') + ?: $this->laravel['config']['queue.default']; + + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queue = $this->getQueue($connection); + + $response = $this->runWorker( + $connection, $queue + ); + } + + /** + * Run the worker instance. + * + * @param string $connection + * @param string $queue + * @return array + */ + protected function runWorker($connection, $queue) + { + $this->worker->setCache($this->laravel['cache']->driver()); + + return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( + $connection, $queue, $this->gatherWorkerOptions() + ); + } + + /** + * Gather all of the queue worker options as a single object. + * + * @return \Illuminate\Queue\WorkerOptions + */ + protected function gatherWorkerOptions() + { + return new WorkerOptions( + $this->option('delay'), $this->option('memory'), + $this->option('timeout'), $this->option('sleep'), + $this->option('tries') + ); + } + + /** + * Listen for the queue events in order to update the console output. + * + * @return void + */ + protected function listenForEvents() + { + $this->laravel['events']->listen(JobProcessed::class, function ($event) { + $this->writeOutput($event->job, false); + }); + + $this->laravel['events']->listen(JobFailed::class, function ($event) { + $this->writeOutput($event->job, true); + + $this->logFailedJob($event); + }); + } + + /** + * Write the status output for the queue worker. + * + * @param \Illuminate\Contracts\Queue\Job $job + * @param bool $failed + * @return void + */ + protected function writeOutput(Job $job, $failed) + { + if ($failed) { + $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$job->resolveName()); + } else { + $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$job->resolveName()); + } + } - exit($process->run(function ($type, $line) { - if (trim($line) !== '.') { - $this->output->write($line); - } - })); + /** + * Store a failed job event. + * + * @param JobFailed $event + * @return void + */ + protected function logFailedJob(JobFailed $event) + { + $this->laravel['queue.failer']->log( + $event->connectionName, $event->job->getQueue(), + $event->job->getRawBody(), $event->exception + ); } /** - * Get a new proxy process to the daemon command. + * Get the queue name for the worker. * - * @return Process + * @param string $connection + * @return string */ - protected function newProxyProcess() + protected function getQueue($connection) { - $_SERVER['argv'][1] = 'queue:daemon'; + return $this->option('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } - return (new Process(PHP_BINARY.' '.implode(' ', $_SERVER['argv']), getcwd())) - ->setTimeout(null) - ->setIdleTimeout($this->option('timeout') + $this->option('sleep')); + /** + * Determine if the worker should run in maintenance mode. + * + * @return bool + */ + protected function downForMaintenance() + { + return $this->option('force') ? false : $this->laravel->isDownForMaintenance(); } } diff --git a/src/Illuminate/Queue/QueueServiceProvider.php b/src/Illuminate/Queue/QueueServiceProvider.php index 20b446879b24..232abc240b9f 100755 --- a/src/Illuminate/Queue/QueueServiceProvider.php +++ b/src/Illuminate/Queue/QueueServiceProvider.php @@ -90,15 +90,11 @@ protected function registerWorker() */ protected function registerWorkCommand() { - $this->app->singleton('command.queue.work', function () { - return new WorkCommand; + $this->app->singleton('command.queue.work', function ($app) { + return new WorkCommand($app['queue.worker']); }); - $this->app->singleton('command.queue.daemon', function ($app) { - return new DaemonCommand($app['queue.worker']); - }); - - $this->commands('command.queue.work', 'command.queue.daemon'); + $this->commands('command.queue.work'); } /** diff --git a/src/Illuminate/Queue/Worker.php b/src/Illuminate/Queue/Worker.php index ad1f3c275d94..18f5de854c41 100644 --- a/src/Illuminate/Queue/Worker.php +++ b/src/Illuminate/Queue/Worker.php @@ -103,30 +103,6 @@ protected function daemonShouldRun() return true; } - /** - * Wait for the given child process to finish. - * - * @param int $processId - * @param int $timeout - * @return void - */ - protected function waitForChildProcess($processId, $timeout) - { - declare(ticks=1) { - pcntl_signal(SIGALRM, function () use ($processId, $timeout) { - posix_kill($processId, SIGKILL); - - $this->exceptions->report(new TimeoutException("Queue child process timed out after {$timeout} seconds.")); - }, true); - - pcntl_alarm($timeout); - - pcntl_waitpid($processId, $status); - - pcntl_alarm(0); - } - } - /** * Process the next job on the queue. *