Skip to content

Commit

Permalink
[redis] Add subscription consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Aug 13, 2018
1 parent bc28f1e commit 2cd2524
Show file tree
Hide file tree
Showing 20 changed files with 556 additions and 6 deletions.
2 changes: 0 additions & 2 deletions pkg/redis/PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public function __construct(array $config)
'persisted' => false,
'database' => 0,
], $config);

var_dump($this->config);
}

/**
Expand Down
13 changes: 12 additions & 1 deletion pkg/redis/RedisContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
use Interop\Queue\PsrContext;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class RedisContext implements PsrContext
class RedisContext implements PsrContext, PsrSubscriptionConsumerAwareContext
{
/**
* @var Redis
Expand Down Expand Up @@ -122,6 +123,16 @@ public function createConsumer(PsrDestination $destination)
return new RedisConsumer($this, $destination);
}

/**
* {@inheritdoc}
*
* @return RedisSubscriptionConsumer
*/
public function createSubscriptionConsumer()
{
return new RedisSubscriptionConsumer($this);
}

public function close()
{
$this->getRedis()->disconnect();
Expand Down
17 changes: 15 additions & 2 deletions pkg/redis/RedisSubscriptionConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RedisSubscriptionConsumer implements PsrSubscriptionConsumer
public function __construct(RedisContext $context)
{
$this->context = $context;
$this->subscribers = [];
}

/**
Expand Down Expand Up @@ -58,7 +59,7 @@ public function consume($timeout = 0)
$result = $this->context->getRedis()->brpop($currentQueueNames, $timeout || 5000);
if ($result) {
$message = RedisMessage::jsonUnserialize($result->getMessage());
$callback = $this->subscribers[$result->getKey()];
list($consumer, $callback) = $this->subscribers[$result->getKey()];
if (false === call_user_func($callback, $message, $consumer)) {
return;
}
Expand Down Expand Up @@ -91,7 +92,11 @@ public function subscribe(PsrConsumer $consumer, callable $callback)

$queueName = $consumer->getQueue()->getQueueName();
if (array_key_exists($queueName, $this->subscribers)) {
return;
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
return;
}

throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
}

$this->subscribers[$queueName] = [$consumer, $callback];
Expand All @@ -110,6 +115,14 @@ public function unsubscribe(PsrConsumer $consumer)

$queueName = $consumer->getQueue()->getQueueName();

if (false == array_key_exists($queueName, $this->subscribers)) {
return;
}

if ($this->subscribers[$queueName][0] !== $consumer) {
return;
}

unset($this->subscribers[$queueName]);
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/redis/Tests/RedisContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
use Enqueue\Redis\RedisDestination;
use Enqueue\Redis\RedisMessage;
use Enqueue\Redis\RedisProducer;
use Enqueue\Redis\RedisSubscriptionConsumer;
use Enqueue\Test\ClassExtensionTrait;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;

class RedisContextTest extends \PHPUnit\Framework\TestCase
{
Expand Down Expand Up @@ -201,6 +203,20 @@ public function testShouldAllowDeleteTopic()
$context->deleteQueue($topic);
}

public function testShouldImplementPsrSubscriptionConsumerAwareInterface()
{
$rc = new \ReflectionClass(RedisContext::class);

$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class));
}

public function testShouldReturnExpectedSubscriptionConsumerInstance()
{
$context = new RedisContext($this->createRedisMock());

$this->assertInstanceOf(RedisSubscriptionConsumer::class, $context->createSubscriptionConsumer());
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|Redis
*/
Expand Down
174 changes: 174 additions & 0 deletions pkg/redis/Tests/RedisSubscriptionConsumerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
<?php

namespace Enqueue\Redis\Tests;

use Enqueue\Redis\RedisConsumer;
use Enqueue\Redis\RedisContext;
use Enqueue\Redis\RedisSubscriptionConsumer;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrSubscriptionConsumer;
use PHPUnit\Framework\TestCase;

class RedisSubscriptionConsumerTest extends TestCase
{
public function testShouldImplementPsrSubscriptionConsumerInterface()
{
$rc = new \ReflectionClass(RedisSubscriptionConsumer::class);

$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class));
}

public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
{
new RedisSubscriptionConsumer($this->createRedisContextMock());
}

public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$fooCallback = function () {};
$fooConsumer = $this->createConsumerStub('foo_queue');

$barCallback = function () {};
$barConsumer = $this->createConsumerStub('bar_queue');

