Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Interop Support #104

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
language: php

php:
- 5.5
- 5.6
- 7.0
- 7.1

matrix:
include:
- php: 5.5
env: EXCLUDE_ENQUEUE=true
fast_finish: true

services:
Expand All @@ -28,6 +30,7 @@ before_install:
install:
- travis_retry composer self-update && composer --version
- export PATH="$HOME/.composer/vendor/bin:$PATH"
- if [ "$EXCLUDE_ENQUEUE" = true ]; then travis_retry composer remove "enqueue/amqp-lib" "enqueue/amqp-tools" --dev --no-interaction --no-update; fi
- travis_retry composer install --prefer-dist --no-interaction

before_script:
Expand Down
31 changes: 24 additions & 7 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "yiisoft/yii2-queue",
"description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk and Gearman",
"description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, Gearman, Kafka, AWS SQS, STOMP, Filesystem",
"type": "yii2-extension",
"keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman"],
"keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman", "amqp-interop", "queue-interop", "php-enqueue"],
"license": "BSD-3-Clause",
"authors": [
{
Expand All @@ -18,23 +18,39 @@
"require": {
"php": ">=5.5.0",
"yiisoft/yii2": "~2.0.10",
"symfony/process": "*"
"symfony/process": "*",
"queue-interop/queue-interop": "^0.6.1",
"queue-interop/amqp-interop": "^0.6.1"
},
"require-dev": {
"yiisoft/yii2-redis": "*",
"php-amqplib/php-amqplib": "*",
"php-amqplib/php-amqplib": "2.7.x-dev",
"pda/pheanstalk": "*",
"jeremeamia/superclosure": "*",
"yiisoft/yii2-debug": "*",
"yiisoft/yii2-gii": "*",
"phpunit/phpunit": "~4.4"
"phpunit/phpunit": "~4.4",
"enqueue/amqp-lib": "0.7.x-dev",
"enqueue/amqp-tools": "0.7.x-dev"
},
"suggest": {
"ext-pcntl": "Need for process signals.",
"yiisoft/yii2-redis": "Need for Redis queue.",
"pda/pheanstalk": "Need for Beanstalk queue.",
"php-amqplib/php-amqplib": "Need for AMQP queue.",
"ext-gearman": "Need for Gearman queue."
"ext-gearman": "Need for Gearman queue.",
"enqueue/amqp-ext": "Required for support of AMQP with queue interop",
"enqueue/amqp-lib": "Required for support of AMQP with queue interop",
"enqueue/amqp-bunny": "Required for support of AMQP with queue interop",
"enqueue/amqp-tools": "Required for support of message delay with AMQP",
"enqueue/rdkafka": "Required for support of Kafka with queue interop",
"enqueue/sqs": "Required for support of AWS SQS with queue interop",
"enqueue/dbal": "Required for support of Doctrine Dbal queue with queue interop",
"enqueue/redis": "Required for support of Redis queue with queue interop",
"enqueue/fs": "Required for support of filesystem queue with queue interop",
"enqueue/gearman": "Required for support of Gearman queue with queue interop",
"enqueue/pheanstalk": "Required for support of Beanstalk queue with queue interop",
"enqueue/stomp": "Required for support of STOMP with queue interop"
},
"autoload": {
"psr-4": {
Expand All @@ -45,7 +61,8 @@
"yii\\queue\\file\\": "src/drivers/file",
"yii\\queue\\gearman\\": "src/drivers/gearman",
"yii\\queue\\redis\\": "src/drivers/redis",
"yii\\queue\\sync\\": "src/drivers/sync"
"yii\\queue\\sync\\": "src/drivers/sync",
"yii\\queue\\queue_interop\\": "src/drivers/queue_interop"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use just yii\queue\interop as a namespace?

}
},
"extra": {
Expand Down
1 change: 1 addition & 0 deletions docs/guide/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ Queue Drivers
* [RabbitMQ](driver-amqp.md)
* [Beanstalk](driver-beanstalk.md)
* [Gearman](driver-gearman.md)
* [Queue Interop (php-enqueue)](driver-queue-interop.md)
51 changes: 51 additions & 0 deletions docs/guide/driver-queue-interop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Queue Interop Driver (php-enqueue)
==================================

The driver works with many queue brokers.

You can find full list of supported brokers on the [Queue Interop](https://github.com/queue-interop/queue-interop) page

In order for it to work you need to install and configure one the implementations supported.

Configuration example for the RabbitMQ AMQP:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good example since there's RabbitMQ driver that works w/o Queue Interop already. It all depends on the goal.

@zhuravljov Do we want to remove our own implementations and use Queue Interop ones instead? That would be less maintenance and more profit for PHP overall. Or do we want to have additional drivers through Queue Interop?


```php
return [
'bootstrap' => [
'queue', // The component registers own console commands
],
'components' => [
'queue' => [
'class' => \yii\queue\queue_interop\Queue::class,
'queueName' => 'queue',
'factoryClass' => \Enqueue\AmqpLib\AmqpConnectionFactory::class,
'factoryConfig' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'pass' => 'guest',
'vhost' => '/',
],
],
],
];
```

Console
-------

Console is used to listen and process queued tasks.

```sh
yii queue/listen
```

`listen` command launches a daemon which infinitely queries the queue. If there are new tasks
they're immediately obtained and executed. This method is most efficient when command is properly
daemonized via [supervisor](worker.md#supervisor).

`listen` command has options:

- `--verbose`, `-v`: print execution status into console.
- `--isolate`: verbose mode of a job execution. If enabled, execution result of each job will be printed.
- `--color`: highlighting for verbose mode.
22 changes: 22 additions & 0 deletions src/drivers/queue_interop/Command.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace yii\queue\queue_interop;

use yii\queue\cli\Command as CliCommand;

class Command extends CliCommand
{
/**
* @var Queue
*/
public $queue;

/**
* Listens queue and runs new jobs.
* It can be used as demon process.
*/
public function actionListen()
{
$this->queue->listen();
}
}
153 changes: 153 additions & 0 deletions src/drivers/queue_interop/Queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
<?php

namespace yii\queue\queue_interop;

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpQueue;
use Interop\Queue\PsrConnectionFactory;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrProducer;
use yii\base\NotSupportedException;
use yii\queue\cli\Queue as CliQueue;

class Queue extends CliQueue
{
const RABBITMQ_DELAY_DLX = 'dlx';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there are rabbit-specific things in a general purpose class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tried to keep it as simple as possible, so everything in one class.

we can move amqp interop specific stuff to separate class. Is it ok?

const RABBITMQ_DELAY_DELAYED_MESSAGE_PLUGIN = 'delayed_message_plugin';

/**
* {@inheritdoc}
*/
public $commandClass = Command::class;

/**
* @var string
*/
public $factoryClass = null;

/**
* @var array
*/
public $factoryConfig = [];

/**
* Supported strategies: "dlx", "delayed_message_plugin"
*
* @var string
*/
public $rabbitmqDelayStrategy = self::RABBITMQ_DELAY_DLX;

/**
* @var string
*/
public $queueName = 'queue';

/**
* @var PsrContext
*/
private $context;

/**
* Listens queue and runs new jobs.
*/
public function listen()
{
$consumer = $this->getContext()->createConsumer(
$this->getContext()->createQueue($this->queueName)
);

while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to add usleep. 10-100ms would be ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ability to configure delay would be good with 10ms default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. There is no need for sleep since receive is blocking (it waits on a socket, or does sleep internally). No need to do it here.

if ($message = $consumer->receive()) {
list($ttr, $body) = explode(';', $message->getBody(), 2);
if ($this->handleMessage(null, $body, $ttr, 1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we reject it on false?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue reserves message for handling before method calling.
If method returns false, queue should keep message in reserve. If false, message must be deleted.

Copy link
Contributor

@makasim makasim Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ASKozienko could you please do message requeue on false and ack on true?

$consumer->acknowledge($message);
} else {
$consumer->reject($message, true);
}
}
}
}

/**
* {@inheritdoc}
*/
protected function pushMessage($message, $ttr, $delay, $priority)
{
$producer = $this->getContext()->createProducer();

if ($delay !== null) {
$producer->setDeliveryDelay($delay * 1000);
}

if ($priority !== null) {
$producer->setPriority($priority);
}

$producer->send(
$this->getContext()->createQueue($this->queueName),
$this->getContext()->createMessage("$ttr;$message")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samdark what does ttr mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@makasim "time to read"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

);

return null;
}

/**
* {@inheritdoc}
*/
public function status($id)
{
throw new NotSupportedException('Status is not supported in the driver.');
}

/**
* @return PsrContext
*/
private function getContext()
{
if (null === $this->context) {
if (empty($this->factoryClass)) {
throw new \LogicException('The "factoryClass" option is required');
}

if (false == class_exists($this->factoryClass)) {
throw new \LogicException(sprintf('The "factoryClass" option "%s" is not a class', $this->factoryClass));
}

if (false == is_a($this->factoryClass, PsrConnectionFactory::class, true)) {
throw new \LogicException(sprintf('The "factoryClass" option must contain a class that implements "%s" but it is not', PsrConnectionFactory::class));
}

/** @var PsrConnectionFactory $factory */
$factory = new $this->factoryClass(isset($this->factoryConfig['dsn']) ? $this->factoryConfig['dsn'] : $this->factoryConfig);

if ($factory instanceof DelayStrategyAware) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ASKozienko I am not sure factory implements delay strategy aware interface.

if (false != $this->rabbitmqDelayStrategy) {
switch ($this->rabbitmqDelayStrategy) {
case self::RABBITMQ_DELAY_DLX:
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
break;
case self::RABBITMQ_DELAY_DELAYED_MESSAGE_PLUGIN:
$factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy());
break;
default:
throw new \LogicException(sprintf('Unknown rabbitmq delay strategy: "%s"', $this->rabbitmqDelayStrategy));
}
}
}

$this->context = $factory->createContext();

if ($this->context instanceof AmqpContext) {
$queue = $this->context->createQueue($this->queueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);

$this->context->declareQueue($queue);
}
}

return $this->context;
}
}
13 changes: 13 additions & 0 deletions tests/app/config/main.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
'redisQueue',
'amqpQueue',
'beanstalkQueue',
'interopQueue',
],
'components' => [
'syncQueue' => [
Expand Down Expand Up @@ -75,6 +76,18 @@
'beanstalkQueue' => [
'class' => \yii\queue\beanstalk\Queue::class,
],
'interopQueue' => [
'class' => \yii\queue\queue_interop\Queue::class,
'queueName' => 'interop_queue',
'factoryClass' => \Enqueue\AmqpLib\AmqpConnectionFactory::class,
'factoryConfig' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'pass' => 'guest',
'vhost' => '/'
]
],
],
];

Expand Down
41 changes: 41 additions & 0 deletions tests/drivers/queue_interop/QueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace tests\drivers\queue_interop;

use tests\drivers\CliTestCase;
use yii\queue\queue_interop\Queue;

class QueueTest extends CliTestCase
{
protected function setUp()
{
if ('true' == getenv('EXCLUDE_ENQUEUE')) {
$this->markTestSkipped('Queue interop tests are disabled for php 5.5');
}

parent::setUp();
}

/**
* @return Queue
*/
protected function getQueue()
{
return \Yii::$app->interopQueue;
}

public function testRun()
{
// Not supported
}

public function testStatus()
{
// Not supported
}

public function testRetry()
{
// Limited support
}
}