Skip to content

Commit

Permalink
Merge pull request #492 from php-enqueue/amqp-subscription-consumers
Browse files Browse the repository at this point in the history
Move subscription related logic to SubscriptionConsumer class.
  • Loading branch information
makasim authored Aug 9, 2018
2 parents 8ca4124 + 1dc3876 commit 3a3f8a6
Show file tree
Hide file tree
Showing 12 changed files with 601 additions and 240 deletions.
82 changes: 9 additions & 73 deletions pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
namespace Enqueue\AmqpBunny;

use Bunny\Channel;
use Bunny\Client;
use Bunny\Exception\ClientException;
use Bunny\Message;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand Down Expand Up @@ -51,11 +47,9 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscrip
private $buffer;

/**
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
*
* @var array
* @var AmqpSubscriptionConsumer
*/
private $subscribers;
private $bcSubscriptionConsumer;

/**
* Callable must return instance of \Bunny\Channel once called.
Expand All @@ -81,7 +75,7 @@ public function __construct($bunnyChannel, $config = [])
}

$this->buffer = new Buffer();
$this->subscribers = [];
$this->bcSubscriptionConsumer = $this->createSubscriptionConsumer();
}

/**
Expand Down Expand Up @@ -140,10 +134,12 @@ public function createConsumer(PsrDestination $destination)

/**
* {@inheritdoc}
*
* @return AmqpSubscriptionConsumer
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
return new AmqpSubscriptionConsumer($this);
}

/**
Expand Down Expand Up @@ -339,42 +335,7 @@ public function setQos($prefetchSize, $prefetchCount, $global)
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
{
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
return;
}

$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
$receivedMessage = $this->convertMessage($message);
$receivedMessage->setConsumerTag($message->consumerTag);

/**
* @var AmqpConsumer
* @var callable $callback
*/
list($consumer, $callback) = $this->subscribers[$message->consumerTag];

if (false === call_user_func($callback, $receivedMessage, $consumer)) {
$bunny->stop();
}
};

$frame = $this->getBunnyChannel()->consume(
$bunnyCallback,
$consumer->getQueue()->getQueueName(),
$consumer->getConsumerTag(),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
);

if (empty($frame->consumerTag)) {
throw new Exception('Got empty consumer tag');
}

$consumer->setConsumerTag($frame->consumerTag);

$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
$this->bcSubscriptionConsumer->subscribe($consumer, $callback);
}

/**
Expand All @@ -384,15 +345,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
{
if (false == $consumer->getConsumerTag()) {
return;
}

$consumerTag = $consumer->getConsumerTag();

$this->getBunnyChannel()->cancel($consumerTag);
$consumer->setConsumerTag(null);
unset($this->subscribers[$consumerTag]);
$this->bcSubscriptionConsumer->unsubscribe($consumer);
}

/**
Expand All @@ -402,24 +355,7 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
*/
public function consume($timeout = 0)
{
if (empty($this->subscribers)) {
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
}

$signalHandler = new SignalSocketHelper();
$signalHandler->beforeSocket();

try {
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
} catch (ClientException $e) {
if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) {
return;
}

throw $e;
} finally {
$signalHandler->afterSocket();
}
$this->bcSubscriptionConsumer->consume($timeout);
}

/**
Expand Down
141 changes: 141 additions & 0 deletions pkg/amqp-bunny/AmqpSubscriptionConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
<?php

namespace Enqueue\AmqpBunny;

use Bunny\Channel;
use Bunny\Client;
use Bunny\Exception\ClientException;
use Bunny\Message;
use Enqueue\AmqpTools\SignalSocketHelper;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Queue\Exception;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrSubscriptionConsumer;

class AmqpSubscriptionConsumer implements PsrSubscriptionConsumer
{
/**
* @var AmqpContext
*/
private $context;

/**
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
*
* @var array
*/
private $subscribers;

public function __construct(AmqpContext $context)
{
$this->context = $context;

$this->subscribers = [];
}

