Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor aggregate module #424

Merged
merged 18 commits into from
Dec 28, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ classesToResolve: [Calendar::class],
->sendCommand(new ScheduleMeeting('1', '2'))
;

self::assertFalse($ecotone->getMessageChannel('calendar')->receive()->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_OBJECT));
self::assertFalse($ecotone->getMessageChannel('calendar')->receive()->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_INSTANCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
use Ecotone\Messaging\Handler\Gateway\Gateway;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessagePublisher;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\DistributedBus;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
use Ecotone\Modelling\QueryBus;

/**
Expand Down Expand Up @@ -84,12 +86,18 @@ public function getFlowTestSupport(): FlowTestSupport
$this->getCommandBus(),
$this->getEventBus(),
$this->getQueryBus(),
$this->getServiceFromContainer(AggregateDefinitionRegistry::class),
$this->getMessagingTestSupport(),
$this->getGatewayByName(MessagingEntrypoint::class),
$this->configuredMessagingSystem
);
}

/**
* @template T
* @param class-string<T> $referenceName
* @return T
*/
public function getServiceFromContainer(string $referenceName): object
{
return $this->configuredMessagingSystem->getServiceFromContainer($referenceName);
Expand Down
19 changes: 15 additions & 4 deletions packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageHeaders;
Expand All @@ -19,13 +20,16 @@
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\AggregateMessage;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\MessageBusChannel;
use Ecotone\Modelling\Config\AggregrateHandlerModule;
use Ecotone\Modelling\Event;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
use Ecotone\Modelling\QueryBus;
use Ecotone\Test\StubEventSourcedAggregate;

/**
* @template T
Expand All @@ -39,6 +43,7 @@ public function __construct(
private CommandBus $commandBus,
private EventBus $eventBus,
private QueryBus $queryBus,
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
private MessagingTestSupport $testSupportGateway,
private MessagingEntrypoint $messagingEntrypoint,
private ConfiguredMessagingSystem $configuredMessagingSystem
Expand Down Expand Up @@ -179,13 +184,17 @@ public function getEventStreamEvents(string $streamName): array
*/
public function withEventsFor(string|object|array $identifiers, string $aggregateClass, array $events, int $aggregateVersion = 0): self
{
$aggregateDefinition = $this->aggregateDefinitionRegistry->getFor(TypeDescriptor::create($aggregateClass));
Assert::isTrue($aggregateDefinition->isEventSourced(), "Aggregate {$aggregateClass} is not event sourced. Can't store events for it.");

$this->messagingEntrypoint->sendWithHeaders(
$events,
[],
[
AggregateMessage::OVERRIDE_AGGREGATE_IDENTIFIER => is_object($identifiers) ? (string)$identifiers : $identifiers,
AggregateMessage::TARGET_VERSION => $aggregateVersion,
AggregateMessage::RESULT_AGGREGATE_OBJECT => $aggregateClass,
AggregateMessage::RESULT_AGGREGATE_EVENTS => $events,
AggregateMessage::CALLED_AGGREGATE_CLASS => $aggregateClass,
AggregateMessage::CALLED_AGGREGATE_INSTANCE => new $aggregateClass(),
AggregateMessage::RECORDED_AGGREGATE_EVENTS => $events,
],
AggregrateHandlerModule::getRegisterAggregateSaveRepositoryInputChannel($aggregateClass). '.test_setup_state'
);
Expand All @@ -198,7 +207,8 @@ public function withStateFor(object $aggregate): self
$this->messagingEntrypoint->sendWithHeaders(
$aggregate,
[
AggregateMessage::RESULT_AGGREGATE_OBJECT => $aggregate,
AggregateMessage::CALLED_AGGREGATE_INSTANCE => $aggregate,
AggregateMessage::CALLED_AGGREGATE_CLASS => $aggregate::class,
],
AggregrateHandlerModule::getRegisterAggregateSaveRepositoryInputChannel($aggregate::class). '.test_setup_state'
);
Expand Down Expand Up @@ -397,6 +407,7 @@ public function sendMessageDirectToChannelWithMessageReply(string $targetChannel
}

/**
* @template T
* @param class-string<T> $referenceName
* @return T
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public function getMessagePublisher(string $referenceName = MessagePublisher::cl

/**
* @throws InvalidArgumentException if trying to find not existing service reference
* @template T
* @param class-string<T> $referenceName
* @return T
*/
public function getServiceFromContainer(string $referenceName): object;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Ecotone\Messaging\Handler\Enricher;

use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\Support\Assert;
Expand Down Expand Up @@ -35,6 +37,17 @@ public static function create(ExpressionEvaluationService $expressionEvaluationS
return self::createWithMapping($expressionEvaluationService, '');
}

public static function getDefinition(): Definition
{
return new Definition(
self::class,
[
Reference::to(ExpressionEvaluationService::REFERENCE)
],
[self::class, 'create']
);
}

/**
* @param PropertyPath $propertyNamePath
* @param mixed $dataToEnrich
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\Messaging\Handler\Enricher;

use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\Support\InvalidArgumentException;
use ReflectionClass;
Expand Down Expand Up @@ -39,6 +40,14 @@ public function hasPropertyValue(PropertyPath $propertyPath, $fromData): bool
return true;
}

public static function getDefinition(): Definition
{
return new Definition(
self::class,
[],
);
}

/**
* @param PropertyPath $propertyPath
* @param mixed $fromData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class MethodInvokerAggregateObjectResolver implements MethodInvokerObjectResolve
{
public function resolveFor(Message $message): object
{
return $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_OBJECT);
return $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Ecotone\Messaging\Handler\Router;

use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Message;

/**
* Class HeaderValueRouter
* @package Ecotone\Messaging\Handler\Router
* @author Dariusz Gafka <support@simplycodedsoftware.com>
* @internal
*/
/**
* licence Apache-2.0
*/
final class HeaderExistsRouter implements RouteSelector, DefinedObject
{

private function __construct(private string $headerName, private string $routeToChannel, private string $fallbackRoute)
{
}

public static function create(string $headerName, string $routeToChannel, string $fallbackRoute): self
{
return new self($headerName, $routeToChannel, $fallbackRoute);
}

/**
* @param Message $message
* @return array
*/
public function route(Message $message): array
{
return $message->getHeaders()->containsKey($this->headerName)
? [$this->routeToChannel]
: [$this->fallbackRoute];
}

public function getDefinition(): Definition
{
return new Definition(self::class, [
$this->headerName,
$this->routeToChannel,
$this->fallbackRoute
], 'create');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ public static function createRecipientListRouter(array $recipientList): self
);
}

public static function createHeaderExistsRouter(string $headerName, string $routeToChannel, string $fallbackRoute): self
{
return new self(
HeaderExistsRouter::create($headerName, $routeToChannel, $fallbackRoute)->getDefinition(),
[
$routeToChannel => new Definition(SendToChannelProcessor::class, [
new ChannelReference($routeToChannel),
]),
$fallbackRoute => new Definition(SendToChannelProcessor::class, [
new ChannelReference($fallbackRoute),
]),
]
);
}

public function route(string $routeName, CompilableBuilder $processor): self
{
$this->routeMap[$routeName] = $processor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,7 @@ public function getParameterConverters(): array

public function compile(MessagingContainerBuilder $builder): Definition
{
if ($this->expression) {
$objectToInvokeOn = new Definition(ExpressionTransformer::class, [$this->expression, new Reference(ExpressionEvaluationService::REFERENCE), new Reference(ReferenceSearchService::class)]);
$interfaceToCallReference = new InterfaceToCallReference(ExpressionTransformer::class, 'transform');
} else {
$objectToInvokeOn = $this->directObject ?: new Reference($this->objectToInvokeReferenceName);
if ($this->methodNameOrInterface instanceof InterfaceToCall) {
$interfaceToCallReference = InterfaceToCallReference::fromInstance($this->methodNameOrInterface);
} else {
$className = $this->directObject ? \get_class($objectToInvokeOn) : $this->objectToInvokeReferenceName;
$interfaceToCallReference = new InterfaceToCallReference($className, $this->getMethodName());
}
}

$interfaceToCall = $builder->getInterfaceToCall($interfaceToCallReference);

if (! $interfaceToCall->canReturnValue()) {
throw InvalidArgumentException::create("Can't create transformer for {$interfaceToCall}, because method has no return value");
}
list($objectToInvokeOn, $interfaceToCallReference, $interfaceToCall) = $this->prepare($builder);

$newImplementation = MessageProcessorActivatorBuilder::create()
->withEndpointId($this->getEndpointId())
Expand All @@ -172,6 +155,17 @@ public function compile(MessagingContainerBuilder $builder): Definition
return $newImplementation->compile($builder);
}

public function compileProcessor(MessagingContainerBuilder $builder): Definition
{
list($objectToInvokeOn, $interfaceToCallReference, $interfaceToCall) = $this->prepare($builder);

return MethodInvokerBuilder::create(
$objectToInvokeOn,
$interfaceToCallReference,
$this->methodParameterConverterBuilders
)->compile($builder);
}

private function setDirectObjectToInvoke(DefinedObject $objectToInvoke): void
{
$this->directObject = $objectToInvoke;
Expand Down Expand Up @@ -202,4 +196,28 @@ private function getMethodName(): string|InterfaceToCall
? $this->methodNameOrInterface->getMethodName()
: $this->methodNameOrInterface;
}

public function prepare(MessagingContainerBuilder $builder): array
{
if ($this->expression) {
$objectToInvokeOn = new Definition(ExpressionTransformer::class, [$this->expression, new Reference(ExpressionEvaluationService::REFERENCE), new Reference(ReferenceSearchService::class)]);
$interfaceToCallReference = new InterfaceToCallReference(ExpressionTransformer::class, 'transform');
} else {
$objectToInvokeOn = $this->directObject ?: new Reference($this->objectToInvokeReferenceName);
if ($this->methodNameOrInterface instanceof InterfaceToCall) {
$interfaceToCallReference = InterfaceToCallReference::fromInstance($this->methodNameOrInterface);
} else {
$className = $this->directObject ? \get_class($objectToInvokeOn) : $this->objectToInvokeReferenceName;
$interfaceToCallReference = new InterfaceToCallReference($className, $this->getMethodName());
}
}

$interfaceToCall = $builder->getInterfaceToCall($interfaceToCallReference);

if (!$interfaceToCall->canReturnValue()) {
throw InvalidArgumentException::create("Can't create transformer for {$interfaceToCall}, because method has no return value");
}

return array($objectToInvokeOn, $interfaceToCallReference, $interfaceToCall);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Handler\Transformer;

use Ecotone\Messaging\Config\Container\CompilableBuilder;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;

/**
* licence Apache-2.0
*/
final class TransformerProcessorBuilder implements CompilableBuilder
{
private function __construct(private TransformerBuilder $transformerBuilder)
{

}

public static function create(TransformerBuilder $transformerBuilder): self
{
return new self($transformerBuilder);
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
return $this->transformerBuilder->compileProcessor($builder);
}
}
8 changes: 3 additions & 5 deletions packages/Ecotone/src/Messaging/MessageHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,10 @@ public static function unsetAggregateKeys(array $metadata): array
{
unset(
$metadata[AggregateMessage::AGGREGATE_ID],
$metadata[AggregateMessage::CALLED_AGGREGATE_OBJECT],
$metadata[AggregateMessage::CALLED_AGGREGATE_EVENTS],
$metadata[AggregateMessage::RESULT_AGGREGATE_OBJECT],
$metadata[AggregateMessage::RESULT_AGGREGATE_EVENTS],
$metadata[AggregateMessage::CALLED_AGGREGATE_INSTANCE],
$metadata[AggregateMessage::CALLED_AGGREGATE_CLASS],
$metadata[AggregateMessage::RECORDED_AGGREGATE_EVENTS],
$metadata[AggregateMessage::TARGET_VERSION],
$metadata[AggregateMessage::AGGREGATE_OBJECT_EXISTS],
$metadata[AggregateMessage::NULL_EXECUTION_RESULT],
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa
}

if ($this->isCommandHandler) {
$calledAggregate = $requestMessage->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_OBJECT) ? $requestMessage->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_OBJECT) : null;
$calledAggregate = $requestMessage->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_INSTANCE) ? $requestMessage->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE) : null;
$versionBeforeHandling = $requestMessage->getHeaders()->containsKey(AggregateMessage::TARGET_VERSION) ? $requestMessage->getHeaders()->get(AggregateMessage::TARGET_VERSION) : null;

if (is_null($versionBeforeHandling) && $this->aggregateVersionProperty) {
Expand All @@ -51,7 +51,6 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa

$resultMessage = $resultMessage->setHeader(AggregateMessage::TARGET_VERSION, $versionBeforeHandling);
}
$resultMessage = $resultMessage->setHeader(AggregateMessage::CALLED_AGGREGATE_OBJECT, $calledAggregate);
}

if (! is_null($result)) {
Expand All @@ -68,6 +67,7 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa
if ($this->isCommandHandler || ! is_null($result)) {
return $resultMessage->build();
}

return null;
}
}
Loading
Loading