Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNSQS] added possibility to send FIFO-related parameters using snsqs transport #1278

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/snsqs/SnsQsMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ class SnsQsMessage implements Message
*/
private $messageAttributes;

/**
* @var string|null
*/
private $messageGroupId;

/**
* @var string|null
*/
private $messageDeduplicationId;

/**
* See AWS documentation for message attribute structure.
*
Expand Down Expand Up @@ -59,4 +69,40 @@ public function setMessageAttributes(?array $messageAttributes): void
{
$this->messageAttributes = $messageAttributes;
}

/**
* Only FIFO.
*
* The token used for deduplication of sent messages. If a message with a particular MessageDeduplicationId is sent successfully,
* any messages sent with the same MessageDeduplicationId are accepted successfully but aren't delivered during the 5-minute
* deduplication interval. For more information, see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html#FIFO-queues-exactly-once-processing.
*/
public function setMessageDeduplicationId(string $id = null): void
{
$this->messageDeduplicationId = $id;
}

public function getMessageDeduplicationId(): ?string
{
return $this->messageDeduplicationId;
}

/**
* Only FIFO.
*
* The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group
* are processed in a FIFO manner (however, messages in different message groups might be processed out of order).
* To interleave multiple ordered streams within a single queue, use MessageGroupId values (for example, session data
* for multiple users). In this scenario, multiple readers can process the queue, but the session data
* of each user is processed in a FIFO fashion.
*/
public function setMessageGroupId(string $id = null): void
{
$this->messageGroupId = $id;
}

public function getMessageGroupId(): ?string
{
return $this->messageGroupId;
}
}
5 changes: 5 additions & 0 deletions pkg/snsqs/SnsQsProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public function send(Destination $destination, Message $message): void
$message->getHeaders()
);
$snsMessage->setMessageAttributes($message->getMessageAttributes());
$snsMessage->setMessageGroupId($message->getMessageGroupId());
$snsMessage->setMessageDeduplicationId($message->getMessageDeduplicationId());

$this->getSnsProducer()->send($destination, $snsMessage);
} else {
Expand All @@ -70,6 +72,9 @@ public function send(Destination $destination, Message $message): void
$message->getHeaders()
);

$sqsMessage->setMessageGroupId($message->getMessageGroupId());
$sqsMessage->setMessageDeduplicationId($message->getMessageDeduplicationId());

$this->getSqsProducer()->send($destination, $sqsMessage);
}
}
Expand Down
58 changes: 55 additions & 3 deletions pkg/snsqs/Tests/SnsQsProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Enqueue\SnsQs\SnsQsQueue;
use Enqueue\SnsQs\SnsQsTopic;
use Enqueue\Sqs\SqsContext;
use Enqueue\Sqs\SqsMessage;
use Enqueue\Sqs\SqsProducer;
use Enqueue\Test\ClassExtensionTrait;
use Interop\Queue\Destination;
Expand Down Expand Up @@ -124,20 +125,71 @@ public function testShouldSendSnsTopicMessageWithAttributesToSnsProducer()
$producer->send($destination, new SnsQsMessage('', [], [], ['foo' => 'bar']));
}

public function testShouldSendToSnsTopicMessageWithGroupIdAndDeduplicationId()
{
$snsMock = $this->createSnsContextMock();
$snsMock->method('createMessage')->willReturn(new SnsMessage());
$destination = new SnsQsTopic('');

$snsProducerStub = $this->prophesize(SnsProducer::class);
$snsProducerStub->send(
$destination,
Argument::that(function (SnsMessage $snsMessage) {
return 'group-id' === $snsMessage->getMessageGroupId()
&& 'deduplication-id' === $snsMessage->getMessageDeduplicationId();
})
)->shouldBeCalledOnce();

$snsMock->method('createProducer')->willReturn($snsProducerStub->reveal());

$snsMessage = new SnsQsMessage();
$snsMessage->setMessageGroupId('group-id');
$snsMessage->setMessageDeduplicationId('deduplication-id');

$producer = new SnsQsProducer($snsMock, $this->createSqsContextMock());
$producer->send($destination, $snsMessage);
}

public function testShouldSendSqsMessageToSqsProducer()
{
$sqsMock = $this->createSqsContextMock();
$sqsMock->method('createMessage')->willReturn(new SqsMessage());
$destination = new SnsQsQueue('');

$snsProducerStub = $this->prophesize(SqsProducer::class);
$snsProducerStub->send($destination, Argument::any())->shouldBeCalledOnce();
$sqsProducerStub = $this->prophesize(SqsProducer::class);
$sqsProducerStub->send($destination, Argument::any())->shouldBeCalledOnce();

$sqsMock->method('createProducer')->willReturn($snsProducerStub->reveal());
$sqsMock->method('createProducer')->willReturn($sqsProducerStub->reveal());

$producer = new SnsQsProducer($this->createSnsContextMock(), $sqsMock);
$producer->send($destination, new SnsQsMessage());
}

public function testShouldSendToSqsProducerMessageWithGroupIdAndDeduplicationId()
{
$sqsMock = $this->createSqsContextMock();
$sqsMock->method('createMessage')->willReturn(new SqsMessage());
$destination = new SnsQsQueue('');

$sqsProducerStub = $this->prophesize(SqsProducer::class);
$sqsProducerStub->send(
$destination,
Argument::that(function (SqsMessage $sqsMessage) {
return 'group-id' === $sqsMessage->getMessageGroupId()
&& 'deduplication-id' === $sqsMessage->getMessageDeduplicationId();
})
)->shouldBeCalledOnce();

$sqsMock->method('createProducer')->willReturn($sqsProducerStub->reveal());

$sqsMessage = new SnsQsMessage();
$sqsMessage->setMessageGroupId('group-id');
$sqsMessage->setMessageDeduplicationId('deduplication-id');

$producer = new SnsQsProducer($this->createSnsContextMock(), $sqsMock);
$producer->send($destination, $sqsMessage);
}

/**
* @return \PHPUnit\Framework\MockObject\MockObject|SnsContext
*/
Expand Down