From 913bcab85fd78ba63014756a0cf2c5643da5fbe8 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 21 Aug 2017 14:03:36 +0300 Subject: [PATCH 1/2] fix pheanstalk redelivered, receive --- pkg/pheanstalk/PheanstalkConsumer.php | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/pheanstalk/PheanstalkConsumer.php b/pkg/pheanstalk/PheanstalkConsumer.php index 4bcc7f4d2..025b9f5d8 100644 --- a/pkg/pheanstalk/PheanstalkConsumer.php +++ b/pkg/pheanstalk/PheanstalkConsumer.php @@ -47,8 +47,16 @@ public function getQueue() */ public function receive($timeout = 0) { - if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), $timeout / 1000)) { - return $this->convertJobToMessage($job); + if ($timeout === 0) { + while (true) { + if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), 1000)) { + return $this->convertJobToMessage($job); + } + } + } else { + if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), $timeout / 1000)) { + return $this->convertJobToMessage($job); + } } } @@ -101,7 +109,10 @@ public function reject(PsrMessage $message, $requeue = false) */ private function convertJobToMessage(Job $job) { + $stats = $this->pheanstalk->statsJob($job); + $message = PheanstalkMessage::jsonUnserialize($job->getData()); + $message->setRedelivered($stats['reserves'] > 1); $message->setJob($job); return $message; From fce71d1b1491ce9fef9aac957e6f8b27a99c2fcf Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 21 Aug 2017 14:27:51 +0300 Subject: [PATCH 2/2] fix pheanstalk redelivered, receive --- pkg/pheanstalk/PheanstalkConsumer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pheanstalk/PheanstalkConsumer.php b/pkg/pheanstalk/PheanstalkConsumer.php index 025b9f5d8..93f24728a 100644 --- a/pkg/pheanstalk/PheanstalkConsumer.php +++ b/pkg/pheanstalk/PheanstalkConsumer.php @@ -49,7 +49,7 @@ public function receive($timeout = 0) { if ($timeout === 0) { while (true) { - if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), 1000)) { + if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), 5)) { return $this->convertJobToMessage($job); } }