Skip to content

Commit

Permalink
Merge pull request #165 from php-enqueue/queue-consumer-timeout
Browse files Browse the repository at this point in the history
[consumption] adjust receive and idle timeouts
  • Loading branch information
makasim authored Aug 9, 2017
2 parents 248cdaf + 0fda676 commit 32f2a27
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
22 changes: 15 additions & 7 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,31 @@ class QueueConsumer
private $boundProcessors;

/**
* @var int
* @var int|float in milliseconds
*/
private $idleMicroseconds;
private $idleTimeout;

/**
* @var int|float in milliseconds
*/
private $receiveTimeout;

/**
* @param PsrContext $psrContext
* @param ExtensionInterface|ChainExtension|null $extension
* @param int $idleMicroseconds 100ms by default
* @param int|float $idleTimeout the time in milliseconds queue consumer waits if no message received
* @param int|float $receiveTimeout the time in milliseconds queue consumer waits for a message (10 ms by default)
*/
public function __construct(
PsrContext $psrContext,
ExtensionInterface $extension = null,
$idleMicroseconds = 100000
$idleTimeout = 0,
$receiveTimeout = 10000
) {
$this->psrContext = $psrContext;
$this->extension = $extension;
$this->idleMicroseconds = $idleMicroseconds;
$this->idleTimeout = $idleTimeout;
$this->receiveTimeout = $receiveTimeout;

$this->boundProcessors = [];
}
Expand Down Expand Up @@ -181,7 +189,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
throw new ConsumptionInterruptedException();
}

if ($message = $consumer->receive($timeout = 5000)) {
if ($message = $consumer->receive($this->receiveTimeout)) {
$logger->info('Message received from the queue: '.$context->getPsrQueue()->getQueueName());
$logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]);
$logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]);
Expand Down Expand Up @@ -215,7 +223,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context)

$extension->onPostReceived($context);
} else {
usleep($this->idleMicroseconds);
usleep($this->idleTimeout * 1000);
$extension->onIdle($context);
}

Expand Down
31 changes: 31 additions & 0 deletions pkg/enqueue/Tests/Consumption/QueueConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,37 @@ public function testShouldReturnSelfOnBind()
$this->assertSame($consumer, $consumer->bind(new NullQueue('aQueueName'), $processorMock));
}

public function testShouldSubscribeToGivenQueueWithExpectedTimeout()
{
$expectedQueue = new NullQueue('theQueueName');

$messageConsumerMock = $this->createMock(PsrConsumer::class);
$messageConsumerMock
->expects($this->once())
->method('receive')
->with(12345)
->willReturn(null)
;

$contextMock = $this->createMock(PsrContext::class);
$contextMock
->expects($this->once())
->method('createConsumer')
->with($this->identicalTo($expectedQueue))
->willReturn($messageConsumerMock)
;

$processorMock = $this->createProcessorMock();
$processorMock
->expects($this->never())
->method('process')
;

$queueConsumer = new QueueConsumer($contextMock, new BreakCycleExtension(1), 0, 12345);
$queueConsumer->bind($expectedQueue, $processorMock);
$queueConsumer->consume();
}

public function testShouldSubscribeToGivenQueueAndQuitAfterFifthIdleCycle()
{
$expectedQueue = new NullQueue('theQueueName');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* @group functional
* @retry 5
* @retry 10
*/
class RdKafkaSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec
{
Expand Down

0 comments on commit 32f2a27

Please sign in to comment.