Skip to content

Commit

Permalink
setup EventStore in context of given stream (#329) (#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
unixslayer authored Jul 6, 2024
1 parent 6d895ad commit f2c89e9
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 27 deletions.
6 changes: 6 additions & 0 deletions packages/PdoEventSourcing/src/EventSourcingConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Enqueue\Dbal\DbalConnectionFactory;
use Prooph\EventStore\InMemoryEventStore;
use Prooph\EventStore\Pdo\PersistenceStrategy;
use Prooph\EventStore\StreamName;

class EventSourcingConfiguration extends BaseEventSourcingConfiguration
{
Expand Down Expand Up @@ -184,6 +185,11 @@ public function getPersistenceStrategy(): string
return $this->persistenceStrategy;
}

public function getPersistenceStrategyFor(?StreamName $streamName = null): string
{
return $this->persistenceStrategies[$streamName?->toString()] ?? $this->persistenceStrategy;
}

public function getCustomPersistenceStrategy(): PersistenceStrategy
{
return $this->customPersistenceStrategyInstance;
Expand Down
43 changes: 24 additions & 19 deletions packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ public function __construct(

public function fetchStreamMetadata(StreamName $streamName): array
{
return $this->getEventStore()->fetchStreamMetadata($streamName);
return $this->getEventStore($streamName)->fetchStreamMetadata($streamName);
}

public function hasStream(StreamName $streamName): bool
{
return $this->getEventStore()->hasStream($streamName);
return $this->getEventStore($streamName)->hasStream($streamName);
}

public function load(StreamName $streamName, int $fromNumber = 1, int $count = null, MetadataMatcher $metadataMatcher = null): Iterator
{
return $this->getEventStore()->load($streamName, $fromNumber, $count, $metadataMatcher);
return $this->getEventStore($streamName)->load($streamName, $fromNumber, $count, $metadataMatcher);
}

public function loadReverse(StreamName $streamName, int $fromNumber = null, int $count = null, MetadataMatcher $metadataMatcher = null): Iterator
{
return $this->getEventStore()->loadReverse($streamName, $fromNumber, $count, $metadataMatcher);
return $this->getEventStore($streamName)->loadReverse($streamName, $fromNumber, $count, $metadataMatcher);
}

public function fetchStreamNames(?string $filter, ?MetadataMatcher $metadataMatcher, int $limit = 20, int $offset = 0): array
Expand All @@ -114,12 +114,12 @@ public function fetchCategoryNamesRegex(string $filter, int $limit = 20, int $of

public function updateStreamMetadata(StreamName $streamName, array $newMetadata): void
{
$this->getEventStore()->updateStreamMetadata($streamName, $newMetadata);
$this->getEventStore($streamName)->updateStreamMetadata($streamName, $newMetadata);
}

public function create(Stream $stream): void
{
$this->getEventStore()->create($stream);
$this->getEventStore($stream->streamName())->create($stream);
$this->ensuredExistingStreams[$this->getContextName()][$stream->streamName()->toString()] = true;
}

Expand All @@ -129,7 +129,7 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void
$this->create(new Stream($streamName, $streamEvents, []));
} else {
try {
$this->getEventStore()->appendTo($streamName, $streamEvents);
$this->getEventStore($streamName)->appendTo($streamName, $streamEvents);
} catch (StreamNotFound) {
$this->create(new Stream($streamName, $streamEvents, []));
}
Expand Down Expand Up @@ -168,9 +168,9 @@ public function prepareEventStore(): void
$this->initializated[$connectionName] = true;
}

public function getEventStore(): EventStore
public function getEventStore(?StreamName $streamName = null): EventStore
{
$contextName = $this->getContextName();
$contextName = $this->getContextName($streamName);
if (isset($this->initializedEventStore[$contextName])) {
return $this->initializedEventStore[$contextName];
}
Expand All @@ -185,9 +185,9 @@ public function getEventStore(): EventStore
$eventStoreType = $this->getEventStoreType();

$persistenceStrategy = match ($eventStoreType) {
self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategy(),
self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategy(),
self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategy(),
self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamName),
self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamName),
self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamName),
default => throw InvalidArgumentException::create('Unexpected match value ' . $eventStoreType)
};

Expand Down Expand Up @@ -222,29 +222,29 @@ public function getEventStore(): EventStore
return $eventStore;
}