$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
$subscriptionConsumer->subscribe($barConsumer, $barCallback);

$this->assertAttributeSame([
'foo_queue' => [$fooConsumer, $fooCallback],
'bar_queue' => [$barConsumer, $barCallback],
], 'subscribers', $subscriptionConsumer);
}

public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$fooCallback = function () {};
$fooConsumer = $this->createConsumerStub('foo_queue');

$barCallback = function () {};
$barConsumer = $this->createConsumerStub('foo_queue');

$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);

$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"');
$subscriptionConsumer->subscribe($barConsumer, $barCallback);
}

public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$fooCallback = function () {};
$fooConsumer = $this->createConsumerStub('foo_queue');

$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
}

public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$fooConsumer = $this->createConsumerStub('foo_queue');
$barConsumer = $this->createConsumerStub('bar_queue');

$subscriptionConsumer->subscribe($fooConsumer, function () {});
$subscriptionConsumer->subscribe($barConsumer, function () {});

// guard
$this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer);

$subscriptionConsumer->unsubscribe($fooConsumer);

$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
}

public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});

// guard
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);

$subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue'));

$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
}

public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});

// guard
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);

$subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue'));

$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
}

public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
$subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {});

// guard
$this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer);

$subscriptionConsumer->unsubscribeAll();

$this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer);
}

public function testThrowsIfTryConsumeWithoutSubscribers()
{
$subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock());

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('No subscribers');
$subscriptionConsumer->consume();
}

/**
* @return RedisContext|\PHPUnit_Framework_MockObject_MockObject
*/
private function createRedisContextMock()
{
return $this->createMock(RedisContext::class);
}

/**
* @param null|mixed $queueName
*
* @return PsrConsumer|\PHPUnit_Framework_MockObject_MockObject
*/
private function createConsumerStub($queueName = null)
{
$queueMock = $this->createMock(PsrQueue::class);
$queueMock
->expects($this->any())
->method('getQueueName')
->willReturn($queueName);

$consumerMock = $this->createMock(RedisConsumer::class);
$consumerMock
->expects($this->any())
->method('getQueue')
->willReturn($queueMock)
;

return $consumerMock;
}
}
20 changes: 20 additions & 0 deletions pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Enqueue\Redis\Tests\Spec;

use Enqueue\Redis\RedisConnectionFactory;
use Interop\Queue\Spec\PsrConnectionFactorySpec;

/**
* @group Redis
*/
class RedisConnectionFactoryTest extends PsrConnectionFactorySpec
{
/**
* {@inheritdoc}
*/
protected function createConnectionFactory()
{
return new RedisConnectionFactory();
}
}
23 changes: 23 additions & 0 deletions pkg/redis/Tests/Spec/RedisContextTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Enqueue\Redis\Tests\Spec;

use Enqueue\Test\RedisExtension;
use Interop\Queue\Spec\PsrContextSpec;

/**
* @group functional
* @group Redis
*/
class RedisContextTest extends PsrContextSpec
{
use RedisExtension;

/**
* {@inheritdoc}
*/
protected function createContext()
{
return $this->buildPhpRedisContext();
}
}
3 changes: 3 additions & 0 deletions pkg/redis/Tests/Spec/RedisMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
use Enqueue\Redis\RedisMessage;
use Interop\Queue\Spec\PsrMessageSpec;

/**
* @group Redis
*/
class RedisMessageTest extends PsrMessageSpec
{
/**
Expand Down
23 changes: 23 additions & 0 deletions pkg/redis/Tests/Spec/RedisProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Enqueue\Redis\Tests\Spec;

use Enqueue\Test\RedisExtension;
use Interop\Queue\Spec\PsrProducerSpec;

/**
* @group functional
* @group Redis
*/
class RedisProducerTest extends PsrProducerSpec
{
use RedisExtension;

/**
* {@inheritdoc}
*/
protected function createProducer()
{
return $this->buildPhpRedisContext()->createProducer();
}
}
20 changes: 20 additions & 0 deletions pkg/redis/Tests/Spec/RedisQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Enqueue\Redis\Tests\Spec;

use Enqueue\Redis\RedisDestination;
use Interop\Queue\Spec\PsrQueueSpec;

/**
* @group Redis
*/
class RedisQueueTest extends PsrQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createQueue()
{
return new RedisDestination(self::EXPECTED_QUEUE_NAME);
}
}
Loading

0 comments on commit 2cd2524

Please sign in to comment.