Skip to content

Commit

Permalink
Merge pull request #60 from roadrunner-php/feature/sqs-options
Browse files Browse the repository at this point in the history
Add additional options when creating a pipeline
  • Loading branch information
wolfy-j authored Nov 8, 2023
2 parents d031e96 + e645d02 commit b0903fe
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 61 deletions.
29 changes: 28 additions & 1 deletion src/Queue/AMQPCreateInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ final class AMQPCreateInfo extends CreateInfo
{
public const PREFETCH_DEFAULT_VALUE = 100;
public const QUEUE_DEFAULT_VALUE = 'default';
public const QUEUE_AUTO_DELETE_DEFAULT_VALUE = false;
public const EXCHANGE_DEFAULT_VALUE = 'amqp.default';
public const EXCHANGE_DURABLE_DEFAULT_VALUE = false;
public const ROUTING_KEY_DEFAULT_VALUE = '';
Expand All @@ -19,6 +20,10 @@ final class AMQPCreateInfo extends CreateInfo
public const DURABLE_DEFAULT_VALUE = false;
public const CONSUME_ALL_DEFAULT_VALUE = false;
public const QUEUE_HEADERS_DEFAULT_VALUE = [];
public const DELETE_QUEUE_ON_STOP_DEFAULT_VALUE = false;
public const REDIAL_TIMEOUT_DEFAULT_VALUE = 60;
public const EXCHANGE_AUTO_DELETE_DEFAULT_VALUE = false;
public const CONSUMER_ID_DEFAULT_VALUE = null;

/**
* @param non-empty-string $name
Expand All @@ -28,6 +33,8 @@ final class AMQPCreateInfo extends CreateInfo
* @param non-empty-string $exchange
* @param string $routingKey Routing key. Required for publisher.
* @param array<string, string> $queueHeaders
* @param positive-int $redialTimeout
* @param non-empty-string|null $consumerId
*/
public function __construct(
string $name,
Expand All @@ -44,28 +51,48 @@ public function __construct(
public readonly bool $exchangeDurable = self::EXCHANGE_DURABLE_DEFAULT_VALUE,
public readonly bool $consumeAll = self::CONSUME_ALL_DEFAULT_VALUE,
public readonly array $queueHeaders = self::QUEUE_HEADERS_DEFAULT_VALUE,
public readonly bool $deleteQueueOnStop = self::DELETE_QUEUE_ON_STOP_DEFAULT_VALUE,
public readonly int $redialTimeout = self::REDIAL_TIMEOUT_DEFAULT_VALUE,
public readonly bool $exchangeAutoDelete = self::EXCHANGE_AUTO_DELETE_DEFAULT_VALUE,
public readonly bool $queueAutoDelete = self::QUEUE_AUTO_DELETE_DEFAULT_VALUE,
public readonly ?string $consumerId = self::CONSUMER_ID_DEFAULT_VALUE,
) {
parent::__construct(Driver::AMQP, $name, $priority);

\assert($this->prefetch >= 1, 'Precondition [prefetch >= 1] failed');
\assert($this->redialTimeout >= 1, 'Precondition [redialTimeout >= 1] failed');
\assert($this->exchange !== '', 'Precondition [exchange !== ""] failed');

if ($this->consumerId !== null) {
\assert($this->consumerId !== '', 'Precondition [consumerId !== ""] failed');
}
}

public function toArray(): array
{
return \array_merge(parent::toArray(), [
$result = \array_merge(parent::toArray(), [
'prefetch' => $this->prefetch,
'queue' => $this->queue,
'queue_auto_delete' => $this->queueAutoDelete,
'exchange' => $this->exchange,
'exchange_durable' => $this->exchangeDurable,
'exchange_type' => $this->exchangeType->value,
'exchange_auto_delete' => $this->exchangeAutoDelete,
'routing_key' => $this->routingKey,
'exclusive' => $this->exclusive,
'multiple_ack' => $this->multipleAck,
'requeue_on_fail' => $this->requeueOnFail,
'durable' => $this->durable,
'consume_all' => $this->consumeAll,
'queue_headers' => $this->queueHeaders,
'delete_queue_on_stop' => $this->deleteQueueOnStop,
'redial_timeout' => $this->redialTimeout,
]);

if (!empty($this->consumerId)) {
$result['consumer_id'] = $this->consumerId;
}

return $result;
}
}
3 changes: 3 additions & 0 deletions src/Queue/BeanstalkCreateInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ final class BeanstalkCreateInfo extends CreateInfo
public const TUBE_PRIORITY_MAX_VALUE = 2 ** 32;
public const TUBE_DEFAULT_VALUE = 'default';
public const RESERVE_TIMEOUT_DEFAULT_VALUE = 5;
public const CONSUME_ALL_DEFAULT_VALUE = false;

/**
* @param non-empty-string $name
Expand All @@ -27,6 +28,7 @@ public function __construct(
public readonly int $tubePriority = self::TUBE_PRIORITY_DEFAULT_VALUE,
public readonly string $tube = self::TUBE_DEFAULT_VALUE,
public readonly int $reserveTimeout = self::RESERVE_TIMEOUT_DEFAULT_VALUE,
public readonly bool $consumeAll = self::CONSUME_ALL_DEFAULT_VALUE,
) {
parent::__construct(Driver::Beanstalk, $name, $priority);

Expand All @@ -41,6 +43,7 @@ public function toArray(): array
'tube_priority' => $this->tubePriority,
'tube' => $this->tube,
'reserve_timeout' => $this->reserveTimeout,
'consume_all' => $this->consumeAll,
]);
}
}
3 changes: 3 additions & 0 deletions src/Queue/BoltdbCreateInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ final class BoltdbCreateInfo extends CreateInfo
{
public const PREFETCH_DEFAULT_VALUE = 10000;
public const FILE_DEFAULT_VALUE = 'rr.db';
public const PERMISSIONS_DEFAULT_VALUE = 0755;

/**
* @param non-empty-string $name
Expand All @@ -23,6 +24,7 @@ public function __construct(
public readonly string $file = self::FILE_DEFAULT_VALUE,
int $priority = self::PRIORITY_DEFAULT_VALUE,
public readonly int $prefetch = self::PREFETCH_DEFAULT_VALUE,
public readonly int $permissions = self::PERMISSIONS_DEFAULT_VALUE,
) {
parent::__construct(Driver::BoltDB, $name, $priority);

Expand All @@ -35,6 +37,7 @@ public function toArray(): array
return \array_merge(parent::toArray(), [
'prefetch' => $this->prefetch,
'file' => $this->file,
'permissions' => $this->permissions,
]);
}
}
14 changes: 13 additions & 1 deletion src/Queue/SQSCreateInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ final class SQSCreateInfo extends CreateInfo
public const ATTRIBUTES_DEFAULT_VALUE = [];
public const TAGS_DEFAULT_VALUE = [];
public const QUEUE_DEFAULT_VALUE = 'default';
public const MESSAGE_GROUP_ID_DEFAULT_VALUE = null;
public const SKIP_QUEUE_DECLARATION_DEFAULT_VALUE = false;

/**
* @param non-empty-string $name
Expand All @@ -43,6 +45,7 @@ final class SQSCreateInfo extends CreateInfo
* @param non-empty-string $queue
* @param array|SQSAttributesMap $attributes
* @param array<non-empty-string, non-empty-string> $tags
* @param non-empty-string|null $messageGroupId
*/
public function __construct(
string $name,
Expand All @@ -53,29 +56,38 @@ public function __construct(
public readonly string $queue = self::QUEUE_DEFAULT_VALUE,
public readonly array $attributes = self::ATTRIBUTES_DEFAULT_VALUE,
public readonly array $tags = self::TAGS_DEFAULT_VALUE,
public readonly ?string $messageGroupId = self::MESSAGE_GROUP_ID_DEFAULT_VALUE,
public readonly bool $skipQueueDeclaration = self::SKIP_QUEUE_DECLARATION_DEFAULT_VALUE,
) {
parent::__construct(Driver::SQS, $name, $priority);

\assert($this->prefetch >= 1, 'Precondition [prefetch >= 1] failed');
\assert($this->visibilityTimeout >= 0, 'Precondition [visibilityTimeout >= 0] failed');
\assert($this->waitTimeSeconds >= 0, 'Precondition [waitTimeSeconds >= 0] failed');
\assert($this->queue !== '', 'Precondition [queue !== ""] failed');
if ($this->messageGroupId !== null) {
\assert($this->messageGroupId !== '', 'Precondition [messageGroupId !== ""] failed');
}
}

public function toArray(): array
{
$result = \array_merge(parent::toArray(), [
'prefetch' => $this->prefetch,
'visibility_timeout' => $this->visibilityTimeout,
'wait_time_seconds' => $this->waitTimeSeconds,
'wait_time' => $this->waitTimeSeconds,
'queue' => $this->queue,
'skip_queue_declaration' => $this->skipQueueDeclaration,
]);
if ($this->attributes !== []) {
$result['attributes'] = $this->attributes;
}
if ($this->tags !== []) {
$result['tags'] = $this->tags;
}
if (!empty($this->messageGroupId)) {
$result['message_group_id'] = $this->messageGroupId;
}

return $result;
}
Expand Down
88 changes: 59 additions & 29 deletions tests/Unit/Queue/AMQPCreateInfoTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public function testDefaultValues(): void
{
$amqpCreateInfo = new AMQPCreateInfo('test');

$this->assertSame('test', $amqpCreateInfo->name);
$this->assertSame(AMQPCreateInfo::PRIORITY_DEFAULT_VALUE, $amqpCreateInfo->priority);
$this->assertSame(AMQPCreateInfo::PREFETCH_DEFAULT_VALUE, $amqpCreateInfo->prefetch);
$this->assertSame(AMQPCreateInfo::QUEUE_DEFAULT_VALUE, $amqpCreateInfo->queue);
$this->assertSame(AMQPCreateInfo::EXCHANGE_DEFAULT_VALUE, $amqpCreateInfo->exchange);
Expand All @@ -23,27 +25,40 @@ public function testDefaultValues(): void
$this->assertFalse($amqpCreateInfo->multipleAck);
$this->assertFalse($amqpCreateInfo->requeueOnFail);
$this->assertFalse($amqpCreateInfo->durable);
$this->assertSame(AMQPCreateInfo::EXCHANGE_DURABLE_DEFAULT_VALUE, $amqpCreateInfo->exchangeDurable);
$this->assertSame(AMQPCreateInfo::CONSUME_ALL_DEFAULT_VALUE, $amqpCreateInfo->consumeAll);
$this->assertSame(AMQPCreateInfo::QUEUE_HEADERS_DEFAULT_VALUE, $amqpCreateInfo->queueHeaders);
$this->assertSame(AMQPCreateInfo::DELETE_QUEUE_ON_STOP_DEFAULT_VALUE, $amqpCreateInfo->deleteQueueOnStop);
$this->assertSame(AMQPCreateInfo::REDIAL_TIMEOUT_DEFAULT_VALUE, $amqpCreateInfo->redialTimeout);
$this->assertSame(AMQPCreateInfo::EXCHANGE_AUTO_DELETE_DEFAULT_VALUE, $amqpCreateInfo->exchangeAutoDelete);
$this->assertSame(AMQPCreateInfo::QUEUE_AUTO_DELETE_DEFAULT_VALUE, $amqpCreateInfo->queueAutoDelete);
$this->assertSame(AMQPCreateInfo::CONSUMER_ID_DEFAULT_VALUE, $amqpCreateInfo->consumerId);
}

public function testCustomValues(): void
{
$amqpCreateInfo = new AMQPCreateInfo(
'test',
5,
200,
'custom_queue',
'custom_exchange',
ExchangeType::Topics,
'custom_routing_key',
true,
true,
true,
true,
true,
true,
[
name: 'test',
priority: 5,
prefetch: 200,
queue: 'custom_queue',
exchange: 'custom_exchange',
exchangeType: ExchangeType::Topics,
routingKey: 'custom_routing_key',
exclusive: true,
multipleAck: true,
requeueOnFail: true,
durable: true,
exchangeDurable: true,
consumeAll: true,
queueHeaders: [
'x-queue-type' => 'quorum',
],
deleteQueueOnStop: true,
redialTimeout: 10,
exchangeAutoDelete: true,
queueAutoDelete: true,
consumerId: 'custom_consumer_id'
);

$this->assertSame(200, $amqpCreateInfo->prefetch);
Expand All @@ -58,27 +73,37 @@ public function testCustomValues(): void
$this->assertTrue($amqpCreateInfo->exchangeDurable);
$this->assertTrue($amqpCreateInfo->consumeAll);
$this->assertSame(['x-queue-type' => 'quorum'], $amqpCreateInfo->queueHeaders);
$this->assertTrue($amqpCreateInfo->deleteQueueOnStop);
$this->assertSame(10, $amqpCreateInfo->redialTimeout);
$this->assertTrue($amqpCreateInfo->exchangeAutoDelete);
$this->assertTrue($amqpCreateInfo->queueAutoDelete);
$this->assertSame('custom_consumer_id', $amqpCreateInfo->consumerId);
}

public function testToArray()
public function testToArray(): void
{
$amqpCreateInfo = new AMQPCreateInfo(
'test',
5,
200,
'custom_queue',
'custom_exchange',
ExchangeType::Fanout,
'custom_routing_key',
true,
true,
true,
true,
true,
true,
[
name: 'test',
priority: 5,
prefetch: 200,
queue: 'custom_queue',
exchange: 'custom_exchange',
exchangeType: ExchangeType::Fanout,
routingKey: 'custom_routing_key',
exclusive: true,
multipleAck: true,
requeueOnFail: true,
durable: true,
exchangeDurable: true,
consumeAll: true,
queueHeaders: [
'x-queue-type' => 'quorum',
],
deleteQueueOnStop: true,
redialTimeout: 10,
exchangeAutoDelete: true,
queueAutoDelete: true,
consumerId: 'custom_consumer_id'
);

$expectedArray = [
Expand All @@ -99,6 +124,11 @@ public function testToArray()
'queue_headers' => [
'x-queue-type' => 'quorum',
],
'exchange_auto_delete' => true,
'delete_queue_on_stop' => true,
'redial_timeout' => 10,
'queue_auto_delete' => true,
'consumer_id' => 'custom_consumer_id'
];

$this->assertEquals($expectedArray, $amqpCreateInfo->toArray());
Expand Down
23 changes: 21 additions & 2 deletions tests/Unit/Queue/BeanstalkCreateInfoTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public function testConstructor(): void
$this->assertEquals(BeanstalkCreateInfo::TUBE_PRIORITY_DEFAULT_VALUE, $beanstalkCreateInfo->tubePriority);
$this->assertEquals(BeanstalkCreateInfo::TUBE_DEFAULT_VALUE, $beanstalkCreateInfo->tube);
$this->assertEquals(BeanstalkCreateInfo::RESERVE_TIMEOUT_DEFAULT_VALUE, $beanstalkCreateInfo->reserveTimeout);
$this->assertEquals(BeanstalkCreateInfo::CONSUME_ALL_DEFAULT_VALUE, $beanstalkCreateInfo->consumeAll);
}

public function testBeanstalkCreateInfoCustomValues(): void
Expand All @@ -30,15 +31,24 @@ public function testBeanstalkCreateInfoCustomValues(): void
$tubePriority = 100;
$tube = 'my-tube';
$reserveTimeout = 30;
$consumeAll = true;

$beanstalkCreateInfo = new BeanstalkCreateInfo($name, $priority, $tubePriority, $tube, $reserveTimeout);
$beanstalkCreateInfo = new BeanstalkCreateInfo(
name: $name,
priority: $priority,
tubePriority: $tubePriority,
tube: $tube,
reserveTimeout: $reserveTimeout,
consumeAll: $consumeAll
);

$this->assertEquals(Driver::Beanstalk, $beanstalkCreateInfo->driver);
$this->assertEquals($name, $beanstalkCreateInfo->name);
$this->assertEquals($priority, $beanstalkCreateInfo->priority);
$this->assertEquals($tubePriority, $beanstalkCreateInfo->tubePriority);
$this->assertEquals($tube, $beanstalkCreateInfo->tube);
$this->assertEquals($reserveTimeout, $beanstalkCreateInfo->reserveTimeout);
$this->assertEquals($consumeAll, $beanstalkCreateInfo->consumeAll);
}

public function testToArray(): void
Expand All @@ -48,8 +58,16 @@ public function testToArray(): void
$tubePriority = 100;
$tube = 'my-tube';
$reserveTimeout = 30;
$consumeAll = true;

$beanstalkCreateInfo = new BeanstalkCreateInfo($name, $priority, $tubePriority, $tube, $reserveTimeout);
$beanstalkCreateInfo = new BeanstalkCreateInfo(
name: $name,
priority: $priority,
tubePriority: $tubePriority,
tube: $tube,
reserveTimeout: $reserveTimeout,
consumeAll: $consumeAll
);

$expectedArray = [
'driver' => Driver::Beanstalk->value,
Expand All @@ -58,6 +76,7 @@ public function testToArray(): void
'tube_priority' => $tubePriority,
'tube' => $tube,
'reserve_timeout' => $reserveTimeout,
'consume_all' => $consumeAll,
];

$this->assertEquals($expectedArray, $beanstalkCreateInfo->toArray());
Expand Down
Loading

0 comments on commit b0903fe

Please sign in to comment.