Skip to content

Commit

Permalink
Merge pull request #430 from turboboy88/mongodb_transport
Browse files Browse the repository at this point in the history
Mongodb transport
  • Loading branch information
makasim authored May 3, 2018
2 parents bf9f57a + dab69f9 commit 56e5c97
Show file tree
Hide file tree
Showing 57 changed files with 3,301 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ cache:
directories:
- $HOME/.composer/cache

before_install:
- echo "extension = mongodb.so" >> $HOME/.phpenv/versions/$(phpenv version-name)/etc/php.ini
- php -m
- php -i | grep -C 15 mongo

install:
- rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini;
- echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ Features:
[![Build Status](https://travis-ci.org/php-enqueue/fs.png?branch=master)](https://travis-ci.org/php-enqueue/fs)
[![Total Downloads](https://poser.pugx.org/enqueue/fs/d/total.png)](https://packagist.org/packages/enqueue/fs)
[![Latest Stable Version](https://poser.pugx.org/enqueue/fs/version.png)](https://packagist.org/packages/enqueue/fs)
* [Mongodb](docs/transport/mongodb.md)
[![Build Status](https://travis-ci.org/php-enqueue/mongodb.png?branch=master)](https://travis-ci.org/php-enqueue/mongodb)
[![Total Downloads](https://poser.pugx.org/enqueue/mongodb/d/total.png)](https://packagist.org/packages/enqueue/mongodb)
[![Latest Stable Version](https://poser.pugx.org/enqueue/mongodb/version.png)](https://packagist.org/packages/enqueue/mongodb)
* [Null](docs/transport/null.md).
[![Build Status](https://travis-ci.org/php-enqueue/null.png?branch=master)](https://travis-ci.org/php-enqueue/null)
[![Total Downloads](https://poser.pugx.org/enqueue/null/d/total.png)](https://packagist.org/packages/enqueue/null)
Expand Down
2 changes: 1 addition & 1 deletion bin/run-fun-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
set -x
set -e

COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ waitForService redis 6379 50
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
8 changes: 7 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"enqueue/fs": "*@dev",
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/mongodb": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
Expand Down Expand Up @@ -60,7 +61,8 @@
"platform": {
"ext-amqp": "1.9.3",
"ext-gearman": "1.1",
"ext-rdkafka": "3.3"
"ext-rdkafka": "3.3",
"ext-mongodb": "1.3"
}
},
"repositories": [
Expand Down Expand Up @@ -143,6 +145,10 @@
{
"type": "path",
"url": "pkg/async-event-dispatcher"
},
{
"type": "path",
"url": "pkg/mongodb"
}
]
}
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- zookeeper
- google-pubsub
- rabbitmqssl
- mongo
volumes:
- './:/mqdev'
environment:
Expand All @@ -24,7 +25,7 @@ services:
- RABBITMQ_PASSWORD=guest
- RABBITMQ_VHOST=mqdev
- RABBITMQ_AMQP__PORT=5672
- RABBITMQ_STOMP_PORT=61613
- RABBITMQ_STOMP_PORT=61613
- DOCTRINE_DRIVER=pdo_mysql
- DOCTRINE_HOST=mysql
- DOCTRINE_PORT=3306
Expand All @@ -44,6 +45,7 @@ services:
- RDKAFKA_PORT=9092
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
- GCLOUD_PROJECT=mqdev
- MONGO_DSN=mongodb://mongo

rabbitmq:
image: 'enqueue/rabbitmq:latest'
Expand Down Expand Up @@ -102,6 +104,11 @@ services:
image: 'google/cloud-sdk:latest'
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'

mongo:
image: mongo:3.7
ports:
- "27017:27017"

volumes:
mysql-data:
driver: local
10 changes: 3 additions & 7 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ RUN set -x && \
git clone https://github.com/pdezwart/php-amqp.git . && git checkout v1.9.3 && \
phpize --clean && phpize && ./configure && make install

## confis

# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini

## librdkafka
RUN set -x && \
apt-get update && \
Expand All @@ -27,10 +23,10 @@ RUN set -x && \
git checkout v0.11.1 && \
./configure && make && make install && \
pecl install rdkafka && \
echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini && \
echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini
echo "extension=rdkafka.so" > /etc/php/7.2/cli/conf.d/10-rdkafka.ini && \
echo "extension=rdkafka.so" > /etc/php/7.2/fpm/conf.d/10-rdkafka.ini

COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
COPY ./php/cli.ini /etc/php/7.2/cli/conf.d/1-dev_cli.ini
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
RUN chmod u+x /usr/local/bin/entrypoint.sh

Expand Down
142 changes: 142 additions & 0 deletions docs/transport/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Mongodb transport

Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker.

* [Installation](#installation)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Send priority message](#send-priority-message)
* [Send expiration message](#send-expiration-message)
* [Send delayed message](#send-delayed-message)
* [Consume message](#consume-message)

## Installation

```bash
$ composer require enqueue/mongodb
```

## Create context

```php
<?php
use Enqueue\Mongodb\MongodbConnectionFactory;

// connects to localhost
$connectionFactory = new MongodbConnectionFactory();

// same as above
$factory = new MongodbConnectionFactory('mongodb:');

// same as above
$factory = new MongodbConnectionFactory([]);

$factory = new MongodbConnectionFactory([
'uri' => 'mongodb://localhost:27017/db_name',
'dbname' => 'enqueue',
'collection_name' => 'enqueue',
'polling_interval' => '1000',
]);

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a function from there to create the context
$psrContext = \Enqueue\dsn_to_context('mongodb:');
```

## Send message to topic

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooTopic */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);
```

## Send message to queue

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);
```

## Send priority message

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setPriority(5) // the higher priority the sooner a message gets to a consumer
//
->send($fooQueue, $message)
;
```

## Send expiration message

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setTimeToLive(60000) // 60 sec
//
->send($fooQueue, $message)
;
```

## Send delayed message

```php
<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;

/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

// make sure you run "composer require enqueue/amqp-tools".

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setDeliveryDelay(5000) // 5 sec

->send($fooQueue, $message)
;
````

## Consume message:

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);
```

[back to index](../index.md)
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<directory>pkg/gps/Tests</directory>
</testsuite>

<testsuite name="Mongodb transport">
<directory>pkg/mongodb/Tests</directory>
</testsuite>

<testsuite name="enqueue-bundle">
<directory>pkg/enqueue-bundle/Tests</directory>
</testsuite>
Expand Down
7 changes: 7 additions & 0 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
Expand Down Expand Up @@ -112,6 +113,12 @@ class_exists(AmqpLibConnectionFactory::class)
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
}

if (class_exists(MongodbTransportFactory::class)) {
$extension->setTransportFactory(new MongodbTransportFactory('mongodb'));
} else {
$extension->setTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb']));
}

$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ public function provideEnqueueConfigs()
]];
}

yield 'mongodb_dsn' => [[
'transport' => [
'default' => 'mongodb',
'mongodb' => getenv('MONGO_DSN'),
],
]];

// yield 'gps' => [[
// 'transport' => [
// 'default' => 'gps',
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue/Symfony/DefaultTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
Expand Down Expand Up @@ -215,6 +217,10 @@ private function findFactory($dsn)
return new RdKafkaTransportFactory('default_kafka');
}

if ($factory instanceof MongodbConnectionFactory) {
return new MongodbTransportFactory('default_mongodb');
}

throw new \LogicException(sprintf(
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
get_class($factory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
Expand Down Expand Up @@ -97,5 +98,7 @@ public static function provideDSNs()
yield ['sqs:', SqsConnectionFactory::class];

yield ['gps:', GpsConnectionFactory::class];

yield ['mongodb:', MongodbConnectionFactory::class];
}
}
2 changes: 2 additions & 0 deletions pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,7 @@ public static function provideDSNs()
yield ['stomp:', 'default_stomp'];

yield ['kafka:', 'default_kafka'];

yield ['mongodb:', 'default_mongodb'];
}
}
5 changes: 5 additions & 0 deletions pkg/enqueue/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
Expand Down Expand Up @@ -108,6 +109,10 @@ function dsn_to_connection_factory($dsn)
$map['gps'] = GpsConnectionFactory::class;
}

if (class_exists(MongodbConnectionFactory::class)) {
$map['mongodb'] = MongodbConnectionFactory::class;
}

list($scheme) = explode(':', $dsn, 2);
if (false == $scheme || false === strpos($dsn, ':')) {
throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn));
Expand Down
7 changes: 7 additions & 0 deletions pkg/mongodb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
/examples/
Loading

0 comments on commit 56e5c97

Please sign in to comment.