diff --git a/packages/Ecotone/src/Messaging/MessageHeaders.php b/packages/Ecotone/src/Messaging/MessageHeaders.php index cacc6ba97..350898ac9 100644 --- a/packages/Ecotone/src/Messaging/MessageHeaders.php +++ b/packages/Ecotone/src/Messaging/MessageHeaders.php @@ -163,6 +163,15 @@ public static function getFrameworksHeaderNames(): array ]; } + public static function unsetFrameworkKeys(array $metadata): array + { + foreach (self::getFrameworksHeaderNames() as $frameworksHeaderName) { + unset($metadata[$frameworksHeaderName]); + } + + return $metadata; + } + public static function unsetTransportMessageKeys(array $metadata): array { unset($metadata[self::MESSAGE_ID]); @@ -233,6 +242,16 @@ public static function unsetAggregateKeys(array $metadata): array return $metadata; } + public static function unsetNonUserKeys(array $metadata): array + { + $metadata = self::unsetEnqueueMetadata($metadata); + $metadata = self::unsetDistributionKeys($metadata); + $metadata = self::unsetAsyncKeys($metadata); + $metadata = self::unsetBusKeys($metadata); + + return self::unsetAggregateKeys($metadata); + } + /** * @param string $headerRegex e.g. ecotone-domain-* * diff --git a/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php b/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php index ceb5af25d..ee0381125 100644 --- a/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php +++ b/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php @@ -14,23 +14,15 @@ class MessageHeadersPropagatorInterceptor public function storeHeaders(MethodInvocation $methodInvocation, Message $message) { $headers = $message->getHeaders()->headers(); - foreach (MessageHeaders::getFrameworksHeaderNames() as $frameworksHeaderName) { - unset($headers[$frameworksHeaderName]); - } - if (isset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION])) { - unset($headers[$headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]]); - } - unset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]); + $headers = MessageHeaders::unsetFrameworkKeys($headers); + $headers = MessageHeaders::unsetNonUserKeys($headers); $this->currentlyPropagatedHeaders[] = $headers; try { $reply = $methodInvocation->proceed(); + } finally { array_shift($this->currentlyPropagatedHeaders); - } catch (Throwable $exception) { - array_shift($this->currentlyPropagatedHeaders); - - throw $exception; } return $reply; @@ -38,7 +30,11 @@ public function storeHeaders(MethodInvocation $methodInvocation, Message $messag public function propagateHeaders(array $headers): array { - return array_merge($this->getLastHeaders(), $headers); + try{ + return array_merge($this->getLastHeaders(), $headers); + } finally { + array_shift($this->currentlyPropagatedHeaders); + } } public function getLastHeaders(): array diff --git a/packages/Ecotone/src/Modelling/SaveAggregateService.php b/packages/Ecotone/src/Modelling/SaveAggregateService.php index 913ba60a5..34f43c469 100644 --- a/packages/Ecotone/src/Modelling/SaveAggregateService.php +++ b/packages/Ecotone/src/Modelling/SaveAggregateService.php @@ -75,11 +75,7 @@ public function __construct( public function save(Message $message, array $metadata): Message { - $metadata = MessageHeaders::unsetEnqueueMetadata($metadata); - $metadata = MessageHeaders::unsetDistributionKeys($metadata); - $metadata = MessageHeaders::unsetAsyncKeys($metadata); - $metadata = MessageHeaders::unsetBusKeys($metadata); - $metadata = MessageHeaders::unsetAggregateKeys($metadata); + $metadata = MessageHeaders::unsetNonUserKeys($metadata); $aggregate = $message->getHeaders()->get(AggregateMessage::AGGREGATE_OBJECT); $events = []; diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php new file mode 100644 index 000000000..b6f1f0e20 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php @@ -0,0 +1,32 @@ +id = $event->id; + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php new file mode 100644 index 000000000..e08152427 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php @@ -0,0 +1,15 @@ + $event->id]; + } + + #[Converter] + public function convertToOrderCreated(array $payload): OrderCreated + { + return new OrderCreated($payload['id']); + } + + #[Converter] + public function convertFromProductAddedToOrder(ProductAddedToOrder $event): array + { + return ['id' => $event->id]; + } + + #[Converter] + public function convertToProductAddedToOrder(array $payload): ProductAddedToOrder + { + return new ProductAddedToOrder($payload['id']); + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php new file mode 100644 index 000000000..37319eb69 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php @@ -0,0 +1,56 @@ +connection->executeStatement(sprintf('CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, foo INT DEFAULT 0)', self::TABLE)); + } + + #[EventHandler(listenTo: 'order.created', endpointId: 'foo_orders.order_created')] + public function whenOrderCreated(OrderCreated $event, array $metadata): void + { + $data = ['id' => $event->id]; + if (array_key_exists('foo', $metadata)) { + $data['foo'] = 1; + } + + $this->connection->insert(self::TABLE, $data); + } + + #[EventHandler(listenTo: 'order.product_added', endpointId: 'foo_orders.product_added')] + public function whenProductAddedToOrder(ProductAddedToOrder $event, array $metadata): void + { + if (array_key_exists('foo', $metadata)) { + $this->connection->executeStatement(sprintf('UPDATE %s SET foo = foo + 1 WHERE id = ?', self::TABLE), [$event->id]); + } + } + + #[QueryHandler(routingKey: 'foo_orders.count')] + public function fooOrdersCount(): int + { + return (int) $this->connection->fetchOne(sprintf('SELECT SUM(foo) FROM %s', self::TABLE)); + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjectionConfig.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjectionConfig.php new file mode 100644 index 000000000..eff047de1 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjectionConfig.php @@ -0,0 +1,29 @@ +withOption(PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT, 100) + ->withOption(ReadModelProjector::OPTION_PERSIST_BLOCK_SIZE, 100) + ; + } + + #[ServiceContext] + public function simpleAuditPlanActionLogProjectionChannel(): SimpleMessageChannelBuilder + { + return SimpleMessageChannelBuilder::createQueueChannel(OrderProjection::CHANNEL); + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php new file mode 100644 index 000000000..e4e178d74 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php @@ -0,0 +1,15 @@ +getConnectionFactory(); + $connection = $connectionFactory->createContext()->getDbalConnection(); + $schemaManager = $connection->createSchemaManager(); + if ($schemaManager->tablesExist(names: OrderProjection::TABLE)) { + $schemaManager->dropTable(name: OrderProjection::TABLE); + } + + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [new OrderProjection($connection), new OrderEventsConverter(), DbalConnectionFactory::class => $connectionFactory], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces(['Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection']), + pathToRootCatalog: __DIR__ . "/../../", + addEventSourcedRepository: false + ); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 1, metadata: ['foo' => 'bar']); + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 2); + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 3, metadata: ['foo' => 'baz']); + + $ecotoneLite->run(name: OrderProjection::CHANNEL, executionPollingMetadata: ExecutionPollingMetadata::createWithTestingSetup()->withHandledMessageLimit(2)); + + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + } +}