diff --git a/pkg/redis/PhpRedis.php b/pkg/redis/PhpRedis.php index 2d66c61d3..e373add32 100644 --- a/pkg/redis/PhpRedis.php +++ b/pkg/redis/PhpRedis.php @@ -30,8 +30,6 @@ public function __construct(array $config) 'persisted' => false, 'database' => 0, ], $config); - - var_dump($this->config); } /** diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index b87cc5e5f..7cad0299d 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -6,9 +6,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\PsrDestination; use Interop\Queue\PsrQueue; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use Interop\Queue\PsrTopic; -class RedisContext implements PsrContext +class RedisContext implements PsrContext, PsrSubscriptionConsumerAwareContext { /** * @var Redis @@ -122,6 +123,16 @@ public function createConsumer(PsrDestination $destination) return new RedisConsumer($this, $destination); } + /** + * {@inheritdoc} + * + * @return RedisSubscriptionConsumer + */ + public function createSubscriptionConsumer() + { + return new RedisSubscriptionConsumer($this); + } + public function close() { $this->getRedis()->disconnect(); diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 235fc8141..23b809992 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -25,6 +25,7 @@ class RedisSubscriptionConsumer implements PsrSubscriptionConsumer public function __construct(RedisContext $context) { $this->context = $context; + $this->subscribers = []; } /** @@ -58,7 +59,7 @@ public function consume($timeout = 0) $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout || 5000); if ($result) { $message = RedisMessage::jsonUnserialize($result->getMessage()); - $callback = $this->subscribers[$result->getKey()]; + list($consumer, $callback) = $this->subscribers[$result->getKey()]; if (false === call_user_func($callback, $message, $consumer)) { return; } @@ -91,7 +92,11 @@ public function subscribe(PsrConsumer $consumer, callable $callback) $queueName = $consumer->getQueue()->getQueueName(); if (array_key_exists($queueName, $this->subscribers)) { - return; + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); } $this->subscribers[$queueName] = [$consumer, $callback]; @@ -110,6 +115,14 @@ public function unsubscribe(PsrConsumer $consumer) $queueName = $consumer->getQueue()->getQueueName(); + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + unset($this->subscribers[$queueName]); } diff --git a/pkg/redis/Tests/RedisContextTest.php b/pkg/redis/Tests/RedisContextTest.php index eedfaf29a..ca97c3b3e 100644 --- a/pkg/redis/Tests/RedisContextTest.php +++ b/pkg/redis/Tests/RedisContextTest.php @@ -10,9 +10,11 @@ use Enqueue\Redis\RedisDestination; use Enqueue\Redis\RedisMessage; use Enqueue\Redis\RedisProducer; +use Enqueue\Redis\RedisSubscriptionConsumer; use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; class RedisContextTest extends \PHPUnit\Framework\TestCase { @@ -201,6 +203,20 @@ public function testShouldAllowDeleteTopic() $context->deleteQueue($topic); } + public function testShouldImplementPsrSubscriptionConsumerAwareInterface() + { + $rc = new \ReflectionClass(RedisContext::class); + + $this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class)); + } + + public function testShouldReturnExpectedSubscriptionConsumerInstance() + { + $context = new RedisContext($this->createRedisMock()); + + $this->assertInstanceOf(RedisSubscriptionConsumer::class, $context->createSubscriptionConsumer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Redis */ diff --git a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php new file mode 100644 index 000000000..486a2f995 --- /dev/null +++ b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php @@ -0,0 +1,174 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + { + new RedisSubscriptionConsumer($this->createRedisContextMock()); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new RedisSubscriptionConsumer($this->createRedisContextMock()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + $subscriptionConsumer->consume(); + } + + /** + * @return RedisContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createRedisContextMock() + { + return $this->createMock(RedisContext::class); + } + + /** + * @param null|mixed $queueName + * + * @return PsrConsumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(PsrQueue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(RedisConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock) + ; + + return $consumerMock; + } +} diff --git a/pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php b/pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php new file mode 100644 index 000000000..bda37f4e0 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisConnectionFactoryTest.php @@ -0,0 +1,20 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisMessageTest.php b/pkg/redis/Tests/Spec/RedisMessageTest.php index 073f81322..2e2b18736 100644 --- a/pkg/redis/Tests/Spec/RedisMessageTest.php +++ b/pkg/redis/Tests/Spec/RedisMessageTest.php @@ -5,6 +5,9 @@ use Enqueue\Redis\RedisMessage; use Interop\Queue\Spec\PsrMessageSpec; +/** + * @group Redis + */ class RedisMessageTest extends PsrMessageSpec { /** diff --git a/pkg/redis/Tests/Spec/RedisProducerTest.php b/pkg/redis/Tests/Spec/RedisProducerTest.php new file mode 100644 index 000000000..9ebd2992b --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisProducerTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext()->createProducer(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisQueueTest.php b/pkg/redis/Tests/Spec/RedisQueueTest.php new file mode 100644 index 000000000..1975b22f4 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisQueueTest.php @@ -0,0 +1,20 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..5535d949a --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromQueueTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..2967ba977 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveFromTopicTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..e03139f7e --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..fe103234b --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,23 @@ +buildPhpRedisContext(); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php new file mode 100644 index 000000000..abeccc273 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -0,0 +1,38 @@ +buildPhpRedisContext(); + } + + /** + * @param RedisContext $context + * + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + /** @var RedisDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getRedis()->del($queueName); + + return $queue; + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php new file mode 100644 index 000000000..f30a2fe2d --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -0,0 +1,38 @@ +buildPhpRedisContext(); + } + + /** + * @param RedisContext $context + * + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + /** @var RedisDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getRedis()->del($queueName); + + return $queue; + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php new file mode 100644 index 000000000..bcc2cd678 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerStopOnFalseTest.php @@ -0,0 +1,38 @@ +buildPhpRedisContext(); + } + + /** + * @param RedisContext $context + * + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + /** @var RedisDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getRedis()->del($queueName); + + return $queue; + } +} diff --git a/pkg/redis/Tests/Spec/RedisTopicTest.php b/pkg/redis/Tests/Spec/RedisTopicTest.php new file mode 100644 index 000000000..3117cb8e3 --- /dev/null +++ b/pkg/redis/Tests/Spec/RedisTopicTest.php @@ -0,0 +1,20 @@ +