Skip to content

Commit

Permalink
Merge pull request #4 from sagacorp/add-retry-request
Browse files Browse the repository at this point in the history
Add retry request
  • Loading branch information
AdrienHt authored Nov 25, 2022
2 parents c672a40 + 92f330c commit a3aede4
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 79 deletions.
34 changes: 23 additions & 11 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@
*/
class Queue extends \yii\queue\cli\Queue
{
//region Public Properties
// region Public Properties
/**
* use this property to filter job execution on a specific id
* You can use this property when you need to run multiple environments with the same queue at the same time, multiple locals envionnements for example.
*
* @see BrokerProperties::$to
*
* @var string|null
*/
public ?string $id = null;
/**
* @var ServiceBus
*/
public $serviceBus = 'serviceBus';
//endregion Public Properties
// endregion Public Properties

//region Initialization
// region Initialization
/**
* @throws \yii\base\InvalidConfigException
*/
Expand All @@ -32,10 +41,9 @@ public function init(): void

$this->serviceBus = Instance::ensure($this->serviceBus, ServiceBus::class);
}
//endregion Initialization

//region Public Methods
// endregion Initialization

// region Public Methods
/**
* Listens queue and runs each job.
*
Expand All @@ -54,6 +62,9 @@ function (callable $canContinue) use ($repeat, $timeout) {
$message = $this->serviceBus->receiveMessage(ServiceBus::PEEK_LOCK, $timeout);

if ($message !== null && $message->brokerProperties !== null) {
if ($message->brokerProperties->to && !$message->brokerProperties->isTo($this->id)) {
continue;
}
if ($this->handleMessage($message->brokerProperties->messageId, $message->body, $message->brokerProperties->timeToLive, $message->brokerProperties->deliveryCount)) {
$this->serviceBus->deleteMessage($message);
}
Expand All @@ -75,7 +86,7 @@ public function status($id): void
{
throw new NotSupportedException('Status is not supported in the driver.');
}
//endregion Public Methods
// endregion Public Methods

//region Protected Methods
/**
Expand All @@ -92,12 +103,13 @@ protected function pushMessage($message, $ttr, $delay, $priority): string
{
$azureMessage = new Message(
[
'body' => $message,
'contentType' => 'application/vnd.microsoft.servicebus.yml',
'body' => $message,
'contentType' => 'application/vnd.microsoft.servicebus.yml',
'brokerProperties' => new BrokerProperties(
[
'timeToLive' => $ttr,
'delay' => $delay,
'delay' => $delay,
'to' => $this->id,
]
),
]
Expand All @@ -107,5 +119,5 @@ protected function pushMessage($message, $ttr, $delay, $priority): string

return $azureMessage->brokerProperties->messageId;
}
//endregion Protected Methods
// endregion Protected Methods
}
60 changes: 33 additions & 27 deletions src/service/BrokerProperties.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace sagacorp\queue\azure\service;

use Carbon\Carbon;
use Carbon\CarbonTimeZone;
use yii\base\Model;

/**
Expand All @@ -13,7 +12,7 @@
*/
class BrokerProperties extends Model
{
//region Public Properties
// region Public Properties
/**
* The correlation ID.
*/
Expand Down Expand Up @@ -66,24 +65,24 @@ class BrokerProperties extends Model
* The to.
*/
public ?string $to = null;
//endregion Public Properties
// endregion Public Properties

//region Private Properties
// region Private Properties
/**
* The enqueued time.
*/
private ?Carbon $enqueuedTimeUtc;
private ?Carbon $enqueuedTimeUtc = null;
/**
* The locked until time.
*/
private ?Carbon $lockedUntilUtc;
private ?Carbon $lockedUntilUtc = null;
/**
* The scheduled enqueue time.
*/
private ?Carbon $scheduledEnqueueTimeUtc;
//endregion Private Properties
private ?Carbon $scheduledEnqueueTimeUtc = null;
// endregion Private Properties

//region Initialization
// region Initialization
public function init(): void
{
parent::init();
Expand All @@ -102,16 +101,16 @@ public function __toString()
$values = [];

$settableProperties = [
'CorrelationId' => 'correlationId',
'SessionId' => 'sessionId',
'MessageId' => 'messageId',
'Label' => 'label',
'ReplyTo' => 'replyTo',
'TimeToLive' => 'timeToLive',
'To' => 'to',
'CorrelationId' => 'correlationId',
'SessionId' => 'sessionId',
'MessageId' => 'messageId',
'Label' => 'label',
'ReplyTo' => 'replyTo',
'TimeToLive' => 'timeToLive',
'To' => 'to',
'ScheduledEnqueueTimeUtc' => 'scheduledEnqueueTimeUtc',
'ReplyToSessionId' => 'replyToSessionId',
'PartitionKey' => 'partitionKey',
'ReplyToSessionId' => 'replyToSessionId',
'PartitionKey' => 'partitionKey',
];

foreach ($settableProperties as $key => $value) {
Expand All @@ -122,9 +121,9 @@ public function __toString()

return (string) \json_encode($values, JSON_THROW_ON_ERROR);
}
//endregion Initialization
// endregion Initialization

//region Getters/Setters
// region Getters/Setters
public function getEnqueuedTimeUtc(): ?Carbon
{
return $this->enqueuedTimeUtc;
Expand All @@ -135,14 +134,14 @@ public function getLockedUntilUtc(): ?Carbon
return $this->lockedUntilUtc;
}

public function setDelay(int $value): void
public function getScheduledEnqueueTimeUtc(): ?Carbon
{
$this->setScheduledEnqueueTimeUtc(Carbon::now()->addSeconds($value)->setTimezone('UTC'));
return $this->scheduledEnqueueTimeUtc;
}

public function getScheduledEnqueueTimeUtc(): ?Carbon
public function setDelay(int $value): void
{
return $this->scheduledEnqueueTimeUtc;
$this->setScheduledEnqueueTimeUtc(Carbon::now()->addSeconds($value)->setTimezone('UTC'));
}

public function setEnqueuedTimeUtc(Carbon|string $enqueuedTimeUtc): void
Expand Down Expand Up @@ -171,9 +170,16 @@ public function setScheduledEnqueueTimeUtc(Carbon|string $scheduledEnqueueTimeUt

$this->scheduledEnqueueTimeUtc = $scheduledEnqueueTimeUtc;
}
//endregion Getters/Setters
// endregion Getters/Setters

// region Public Methods
public function isTo(string $id): bool
{
return $this->to === $id;
}
// endregion Public Methods

//region Protected Methods
// region Protected Methods
protected function azureDateToCarbon(string $date): ?Carbon
{
return Carbon::parse($date, 'UTC') ?: null;
Expand All @@ -183,5 +189,5 @@ protected function carbonToAzureDate(Carbon $carbon): string
{
return $carbon->format(\DateTimeInterface::RFC7231);
}
//endregion Protected Methods
// endregion Protected Methods
}
62 changes: 62 additions & 0 deletions src/service/Request.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace sagacorp\queue\azure\service;

use yii\httpclient\Exception as HttpClientException;

class Request extends \yii\httpclient\Request
{
// region Public Properties
public int $maxRetries;
// endregion Public Properties

// region Protected Properties
protected int $attempts = 0;
// endregion Protected Properties

// region Public Methods
/**
* @throws HttpClientException
*/
public function sendAndRetryOnFailure(array $successStatusCodes): \yii\httpclient\Response
{
try {
$response = $this->send();

if (!in_array($response->statusCode, $successStatusCodes, true)) {
throw new HttpClientException($response->toString());
}
} catch (HttpClientException $e) {
\Yii::error($e);

if (!$this->canContinue($this->attempts)) {
throw $e;
}

$delay = $this->getRetryDelay($this->attempts);

\Yii::error('Retry request in ' . $delay . ' seconds');
sleep($delay);

++$this->attempts;

$response = $this->sendAndRetryOnFailure($successStatusCodes);
}

return $response;
}

// endregion Public Methods

// region Protected Methods
protected function canContinue(int $attempts): bool
{
return $attempts < $this->maxRetries;
}

protected function getRetryDelay(int $attempts): int
{
return 4 ** $attempts;
}
// endregion Protected Methods
}
Loading

0 comments on commit a3aede4

Please sign in to comment.