Skip to content

Commit

Permalink
[11.x] Improve performance of Redis queue block_for when a worker has…
Browse files Browse the repository at this point in the history
… multiple queues to service (#52826)

* Improve performance of Redis queue block_for when a worker has multiple queues to service

* Default block on popJobCallback

* Fix style

* Fix style again

* fix: Encapsulate blocking determination in RedisQueue

* fix: Style
  • Loading branch information
michael-scinocca authored Oct 8, 2024
1 parent 2599bc4 commit a3d4286
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
23 changes: 21 additions & 2 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue
*/
protected $migrationBatchSize = -1;

/**
* Indicates if a secondary queue had a job available between checks of the primary queue.
*
* Only applicable when monitoring multiple named queues with a single instance.
*
* @var bool
*/
protected $secondaryQueueHadJob = false;

/**
* Create a new Redis queue instance.
*
Expand Down Expand Up @@ -221,13 +230,23 @@ protected function createPayloadArray($job, $queue, $data = '')
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
public function pop($queue = null, $index = 0)
{
$this->migrate($prefixed = $this->getQueue($queue));

[$job, $reserved] = $this->retrieveNextJob($prefixed);
$block = ! $this->secondaryQueueHadJob && $index == 0;

[$job, $reserved] = $this->retrieveNextJob($prefixed, $block);

if ($index == 0) {
$this->secondaryQueueHadJob = false;
}

if ($reserved) {
if ($index > 0) {
$this->secondaryQueueHadJob = true;
}

return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
Expand Down
8 changes: 4 additions & 4 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
*/
protected function getNextJob($connection, $queue)
{
$popJobCallback = function ($queue) use ($connection) {
return $connection->pop($queue);
$popJobCallback = function ($queue, $index = 0) use ($connection) {
return $connection->pop($queue, $index);
};

$this->raiseBeforeJobPopEvent($connection->getConnectionName());
Expand All @@ -360,8 +360,8 @@ protected function getNextJob($connection, $queue)
);
}

foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $popJobCallback($queue))) {
foreach (explode(',', $queue) as $index => $queue) {
if (! is_null($job = $popJobCallback($queue, $index))) {
$this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);

return $job;
Expand Down

0 comments on commit a3d4286

Please sign in to comment.