Skip to content

Commit

Permalink
Merge pull request #565 from rosamarsky/mongodb-subscription-consumer
Browse files Browse the repository at this point in the history
MongoDB Subscription Consumer feature
  • Loading branch information
makasim authored Oct 18, 2018
2 parents 1d395e3 + 8d682cb commit ab30327
Show file tree
Hide file tree
Showing 13 changed files with 531 additions and 25 deletions.
34 changes: 34 additions & 0 deletions docs/transport/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker.
* [Send expiration message](#send-expiration-message)
* [Send delayed message](#send-delayed-message)
* [Consume message](#consume-message)
* [Subscription consumer](#subscription-consumer)

## Installation

Expand Down Expand Up @@ -139,4 +140,37 @@ $consumer->acknowledge($message);
// $consumer->reject($message);
```

## Subscription consumer

```php
<?php
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrConsumer;

/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
/** @var \Enqueue\Mongodb\MongodbDestination $barQueue */

$fooConsumer = $psrContext->createConsumer($fooQueue);
$barConsumer = $psrContext->createConsumer($barQueue);

$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
// process message

$consumer->acknowledge($message);

return true;
});
$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
// process message

$consumer->acknowledge($message);

return true;
});

$subscriptionConsumer->consume(2000); // 2 sec
```

[back to index](../index.md)
18 changes: 2 additions & 16 deletions pkg/mongodb/MongodbConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public function reject(Message $message, bool $requeue = false): void
}
}

protected function receiveMessage(): ?MongodbMessage
private function receiveMessage(): ?MongodbMessage
{
$now = time();
$collection = $this->context->getCollection();
Expand All @@ -137,23 +137,9 @@ protected function receiveMessage(): ?MongodbMessage
return null;
}
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
return $this->convertMessage($message);
return $this->context->convertMessage($message);
}

return null;
}

protected function convertMessage(array $mongodbMessage): MongodbMessage
{
$properties = JSON::decode($mongodbMessage['properties']);
$headers = JSON::decode($mongodbMessage['headers']);

$message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers);
$message->setId((string) $mongodbMessage['_id']);
$message->setPriority((int) $mongodbMessage['priority']);
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
$message->setPublishedAt((int) $mongodbMessage['published_at']);

return $message;
}
}
22 changes: 20 additions & 2 deletions pkg/mongodb/MongodbContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Interop\Queue\Context;
use Interop\Queue\Destination;
use Interop\Queue\Exception\InvalidDestinationException;
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
use Interop\Queue\Message;
use Interop\Queue\Producer;
Expand Down Expand Up @@ -107,7 +106,26 @@ public function close(): void

public function createSubscriptionConsumer(): SubscriptionConsumer
{
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
return new MongodbSubscriptionConsumer($this);
}

/**
* @internal It must be used here and in the consumer only
*/
public function convertMessage(array $mongodbMessage): MongodbMessage
{
$mongodbMessageObj = $this->createMessage(
$mongodbMessage['body'],
JSON::decode($mongodbMessage['properties']),
JSON::decode($mongodbMessage['headers'])
);

$mongodbMessageObj->setId((string) $mongodbMessage['_id']);
$mongodbMessageObj->setPriority((int) $mongodbMessage['priority']);
$mongodbMessageObj->setRedelivered((bool) $mongodbMessage['redelivered']);
$mongodbMessageObj->setPublishedAt((int) $mongodbMessage['published_at']);

return $mongodbMessageObj;
}

/**
Expand Down
136 changes: 136 additions & 0 deletions pkg/mongodb/MongodbSubscriptionConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?php

declare(strict_types=1);

namespace Enqueue\Mongodb;

use Interop\Queue\Consumer;
use Interop\Queue\SubscriptionConsumer;

class MongodbSubscriptionConsumer implements SubscriptionConsumer
{
/**
* @var MongodbContext
*/
private $context;

/**
* an item contains an array: [MongodbConsumer $consumer, callable $callback];.
*
* @var array
*/
private $subscribers;

/**
* @param MongodbContext $context
*/
public function __construct(MongodbContext $context)
{
$this->context = $context;
$this->subscribers = [];
}

public function consume(int $timeout = 0): void
{
if (empty($this->subscribers)) {
throw new \LogicException('No subscribers');
}

$timeout = (int) ceil($timeout / 1000);
$endAt = time() + $timeout;

$queueNames = [];
foreach (array_keys($this->subscribers) as $queueName) {
$queueNames[$queueName] = $queueName;
}

$currentQueueNames = [];
while (true) {
if (empty($currentQueueNames)) {
$currentQueueNames = $queueNames;
}

$result = $this->context->getCollection()->findOneAndDelete(
[
'queue' => ['$in' => array_keys($currentQueueNames)],
'$or' => [
['delayed_until' => ['$exists' => false]],
['delayed_until' => ['$lte' => time()]],
],
],
[
'sort' => ['priority' => -1, 'published_at' => 1],
'typeMap' => ['root' => 'array', 'document' => 'array'],
]
);

if ($result) {
list($consumer, $callback) = $this->subscribers[$result['queue']];

$message = $this->context->convertMessage($result);

if (false === call_user_func($callback, $message, $consumer)) {
return;
}

unset($currentQueueNames[$result['queue']]);
} else {
$currentQueueNames = [];

usleep(200000); // 200ms
}

if ($timeout && microtime(true) >= $endAt) {
return;
}
}
}

/**
* @param MongodbConsumer $consumer
*/
public function subscribe(Consumer $consumer, callable $callback): void
{
if (false == $consumer instanceof MongodbConsumer) {
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer)));
}

$queueName = $consumer->getQueue()->getQueueName();
if (array_key_exists($queueName, $this->subscribers)) {
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
return;
}

throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
}

$this->subscribers[$queueName] = [$consumer, $callback];
}

/**
* @param MongodbConsumer $consumer
*/
public function unsubscribe(Consumer $consumer): void
{
if (false == $consumer instanceof MongodbConsumer) {
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer)));
}

$queueName = $consumer->getQueue()->getQueueName();

if (false == array_key_exists($queueName, $this->subscribers)) {
return;
}

if ($this->subscribers[$queueName][0] !== $consumer) {
return;
}

unset($this->subscribers[$queueName]);
}

public function unsubscribeAll(): void
{
$this->subscribers = [];
}
}
26 changes: 26 additions & 0 deletions pkg/mongodb/Tests/MongodbContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ public function testShouldCreateMessage()
$this->assertFalse($message->isRedelivered());
}

public function testShouldConvertFromArrayToMongodbMessage()
{
$arrayData = [
'_id' => 'stringId',
'body' => 'theBody',
'properties' => json_encode(['barProp' => 'barPropVal']),
'headers' => json_encode(['fooHeader' => 'fooHeaderVal']),
'priority' => '12',
'published_at' => 1525935820,
'redelivered' => false,
];

$context = new MongodbContext($this->createClientMock());
$message = $context->convertMessage($arrayData);

$this->assertInstanceOf(MongodbMessage::class, $message);

$this->assertEquals('stringId', $message->getId());
$this->assertEquals('theBody', $message->getBody());
$this->assertEquals(['barProp' => 'barPropVal'], $message->getProperties());
$this->assertEquals(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
$this->assertEquals(12, $message->getPriority());
$this->assertEquals(1525935820, $message->getPublishedAt());
$this->assertFalse($message->isRedelivered());
}

public function testShouldCreateTopic()
{
$context = new MongodbContext($this->createClientMock());
Expand Down
Loading

0 comments on commit ab30327

Please sign in to comment.