/**
* {@inheritdoc}
*/
public function consume($timeout = 0)
{
if (empty($this->subscribers)) {
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
}

$signalHandler = new SignalSocketHelper();
$signalHandler->beforeSocket();

try {
$this->context->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
} catch (ClientException $e) {
if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) {
return;
}

throw $e;
} finally {
$signalHandler->afterSocket();
}
}

/**
* @param AmqpConsumer $consumer
*
* {@inheritdoc}
*/
public function subscribe(PsrConsumer $consumer, callable $callback)
{
if (false == $consumer instanceof AmqpConsumer) {
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer)));
}

if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
return;
}

$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
$receivedMessage = $this->context->convertMessage($message);
$receivedMessage->setConsumerTag($message->consumerTag);

/**
* @var AmqpConsumer
* @var callable $callback
*/
list($consumer, $callback) = $this->subscribers[$message->consumerTag];

if (false === call_user_func($callback, $receivedMessage, $consumer)) {
$bunny->stop();
}
};

$frame = $this->context->getBunnyChannel()->consume(
$bunnyCallback,
$consumer->getQueue()->getQueueName(),
$consumer->getConsumerTag(),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
);

if (empty($frame->consumerTag)) {
throw new Exception('Got empty consumer tag');
}

$consumer->setConsumerTag($frame->consumerTag);

$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
}

/**
* @param AmqpConsumer $consumer
*
* {@inheritdoc}
*/
public function unsubscribe(PsrConsumer $consumer)
{
if (false == $consumer instanceof AmqpConsumer) {
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer)));
}

if (false == $consumer->getConsumerTag()) {
return;
}

$consumerTag = $consumer->getConsumerTag();

$this->context->getBunnyChannel()->cancel($consumerTag);
$consumer->setConsumerTag(null);
unset($this->subscribers[$consumerTag]);
}

/**
* {@inheritdoc}
*/
public function unsubscribeAll()
{
foreach ($this->subscribers as list($consumer)) {
$this->unsubscribe($consumer);
}
}
}
16 changes: 16 additions & 0 deletions pkg/amqp-bunny/Tests/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
use Bunny\Channel;
use Bunny\Protocol\MethodQueueDeclareOkFrame;
use Enqueue\AmqpBunny\AmqpContext;
use Enqueue\AmqpBunny\AmqpSubscriptionConsumer;
use Interop\Amqp\Impl\AmqpBind;
use Interop\Amqp\Impl\AmqpQueue;
use Interop\Amqp\Impl\AmqpTopic;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use PHPUnit\Framework\TestCase;

class AmqpContextTest extends TestCase
Expand Down Expand Up @@ -235,6 +237,20 @@ public function testShouldSetQos()
$context->setQos(123, 456, true);
}

public function testShouldImplementPsrSubscriptionConsumerAwareInterface()
{
$rc = new \ReflectionClass(AmqpContext::class);

$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class));
}

public function testShouldReturnExpectedSubscriptionConsumerInstance()
{
$context = new AmqpContext($this->createChannelMock());

$this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer());
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|Channel
*/
Expand Down
31 changes: 31 additions & 0 deletions pkg/amqp-bunny/Tests/AmqpSubscriptionConsumerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Enqueue\AmqpBunny\Tests;

use Enqueue\AmqpBunny\AmqpContext;
use Enqueue\AmqpBunny\AmqpSubscriptionConsumer;
use Interop\Queue\PsrSubscriptionConsumer;
use PHPUnit\Framework\TestCase;

class AmqpSubscriptionConsumerTest extends TestCase
{
public function testShouldImplementPsrSubscriptionConsumerInterface()
{
$rc = new \ReflectionClass(AmqpSubscriptionConsumer::class);

$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class));
}

public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
{
new AmqpSubscriptionConsumer($this->createAmqpContextMock());
}

/**
* @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject
*/
private function createAmqpContextMock()
{
return $this->createMock(AmqpContext::class);
}
}
Loading

0 comments on commit 3a3f8a6

Please sign in to comment.