Skip to content

Commit

Permalink
remove the headers once they are propagated
Browse files Browse the repository at this point in the history
in case of asynchronous projection when first event has a header which does not exist in the next one, header will remain
  • Loading branch information
unixslayer committed May 31, 2023
1 parent 97e54e8 commit 94d09f4
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 17 deletions.
19 changes: 19 additions & 0 deletions packages/Ecotone/src/Messaging/MessageHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ public static function getFrameworksHeaderNames(): array
];
}

public static function unsetFrameworkKeys(array $metadata): array
{
foreach (self::getFrameworksHeaderNames() as $frameworksHeaderName) {
unset($metadata[$frameworksHeaderName]);
}

return $metadata;
}

public static function unsetTransportMessageKeys(array $metadata): array
{
unset($metadata[self::MESSAGE_ID]);
Expand Down Expand Up @@ -233,6 +242,16 @@ public static function unsetAggregateKeys(array $metadata): array
return $metadata;
}

public static function unsetNonUserKeys(array $metadata): array
{
$metadata = self::unsetEnqueueMetadata($metadata);
$metadata = self::unsetDistributionKeys($metadata);
$metadata = self::unsetAsyncKeys($metadata);
$metadata = self::unsetBusKeys($metadata);

return self::unsetAggregateKeys($metadata);
}

