Skip to content

Commit

Permalink
delay strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ASKozienko committed Aug 3, 2017
1 parent 918bff6 commit f174e80
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
43 changes: 43 additions & 0 deletions pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Enqueue\Client\Amqp;

use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpDestination;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
use Interop\Amqp\Impl\AmqpBind;

class DelayPluginDelayStrategy implements DelayStrategy
{
public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message)
{
$delaySec = 1235; // $message->getDelay();

if ($dest instanceof AmqpTopic) {
$delayTopic = $context->createTopic($dest->getTopicName().'.x.delayed');
$delayTopic->setType('x-delayed-message');
$delayTopic->addFlag(AmqpTopic::FLAG_DURABLE);
$delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT);

$context->declareTopic($delayTopic);
$context->bind(new AmqpBind($dest, $delayTopic));
} elseif ($dest instanceof AmqpQueue) {
$delayTopic = $context->createTopic($dest->getQueueName().'.delayed');
$delayTopic->setType('x-delayed-message');
$delayTopic->addFlag(AmqpTopic::FLAG_DURABLE);
$delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT);

$context->declareTopic($delayTopic);
$context->bind(new AmqpBind($delayTopic, $dest, $dest->getQueueName()));
} else {
throw new \LogicException();
}

$delayMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
$delayMessage->setProperty('x-delay', (string) ($delaySec * 1000));

$context->createProducer()->send($delayTopic, $delayMessage);
}
}
17 changes: 17 additions & 0 deletions pkg/enqueue/Client/Amqp/DelayStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Enqueue\Client\Amqp;

use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpDestination;
use Interop\Amqp\AmqpMessage;

interface DelayStrategy
{
/**
* @param AmqpContext $context
* @param AmqpDestination $dest
* @param AmqpMessage $message
*/
public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message);
}
47 changes: 47 additions & 0 deletions pkg/enqueue/Client/Amqp/DlxDelayStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Enqueue\Client\Amqp;

use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpDestination;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;

class DlxDelayStrategy implements DelayStrategy
{
/**
* {@inheritdoc}
*/
public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message)
{
$delaySec = 1235; // $message->getDelay();

if ($dest instanceof AmqpTopic) {
$delayQueue = $context->createQueue($dest->getTopicName().'.'.$delaySec.'.x.delayed');
$delayQueue->addFlag(AmqpTopic::FLAG_DURABLE);
$delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName());
} elseif ($dest instanceof AmqpQueue) {
$delayQueue = $context->createQueue($dest->getQueueName().'.'.$delaySec.'.delayed');
$delayQueue->addFlag(AmqpTopic::FLAG_DURABLE);
$delayQueue->setArgument('x-dead-letter-exchange', '');
$delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName());
} else {
throw new \LogicException();
}

$context->declareQueue($delayQueue);

$properties = $message->getProperties();

// The x-death header must be removed because of the bug in RabbitMQ.
// It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there.
// https://github.com/rabbitmq/rabbitmq-server/issues/216
unset($properties['x-death']);

$delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders());
$delayMessage->setExpiration((string) ($delaySec * 1000));

$context->createProducer()->send($delayQueue, $delayMessage);
}
}

0 comments on commit f174e80

Please sign in to comment.