private function getMysqlPersistenceStrategy(): PersistenceStrategy
private function getMysqlPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy
{
return match ($this->eventSourcingConfiguration->getPersistenceStrategy()) {
return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) {
self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlAggregateStreamStrategy($this->messageConverter),
self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MySqlSingleStreamStrategy($this->messageConverter),
self::SIMPLE_STREAM_PERSISTENCE => new InterlopMysqlSimpleStreamStrategy($this->messageConverter),
self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(),
};
}

private function getMariaDbPersistenceStrategy(): PersistenceStrategy
private function getMariaDbPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy
{
return match ($this->eventSourcingConfiguration->getPersistenceStrategy()) {
return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) {
self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbAggregateStreamStrategy($this->messageConverter),
self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\MariaDbSingleStreamStrategy($this->messageConverter),
self::SIMPLE_STREAM_PERSISTENCE => new InterlopMariaDbSimpleStreamStrategy($this->messageConverter),
self::CUSTOM_STREAM_PERSISTENCE => $this->eventSourcingConfiguration->getCustomPersistenceStrategy(),
};
}

private function getPostgresPersistenceStrategy(): PersistenceStrategy
private function getPostgresPersistenceStrategyFor(?StreamName $streamName = null): PersistenceStrategy
{
return match ($this->eventSourcingConfiguration->getPersistenceStrategy()) {
return match ($this->eventSourcingConfiguration->getPersistenceStrategyFor($streamName)) {
self::AGGREGATE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresAggregateStreamStrategy($this->messageConverter),
self::SINGLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSingleStreamStrategy($this->messageConverter),
self::SIMPLE_STREAM_PERSISTENCE => new PersistenceStrategy\PostgresSimpleStreamStrategy($this->messageConverter),
Expand Down Expand Up @@ -416,12 +416,17 @@ private function getConnectionInLegacyOrLaravelWay()
return $connection->getWrappedConnection();
}

public function getContextName(): string
public function getContextName(?StreamName $streamName = null): string
{
$connectionName = 'default';
if ($this->connectionFactory instanceof MultiTenantConnectionFactory) {
$connectionName = $this->connectionFactory->currentActiveTenant();
}

if ($streamName !== null) {
$connectionName .= '-' . $streamName->toString();
}

return $connectionName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected function connectionForTenantB(): ConnectionFactory
}

$connectionFactory = DbalConnection::fromDsn(
getenv('SECONDARY_DATABASE_DSN') ? getenv('SECONDARY_DATABASE_DSN') : 'mysql://ecotone:secret@localhost:3306/ecotone'
getenv('SECONDARY_DATABASE_DSN') ? getenv('SECONDARY_DATABASE_DSN') : 'mysql://ecotone:secret@127.0.0.1:3306/ecotone'
);

$this->tenantBConnection = $connectionFactory;
Expand All @@ -43,7 +43,7 @@ protected function connectionForTenantB(): ConnectionFactory

protected function connectionForTenantA(): ConnectionFactory
{
$connectionFactory = $this->getConnectionFactory();
$connectionFactory = self::getConnectionFactory();
if (isset($this->tenantAConnection)) {
return $this->tenantAConnection;
}
Expand All @@ -54,7 +54,7 @@ protected function connectionForTenantA(): ConnectionFactory

public static function getConnectionFactory(bool $isRegistry = false): ConnectionFactory
{
$dsn = getenv('DATABASE_DSN') ? getenv('DATABASE_DSN') : 'pgsql://ecotone:secret@localhost:5432/ecotone';
$dsn = getenv('DATABASE_DSN') ? getenv('DATABASE_DSN') : 'pgsql://ecotone:secret@127.0.0.1:5432/ecotone';
if (! $dsn) {
throw new InvalidArgumentException('Missing env `DATABASE_DSN` pointing to test database');
}
Expand All @@ -66,14 +66,14 @@ public static function getConnectionFactory(bool $isRegistry = false): Connectio

public function getConnection(): Connection
{
return $this->getConnectionFactory()->createContext()->getDbalConnection();
return self::getConnectionFactory()->createContext()->getDbalConnection();
}

protected function getReferenceSearchServiceWithConnection(array $objects = [], bool $connectionAsRegistry = false)
{
return InMemoryReferenceSearchService::createWith(
array_merge(
[DbalConnectionFactory::class => $this->getConnectionFactory($connectionAsRegistry)],
[DbalConnectionFactory::class => self::getConnectionFactory($connectionAsRegistry)],
$objects
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies;

use Ecotone\EventSourcing\EventStreamEmitter;
use Ecotone\Modelling\Attribute\EventHandler;

final class Logger
{
public const STREAM = 'log';

#[EventHandler(listenTo: BasketCreated::NAME)]
public function whenBasketCreated(BasketCreated $event, EventStreamEmitter $emitter): void
{
$emitter->linkTo(self::STREAM, [$event]);
}

#[EventHandler(listenTo: OrderCreated::NAME)]
public function whenOrderCreated(OrderCreated $event, EventStreamEmitter $emitter): void
{
$emitter->linkTo(self::STREAM, [$event]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\BasketCreated;
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\BasketProjection;
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\EventsConverter;
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\Logger;
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\Order;
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\OrderCreated;
use Test\Ecotone\EventSourcing\Fixture\MultiplePersistenceStrategies\OrderProjection;
Expand All @@ -27,19 +28,20 @@ final class MultiplePersistenceStrategiesTest extends EventSourcingMessagingTest
public function test_allow_multiple_persistent_strategies_per_aggregate(): void
{
$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [Order::class, Basket::class, BasketProjection::class, OrderProjection::class],
classesToResolve: [Order::class, Basket::class, BasketProjection::class, OrderProjection::class, Logger::class],
containerOrAvailableServices: [
new EventsConverter(),
new BasketProjection($this->getConnection()),
new OrderProjection($this->getConnection()),
$this->getConnectionFactory(),
new Logger(),
self::getConnectionFactory(),
],
configuration: ServiceConfiguration::createWithDefaults()
->withEnvironment('prod')
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE]))
->withExtensionObjects([
EventSourcingConfiguration::createWithDefaults()
->withSimpleStreamPersistenceStrategy()
->withPersistenceStrategyFor(Logger::STREAM, LazyProophEventStore::SIMPLE_STREAM_PERSISTENCE)
->withPersistenceStrategyFor(Order::STREAM, LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE),
])
->withNamespaces([
Expand Down Expand Up @@ -89,10 +91,12 @@ classesToResolve: [Order::class, Basket::class, BasketProjection::class, OrderPr
self::assertTrue($eventStore->hasStream(Order::STREAM.'-order-1'));
self::assertTrue($eventStore->hasStream(Order::STREAM.'-order-2'));
self::assertTrue($eventStore->hasStream(Basket::STREAM));
self::assertTrue($eventStore->hasStream(Logger::STREAM));

self::assertCount(1, $eventStore->load(Order::STREAM.'-order-1'));
self::assertCount(1, $eventStore->load(Order::STREAM.'-order-2'));
self::assertCount(2, $eventStore->load(Basket::STREAM));
self::assertCount(4, $eventStore->load(Logger::STREAM));

self::assertEquals(['order-1', 'order-2'], $ecotone->sendQueryWithRouting('orders'));
self::assertEquals(['basket-1', 'basket-2'], $ecotone->sendQueryWithRouting('baskets'));
Expand Down

0 comments on commit f2c89e9

Please sign in to comment.