/**
* @param string $headerRegex e.g. ecotone-domain-*
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,27 @@ class MessageHeadersPropagatorInterceptor
public function storeHeaders(MethodInvocation $methodInvocation, Message $message)
{
$headers = $message->getHeaders()->headers();
foreach (MessageHeaders::getFrameworksHeaderNames() as $frameworksHeaderName) {
unset($headers[$frameworksHeaderName]);
}
if (isset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION])) {
unset($headers[$headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]]);
}
unset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]);
$headers = MessageHeaders::unsetFrameworkKeys($headers);
$headers = MessageHeaders::unsetNonUserKeys($headers);

$this->currentlyPropagatedHeaders[] = $headers;

try {
$reply = $methodInvocation->proceed();
} finally {
array_shift($this->currentlyPropagatedHeaders);
} catch (Throwable $exception) {
array_shift($this->currentlyPropagatedHeaders);

throw $exception;
}

return $reply;
}

public function propagateHeaders(array $headers): array
{
return array_merge($this->getLastHeaders(), $headers);
try{
return array_merge($this->getLastHeaders(), $headers);
} finally {
array_shift($this->currentlyPropagatedHeaders);
}
}

public function getLastHeaders(): array
Expand Down
6 changes: 1 addition & 5 deletions packages/Ecotone/src/Modelling/SaveAggregateService.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ public function __construct(

public function save(Message $message, array $metadata): Message
{
$metadata = MessageHeaders::unsetEnqueueMetadata($metadata);
$metadata = MessageHeaders::unsetDistributionKeys($metadata);
$metadata = MessageHeaders::unsetAsyncKeys($metadata);
$metadata = MessageHeaders::unsetBusKeys($metadata);
$metadata = MessageHeaders::unsetAggregateKeys($metadata);
$metadata = MessageHeaders::unsetNonUserKeys($metadata);

$aggregate = $message->getHeaders()->get(AggregateMessage::AGGREGATE_OBJECT);
$events = [];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\Modelling\Attribute\AggregateIdentifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\WithAggregateVersioning;

#[EventSourcingAggregate]
final class Order
{
use WithAggregateVersioning;

#[AggregateIdentifier]
private int $id;

#[CommandHandler(routingKey: 'order.create')]
public static function create(int $id): array
{
return [new OrderCreated($id), new ProductAddedToOrder($id)];
}

#[EventSourcingHandler]
public function applyOrderCreated(OrderCreated $event): void
{
$this->id = $event->id;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('order.created')]
final class OrderCreated
{
public function __construct(public int $id)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\Messaging\Attribute\Converter;

final class OrderEventsConverter
{
#[Converter]
public function convertFromOrderCreated(OrderCreated $event): array
{
return ['id' => $event->id];
}

#[Converter]
public function convertToOrderCreated(array $payload): OrderCreated
{
return new OrderCreated($payload['id']);
}

#[Converter]
public function convertFromProductAddedToOrder(ProductAddedToOrder $event): array
{
return ['id' => $event->id];
}

#[Converter]
public function convertToProductAddedToOrder(array $payload): ProductAddedToOrder
{
return new ProductAddedToOrder($payload['id']);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Doctrine\DBAL\Connection;
use Ecotone\EventSourcing\Attribute\Projection;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;

#[Asynchronous(channelName: self::CHANNEL)]
#[Projection(name: self::NAME, fromStreams: [Order::class])]
final class OrderProjection
{
public const CHANNEL = 'projection_channel';
public const NAME = 'order_projection';
public const TABLE = 'foo_orders';

public function __construct(private Connection $connection)
{
}

#[ProjectionInitialization]
public function initialization(): void
{
$this->connection->executeStatement(sprintf('CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, foo INT DEFAULT 0)', self::TABLE));
}

#[EventHandler(listenTo: 'order.created', endpointId: 'foo_orders.order_created')]
public function whenOrderCreated(OrderCreated $event, array $metadata): void
{
$data = ['id' => $event->id];
if (array_key_exists('foo', $metadata)) {
$data['foo'] = 1;
}

$this->connection->insert(self::TABLE, $data);
}

#[EventHandler(listenTo: 'order.product_added', endpointId: 'foo_orders.product_added')]
public function whenProductAddedToOrder(ProductAddedToOrder $event, array $metadata): void
{
if (array_key_exists('foo', $metadata)) {
$this->connection->executeStatement(sprintf('UPDATE %s SET foo = foo + 1 WHERE id = ?', self::TABLE), [$event->id]);
}
}

#[QueryHandler(routingKey: 'foo_orders.count')]
public function fooOrdersCount(): int
{
return (int) $this->connection->fetchOne(sprintf('SELECT SUM(foo) FROM %s', self::TABLE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\EventSourcing\ProjectionRunningConfiguration;
use Ecotone\Messaging\Attribute\ServiceContext;
use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder;
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
use Prooph\EventStore\Projection\ReadModelProjector;

class OrderProjectionConfig
{
#[ServiceContext]
public function eventDrivenAuditPlanActionLogProjection(): ProjectionRunningConfiguration
{
return ProjectionRunningConfiguration::createEventDriven(OrderProjection::NAME)
->withOption(PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT, 100)
->withOption(ReadModelProjector::OPTION_PERSIST_BLOCK_SIZE, 100)
;
}

#[ServiceContext]
public function simpleAuditPlanActionLogProjectionChannel(): SimpleMessageChannelBuilder
{
return SimpleMessageChannelBuilder::createQueueChannel(OrderProjection::CHANNEL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\Modelling\Attribute\NamedEvent;

#[NamedEvent('order.product_added')]
final class ProductAddedToOrder
{
public function __construct(public int $id)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Integration;

use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Enqueue\Dbal\DbalConnectionFactory;
use Ramsey\Uuid\Uuid;
use Test\Ecotone\EventSourcing\EventSourcingMessagingTest;
use Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection\OrderEventsConverter;
use Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection\OrderProjection;

final class AsyncProjectionMetadataPropagationTest extends EventSourcingMessagingTest
{
public function test_metadata_propagation_with_async_projection(): void
{
/** @var DbalConnectionFactory $connectionFactory */
$connectionFactory = $this->getConnectionFactory();
$connection = $connectionFactory->createContext()->getDbalConnection();
$schemaManager = $connection->createSchemaManager();
if ($schemaManager->tablesExist(names: OrderProjection::TABLE)) {
$schemaManager->dropTable(name: OrderProjection::TABLE);
}

$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
containerOrAvailableServices: [new OrderProjection($connection), new OrderEventsConverter(), DbalConnectionFactory::class => $connectionFactory],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withNamespaces(['Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection']),
pathToRootCatalog: __DIR__ . "/../../",
addEventSourcedRepository: false
);

$ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 1, metadata: ['foo' => 'bar']);
$ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 2);
$ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 3, metadata: ['foo' => 'baz']);

$ecotoneLite->run(name: OrderProjection::CHANNEL, executionPollingMetadata: ExecutionPollingMetadata::createWithTestingSetup()->withHandledMessageLimit(2));

self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count'));
}
}

0 comments on commit 94d09f4

Please sign in to comment.