Skip to content

Commit

Permalink
Allow expression language for delaying and time to live (#415)
Browse files Browse the repository at this point in the history
* Allow for delaying with date time

* expression based delaying message
  • Loading branch information
dgafka authored Dec 1, 2024
1 parent 5aea780 commit 918969d
Show file tree
Hide file tree
Showing 18 changed files with 421 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
use Ecotone\Messaging\Channel\DelayableQueueChannel;
use Ecotone\Messaging\Channel\MessageChannelInterceptorAdapter;
use Ecotone\Messaging\Handler\ChannelResolver;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\Assert;

/**
* licence Apache-2.0
*/
final class DelayedMessageReleaseHandler
{
public function releaseMessagesAwaitingFor(string $channelName, int $timeInMilliseconds, ChannelResolver $channelResolver): void
public function releaseMessagesAwaitingFor(string $channelName, int|TimeSpan|\DateTimeInterface $timeInMillisecondsOrDateTime, ChannelResolver $channelResolver): void
{
/** @var DelayableQueueChannel|MessageChannelInterceptorAdapter $channel */
$channel = $channelResolver->resolve($channelName);
Expand All @@ -24,6 +25,6 @@ public function releaseMessagesAwaitingFor(string $channelName, int $timeInMilli

Assert::isTrue($channel instanceof DelayableQueueChannel, sprintf('Used %s channel to release delayed message, use instead of %s.', $channel::class, DelayableQueueChannel::class));

$channel->releaseMessagesAwaitingFor($timeInMilliseconds);
$channel->releaseMessagesAwaitingFor($timeInMillisecondsOrDateTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private function registerMessageReleasingHandler(Configuration $configuration):
)
->withMethodParameterConverters([
HeaderBuilder::create('channelName', 'ecotone.test_support_gateway.channel_name'),
PayloadBuilder::create('timeInMilliseconds'),
PayloadBuilder::create('timeInMillisecondsOrDateTime'),
ReferenceBuilder::create('channelResolver', ChannelResolver::class),
])
->withInputChannelName(self::inputChannelName(self::RELEASE_DELAYED_MESSAGES)))
Expand All @@ -211,7 +211,7 @@ private function registerMessageReleasingHandler(Configuration $configuration):
self::inputChannelName(self::RELEASE_DELAYED_MESSAGES)
)->withParameterConverters([
GatewayHeaderBuilder::create('channelName', 'ecotone.test_support_gateway.channel_name'),
GatewayPayloadBuilder::create('timeInMilliseconds'),
GatewayPayloadBuilder::create('timeInMillisecondsOrDateTime'),
]));
}

Expand Down
17 changes: 12 additions & 5 deletions packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ public function discardRecordedMessages(): self
}

/**
* @param int $time Time in milliseconds or DelayedTime object
* @param int $time Time in milliseconds or TimeSpan object
*
* @deprecated use run instead
*/
public function releaseAwaitingMessagesAndRunConsumer(string $channelName, int|TimeSpan $time, ?ExecutionPollingMetadata $executionPollingMetadata = null): self
public function releaseAwaitingMessagesAndRunConsumer(string $channelName, int|TimeSpan|\DateTimeInterface $time, ?ExecutionPollingMetadata $executionPollingMetadata = null): self
{
$this->testSupportGateway->releaseMessagesAwaitingFor($channelName, $time instanceof TimeSpan ? $time->toMilliseconds() : $time);
$this->run($channelName, $executionPollingMetadata);
$this->run($channelName, $executionPollingMetadata, is_int($time) ? TimeSpan::withMilliseconds($time) : $time);

return $this;
}
Expand Down Expand Up @@ -129,8 +130,14 @@ public function receiveMessageFrom(string $channelName): ?Message
return $messageChannel->receive();
}

