-
-
Notifications
You must be signed in to change notification settings - Fork 294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queue Interop Support #104
Changes from 9 commits
4abd1c8
6b89628
481060a
9cc793e
0f6eb76
248aaf2
d53f763
497ab57
cff0a63
4e6fa97
79f27a0
f177722
15478f6
a774912
7103cdb
2a80a50
cb24354
aa7e7bb
e0d34fb
ebcfd96
f62b2d1
454f66d
9dbb593
1f7657f
9bd6041
8ad8868
6cb788b
0b60c94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
Queue Interop Driver (php-enqueue) | ||
================================== | ||
|
||
The driver works with many queue brokers. | ||
|
||
You can find full list of supported brokers on the [Queue Interop](https://github.com/queue-interop/queue-interop) page | ||
|
||
In order for it to work you need to install and configure one the implementations supported. | ||
|
||
Configuration example for the RabbitMQ AMQP: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure it's a good example since there's RabbitMQ driver that works w/o Queue Interop already. It all depends on the goal. @zhuravljov Do we want to remove our own implementations and use Queue Interop ones instead? That would be less maintenance and more profit for PHP overall. Or do we want to have additional drivers through Queue Interop? |
||
|
||
```php | ||
return [ | ||
'bootstrap' => [ | ||
'queue', // The component registers own console commands | ||
], | ||
'components' => [ | ||
'queue' => [ | ||
'class' => \yii\queue\queue_interop\Queue::class, | ||
'queueName' => 'queue', | ||
'factoryClass' => \Enqueue\AmqpLib\AmqpConnectionFactory::class, | ||
'factoryConfig' => [ | ||
'host' => 'localhost', | ||
'port' => 5672, | ||
'user' => 'guest', | ||
'pass' => 'guest', | ||
'vhost' => '/', | ||
], | ||
], | ||
], | ||
]; | ||
``` | ||
|
||
Console | ||
------- | ||
|
||
Console is used to listen and process queued tasks. | ||
|
||
```sh | ||
yii queue/listen | ||
``` | ||
|
||
`listen` command launches a daemon which infinitely queries the queue. If there are new tasks | ||
they're immediately obtained and executed. This method is most efficient when command is properly | ||
daemonized via [supervisor](worker.md#supervisor). | ||
|
||
`listen` command has options: | ||
|
||
- `--verbose`, `-v`: print execution status into console. | ||
- `--isolate`: verbose mode of a job execution. If enabled, execution result of each job will be printed. | ||
- `--color`: highlighting for verbose mode. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
<?php | ||
|
||
namespace yii\queue\queue_interop; | ||
|
||
use yii\queue\cli\Command as CliCommand; | ||
|
||
class Command extends CliCommand | ||
{ | ||
/** | ||
* @var Queue | ||
*/ | ||
public $queue; | ||
|
||
/** | ||
* Listens queue and runs new jobs. | ||
* It can be used as demon process. | ||
*/ | ||
public function actionListen() | ||
{ | ||
$this->queue->listen(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
<?php | ||
|
||
namespace yii\queue\queue_interop; | ||
|
||
use Enqueue\AmqpTools\DelayStrategyAware; | ||
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; | ||
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; | ||
use Interop\Amqp\AmqpContext; | ||
use Interop\Amqp\AmqpQueue; | ||
use Interop\Queue\PsrConnectionFactory; | ||
use Interop\Queue\PsrContext; | ||
use Interop\Queue\PsrProducer; | ||
use yii\base\NotSupportedException; | ||
use yii\queue\cli\Queue as CliQueue; | ||
|
||
class Queue extends CliQueue | ||
{ | ||
const RABBITMQ_DELAY_DLX = 'dlx'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why there are rabbit-specific things in a general purpose class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We tried to keep it as simple as possible, so everything in one class. we can move amqp interop specific stuff to separate class. Is it ok? |
||
const RABBITMQ_DELAY_DELAYED_MESSAGE_PLUGIN = 'delayed_message_plugin'; | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public $commandClass = Command::class; | ||
|
||
/** | ||
* @var string | ||
*/ | ||
public $factoryClass = null; | ||
|
||
/** | ||
* @var array | ||
*/ | ||
public $factoryConfig = []; | ||
|
||
/** | ||
* Supported strategies: "dlx", "delayed_message_plugin" | ||
* | ||
* @var string | ||
*/ | ||
public $rabbitmqDelayStrategy = self::RABBITMQ_DELAY_DLX; | ||
|
||
/** | ||
* @var string | ||
*/ | ||
public $queueName = 'queue'; | ||
|
||
/** | ||
* @var PsrContext | ||
*/ | ||
private $context; | ||
|
||
/** | ||
* Listens queue and runs new jobs. | ||
*/ | ||
public function listen() | ||
{ | ||
$consumer = $this->getContext()->createConsumer( | ||
$this->getContext()->createQueue($this->queueName) | ||
); | ||
|
||
while (true) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be good to add usleep. 10-100ms would be ok. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think ability to configure delay would be good with 10ms default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wrong. There is no need for sleep since |
||
if ($message = $consumer->receive()) { | ||
list($ttr, $body) = explode(';', $message->getBody(), 2); | ||
if ($this->handleMessage(null, $body, $ttr, 1)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we reject it on false? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Queue reserves message for handling before method calling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ASKozienko could you please do message requeue on false and ack on true? |
||
$consumer->acknowledge($message); | ||
} else { | ||
$consumer->reject($message, true); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
protected function pushMessage($message, $ttr, $delay, $priority) | ||
{ | ||
$producer = $this->getContext()->createProducer(); | ||
|
||
if ($delay !== null) { | ||
$producer->setDeliveryDelay($delay * 1000); | ||
} | ||
|
||
if ($priority !== null) { | ||
$producer->setPriority($priority); | ||
} | ||
|
||
$producer->send( | ||
$this->getContext()->createQueue($this->queueName), | ||
$this->getContext()->createMessage("$ttr;$message") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @samdark what does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @makasim "time to read" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Time to reserve. See guide about this: |
||
); | ||
|
||
return null; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function status($id) | ||
{ | ||
throw new NotSupportedException('Status is not supported in the driver.'); | ||
} | ||
|
||
/** | ||
* @return PsrContext | ||
*/ | ||
private function getContext() | ||
{ | ||
if (null === $this->context) { | ||
if (empty($this->factoryClass)) { | ||
throw new \LogicException('The "factoryClass" option is required'); | ||
} | ||
|
||
if (false == class_exists($this->factoryClass)) { | ||
throw new \LogicException(sprintf('The "factoryClass" option "%s" is not a class', $this->factoryClass)); | ||
} | ||
|
||
if (false == is_a($this->factoryClass, PsrConnectionFactory::class)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
throw new \LogicException(sprintf('The "factoryClass" option must contain a class that implements "%s" but it is not', PsrConnectionFactory::class)); | ||
} | ||
|
||
/** @var PsrConnectionFactory $factory */ | ||
$factory = new $this->factoryClass(isset($this->factoryConfig['dsn']) ? $this->factoryConfig['dsn'] : $this->factoryConfig); | ||
|
||
if ($factory instanceof DelayStrategyAware) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ASKozienko I am not sure factory implements delay strategy aware interface. |
||
if (false != $this->rabbitmqDelayStrategy) { | ||
switch ($this->rabbitmqDelayStrategy) { | ||
case self::RABBITMQ_DELAY_DLX: | ||
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy()); | ||
break; | ||
case self::RABBITMQ_DELAY_DELAYED_MESSAGE_PLUGIN: | ||
$factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); | ||
break; | ||
default: | ||
throw new \LogicException(sprintf('Unknown rabbitmq delay strategy: "%s"', $this->rabbitmqDelayStrategy)); | ||
} | ||
} | ||
} | ||
|
||
$this->context = $factory->createContext(); | ||
|
||
if ($this->context instanceof AmqpContext) { | ||
$queue = $this->context->createQueue($this->queueName); | ||
$queue->addFlag(AmqpQueue::FLAG_DURABLE); | ||
|
||
$this->context->declareQueue($queue); | ||
} | ||
} | ||
|
||
return $this->context; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
<?php | ||
|
||
namespace tests\drivers\queue_interop; | ||
|
||
use tests\drivers\CliTestCase; | ||
use yii\queue\queue_interop\Queue; | ||
|
||
class QueueTest extends CliTestCase | ||
{ | ||
/** | ||
* @return Queue | ||
*/ | ||
protected function getQueue() | ||
{ | ||
return \Yii::$app->interopQueue; | ||
} | ||
|
||
public function testRun() | ||
{ | ||
// Not supported | ||
} | ||
|
||
public function testStatus() | ||
{ | ||
// Not supported | ||
} | ||
|
||
public function testLater() | ||
{ | ||
// Not supported | ||
} | ||
|
||
public function testRetry() | ||
{ | ||
// Limited support | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use just
yii\queue\interop
as a namespace?