From e46f84a86f49c6ab0da569673f6a9cbe9c434c1e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 13 Aug 2018 22:29:00 +0300 Subject: [PATCH 1/3] prepare redis interface and impl to consume from multiple queues. --- pkg/redis/PRedis.php | 30 ++++++---- pkg/redis/PhpRedis.php | 82 ++++++++++++++------------- pkg/redis/Redis.php | 22 +++---- pkg/redis/RedisConnectionFactory.php | 6 +- pkg/redis/RedisConsumer.php | 18 +++--- pkg/redis/RedisResult.php | 32 +++++++++++ pkg/redis/Tests/RedisConsumerTest.php | 50 ++++++++++++++-- pkg/test/RedisExtension.php | 2 +- 8 files changed, 169 insertions(+), 73 deletions(-) create mode 100644 pkg/redis/RedisResult.php diff --git a/pkg/redis/PRedis.php b/pkg/redis/PRedis.php index b90da5be1..0a5922731 100644 --- a/pkg/redis/PRedis.php +++ b/pkg/redis/PRedis.php @@ -39,10 +39,10 @@ public function __construct(array $config) /** * {@inheritdoc} */ - public function lpush($key, $value) + public function lpush(string $key, string $value): int { try { - $this->redis->lpush($key, [$value]); + return $this->redis->lpush($key, [$value]); } catch (PRedisServerException $e) { throw new ServerException('lpush command has failed', null, $e); } @@ -51,12 +51,14 @@ public function lpush($key, $value) /** * {@inheritdoc} */ - public function brpop($key, $timeout) + public function brpop(array $keys, int $timeout): ?RedisResult { try { - if ($result = $this->redis->brpop([$key], $timeout)) { - return $result[1]; + if ($result = $this->redis->brpop($keys, $timeout)) { + return new RedisResult($result[0], $result[1]); } + + return null; } catch (PRedisServerException $e) { throw new ServerException('brpop command has failed', null, $e); } @@ -65,10 +67,14 @@ public function brpop($key, $timeout) /** * {@inheritdoc} */ - public function rpop($key) + public function rpop(string $key): ?RedisResult { try { - return $this->redis->rpop($key); + if ($message = $this->redis->rpop($key)) { + return new RedisResult($key, $message); + } + + return null; } catch (PRedisServerException $e) { throw new ServerException('rpop command has failed', null, $e); } @@ -77,8 +83,12 @@ public function rpop($key) /** * {@inheritdoc} */ - public function connect() + public function connect(): void { + if ($this->redis) { + return; + } + $this->redis = new Client($this->config, ['exceptions' => true]); if ($this->config['pass']) { @@ -91,7 +101,7 @@ public function connect() /** * {@inheritdoc} */ - public function disconnect() + public function disconnect(): void { $this->redis->disconnect(); } @@ -99,7 +109,7 @@ public function disconnect() /** * {@inheritdoc} */ - public function del($key) + public function del(string $key): void { $this->redis->del([$key]); } diff --git a/pkg/redis/PhpRedis.php b/pkg/redis/PhpRedis.php index 829d05ed4..2d66c61d3 100644 --- a/pkg/redis/PhpRedis.php +++ b/pkg/redis/PhpRedis.php @@ -24,80 +24,86 @@ public function __construct(array $config) 'port' => null, 'pass' => null, 'user' => null, - 'timeout' => null, + 'timeout' => .0, 'reserved' => null, 'retry_interval' => null, 'persisted' => false, 'database' => 0, ], $config); + + var_dump($this->config); } /** * {@inheritdoc} */ - public function lpush($key, $value) + public function lpush(string $key, string $value): int { - if (false == $this->redis->lPush($key, $value)) { - throw new ServerException($this->redis->getLastError()); - } + return $this->redis->lPush($key, $value); } /** * {@inheritdoc} */ - public function brpop($key, $timeout) + public function brpop(array $keys, int $timeout): ?RedisResult { - if ($result = $this->redis->brPop([$key], $timeout)) { - return $result[1]; + if ($result = $this->redis->brPop($keys, $timeout)) { + return new RedisResult($result[0], $result[1]); } + + return null; } /** * {@inheritdoc} */ - public function rpop($key) + public function rpop(string $key): ?RedisResult { - return $this->redis->rPop($key); + if ($message = $this->redis->rPop($key)) { + return new RedisResult($key, $message); + } + + return null; } /** * {@inheritdoc} */ - public function connect() + public function connect(): void { - if (false == $this->redis) { - $this->redis = new \Redis(); - - if ($this->config['persisted']) { - $this->redis->pconnect( - $this->config['host'], - $this->config['port'], - $this->config['timeout'] - ); - } else { - $this->redis->connect( - $this->config['host'], - $this->config['port'], - $this->config['timeout'], - $this->config['reserved'], - $this->config['retry_interval'] - ); - } - - if ($this->config['pass']) { - $this->redis->auth($this->config['pass']); - } - - $this->redis->select($this->config['database']); + if ($this->redis) { + return; + } + + $this->redis = new \Redis(); + + if ($this->config['persisted']) { + $this->redis->pconnect( + $this->config['host'], + $this->config['port'], + $this->config['timeout'] + ); + } else { + $this->redis->connect( + $this->config['host'], + $this->config['port'], + $this->config['timeout'], + $this->config['reserved'], + $this->config['retry_interval'] + ); + } + + if ($this->config['pass']) { + $this->redis->auth($this->config['pass']); } - return $this->redis; + $this->redis->select($this->config['database']); } /** * {@inheritdoc} */ - public function disconnect() + public function disconnect(): void { if ($this->redis) { $this->redis->close(); @@ -107,7 +113,7 @@ public function disconnect() /** * {@inheritdoc} */ - public function del($key) + public function del(string $key): void { $this->redis->del($key); } diff --git a/pkg/redis/Redis.php b/pkg/redis/Redis.php index 796081775..7b4cc2691 100644 --- a/pkg/redis/Redis.php +++ b/pkg/redis/Redis.php @@ -1,5 +1,7 @@ 'localhost', 'port' => 6379, - 'timeout' => null, + 'timeout' => .0, 'reserved' => null, 'retry_interval' => null, 'vendor' => 'phpredis', diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 6c57f798c..e5fd785a6 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -47,15 +47,15 @@ public function receive($timeout = 0) { $timeout = (int) ($timeout / 1000); if (empty($timeout)) { - // Caused by - // Predis\Response\ServerException: ERR timeout is not an integer or out of range - // /mqdev/vendor/predis/predis/src/Client.php:370 - - return $this->receiveNoWait(); + while (true) { + if ($message = $this->receive(5000)) { + return $message; + } + } } - if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) { - return RedisMessage::jsonUnserialize($message); + if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) { + return RedisMessage::jsonUnserialize($result->getMessage()); } } @@ -66,8 +66,8 @@ public function receive($timeout = 0) */ public function receiveNoWait() { - if ($message = $this->getRedis()->rpop($this->queue->getName())) { - return RedisMessage::jsonUnserialize($message); + if ($result = $this->getRedis()->rpop($this->queue->getName())) { + return RedisMessage::jsonUnserialize($result->getMessage()); } } diff --git a/pkg/redis/RedisResult.php b/pkg/redis/RedisResult.php new file mode 100644 index 000000000..e04698bd9 --- /dev/null +++ b/pkg/redis/RedisResult.php @@ -0,0 +1,32 @@ +key = $key; + $this->message = $message; + } + + public function getKey(): string + { + return $this->key; + } + + public function getMessage(): string + { + return $this->message; + } +} diff --git a/pkg/redis/Tests/RedisConsumerTest.php b/pkg/redis/Tests/RedisConsumerTest.php index 55ab35346..8def4588f 100644 --- a/pkg/redis/Tests/RedisConsumerTest.php +++ b/pkg/redis/Tests/RedisConsumerTest.php @@ -8,6 +8,7 @@ use Enqueue\Redis\RedisDestination; use Enqueue\Redis\RedisMessage; use Enqueue\Redis\RedisProducer; +use Enqueue\Redis\RedisResult; use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\PsrConsumer; @@ -81,7 +82,7 @@ public function testShouldCallRedisBRPopAndReturnNullIfNothingInQueueOnReceive() $redisMock ->expects($this->once()) ->method('brpop') - ->with('aQueue', 2) + ->with(['aQueue'], 2) ->willReturn(null) ; @@ -105,8 +106,8 @@ public function testShouldCallRedisBRPopAndReturnMessageIfOneInQueueOnReceive() $redisMock ->expects($this->once()) ->method('brpop') - ->with('aQueue', 2) - ->willReturn(json_encode(new RedisMessage('aBody'))) + ->with(['aQueue'], 2) + ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); @@ -124,6 +125,47 @@ public function testShouldCallRedisBRPopAndReturnMessageIfOneInQueueOnReceive() $this->assertSame('aBody', $message->getBody()); } + public function testShouldCallRedisBRPopSeveralTimesWithFiveSecondTimeoutIfZeroTimeoutIsPassed() + { + $destination = new RedisDestination('aQueue'); + + $expectedTimeout = 5; + + $redisMock = $this->createRedisMock(); + $redisMock + ->expects($this->at(0)) + ->method('brpop') + ->with(['aQueue'], $expectedTimeout) + ->willReturn(null) + ; + $redisMock + ->expects($this->at(1)) + ->method('brpop') + ->with(['aQueue'], $expectedTimeout) + ->willReturn(null) + ; + $redisMock + ->expects($this->at(2)) + ->method('brpop') + ->with(['aQueue'], $expectedTimeout) + ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ; + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->atLeastOnce()) + ->method('getRedis') + ->willReturn($redisMock) + ; + + $consumer = new RedisConsumer($contextMock, $destination); + + $message = $consumer->receive(0); + + $this->assertInstanceOf(RedisMessage::class, $message); + $this->assertSame('aBody', $message->getBody()); + } + public function testShouldCallRedisRPopAndReturnNullIfNothingInQueueOnReceiveNoWait() { $destination = new RedisDestination('aQueue'); @@ -157,7 +199,7 @@ public function testShouldCallRedisRPopAndReturnMessageIfOneInQueueOnReceiveNoWa ->expects($this->once()) ->method('rpop') ->with('aQueue') - ->willReturn(json_encode(new RedisMessage('aBody'))) + ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); diff --git a/pkg/test/RedisExtension.php b/pkg/test/RedisExtension.php index 0740c8f21..5652a1907 100644 --- a/pkg/test/RedisExtension.php +++ b/pkg/test/RedisExtension.php @@ -18,7 +18,7 @@ private function buildPhpRedisContext() $config = [ 'host' => getenv('REDIS_HOST'), - 'port' => getenv('REDIS_PORT'), + 'port' => (int) getenv('REDIS_PORT'), 'vendor' => 'phpredis', 'lazy' => false, ]; From ccf0bf1eea50579367879cc44fd80664e1539511 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 13 Aug 2018 22:29:30 +0300 Subject: [PATCH 2/3] [redis] introduce Subscription Consumer. --- pkg/redis/RedisSubscriptionConsumer.php | 123 ++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 pkg/redis/RedisSubscriptionConsumer.php diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php new file mode 100644 index 000000000..235fc8141 --- /dev/null +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -0,0 +1,123 @@ +context = $context; + } + + /** + * {@inheritdoc} + */ + public function consume($timeout = 0) + { + if (empty($this->subscribers)) { + throw new \LogicException('No subscribers'); + } + + $timeout /= 1000; + $endAt = microtime(true) + $timeout; + + $queueNames = []; + foreach (array_keys($this->subscribers) as $queueName) { + $queueNames[$queueName] = $queueName; + } + + $currentQueueNames = []; + while (true) { + if (empty($currentQueueNames)) { + $currentQueueNames = $queueNames; + } + + /** + * @var string + * @var PsrConsumer $consumer + * @var callable $processor + */ + $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout || 5000); + if ($result) { + $message = RedisMessage::jsonUnserialize($result->getMessage()); + $callback = $this->subscribers[$result->getKey()]; + if (false === call_user_func($callback, $message, $consumer)) { + return; + } + + unset($currentQueueNames[$result->getKey()]); + } else { + $currentQueueNames = []; + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + } + + /** + * {@inheritdoc} + * + * @param RedisConsumer $consumer + */ + public function subscribe(PsrConsumer $consumer, callable $callback) + { + if (false == $consumer instanceof RedisConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', RedisConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + return; + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * {@inheritdoc} + * + * @param RedisConsumer $consumer + */ + public function unsubscribe(PsrConsumer $consumer) + { + if (false == $consumer instanceof RedisConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', RedisConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + unset($this->subscribers[$queueName]); + } + + /** + * {@inheritdoc} + */ + public function unsubscribeAll() + { + $this->subscribers = []; + } +} From 18581ed16d444e2394e3f8eda1d511d9028472f6 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 13 Aug 2018 23:07:50 +0300 Subject: [PATCH 3/3] [redis] Add subscription consumer --- pkg/redis/PhpRedis.php | 2 - pkg/redis/RedisContext.php | 13 +- pkg/redis/RedisSubscriptionConsumer.php | 17 +- pkg/redis/Tests/RedisContextTest.php | 16 ++ .../Tests/RedisSubscriptionConsumerTest.php | 174 ++++++++++++++++++ .../Tests/Spec/RedisConnectionFactoryTest.php | 20 ++ pkg/redis/Tests/Spec/RedisContextTest.php | 23 +++ pkg/redis/Tests/Spec/RedisMessageTest.php | 3 + pkg/redis/Tests/Spec/RedisProducerTest.php | 23 +++ pkg/redis/Tests/Spec/RedisQueueTest.php | 20 ++ .../Tests/Spec/RedisRequeueMessageTest.php | 23 +++ .../RedisSendToAndReceiveFromQueueTest.php | 23 +++ .../RedisSendToAndReceiveFromTopicTest.php | 23 +++ ...disSendToAndReceiveNoWaitFromQueueTest.php | 23 +++ ...disSendToAndReceiveNoWaitFromTopicTest.php | 23 +++ ...umerConsumeFromAllSubscribedQueuesTest.php | 38 ++++ ...onConsumerConsumeUntilUnsubscribedTest.php | 38 ++++ ...disSubscriptionConsumerStopOnFalseTest.php | 38 ++++ pkg/redis/Tests/Spec/RedisTopicTest.php | 20 ++ pkg/redis/composer.json | 2 +- 20 files changed, 556 insertions(+), 6 deletions(-) create mode 100644 pkg/redis/Tests/RedisSubscriptionConsumerTest.php create mode 100644 pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php create mode 100644 pkg/redis/Tests/Spec/RedisContextTest.php create mode 100644 pkg/redis/Tests/Spec/RedisProducerTest.php create mode 100644 pkg/redis/Tests/Spec/RedisQueueTest.php create mode 100644 pkg/redis/Tests/Spec/RedisRequeueMessageTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php create mode 100644 pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php create mode 100644 pkg/redis/Tests/Spec/RedisTopicTest.php diff --git a/pkg/redis/PhpRedis.php b/pkg/redis/PhpRedis.php index 2d66c61d3..e373add32 100644 --- a/pkg/redis/PhpRedis.php +++ b/pkg/redis/PhpRedis.php @@ -30,8 +30,6 @@ public function __construct(array $config) 'persisted' => false, 'database' => 0, ], $config); - - var_dump($this->config); } /** diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index b87cc5e5f..7cad0299d 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -6,9 +6,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\PsrDestination; use Interop\Queue\PsrQueue; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use Interop\Queue\PsrTopic; -class RedisContext implements PsrContext +class RedisContext implements PsrContext, PsrSubscriptionConsumerAwareContext { /** * @var Redis @@ -122,6 +123,16 @@ public function createConsumer(PsrDestination $destination) return new RedisConsumer($this, $destination); } + /** + * {@inheritdoc} + * + * @return RedisSubscriptionConsumer + */ + public function createSubscriptionConsumer() + { + return new RedisSubscriptionConsumer($this); + } + public function close() { $this->getRedis()->disconnect(); diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 235fc8141..23b809992 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -25,6 +25,7 @@ class RedisSubscriptionConsumer implements PsrSubscriptionConsumer public function __construct(RedisContext $context) { $this->context = $context; + $this->subscribers = []; } /** @@ -58,7 +59,7 @@ public function consume($timeout = 0) $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout || 5000); if ($result) { $message = RedisMessage::jsonUnserialize($result->getMessage()); - $callback = $this->subscribers[$result->getKey()]; + list($consumer, $callback) = $this->subscribers[$result->getKey()]; if (false === call_user_func($callback, $message, $consumer)) { return; } @@ -91,7 +92,11 @@ public function subscribe(PsrConsumer $consumer, callable $callback) $queueName = $consumer->getQueue()->getQueueName(); if (array_key_exists($queueName, $this->subscribers)) { - return; + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); } $this->subscribers[$queueName] = [$consumer, $callback]; @@ -110,6 +115,14 @@ public function unsubscribe(PsrConsumer $consumer) $queueName = $consumer->getQueue()->getQueueName(); + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + unset($this->subscribers[$queueName]); } diff --git a/pkg/redis/Tests/RedisContextTest.php b/pkg/redis/Tests/RedisContextTest.php index eedfaf29a..ca97c3b3e 100644 --- a/pkg/redis/Tests/RedisContextTest.php +++ b/pkg/redis/Tests/RedisContextTest.php @@ -10,9 +10,11 @@ use Enqueue\Redis\RedisDestination; use Enqueue\Redis\RedisMessage; use Enqueue\Redis\RedisProducer; +use Enqueue\Redis\RedisSubscriptionConsumer; use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; class RedisContextTest extends \PHPUnit\Framework\TestCase { @@ -201,6 +203,20 @@ public function testShouldAllowDeleteTopic() $context->deleteQueue($topic); } + public function testShouldImplementPsrSubscriptionConsumerAwareInterface() + { + $rc = new \ReflectionClass(RedisContext::class); + + $this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class)); + } + + public function testShouldReturnExpectedSubscriptionConsumerInstance() + { + $context = new RedisContext($this->createRedisMock()); + + $this->assertInstanceOf(RedisSubscriptionConsumer::class, $context->createSubscriptionConsumer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Redis */ diff --git a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php new file mode 100644 index 000000000..486a2f995 --- /dev/null +++ b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php @@ -0,0 +1,174 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + { + new RedisSubscriptionConsumer($this->createRedisContextMock()); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + $subscriptionConsumer->consume(); + } + + /** + * @return RedisContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createRedisContextMock() + { + return $this->createMock(RedisContext::class); + } + + /** + * @param null|mixed $queueName + * + * @return PsrConsumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(PsrQueue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(RedisConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock) + ; + + return $consumerMock; + } +} diff --git a/pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php b/pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php new file mode 100644 index 000000000..bda37f4e0 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php @@ -0,0 +1,20 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisMessageTest.php b/pkg/redis/Tests/Spec/RedisMessageTest.php index 073f81322..2e2b18736 100644 --- a/pkg/redis/Tests/Spec/RedisMessageTest.php +++ b/pkg/redis/Tests/Spec/RedisMessageTest.php @@ -5,6 +5,9 @@ use Enqueue\Redis\RedisMessage; use Interop\Queue\Spec\PsrMessageSpec; +/** + * @group Redis + */ class RedisMessageTest extends PsrMessageSpec { /** diff --git a/pkg/redis/Tests/Spec/RedisProducerTest.php b/pkg/redis/Tests/Spec/RedisProducerTest.php new file mode 100644 index 000000000..9ebd2992b --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisProducerTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext()->createProducer(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisQueueTest.php b/pkg/redis/Tests/Spec/RedisQueueTest.php new file mode 100644 index 000000000..1975b22f4 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisQueueTest.php @@ -0,0 +1,20 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..5535d949a --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..2967ba977 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..e03139f7e --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..fe103234b --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php new file mode 100644 index 000000000..abeccc273 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -0,0 +1,38 @@ +buildPhpRedisContext(); + } + + /** + * @param RedisContext $context + * + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + /** @var RedisDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getRedis()->del($queueName); + + return $queue; + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php new file mode 100644 index 000000000..f30a2fe2d --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -0,0 +1,38 @@ +buildPhpRedisContext(); + } + + /** + * @param RedisContext $context + * + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + /** @var RedisDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getRedis()->del($queueName); + + return $queue; + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php new file mode 100644 index 000000000..bcc2cd678 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php @@ -0,0 +1,38 @@ +buildPhpRedisContext(); + } + + /** + * @param RedisContext $context + * + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + /** @var RedisDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getRedis()->del($queueName); + + return $queue; + } +} diff --git a/pkg/redis/Tests/Spec/RedisTopicTest.php b/pkg/redis/Tests/Spec/RedisTopicTest.php new file mode 100644 index 000000000..3117cb8e3 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisTopicTest.php @@ -0,0 +1,20 @@ +