public function run(string $name, ?ExecutionPollingMetadata $executionPollingMetadata = null): self
/**
* @param int|TimeSpan|\DateTimeInterface $releaseAwaitingFor will release messages which are delayed for given time
*/
public function run(string $name, ?ExecutionPollingMetadata $executionPollingMetadata = null, TimeSpan|\DateTimeInterface|null $releaseAwaitingFor = null): self
{
if ($releaseAwaitingFor) {
$this->testSupportGateway->releaseMessagesAwaitingFor($name, $releaseAwaitingFor);
}
$this->configuredMessagingSystem->run($name, $executionPollingMetadata);

return $this;
Expand Down
3 changes: 2 additions & 1 deletion packages/Ecotone/src/Lite/Test/MessagingTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Ecotone\Lite\Test;

use Ecotone\Messaging\Message;
use Ecotone\Messaging\Scheduling\TimeSpan;

/**
* licence Apache-2.0
Expand Down Expand Up @@ -59,5 +60,5 @@ public function getRecordedEcotoneMessagesFrom(string $channelName): array;

public function discardRecordedMessages(): void;

public function releaseMessagesAwaitingFor(string $channelName, int $timeInMilliseconds): void;
public function releaseMessagesAwaitingFor(string $channelName, int|TimeSpan|\DateTimeInterface $timeInMillisecondsOrDateTime): void;
}
17 changes: 16 additions & 1 deletion packages/Ecotone/src/Messaging/Attribute/Endpoint/AddHeader.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ class AddHeader
private string $headerName;
private mixed $headerValue;

public function __construct(string $name, mixed $value)
public function __construct(string $name, mixed $value = null, private string|null $expression = null)
{
Assert::notNullAndEmpty($name, 'Name of the header can not be empty');
Assert::isTrue(
($value === null && $expression !== null)
|| ($value !== null && $expression === null),
'Either value or expression should be provided for attribute ' . static ::class
);

$this->headerName = $name;
$this->headerValue = $value;
Expand All @@ -31,4 +36,14 @@ public function getHeaderValue(): mixed
{
return $this->headerValue;
}

public function isExpression(): bool
{
return false;
}

public function getExpression(): ?string
{
return $this->expression;
}
}
7 changes: 4 additions & 3 deletions packages/Ecotone/src/Messaging/Attribute/Endpoint/Delayed.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Attribute;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\Assert;

#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
/**
Expand All @@ -15,10 +16,10 @@
class Delayed extends AddHeader
{
/**
* @param int|TimeSpan $time if integer is provided it is treated as milliseconds
* @param int|TimeSpan|\DateTimeInterface $time if integer is provided it is treated as milliseconds
*/
public function __construct(int|TimeSpan $time)
public function __construct(int|TimeSpan|\DateTimeInterface|null $time = null, ?string $expression = null)
{
parent::__construct(MessageHeaders::DELIVERY_DELAY, $time instanceof TimeSpan ? $time->toMilliseconds() : $time);
parent::__construct(MessageHeaders::DELIVERY_DELAY, $time, $expression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TimeToLive extends AddHeader
/**
* @param int|TimeSpan $time if integer is provided it is treated as milliseconds
*/
public function __construct(int|TimeSpan $time)
public function __construct(int|TimeSpan|null $time = null, ?string $expression = null)
{
parent::__construct(MessageHeaders::TIME_TO_LIVE, $time instanceof TimeSpan ? $time->toMilliseconds() : $time);
parent::__construct(MessageHeaders::TIME_TO_LIVE, $time instanceof TimeSpan ? $time->toMilliseconds() : $time, $expression);
}
}
26 changes: 20 additions & 6 deletions packages/Ecotone/src/Messaging/Channel/DelayableQueueChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@

use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\PollableChannel;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Scheduling\EpochBasedClock;
use Ecotone\Messaging\Scheduling\StubUTCClock;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\MessageBuilder;

/**
Expand All @@ -19,13 +24,13 @@ final class DelayableQueueChannel implements PollableChannel, DefinedObject
/**
* @param Message[] $queue
*/
private function __construct(private string $name, private array $queue, private int $releaseMessagesAwaitingFor = 0)
public function __construct(private string $name, private array $queue = [], private int|\DateTimeInterface $releaseMessagesAwaitingFor = 0)
{
}

public static function create(string $name = 'unknown'): self
public static function create(string $name): self
{
return new self($name, []);
return new self($name);
}

/**
Expand All @@ -44,7 +49,7 @@ public function receive(): ?Message
$message = array_shift($this->queue);

if ($message !== null && $message->getHeaders()->containsKey(MessageHeaders::DELIVERY_DELAY)) {
if ($message->getHeaders()->get(MessageHeaders::DELIVERY_DELAY) > $this->releaseMessagesAwaitingFor) {
if ($message->getHeaders()->get(MessageHeaders::DELIVERY_DELAY) > $this->getCurrentDeliveryTimeShift($message)) {
$nextAvailableMessage = $this->receive();
array_unshift($this->queue, $message);

Expand All @@ -71,9 +76,9 @@ public function receiveWithTimeout(int $timeoutInMilliseconds): ?Message
return $this->receive();
}

public function releaseMessagesAwaitingFor(int $milliseconds): void
public function releaseMessagesAwaitingFor(int|TimeSpan|\DateTimeInterface $time): void
{
$this->releaseMessagesAwaitingFor = $milliseconds;
$this->releaseMessagesAwaitingFor = $time instanceof TimeSpan ? $time->toMilliseconds() : $time;
}

public function __toString()
Expand All @@ -85,4 +90,13 @@ public function getDefinition(): Definition
{
return new Definition(self::class, [$this->name], 'create');
}

public function getCurrentDeliveryTimeShift(Message $message): int
{
if ($this->releaseMessagesAwaitingFor instanceof \DateTimeInterface) {
return EpochBasedClock::getTimestampWithMillisecondsFor($this->releaseMessagesAwaitingFor) - ($message->getHeaders()->getTimestamp() * 1000);
}

return $this->releaseMessagesAwaitingFor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageConverter\HeaderMapper;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Scheduling\EpochBasedClock;

/**
* licence Apache-2.0
Expand Down Expand Up @@ -91,11 +93,21 @@ public function prepare(Message $messageToConvert, ConversionService $conversion
}
$applicationHeaders[MessageHeaders::CONTENT_TYPE] = $sourceMediaType?->toString();

$deliveryDelay = $messageToConvert->getHeaders()->containsKey(MessageHeaders::DELIVERY_DELAY) ? $messageToConvert->getHeaders()->get(MessageHeaders::DELIVERY_DELAY) : $this->defaultDeliveryDelay;

if ($deliveryDelay instanceof \DateTimeInterface) {
$deliveryDelay = EpochBasedClock::getTimestampWithMillisecondsFor($deliveryDelay) - ($messageToConvert->getHeaders()->getTimestamp() * 1000);

if ($deliveryDelay < 0) {
$deliveryDelay = null;
}
}

return new OutboundMessage(
$messagePayload,
array_merge($applicationHeaders, $this->staticHeadersToAdd),
$applicationHeaders[MessageHeaders::CONTENT_TYPE],
$messageToConvert->getHeaders()->containsKey(MessageHeaders::DELIVERY_DELAY) ? $messageToConvert->getHeaders()->get(MessageHeaders::DELIVERY_DELAY) : $this->defaultDeliveryDelay,
$deliveryDelay,
$messageToConvert->getHeaders()->containsKey(MessageHeaders::TIME_TO_LIVE) ? $messageToConvert->getHeaders()->get(MessageHeaders::TIME_TO_LIVE) : $this->defaultTimeToLive,
$messageToConvert->getHeaders()->containsKey(MessageHeaders::PRIORITY) ? $messageToConvert->getHeaders()->get(MessageHeaders::PRIORITY) : $this->defaultPriority,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@
use Ecotone\Messaging\Attribute\Endpoint\Priority;
use Ecotone\Messaging\Attribute\Endpoint\RemoveHeader;
use Ecotone\Messaging\Attribute\Endpoint\TimeToLive;
use Ecotone\Messaging\Config\ConfigurationException;
use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Handler\UnionTypeDescriptor;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Nette\Utils\Type;

/**
* Class EndpointHeadersInterceptor
Expand All @@ -23,24 +32,74 @@
*/
class EndpointHeadersInterceptor implements DefinedObject
{
public function addMetadata(?AddHeader $addHeader, ?Delayed $delayed, ?Priority $priority, ?TimeToLive $timeToLive, ?RemoveHeader $removeHeader): array
public function __construct(private ExpressionEvaluationService $expressionEvaluationService)
{

}

public function addMetadata(Message $message, ?AddHeader $addHeader, ?Delayed $delayed, ?Priority $priority, ?TimeToLive $timeToLive, ?RemoveHeader $removeHeader): array
{
$metadata = [];

if ($addHeader) {
$metadata[$addHeader->getHeaderName()] = $addHeader->getHeaderValue();

if ($addHeader->getExpression()) {
$metadata[$addHeader->getHeaderName()] = $this->expressionEvaluationService->evaluate($addHeader->getExpression(), [
'payload' => $message->getPayload(),
'headers' => $message->getHeaders()->headers()
]);
}
}

if ($delayed) {
$metadata[MessageHeaders::DELIVERY_DELAY] = $delayed->getHeaderValue();

if ($delayed->getExpression()) {
$metadata[MessageHeaders::DELIVERY_DELAY] = $this->expressionEvaluationService->evaluate($delayed->getExpression(), [
'payload' => $message->getPayload(),
'headers' => $message->getHeaders()->headers()
]);
}

$type = TypeDescriptor::createFromVariable($metadata[MessageHeaders::DELIVERY_DELAY]);
if (!$type->isCompatibleWith(UnionTypeDescriptor::createWith([
TypeDescriptor::createIntegerType(),
TypeDescriptor::create(TimeSpan::class),
TypeDescriptor::create(\DateTimeInterface::class)
]))) {
throw ConfigurationException::create("Delivery delay should be either integer, TimeSpan or DateTimeInterface, but got {$type->toString()}");
}
}

if ($priority) {
$metadata[MessageHeaders::PRIORITY] = $priority->getHeaderValue();

if ($priority->getExpression()) {
$metadata[MessageHeaders::PRIORITY] = $this->expressionEvaluationService->evaluate($priority->getExpression(), [
'payload' => $message->getPayload(),
'headers' => $message->getHeaders()->headers()
]);
}
}

if ($timeToLive) {
$metadata[MessageHeaders::TIME_TO_LIVE] = $timeToLive->getHeaderValue();

if ($timeToLive->getExpression()) {
$metadata[MessageHeaders::TIME_TO_LIVE] = $this->expressionEvaluationService->evaluate($timeToLive->getExpression(), [
'payload' => $message->getPayload(),
'headers' => $message->getHeaders()->headers()
]);
}

$type = TypeDescriptor::createFromVariable($metadata[MessageHeaders::TIME_TO_LIVE]);
if (!$type->isCompatibleWith(UnionTypeDescriptor::createWith([
TypeDescriptor::createIntegerType(),
TypeDescriptor::create(TimeSpan::class)
]))) {
throw ConfigurationException::create("Delivery delay should be either integer or TimeSpan, but got {$type->toString()}");
}
}

if ($removeHeader) {
Expand All @@ -52,6 +111,8 @@ public function addMetadata(?AddHeader $addHeader, ?Delayed $delayed, ?Priority

public function getDefinition(): Definition
{
return new Definition(self::class);
return new Definition(self::class, [
Reference::to(ExpressionEvaluationService::REFERENCE)
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\NoExternalConfigurationModule;
use Ecotone\Messaging\Config\Configuration;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInterceptorBuilder;
use Ecotone\Messaging\Precedence;
Expand All @@ -38,7 +40,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$interfaceToCall = $interfaceToCallRegistry->getFor(EndpointHeadersInterceptor::class, 'addMetadata');
$messagingConfiguration->registerBeforeSendInterceptor(
MethodInterceptorBuilder::create(
new Definition(EndpointHeadersInterceptor::class),
new Definition(EndpointHeadersInterceptor::class, [Reference::to(ExpressionEvaluationService::REFERENCE)]),
$interfaceToCall,
Precedence::ENDPOINT_HEADERS_PRECEDENCE,
AddHeader::class . '||' . RemoveHeader::class,
Expand Down
Loading

0 comments on commit 918969d

Please sign in to comment.