diff --git a/docs/transport/mongodb.md b/docs/transport/mongodb.md index 0904c11ed..610580478 100644 --- a/docs/transport/mongodb.md +++ b/docs/transport/mongodb.md @@ -10,6 +10,7 @@ Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker. * [Send expiration message](#send-expiration-message) * [Send delayed message](#send-delayed-message) * [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) ## Installation @@ -139,4 +140,37 @@ $consumer->acknowledge($message); // $consumer->reject($message); ``` +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $psrContext->createConsumer($barQueue); + +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + [back to index](../index.md) diff --git a/pkg/mongodb/MongodbConsumer.php b/pkg/mongodb/MongodbConsumer.php index 062b4d275..494210573 100644 --- a/pkg/mongodb/MongodbConsumer.php +++ b/pkg/mongodb/MongodbConsumer.php @@ -115,7 +115,7 @@ public function reject(Message $message, bool $requeue = false): void } } - protected function receiveMessage(): ?MongodbMessage + private function receiveMessage(): ?MongodbMessage { $now = time(); $collection = $this->context->getCollection(); @@ -137,23 +137,9 @@ protected function receiveMessage(): ?MongodbMessage return null; } if (empty($message['time_to_live']) || $message['time_to_live'] > time()) { - return $this->convertMessage($message); + return $this->context->convertMessage($message); } return null; } - - protected function convertMessage(array $mongodbMessage): MongodbMessage - { - $properties = JSON::decode($mongodbMessage['properties']); - $headers = JSON::decode($mongodbMessage['headers']); - - $message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers); - $message->setId((string) $mongodbMessage['_id']); - $message->setPriority((int) $mongodbMessage['priority']); - $message->setRedelivered((bool) $mongodbMessage['redelivered']); - $message->setPublishedAt((int) $mongodbMessage['published_at']); - - return $message; - } } diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php index f5f9049e7..968c98b1e 100644 --- a/pkg/mongodb/MongodbContext.php +++ b/pkg/mongodb/MongodbContext.php @@ -8,7 +8,6 @@ use Interop\Queue\Context; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; -use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; @@ -107,7 +106,26 @@ public function close(): void public function createSubscriptionConsumer(): SubscriptionConsumer { - throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); + return new MongodbSubscriptionConsumer($this); + } + + /** + * @internal It must be used here and in the consumer only + */ + public function convertMessage(array $mongodbMessage): MongodbMessage + { + $mongodbMessageObj = $this->createMessage( + $mongodbMessage['body'], + JSON::decode($mongodbMessage['properties']), + JSON::decode($mongodbMessage['headers']) + ); + + $mongodbMessageObj->setId((string) $mongodbMessage['_id']); + $mongodbMessageObj->setPriority((int) $mongodbMessage['priority']); + $mongodbMessageObj->setRedelivered((bool) $mongodbMessage['redelivered']); + $mongodbMessageObj->setPublishedAt((int) $mongodbMessage['published_at']); + + return $mongodbMessageObj; } /** diff --git a/pkg/mongodb/MongodbSubscriptionConsumer.php b/pkg/mongodb/MongodbSubscriptionConsumer.php new file mode 100644 index 000000000..59063dd82 --- /dev/null +++ b/pkg/mongodb/MongodbSubscriptionConsumer.php @@ -0,0 +1,136 @@ +context = $context; + $this->subscribers = []; + } + + public function consume(int $timeout = 0): void + { + if (empty($this->subscribers)) { + throw new \LogicException('No subscribers'); + } + + $timeout = (int) ceil($timeout / 1000); + $endAt = time() + $timeout; + + $queueNames = []; + foreach (array_keys($this->subscribers) as $queueName) { + $queueNames[$queueName] = $queueName; + } + + $currentQueueNames = []; + while (true) { + if (empty($currentQueueNames)) { + $currentQueueNames = $queueNames; + } + + $result = $this->context->getCollection()->findOneAndDelete( + [ + 'queue' => ['$in' => array_keys($currentQueueNames)], + '$or' => [ + ['delayed_until' => ['$exists' => false]], + ['delayed_until' => ['$lte' => time()]], + ], + ], + [ + 'sort' => ['priority' => -1, 'published_at' => 1], + 'typeMap' => ['root' => 'array', 'document' => 'array'], + ] + ); + + if ($result) { + list($consumer, $callback) = $this->subscribers[$result['queue']]; + + $message = $this->context->convertMessage($result); + + if (false === call_user_func($callback, $message, $consumer)) { + return; + } + + unset($currentQueueNames[$result['queue']]); + } else { + $currentQueueNames = []; + + usleep(200000); // 200ms + } + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + } + + /** + * @param MongodbConsumer $consumer + */ + public function subscribe(Consumer $consumer, callable $callback): void + { + if (false == $consumer instanceof MongodbConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * @param MongodbConsumer $consumer + */ + public function unsubscribe(Consumer $consumer): void + { + if (false == $consumer instanceof MongodbConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + + unset($this->subscribers[$queueName]); + } + + public function unsubscribeAll(): void + { + $this->subscribers = []; + } +} diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php index e23f1b73a..7a795c402 100644 --- a/pkg/mongodb/Tests/MongodbContextTest.php +++ b/pkg/mongodb/Tests/MongodbContextTest.php @@ -70,6 +70,32 @@ public function testShouldCreateMessage() $this->assertFalse($message->isRedelivered()); } + public function testShouldConvertFromArrayToMongodbMessage() + { + $arrayData = [ + '_id' => 'stringId', + 'body' => 'theBody', + 'properties' => json_encode(['barProp' => 'barPropVal']), + 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), + 'priority' => '12', + 'published_at' => 1525935820, + 'redelivered' => false, + ]; + + $context = new MongodbContext($this->createClientMock()); + $message = $context->convertMessage($arrayData); + + $this->assertInstanceOf(MongodbMessage::class, $message); + + $this->assertEquals('stringId', $message->getId()); + $this->assertEquals('theBody', $message->getBody()); + $this->assertEquals(['barProp' => 'barPropVal'], $message->getProperties()); + $this->assertEquals(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); + $this->assertEquals(12, $message->getPriority()); + $this->assertEquals(1525935820, $message->getPublishedAt()); + $this->assertFalse($message->isRedelivered()); + } + public function testShouldCreateTopic() { $context = new MongodbContext($this->createClientMock()); diff --git a/pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php b/pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php new file mode 100644 index 000000000..88899c7bb --- /dev/null +++ b/pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php @@ -0,0 +1,177 @@ +assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithMongodbContextAsFirstArgument() + { + new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + + $subscriptionConsumer->consume(); + } + + /** + * @return MongodbContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createMongodbContextMock() + { + return $this->createMock(MongodbContext::class); + } + + /** + * @param null|mixed $queueName + * + * @return Consumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(Queue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(MongodbConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock) + ; + + return $consumerMock; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php new file mode 100644 index 000000000..664990a68 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -0,0 +1,44 @@ +buildMongodbContext(); + } + + /** + * @param MongodbContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + /** @var MongodbDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php new file mode 100644 index 000000000..1071c1267 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -0,0 +1,44 @@ +buildMongodbContext(); + } + + /** + * @param MongodbContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + /** @var MongodbDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php new file mode 100644 index 000000000..321e16bba --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php @@ -0,0 +1,44 @@ +buildMongodbContext(); + } + + /** + * @param MongodbContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + /** @var MongodbDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getClient()->dropDatabase($queueName); + + return $queue; + } +} diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 6e1a24dc0..1b6cd1149 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -50,11 +50,6 @@ public function consume(int $timeout = 0): void $currentQueueNames = $queueNames; } - /** - * @var string - * @var Consumer $consumer - * @var callable $processor - */ $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout ?: 5); if ($result) { $message = RedisMessage::jsonUnserialize($result->getMessage()); diff --git a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php index e2f007737..953006af9 100644 --- a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php +++ b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php @@ -22,7 +22,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig() new RedisConnectionFactory(new \stdClass()); } - public function testThrowIfSchemeIsNotAmqp() + public function testThrowIfSchemeIsNotRedis() { $this->expectException(\LogicException::class); $this->expectExceptionMessage('The given scheme protocol "http" is not supported. It must be one of "redis", "rediss", "tcp", "tls", "unix"'); diff --git a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php index 98c6142f9..12c377500 100644 --- a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php +++ b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php @@ -19,7 +19,7 @@ public function testShouldImplementSubscriptionConsumerInterface() $this->assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); } - public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + public function testCouldBeConstructedWithRedisContextAsFirstArgument() { new RedisSubscriptionConsumer($this->createRedisContextMock()); } diff --git a/pkg/test/MongodbExtensionTrait.php b/pkg/test/MongodbExtensionTrait.php index 29c146c1e..3ba9e93e0 100644 --- a/pkg/test/MongodbExtensionTrait.php +++ b/pkg/test/MongodbExtensionTrait.php @@ -1,5 +1,7 @@