From 20b242d9c4e769ea19926741c669ba6ba2d9678e Mon Sep 17 00:00:00 2001 From: Aleksander Smiatacz <96231524+alek-talabat@users.noreply.github.com> Date: Mon, 3 Apr 2023 15:10:46 +0200 Subject: [PATCH] Postgres support (#108) * Copy pasted code from SQL server to new 'KafkaFlow.Retry.Postgres' project * Adjusted data types to Npgsql * Migrated '01 - Create_Tables.sql' & '02 - Populate_Tables.sql' to postgres * Prepared infrastructure for integration tests * Fixes after running integration tests * Postgres improvements - no dbo schema, postgres naming convention for tables, unified columns casing * Added unit tests (based on sql server) * Added readme for Postgres * Fixes for Codacy Static Code Analysis * PR review - usings inside namespace * PR review - added .ConfigureAwait(false) * Fixes after merge from master --- .github/workflows/build.yml | 5 + .github/workflows/publish.yml | 3 + README.md | 10 +- .../Core/Bootstrappers/BootstrapperKafka.cs | 155 +++++- .../BootstrapperPostgresSchema.cs | 71 +++ .../Fixtures/BootstrapperFixtureTemplate.cs | 17 +- .../Fixtures/BootstrapperHostFixture.cs | 8 + ...anteeOrderedConsumptionPostgresProducer.cs | 6 + ...urableLatestConsumptionPostgresProducer.cs | 6 + .../Settings/PostgresRepositorySettings.cs | 9 + .../Repositories/PostgresRepository.cs | 287 ++++++++++ .../Storages/Repositories/RepositoryType.cs | 1 + .../KafkaFlow.Retry.IntegrationTests.csproj | 1 + .../CheckQueuePendingItemsTests.cs | 5 + .../CheckQueueTests.cs | 3 + .../CreateSchemaCreatorTests.cs | 16 + .../DeleteQueuesTests.cs | 2 + .../GetQueuesTests.cs | 9 + .../SaveToQueueTests.cs | 3 + .../UpdateItemExecutionInfoTests.cs | 9 +- .../UpdateItemStatusTests.cs | 2 + .../UpdateItemsTests.cs | 5 + .../UpdateQueuesTests.cs | 4 + .../RetryDurableTests.cs | 14 + .../conf/appsettings.json | 4 + .../ConnectionProvider.cs | 21 + .../DbConnectionContext.cs | 89 ++++ .../Deploy/01 - Create_Tables.sql | 109 ++++ .../Deploy/02 - Populate_Tables.sql | 28 + .../IConnectionProvider.cs | 9 + src/KafkaFlow.Retry.Postgres/IDbConnection.cs | 10 + .../IDbConnectionWithinTransaction.cs | 9 + .../IRetrySchemaCreator.cs | 9 + .../KafkaFlow.Retry.Postgres.csproj | 43 ++ .../Model/Factories/IRetryQueueDboFactory.cs | 9 + .../Factories/IRetryQueueItemDboFactory.cs | 10 + .../IRetryQueueItemMessageDboFactory.cs | 9 + .../IRetryQueueItemMessageHeaderDboFactory.cs | 10 + .../Model/Factories/RetryQueueDboFactory.cs | 24 + .../Factories/RetryQueueItemDboFactory.cs | 30 ++ .../RetryQueueItemMessageDboFactory.cs | 25 + .../RetryQueueItemMessageHeaderDboFactory.cs | 30 ++ .../Model/RetryQueueDbo.cs | 24 + .../Model/RetryQueueItemDbo.cs | 35 ++ .../Model/RetryQueueItemMessageDbo.cs | 23 + .../Model/RetryQueueItemMessageHeaderDbo.cs | 16 + .../Model/RetryQueuesDboWrapper.cs | 22 + .../Model/Schema/Script.cs | 16 + .../PostgresDbDataProviderFactory.cs | 73 +++ .../PostgresDbSettings.cs | 22 + .../Properties/AssemblyInfo.cs | 5 + .../Readers/Adapters/IRetryQueueAdapter.cs | 9 + .../Adapters/IRetryQueueItemAdapter.cs | 9 + .../Adapters/IRetryQueueItemMessageAdapter.cs | 9 + .../IRetryQueueItemMessageHeaderAdapter.cs | 9 + .../Readers/Adapters/RetryQueueAdapter.cs | 21 + .../Readers/Adapters/RetryQueueItemAdapter.cs | 25 + .../Adapters/RetryQueueItemMessageAdapter.cs | 22 + .../RetryQueueItemMessageHeaderAdapter.cs | 16 + .../Readers/DboCollectionNavigator.cs | 50 ++ .../Readers/IDboDomainAdapter.cs | 7 + .../Readers/IRetryQueueReader.cs | 11 + .../Readers/RetryQueueReader.cs | 83 +++ .../IRetryQueueItemMessageHeaderRepository.cs | 13 + .../IRetryQueueItemMessageRepository.cs | 13 + .../Repositories/IRetryQueueItemRepository.cs | 39 ++ .../Repositories/IRetryQueueRepository.cs | 28 + .../RetryQueueItemMessageHeaderRepository.cs | 93 ++++ .../RetryQueueItemMessageRepository.cs | 84 +++ .../Repositories/RetryQueueItemRepository.cs | 327 ++++++++++++ .../Repositories/RetryQueueRepository.cs | 226 ++++++++ .../RetryDurableDefinitionBuilderExtension.cs | 22 + .../RetryQueueDataProvider.cs | 492 ++++++++++++++++++ .../RetrySchemaCreator.cs | 43 ++ .../KafkaFlow.Retry.UnitTests.csproj | 1 + .../Postgres/ConnectionProviderTests.cs | 54 ++ .../Factories/RetryQueueDboFactoryTests.cs | 54 ++ .../RetryQueueItemDboFactoryTests.cs | 73 +++ .../RetryQueueItemMessageDboFactoryTests.cs | 49 ++ ...ryQueueItemMessageHeaderDboFactoryTests.cs | 56 ++ .../Postgres/Model/Schema/ScriptTests.cs | 37 ++ .../Adapters/RetryQueueAdapterTests.cs | 50 ++ .../Adapters/RetryQueueItemAdapterTests.cs | 55 ++ .../RetryQueueItemMessageAdapterTests.cs | 50 ++ ...RetryQueueItemMessageHeaderAdapterTests.cs | 47 ++ .../Readers/DboCollectionNavigatorTests.cs | 117 +++++ .../Postgres/Readers/RetryQueueReaderTests.cs | 200 +++++++ ...yDurableDefinitionBuilderExtensionTests.cs | 22 + src/KafkaFlow.Retry.sln | 7 + website/docs/getting-started/packages.md | 13 +- website/docs/guides/durable-retries.md | 29 ++ 91 files changed, 3916 insertions(+), 14 deletions(-) create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperPostgresSchema.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableGuaranteeOrderedConsumptionPostgresProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableLatestConsumptionPostgresProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Settings/PostgresRepositorySettings.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/PostgresRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs create mode 100644 src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Deploy/01 - Create_Tables.sql create mode 100644 src/KafkaFlow.Retry.Postgres/Deploy/02 - Populate_Tables.sql create mode 100644 src/KafkaFlow.Retry.Postgres/IConnectionProvider.cs create mode 100644 src/KafkaFlow.Retry.Postgres/IDbConnection.cs create mode 100644 src/KafkaFlow.Retry.Postgres/IDbConnectionWithinTransaction.cs create mode 100644 src/KafkaFlow.Retry.Postgres/IRetrySchemaCreator.cs create mode 100644 src/KafkaFlow.Retry.Postgres/KafkaFlow.Retry.Postgres.csproj create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageHeaderDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/RetryQueueDbo.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemDbo.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageDbo.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageHeaderDbo.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs create mode 100644 src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs create mode 100644 src/KafkaFlow.Retry.Postgres/PostgresDbSettings.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Properties/AssemblyInfo.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/IRetryQueueAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/IRetryQueueItemAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/IRetryQueueItemMessageAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/IRetryQueueItemMessageHeaderAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/RetryQueueAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/RetryQueueItemAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/RetryQueueItemMessageAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/Adapters/RetryQueueItemMessageHeaderAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/DboCollectionNavigator.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/IDboDomainAdapter.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/IRetryQueueReader.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Readers/RetryQueueReader.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/IRetryQueueItemMessageHeaderRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/IRetryQueueItemMessageRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/IRetryQueueItemRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/IRetryQueueRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/RetryQueueItemMessageHeaderRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/RetryQueueItemMessageRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/RetryQueueItemRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/Repositories/RetryQueueRepository.cs create mode 100644 src/KafkaFlow.Retry.Postgres/RetryDurableDefinitionBuilderExtension.cs create mode 100644 src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs create mode 100644 src/KafkaFlow.Retry.Postgres/RetrySchemaCreator.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/ConnectionProviderTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Model/Factories/RetryQueueDboFactoryTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Model/Factories/RetryQueueItemDboFactoryTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Model/Factories/RetryQueueItemMessageDboFactoryTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactoryTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Model/Schema/ScriptTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Readers/Adapters/RetryQueueAdapterTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Readers/Adapters/RetryQueueItemAdapterTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Readers/Adapters/RetryQueueItemMessageAdapterTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Readers/Adapters/RetryQueueItemMessageHeaderAdapterTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Readers/DboCollectionNavigatorTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/Readers/RetryQueueReaderTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Repositories/Postgres/RetryDurableDefinitionBuilderExtensionTests.cs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cabd06d8..5111c23d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,6 +17,8 @@ jobs: ACCEPT_EULA: Y SQLSERVER_SA_PASSWORD: SqlSever123123 SQLSERVER_INTEGRATED_SECURITY: 'False' + POSTGRES_SA_USER: postgres + POSTGRES_SA_PASSWORD: Postgres123123 steps: @@ -56,6 +58,9 @@ jobs: - name: Start SqlServer run: docker run -d -p 1433:1433 -e ACCEPT_EULA=${{ env.ACCEPT_EULA }} -e SA_PASSWORD=${{ env.SQLSERVER_SA_PASSWORD }} -e MSSQL_PID=Developer mcr.microsoft.com/mssql/server:2017-latest + + - name: Start Postgres + run: docker run -d -p 5432:5432 -e POSTGRES_USER=${{ env.POSTGRES_SA_USER }} -e POSTGRES_PASSWORD=${{ env.POSTGRES_SA_PASSWORD }} postgres:latest - name: Start MongoDB uses: supercharge/mongodb-github-action@1.6.0 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 12a3fb3e..55b3e873 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -35,6 +35,9 @@ jobs: - name: Pack KafkaFlow.Retry.API run: dotnet pack src/KafkaFlow.Retry.API/KafkaFlow.Retry.API.csproj -c Release --include-symbols /p:Version=${{ env.BUILD_VERSION }} + - name: Pack KafkaFlow.Retry.Postgres + run: dotnet pack src/KafkaFlow.Retry.Postgres/KafkaFlow.Retry.Postgres.csproj -c Release --include-symbols /p:Version=${{ env.BUILD_VERSION }} + - name: Pack KafkaFlow.Retry.SqlServer run: dotnet pack src/KafkaFlow.Retry.SqlServer/KafkaFlow.Retry.SqlServer.csproj -c Release --include-symbols /p:Version=${{ env.BUILD_VERSION }} diff --git a/README.md b/README.md index 28426a70..ba67283c 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,11 @@ Want to give it a try? Check out our [Quickstart](https://farfetch.github.io/kaf ### Resilience policies -| Policy | Description | Aka | Required Packages | -| ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------: | ----------------- | -| **Simple Retry**
(policy family)
([quickstart](#simple) ; deep) | Many faults are transient and may self-correct after a short delay. | "Maybe it's just a blip" | KafkaFlow.Retry | -| **Forever Retry**
(policy family)
([quickstart](#forever) ; deep) | Many faults are semi-transient and may self-correct after multiple retries. | "Never give up" | KafkaFlow.Retry | -| **Durable Retry**
([quickstart](#durable) ; deep) | Beyond a certain amount of retries and waiting, you want to keep processing next-in-line messages but you can't lose the current offset message. As persistence databases, MongoDb or SqlServer is available. And you can manage in-retry messages through HTTP API. | "I can't stop processing messages but I can't lose messages" | KafkaFlow.Retry
KafkaFlow.Retry.API

KafkaFlow.Retry.SqlServer
or
KafkaFlow.Retry.MongoDb | +| Policy | Description | Aka | Required Packages | +| ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------: |-------------------------------------------------------------------------------------------------------------------------------------------------| +| **Simple Retry**
(policy family)
([quickstart](#simple) ; deep) | Many faults are transient and may self-correct after a short delay. | "Maybe it's just a blip" | KafkaFlow.Retry | +| **Forever Retry**
(policy family)
([quickstart](#forever) ; deep) | Many faults are semi-transient and may self-correct after multiple retries. | "Never give up" | KafkaFlow.Retry | +| **Durable Retry**
([quickstart](#durable) ; deep) | Beyond a certain amount of retries and waiting, you want to keep processing next-in-line messages but you can't lose the current offset message. As persistence databases, MongoDb, Postgres or SqlServer is available. And you can manage in-retry messages through HTTP API. | "I can't stop processing messages but I can't lose messages" | KafkaFlow.Retry
KafkaFlow.Retry.API

KafkaFlow.Retry.SqlServer
or
KafkaFlow.Retry.Postgres or
KafkaFlow.Retry.MongoDb | ## Installation diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs index 27b89ee5..ddc1e636 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs @@ -8,6 +8,7 @@ using KafkaFlow.Retry.IntegrationTests.Core.Messages; using KafkaFlow.Retry.IntegrationTests.Core.Producers; using KafkaFlow.Retry.MongoDb; + using KafkaFlow.Retry.Postgres; using KafkaFlow.Retry.SqlServer; using KafkaFlow.Serializer; using KafkaFlow.TypedHandler; @@ -26,10 +27,14 @@ internal static class BootstrapperKafka "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db-retry", "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server", "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server-retry", + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres", + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres-retry", "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db", "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db-retry", "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server", - "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry" + "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry", + "test-kafka-flow-retry-retry-durable-latest-consumption-postgres", + "test-kafka-flow-retry-retry-durable-latest-consumption-postgres-retry" }; internal static IClusterConfigurationBuilder CreatAllTestTopicsIfNotExist(this IClusterConfigurationBuilder cluster) @@ -194,6 +199,80 @@ internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedCo return cluster; } + internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedConsumptionPostgresCluster( + this IClusterConfigurationBuilder cluster, + string postgresConnectionString, + string postgresDatabaseName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres") + .WithGroupId("test-consumer-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithPollingJobsConfiguration( + configure => configure + .WithSchedulerId("custom_search_key_durable_guarantee_ordered_consumption_postgres") + .WithRetryDurablePollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + ) + .WithPostgresDataProvider( + postgresConnectionString, + postgresDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionMongoDbCluster( this IClusterConfigurationBuilder cluster, string mongoDbConnectionString, @@ -347,6 +426,80 @@ internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionS return cluster; } + internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionPostgresCluster( + this IClusterConfigurationBuilder cluster, + string postgresConnectionString, + string postgresDatabaseName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-retry-durable-latest-consumption-postgres") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-retry-durable-latest-consumption-postgres") + .WithGroupId("test-consumer-kafka-flow-retry-retry-durable-latest-consumption-postgres") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset((KafkaFlow.AutoOffsetReset)AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-retry-durable-latest-consumption-postgres-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithPollingJobsConfiguration( + configure => configure + .WithSchedulerId("custom_search_key_durable_latest_consumption_postgres") + .WithRetryDurablePollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + ) + .WithPostgresDataProvider( + postgresConnectionString, + postgresDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + internal static IClusterConfigurationBuilder SetupRetryForeverCluster(this IClusterConfigurationBuilder cluster) { cluster diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperPostgresSchema.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperPostgresSchema.cs new file mode 100644 index 00000000..73cf4947 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperPostgresSchema.cs @@ -0,0 +1,71 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers +{ + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + using Npgsql; + + internal static class BootstrapperPostgresSchema + { + private static readonly SemaphoreSlim semaphoreOneThreadAtTime = new SemaphoreSlim(1, 1); + private static bool schemaInitialized; + + internal static async Task RecreatePostgresSchemaAsync(string databaseName, string connectionString) + { + await semaphoreOneThreadAtTime.WaitAsync().ConfigureAwait(false); + try + { + if (schemaInitialized) + { + return; + } + + await using (var openCon = new NpgsqlConnection(connectionString)) + { + openCon.Open(); + openCon.ChangeDatabase(databaseName); + + var scripts = GetScriptsForSchemaCreation(); + + foreach (var script in scripts) + { + await using (var queryCommand = new NpgsqlCommand(script)) + { + queryCommand.Connection = openCon; + + await queryCommand.ExecuteNonQueryAsync().ConfigureAwait(false); + } + } + } + + schemaInitialized = true; + } + finally + { + semaphoreOneThreadAtTime.Release(); + } + } + + private static IEnumerable GetScriptsForSchemaCreation() + { + Assembly postgresAssembly = Assembly.LoadFrom("KafkaFlow.Retry.Postgres.dll"); + return postgresAssembly + .GetManifestResourceNames() + .OrderBy(x => x) + .Select(script => + { + using (Stream s = postgresAssembly.GetManifestResourceStream(script)) + { + using (StreamReader sr = new StreamReader(s)) + { + return sr.ReadToEnd(); + } + } + }) + .ToList(); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs index dc1b3e47..3e95f9ae 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Dawn; using global::Microsoft.Extensions.Configuration; + using Npgsql; using KafkaFlow.Retry.IntegrationTests.Core.Settings; using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories; @@ -24,6 +25,8 @@ public abstract class BootstrapperFixtureTemplate : IDisposable internal IRepositoryProvider RepositoryProvider => this.repositoryProvider ?? this.CreateRepositoryProvider(); internal SqlServerRepositorySettings SqlServerSettings { get; private set; } + + internal PostgresRepositorySettings PostgresSettings { get; private set; } public abstract void Dispose(); @@ -31,6 +34,7 @@ protected async Task InitializeDatabasesAsync(IConfiguration configuration) { this.InitializeMongoDb(configuration); await this.InitializeSqlServerAsync(configuration).ConfigureAwait(false); + await this.InitializePostgresAsync(configuration).ConfigureAwait(false); this.databasesInitialized = true; } @@ -47,7 +51,8 @@ private IRepositoryProvider CreateRepositoryProvider() var repositories = new List { new MongoDbRepository( this.MongoDbSettings.ConnectionString, this.MongoDbSettings.DatabaseName, this.MongoDbSettings.RetryQueueCollectionName, this.MongoDbSettings.RetryQueueItemCollectionName), - new SqlServerRepository(this.SqlServerSettings.ConnectionString, this.SqlServerSettings.DatabaseName) + new SqlServerRepository(this.SqlServerSettings.ConnectionString, this.SqlServerSettings.DatabaseName), + new PostgresRepository(this.PostgresSettings.ConnectionString, this.PostgresSettings.DatabaseName) }; this.repositoryProvider = new RepositoryProvider(repositories); @@ -73,5 +78,15 @@ private async Task InitializeSqlServerAsync(IConfiguration configuration) await BootstrapperSqlServerSchema.RecreateSqlSchemaAsync(this.SqlServerSettings.DatabaseName, this.SqlServerSettings.ConnectionString).ConfigureAwait(false); } + + private async Task InitializePostgresAsync(IConfiguration configuration) + { + this.PostgresSettings = configuration.GetSection("PostgresRepository").Get(); + + var postgresConnectionStringBuilder = new NpgsqlConnectionStringBuilder(this.PostgresSettings.ConnectionString); + this.PostgresSettings.ConnectionString = postgresConnectionStringBuilder.ToString(); + + await BootstrapperPostgresSchema.RecreatePostgresSchemaAsync(this.PostgresSettings.DatabaseName, this.PostgresSettings.ConnectionString).ConfigureAwait(false); + } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs index 74bb2c46..d2f0e68f 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs @@ -93,6 +93,9 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic .SetupRetryDurableGuaranteeOrderedConsumptionSqlServerCluster( this.SqlServerSettings.ConnectionString, this.SqlServerSettings.DatabaseName) + .SetupRetryDurableGuaranteeOrderedConsumptionPostgresCluster( + this.PostgresSettings.ConnectionString, + this.PostgresSettings.DatabaseName) .SetupRetryDurableLatestConsumptionMongoDbCluster( this.MongoDbSettings.ConnectionString, this.MongoDbSettings.DatabaseName, @@ -101,14 +104,19 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic .SetupRetryDurableLatestConsumptionSqlServerCluster( this.SqlServerSettings.ConnectionString, this.SqlServerSettings.DatabaseName) + .SetupRetryDurableLatestConsumptionPostgresCluster( + this.PostgresSettings.ConnectionString, + this.PostgresSettings.DatabaseName) )); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(sp => this.RepositoryProvider); services.AddSingleton(); services.AddSingleton(); diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableGuaranteeOrderedConsumptionPostgresProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableGuaranteeOrderedConsumptionPostgresProducer.cs new file mode 100644 index 00000000..f2da3a84 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableGuaranteeOrderedConsumptionPostgresProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class RetryDurableGuaranteeOrderedConsumptionPostgresProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableLatestConsumptionPostgresProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableLatestConsumptionPostgresProducer.cs new file mode 100644 index 00000000..f8090e98 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/RetryDurableLatestConsumptionPostgresProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class RetryDurableLatestConsumptionPostgresProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/PostgresRepositorySettings.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/PostgresRepositorySettings.cs new file mode 100644 index 00000000..7a35cff4 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/PostgresRepositorySettings.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Settings +{ + internal class PostgresRepositorySettings + { + public string ConnectionString { get; set; } + + public string DatabaseName { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/PostgresRepository.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/PostgresRepository.cs new file mode 100644 index 00000000..17442046 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/PostgresRepository.cs @@ -0,0 +1,287 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Threading.Tasks; + using KafkaFlow.Retry.Durable.Common; + using KafkaFlow.Retry.Durable.Repository; + using KafkaFlow.Retry.Durable.Repository.Model; + using KafkaFlow.Retry.Postgres; + using KafkaFlow.Retry.Postgres.Model; + using KafkaFlow.Retry.Postgres.Readers; + using KafkaFlow.Retry.Postgres.Readers.Adapters; + using KafkaFlow.Retry.Postgres.Repositories; + using Npgsql; + + internal class PostgresRepository : IRepository + { + private const int TimeoutSec = 60; + private readonly ConnectionProvider connectionProvider; + + private readonly IRetryQueueItemMessageHeaderRepository retryQueueItemMessageHeaderRepository; + private readonly IRetryQueueItemMessageRepository retryQueueItemMessageRepository; + private readonly IRetryQueueItemRepository retryQueueItemRepository; + private readonly RetryQueueReader retryQueueReader; + private readonly IRetryQueueRepository retryQueueRepository; + private readonly PostgresDbSettings postgresDbSettings; + + public PostgresRepository( + string connectionString, + string dbName) + { + this.postgresDbSettings = new PostgresDbSettings(connectionString, dbName); + + this.RetryQueueDataProvider = new PostgresDbDataProviderFactory().Create(this.postgresDbSettings); + + this.retryQueueItemMessageHeaderRepository = new RetryQueueItemMessageHeaderRepository(); + this.retryQueueItemMessageRepository = new RetryQueueItemMessageRepository(); + this.retryQueueItemRepository = new RetryQueueItemRepository(); + this.retryQueueRepository = new RetryQueueRepository(); + + this.retryQueueReader = new RetryQueueReader( + new RetryQueueAdapter(), + new RetryQueueItemAdapter(), + new RetryQueueItemMessageAdapter(), + new RetryQueueItemMessageHeaderAdapter() + ); + + this.connectionProvider = new ConnectionProvider(); + } + + public RepositoryType RepositoryType => RepositoryType.Postgres; + + public IRetryDurableQueueRepositoryProvider RetryQueueDataProvider { get; } + + public async Task CleanDatabaseAsync() + { + using var dbConnection = this.connectionProvider.Create(this.postgresDbSettings); + using var command = dbConnection.CreateCommand(); + command.CommandType = System.Data.CommandType.Text; + command.CommandText = @" + delete from retry_item_message_headers; + delete from item_messages; + delete from retry_queues; + delete from retry_queue_items; + "; + await command.ExecuteNonQueryAsync(); + } + + public async Task CreateQueueAsync(RetryQueue queue) + { + var queueDbo = new RetryQueueDbo + { + IdDomain = queue.Id, + CreationDate = queue.CreationDate, + LastExecution = queue.LastExecution, + QueueGroupKey = queue.QueueGroupKey, + SearchGroupKey = queue.SearchGroupKey, + Status = queue.Status, + }; + + using var dbConnection = this.connectionProvider.CreateWithinTransaction(this.postgresDbSettings); + + var queueId = await this.retryQueueRepository.AddAsync(dbConnection, queueDbo); + + foreach (var item in queue.Items) + { + // queue item + var itemDbo = new RetryQueueItemDbo + { + IdDomain = item.Id, + CreationDate = item.CreationDate, + LastExecution = item.LastExecution, + ModifiedStatusDate = item.ModifiedStatusDate, + AttemptsCount = item.AttemptsCount, + RetryQueueId = queueId, + DomainRetryQueueId = queue.Id, + Status = item.Status, + SeverityLevel = item.SeverityLevel, + Description = item.Description + }; + + var itemId = await this.retryQueueItemRepository.AddAsync(dbConnection, itemDbo); + + // item message + var messageDbo = new RetryQueueItemMessageDbo + { + IdRetryQueueItem = itemId, + Key = item.Message.Key, + Offset = item.Message.Offset, + Partition = item.Message.Partition, + TopicName = item.Message.TopicName, + UtcTimeStamp = item.Message.UtcTimeStamp, + Value = item.Message.Value + }; + + await this.retryQueueItemMessageRepository.AddAsync(dbConnection, messageDbo); + + // message headers + var messageHeadersDbos = item.Message.Headers + .Select(h => new RetryQueueItemMessageHeaderDbo + { + RetryQueueItemMessageId = itemId, + Key = h.Key, + Value = h.Value + }); + + await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, messageHeadersDbos); + } + + dbConnection.Commit(); + } + + public async Task GetAllRetryQueueDataAsync(string queueGroupKey) + { + using (var dbConnection = this.connectionProvider.Create(this.postgresDbSettings)) + { + var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, queueGroupKey); + + if (retryQueueDbo is null) + { + return null; + } + + var retryQueueItemsDbo = await this.retryQueueItemRepository.GetItemsByQueueOrderedAsync(dbConnection, retryQueueDbo.IdDomain); + var itemMessagesDbo = await this.retryQueueItemMessageRepository.GetMessagesOrderedAsync(dbConnection, retryQueueItemsDbo); + var messageHeadersDbo = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, itemMessagesDbo); + + var dboWrapper = new RetryQueuesDboWrapper + { + QueuesDbos = new[] { retryQueueDbo }, + ItemsDbos = retryQueueItemsDbo, + MessagesDbos = itemMessagesDbo, + HeadersDbos = messageHeadersDbo + }; + + return this.retryQueueReader.Read(dboWrapper).FirstOrDefault(); + } + } + + public async Task GetRetryQueueAsync(string queueGroupKey) + { + var start = DateTime.Now; + Guid retryQueueId = Guid.Empty; + RetryQueue retryQueue; + do + { + if (DateTime.Now.Subtract(start).TotalSeconds > TimeoutSec && !Debugger.IsAttached) + { + return null; + } + + await Task.Delay(100).ConfigureAwait(false); + + using (var dbConnection = this.connectionProvider.Create(this.postgresDbSettings)) + using (var command = dbConnection.CreateCommand()) + { + command.CommandType = System.Data.CommandType.Text; + command.CommandText = @"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution + FROM retry_queues + WHERE QueueGroupKey LIKE '%'||@QueueGroupKey + ORDER BY Id"; + + command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey); + retryQueue = await this.ExecuteSingleLineReaderAsync(command).ConfigureAwait(false); + } + + if (retryQueue != null) + { + retryQueueId = retryQueue.Id; + } + } while (retryQueueId == Guid.Empty); + + return retryQueue; + } + + public async Task> GetRetryQueueItemsAsync(Guid retryQueueId, Func, bool> stopCondition) + { + var start = DateTime.Now; + IList retryQueueItems = null; + do + { + if (DateTime.Now.Subtract(start).TotalSeconds > TimeoutSec && !Debugger.IsAttached) + { + return null; + } + + await Task.Delay(100).ConfigureAwait(false); + + using (var dbConnection = this.connectionProvider.Create(this.postgresDbSettings)) + using (var command = dbConnection.CreateCommand()) + { + command.CommandType = System.Data.CommandType.Text; + command.CommandText = @"SELECT * + FROM retry_queue_items + WHERE IdDomainRetryQueue = @IdDomainRetryQueue + ORDER BY Sort ASC"; + + command.Parameters.AddWithValue("IdDomainRetryQueue", retryQueueId); + retryQueueItems = await this.ExecuteReaderAsync(command).ConfigureAwait(false); + } + } while (stopCondition(retryQueueItems)); + + return retryQueueItems ?? new List(); + } + + private async Task> ExecuteReaderAsync(NpgsqlCommand command) + { + var items = new List(); + + using (var reader = await command.ExecuteReaderAsync()) + { + while (await reader.ReadAsync()) + { + items.Add(this.FillRetryQueueItem(reader)); + } + } + + return items; + } + + private async Task ExecuteSingleLineReaderAsync(NpgsqlCommand command) + { + using (var reader = await command.ExecuteReaderAsync()) + { + if (await reader.ReadAsync()) + { + return this.FillRetryQueue(reader); + } + } + + return null; + } + + private RetryQueue FillRetryQueue(NpgsqlDataReader reader) + { + return new RetryQueue( + reader.GetGuid(reader.GetOrdinal("IdDomain")), + reader.GetString(reader.GetOrdinal("SearchGroupKey")), + reader.GetString(reader.GetOrdinal("QueueGroupKey")), + reader.GetDateTime(reader.GetOrdinal("CreationDate")), + reader.GetDateTime(reader.GetOrdinal("LastExecution")), + (RetryQueueStatus)reader.GetByte(reader.GetOrdinal("IdStatus")) + ); + } + + private RetryQueueItem FillRetryQueueItem(NpgsqlDataReader reader) + { + var lastExecutionOrdinal = reader.GetOrdinal("LastExecution"); + var modifiedStatusDateOrdinal = reader.GetOrdinal("ModifiedStatusDate"); + var descriptionOrdinal = reader.GetOrdinal("Description"); + + return new RetryQueueItem( + reader.GetGuid(reader.GetOrdinal("IdDomain")), + reader.GetInt32(reader.GetOrdinal("AttemptsCount")), + reader.GetDateTime(reader.GetOrdinal("CreationDate")), + reader.GetInt32(reader.GetOrdinal("Sort")), + reader.IsDBNull(lastExecutionOrdinal) ? null : (DateTime?)reader.GetDateTime(lastExecutionOrdinal), + reader.IsDBNull(modifiedStatusDateOrdinal) ? null : (DateTime?)reader.GetDateTime(modifiedStatusDateOrdinal), + (RetryQueueItemStatus)reader.GetByte(reader.GetOrdinal("IdItemStatus")), + (SeverityLevel)reader.GetByte(reader.GetOrdinal("IdSeverityLevel")), + reader.IsDBNull(descriptionOrdinal) ? null : reader.GetString(descriptionOrdinal) + ); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/RepositoryType.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/RepositoryType.cs index c39a4655..36f94479 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/RepositoryType.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/RepositoryType.cs @@ -6,5 +6,6 @@ public enum RepositoryType SqlServer = 1, MongoDb = 2, + Postgres = 3 } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj b/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj index 44efa140..921e611d 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj +++ b/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj @@ -36,6 +36,7 @@ + diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueuePendingItemsTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueuePendingItemsTests.cs index 93584cf5..32499157 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueuePendingItemsTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueuePendingItemsTests.cs @@ -19,6 +19,7 @@ public CheckQueuePendingItemsTests(BootstrapperRepositoryFixture bootstrapperRep [Theory] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] [InlineData(RepositoryType.MongoDb)] public async Task CheckQueuePendingItemsAsync_QueueWithOneItem_ReturnsNoPendingItems(RepositoryType repositoryType) { @@ -49,12 +50,16 @@ public async Task CheckQueuePendingItemsAsync_QueueWithOneItem_ReturnsNoPendingI [Theory] [InlineData(RepositoryType.MongoDb, QueuePendingItemsResultStatus.NoPendingItems, RetryQueueItemStatus.Done)] [InlineData(RepositoryType.SqlServer, QueuePendingItemsResultStatus.NoPendingItems, RetryQueueItemStatus.Done)] + [InlineData(RepositoryType.Postgres, QueuePendingItemsResultStatus.NoPendingItems, RetryQueueItemStatus.Done)] [InlineData(RepositoryType.MongoDb, QueuePendingItemsResultStatus.NoPendingItems, RetryQueueItemStatus.Cancelled)] [InlineData(RepositoryType.SqlServer, QueuePendingItemsResultStatus.NoPendingItems, RetryQueueItemStatus.Cancelled)] + [InlineData(RepositoryType.Postgres, QueuePendingItemsResultStatus.NoPendingItems, RetryQueueItemStatus.Cancelled)] [InlineData(RepositoryType.MongoDb, QueuePendingItemsResultStatus.HasPendingItems, RetryQueueItemStatus.InRetry)] [InlineData(RepositoryType.SqlServer, QueuePendingItemsResultStatus.HasPendingItems, RetryQueueItemStatus.InRetry)] + [InlineData(RepositoryType.Postgres, QueuePendingItemsResultStatus.HasPendingItems, RetryQueueItemStatus.InRetry)] [InlineData(RepositoryType.MongoDb, QueuePendingItemsResultStatus.HasPendingItems, RetryQueueItemStatus.Waiting)] [InlineData(RepositoryType.SqlServer, QueuePendingItemsResultStatus.HasPendingItems, RetryQueueItemStatus.Waiting)] + [InlineData(RepositoryType.Postgres, QueuePendingItemsResultStatus.HasPendingItems, RetryQueueItemStatus.Waiting)] public async Task CheckQueuePendingItemsAsync_QueueWithTwoItems_ReturnsExpectedPendingItemsStatus( RepositoryType repositoryType, QueuePendingItemsResultStatus expectedResultStatus, diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueueTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueueTests.cs index 8f5d1a9c..cee3b84e 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueueTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CheckQueueTests.cs @@ -19,8 +19,10 @@ public CheckQueueTests(BootstrapperRepositoryFixture bootstrapperRepositoryFixtu [Theory] [InlineData(RepositoryType.MongoDb, RetryQueueStatus.Active, CheckQueueResultStatus.Exists)] [InlineData(RepositoryType.SqlServer, RetryQueueStatus.Active, CheckQueueResultStatus.Exists)] + [InlineData(RepositoryType.Postgres, RetryQueueStatus.Active, CheckQueueResultStatus.Exists)] [InlineData(RepositoryType.MongoDb, RetryQueueStatus.Done, CheckQueueResultStatus.DoesNotExist)] [InlineData(RepositoryType.SqlServer, RetryQueueStatus.Done, CheckQueueResultStatus.DoesNotExist)] + [InlineData(RepositoryType.Postgres, RetryQueueStatus.Done, CheckQueueResultStatus.DoesNotExist)] public async Task CheckQueueAsync_ConsideringQueueStatus_ReturnsExpectedCheckQueueResultStatus( RepositoryType repositoryType, RetryQueueStatus queueStatus, @@ -54,6 +56,7 @@ public async Task CheckQueueAsync_ConsideringQueueStatus_ReturnsExpectedCheckQue [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task CheckQueueAsync_NonExistingQueue_ReturnsDoesNotExistStatus(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs index 83cba843..c9b7b210 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs @@ -2,6 +2,7 @@ { using System.Threading.Tasks; using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures; + using KafkaFlow.Retry.Postgres; using KafkaFlow.Retry.SqlServer; using Xunit; @@ -26,5 +27,20 @@ public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSucc await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName); } + + [Fact] + public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully() + { + var postgresDataProviderFactory = new PostgresDbDataProviderFactory(); + + var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString; + var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName; + + var postgresSettings = new PostgresDbSettings(connectionString, databaseName); + + var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings); + + await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName); + } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs index 9663b443..4243cf76 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs @@ -25,6 +25,7 @@ public DeleteQueuesTests(BootstrapperRepositoryFixture bootstrapperRepositoryFix [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task DeleteQueuesAsync_TestingMaxRowsToDelete_DeleteAllEligibleQueuesAfterTwoDeletions(RepositoryType repositoryType) { // Arrange @@ -76,6 +77,7 @@ public async Task DeleteQueuesAsync_TestingMaxRowsToDelete_DeleteAllEligibleQueu [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task DeleteQueuesAsync_WithSeveralScenarios_DeleteAllEligibleQueues(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/GetQueuesTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/GetQueuesTests.cs index f7a7c1d7..491fdd69 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/GetQueuesTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/GetQueuesTests.cs @@ -23,6 +23,7 @@ public GetQueuesTests(BootstrapperRepositoryFixture bootstrapperRepositoryFixtur [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_DifferentSearchGroupKeyDifferentQueueStatusDifferentItemStatus_ReturnOnlyRequestedQueuesAndItems(RepositoryType repositoryType) { // Arrange @@ -86,6 +87,7 @@ public async Task GetQueuesAsync_DifferentSearchGroupKeyDifferentQueueStatusDiff [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_ExistingQueue_ReturnsQueue(RepositoryType repositoryType) { // Arrange @@ -114,6 +116,7 @@ public async Task GetQueuesAsync_ExistingQueue_ReturnsQueue(RepositoryType repos [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_ExistingQueuesActiveButDifferentSearchGroupKey_DontReturnQueues(RepositoryType repositoryType) { // Arrange @@ -150,6 +153,7 @@ public async Task GetQueuesAsync_ExistingQueuesActiveButDifferentSearchGroupKey_ [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_ExistingQueueWithDifferentItemStatus_ReturnQueueWithoutItems(RepositoryType repositoryType) { // Arrange @@ -179,6 +183,7 @@ public async Task GetQueuesAsync_ExistingQueueWithDifferentItemStatus_ReturnQueu [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_ExistingQueueWithDifferentQueueStatus_DontReturnQueues(RepositoryType repositoryType) { // Arrange @@ -204,6 +209,7 @@ public async Task GetQueuesAsync_ExistingQueueWithDifferentQueueStatus_DontRetur [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_ExistingQueueWithDistinctItemStatus_ReturnsQueueWithFilteredItems(RepositoryType repositoryType) { // Arrange @@ -238,6 +244,7 @@ public async Task GetQueuesAsync_ExistingQueueWithDistinctItemStatus_ReturnsQueu [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_ItemsWithDifferentModifiedDates_ReturnQueueWithItemsandQueueWithNoItem(RepositoryType repositoryType) { // Arrange @@ -298,6 +305,7 @@ public async Task GetQueuesAsync_ItemsWithDifferentModifiedDates_ReturnQueueWith [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_WithSeverityLevel_ReturnsOnlyItemsWithCorrespondingLevel(RepositoryType repositoryType) { // Arrange @@ -341,6 +349,7 @@ public async Task GetQueuesAsync_WithSeverityLevel_ReturnsOnlyItemsWithCorrespon [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task GetQueuesAsync_WithStuckStatusFilter_ReturnsItemsByStatusAndStuckItems(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/SaveToQueueTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/SaveToQueueTests.cs index 500e80a1..43fb01d8 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/SaveToQueueTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/SaveToQueueTests.cs @@ -21,6 +21,7 @@ public SaveToQueueTests(BootstrapperRepositoryFixture bootstrapperRepositoryFixt [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task SaveToQueueAsync_ExistingQueueWithOneItem_ReturnsAddedStatus(RepositoryType repositoryType) { // Arrange @@ -63,6 +64,7 @@ public async Task SaveToQueueAsync_ExistingQueueWithOneItem_ReturnsAddedStatus(R [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task SaveToQueueAsync_ExistingQueueWithStatusDone_ReturnsAddedStatus(RepositoryType repositoryType) { // Arrange @@ -102,6 +104,7 @@ public async Task SaveToQueueAsync_ExistingQueueWithStatusDone_ReturnsAddedStatu [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task SaveToQueueAsync_NonExistingQueue_ReturnsCreatedStatus(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemExecutionInfoTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemExecutionInfoTests.cs index 268a9802..e9110912 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemExecutionInfoTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemExecutionInfoTests.cs @@ -21,8 +21,10 @@ public UpdateItemExecutionInfoTests(BootstrapperRepositoryFixture bootstrapperRe [Theory] [InlineData(RepositoryType.MongoDb, RetryQueueItemStatus.InRetry)] [InlineData(RepositoryType.SqlServer, RetryQueueItemStatus.InRetry)] + [InlineData(RepositoryType.Postgres, RetryQueueItemStatus.InRetry)] [InlineData(RepositoryType.MongoDb, RetryQueueItemStatus.Waiting)] [InlineData(RepositoryType.SqlServer, RetryQueueItemStatus.Waiting)] + [InlineData(RepositoryType.Postgres, RetryQueueItemStatus.Waiting)] public async Task UpdateItemExecutionInfoAsync_UpdateStatus_ReturnsUpdatedStatus(RepositoryType repositoryType, RetryQueueItemStatus expectedItemStatus) { // Arrange @@ -67,6 +69,7 @@ public async Task UpdateItemExecutionInfoAsync_UpdateStatus_ReturnsUpdatedStatus [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemExecutionInfoAsync_UpdateToDone_QueueWithoutAllItemsDone_ReturnsUpdatedStatus(RepositoryType repositoryType) { // Arrange @@ -115,6 +118,7 @@ public async Task UpdateItemExecutionInfoAsync_UpdateToDone_QueueWithoutAllItems [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemExecutionInfoAsync_UpdateToDone_ReturnsUpdatedStatusQueueWithAllItemsDone(RepositoryType repositoryType) { // Arrange @@ -156,6 +160,7 @@ public async Task UpdateItemExecutionInfoAsync_UpdateToDone_ReturnsUpdatedStatus [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemExecutionInfoAsync_WrongItem_ReturnsItemNotFoundStatus(RepositoryType repositoryType) { // Arrange @@ -200,6 +205,8 @@ public async Task UpdateItemExecutionInfoAsync_WrongItem_ReturnsItemNotFoundStat [InlineData(RepositoryType.MongoDb, RetryQueueItemStatus.Waiting)] [InlineData(RepositoryType.SqlServer, RetryQueueItemStatus.Done)] [InlineData(RepositoryType.SqlServer, RetryQueueItemStatus.Waiting)] + [InlineData(RepositoryType.Postgres, RetryQueueItemStatus.Done)] + [InlineData(RepositoryType.Postgres, RetryQueueItemStatus.Waiting)] public async Task UpdateItemExecutionInfoAsync_WrongQueue_ReturnsQueueNotFoundStatus(RepositoryType repositoryType, RetryQueueItemStatus notExpectedItemStatus) { // Arrange @@ -245,7 +252,7 @@ public async Task UpdateItemExecutionInfoAsync_WrongQueue_ReturnsQueueNotFoundSt actualItem.LastExecution.Should().Be(notExpectedLastExecution); } - if (repositoryType == RepositoryType.SqlServer) + if (repositoryType is RepositoryType.SqlServer or RepositoryType.Postgres) { actualItem.Status.Should().NotBe(notExpectedItemStatus).And.Be(item.Status); actualItem.AttemptsCount.Should().NotBe(notExpectedAttemptsCount); diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemStatusTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemStatusTests.cs index ecb47c06..7416b1d2 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemStatusTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemStatusTests.cs @@ -20,6 +20,7 @@ public UpdateItemStatusTests(BootstrapperRepositoryFixture bootstrapperRepositor [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemStatusAsync_ExistingItem_ReturnsUpdatedStatus(RepositoryType repositoryType) { // Arrange @@ -51,6 +52,7 @@ public async Task UpdateItemStatusAsync_ExistingItem_ReturnsUpdatedStatus(Reposi [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemStatusAsync_NonExistingItem_ReturnsItemNotFoundStatus(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemsTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemsTests.cs index f42da1c2..904a99bf 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemsTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateItemsTests.cs @@ -21,6 +21,7 @@ public UpdateItemsTests(BootstrapperRepositoryFixture bootstrapperRepositoryFixt [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemsTestsAsync_ExistingItemsInWaitingState_ReturnsItemIsNotTheFirstWaitingInQueue(RepositoryType repositoryType) { // Arrange @@ -54,6 +55,7 @@ public async Task UpdateItemsTestsAsync_ExistingItemsInWaitingState_ReturnsItemI [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemsTestsAsync_ExistingItemsInWaitingState_ReturnsUpdatedStatus(RepositoryType repositoryType) { // Arrange @@ -87,6 +89,7 @@ public async Task UpdateItemsTestsAsync_ExistingItemsInWaitingState_ReturnsUpdat [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemsTestsAsync_ExistingItemWithNotInWaitingState_ReturnsItemIsNotInWaitingState(RepositoryType repositoryType) { // Arrange @@ -117,6 +120,7 @@ public async Task UpdateItemsTestsAsync_ExistingItemWithNotInWaitingState_Return [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemsTestsAsync_ExistingItemWithStatusNotCancelled_ReturnsUpdatedStatusNotAllowed(RepositoryType repositoryType) { // Arrange @@ -147,6 +151,7 @@ public async Task UpdateItemsTestsAsync_ExistingItemWithStatusNotCancelled_Retur [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateItemsTestsAsync_NonExistingItem_ReturnsItemNotFoundStatus(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateQueuesTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateQueuesTests.cs index 3ab47080..5e07ec44 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateQueuesTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/UpdateQueuesTests.cs @@ -21,6 +21,7 @@ public UpdateQueuesTests(BootstrapperRepositoryFixture bootstrapperRepositoryFix [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateQueuesAsync_WithInactiveQueue_ReturnsQueueIsNotActive(RepositoryType repositoryType) { // Arrange @@ -52,6 +53,7 @@ public async Task UpdateQueuesAsync_WithInactiveQueue_ReturnsQueueIsNotActive(Re [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateQueuesAsync_WithItems_ReturnsUpdatedQueue(RepositoryType repositoryType) { // Arrange @@ -83,6 +85,7 @@ public async Task UpdateQueuesAsync_WithItems_ReturnsUpdatedQueue(RepositoryType [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateQueuesAsync_WithNoItems_ReturnsQueueHasNoActiveItems(RepositoryType repositoryType) { // Arrange @@ -112,6 +115,7 @@ public async Task UpdateQueuesAsync_WithNoItems_ReturnsQueueHasNoActiveItems(Rep [Theory] [InlineData(RepositoryType.MongoDb)] [InlineData(RepositoryType.SqlServer)] + [InlineData(RepositoryType.Postgres)] public async Task UpdateQueuesAsync_WithStatusNotCancelled_ReturnsUpdatedStatusNotAllowed(RepositoryType repositoryType) { // Arrange diff --git a/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs index f141a8eb..f36b88ea 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs @@ -46,6 +46,13 @@ public static IEnumerable Scenarios() 10 }; yield return new object[] + { + RepositoryType.Postgres, + typeof(IMessageProducer), + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), + 10 + }; + yield return new object[] { RepositoryType.MongoDb, typeof(IMessageProducer), @@ -59,6 +66,13 @@ public static IEnumerable Scenarios() typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), 1 }; + yield return new object[] + { + RepositoryType.Postgres, + typeof(IMessageProducer), + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), + 1 + }; } [Theory] diff --git a/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json b/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json index af724e7d..506232aa 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json +++ b/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json @@ -12,5 +12,9 @@ "SqlServerRepository": { "ConnectionString": "Server=localhost; User ID=SA; Password=SqlSever123123; Pooling=true; Trusted_Connection=true; Integrated Security=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Tests;", "DatabaseName": "kafka_flow_retry_durable_test" + }, + "PostgresRepository": { + "ConnectionString": "Server=localhost;Database=postgres;User Id=postgres;Password=Postgres123123;Port=5432;Application Name=KafkaFlow Retry Tests;", + "DatabaseName": "postgres" } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs b/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs new file mode 100644 index 00000000..9288e930 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/ConnectionProvider.cs @@ -0,0 +1,21 @@ +namespace KafkaFlow.Retry.Postgres +{ + using Dawn; + + internal sealed class ConnectionProvider : IConnectionProvider + { + public IDbConnection Create(PostgresDbSettings postgresDbSettings) + { + Guard.Argument(postgresDbSettings).NotNull(); + + return new DbConnectionContext(postgresDbSettings, false); + } + + public IDbConnectionWithinTransaction CreateWithinTransaction(PostgresDbSettings postgresDbSettings) + { + Guard.Argument(postgresDbSettings).NotNull(); + + return new DbConnectionContext(postgresDbSettings, true); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs b/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs new file mode 100644 index 00000000..2275e193 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/DbConnectionContext.cs @@ -0,0 +1,89 @@ +namespace KafkaFlow.Retry.Postgres +{ + using System.Diagnostics.CodeAnalysis; + using Dawn; + using Npgsql; + + [ExcludeFromCodeCoverage] + internal sealed class DbConnectionContext : IDbConnectionWithinTransaction + { + private readonly PostgresDbSettings postgresDbSettings; + private readonly bool withinTransaction; + private bool committed; + private NpgsqlConnection sqlConnection; + private NpgsqlTransaction sqlTransaction; + + public DbConnectionContext(PostgresDbSettings postgresDbSettings, bool withinTransaction) + { + Guard.Argument(postgresDbSettings).NotNull(); + this.postgresDbSettings = postgresDbSettings; + this.withinTransaction = withinTransaction; + } + + public void Commit() + { + if (this.sqlTransaction is object) + { + this.sqlTransaction.Commit(); + this.committed = true; + } + } + + public NpgsqlCommand CreateCommand() + { + var dbCommand = this.GetDbConnection().CreateCommand(); + + if (this.withinTransaction) + { + dbCommand.Transaction = this.GetDbTransaction(); + } + + return dbCommand; + } + + public void Dispose() + { + if (this.sqlTransaction is object) + { + if (!this.committed) + { + this.Rollback(); + } + this.sqlTransaction.Dispose(); + } + + if (this.sqlConnection is object) + { + this.sqlConnection.Dispose(); + } + } + + public void Rollback() + { + if (this.sqlTransaction is object) + { + this.sqlTransaction.Rollback(); + } + } + + private NpgsqlConnection GetDbConnection() + { + if (this.sqlConnection is null) + { + this.sqlConnection = new NpgsqlConnection(this.postgresDbSettings.ConnectionString); + this.sqlConnection.Open(); + this.sqlConnection.ChangeDatabase(this.postgresDbSettings.DatabaseName); + } + return this.sqlConnection; + } + + private NpgsqlTransaction GetDbTransaction() + { + if (this.sqlTransaction is null) + { + this.sqlTransaction = this.GetDbConnection().BeginTransaction(); + } + return this.sqlTransaction; + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Deploy/01 - Create_Tables.sql b/src/KafkaFlow.Retry.Postgres/Deploy/01 - Create_Tables.sql new file mode 100644 index 00000000..44e6839f --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Deploy/01 - Create_Tables.sql @@ -0,0 +1,109 @@ +-- Create Tables and Indexes +-- CREATE TABLES + +CREATE TABLE IF NOT EXISTS queue_status ( + Code smallint PRIMARY KEY NOT NULL, + Name varchar(255) NOT NULL, + Description varchar(255) NOT NULL); + +CREATE TABLE IF NOT EXISTS queue_item_status ( + Code smallint PRIMARY KEY NOT NULL, + Name varchar(255) NOT NULL, + Description varchar(255) NOT NULL); + +CREATE TABLE IF NOT EXISTS queue_item_severity ( + Code smallint PRIMARY KEY NOT NULL, + Name varchar(255) NOT NULL, + Description varchar(255) NOT NULL); + +CREATE TABLE IF NOT EXISTS retry_queues ( + Id bigint PRIMARY KEY NOT NULL GENERATED ALWAYS AS IDENTITY, + IdDomain uuid UNIQUE, + IdStatus smallint NOT NULL, + SearchGroupKey varchar(255) NOT NULL, + QueueGroupKey varchar(255) NOT NULL, + CreationDate TIMESTAMP NOT NULL, + LastExecution TIMESTAMP NOT NULL, + CONSTRAINT FK_RetryQueueStatus_RetryQueues FOREIGN KEY (IdStatus) REFERENCES queue_status(Code) + ); + +CREATE TABLE IF NOT EXISTS retry_queue_items ( + Id bigint PRIMARY KEY NOT NULL GENERATED ALWAYS AS IDENTITY, + IdDomain uuid NOT NULL, + IdRetryQueue bigint NOT NULL, + IdDomainRetryQueue uuid NOT NULL, + IdItemStatus smallint NOT NULL, + IdSeverityLevel smallint NOT NULL, + AttemptsCount int NOT NULL, + Sort int NOT NULL, + CreationDate TIMESTAMP NOT NULL, + LastExecution TIMESTAMP, + ModifiedStatusDate TIMESTAMP, + Description varchar NULL, + CONSTRAINT FK_RetryQueues_RetryQueueItems_IdRetryQueue FOREIGN KEY (IdRetryQueue) REFERENCES retry_queues(Id) ON DELETE CASCADE, + CONSTRAINT FK_RetryQueues_RetryQueueItems_IdDomainRetryQueue FOREIGN KEY (IdDomainRetryQueue) REFERENCES retry_queues(IdDomain), + CONSTRAINT FK_QueueItemStatus_RetryQueueItems FOREIGN KEY (IdItemStatus) REFERENCES queue_item_status(Code), + CONSTRAINT FK_QueueItemSeverity_RetryQueueItems FOREIGN KEY (IdSeverityLevel) REFERENCES queue_item_severity(Code) + ); + +CREATE TABLE IF NOT EXISTS item_messages ( + IdRetryQueueItem bigint PRIMARY KEY, + Key bytea NOT NULL, + Value bytea NOT NULL, + TopicName varchar(300) NOT NULL, + Partition int NOT NULL, + "offset" bigint NOT NULL, + UtcTimeStamp TIMESTAMP NOT NULL, + CONSTRAINT FK_RetryQueueItems_ItemMessages FOREIGN KEY (IdRetryQueueItem) REFERENCES retry_queue_items(Id) ON DELETE CASCADE + ); + +CREATE TABLE IF NOT EXISTS retry_item_message_headers ( + Id bigint PRIMARY KEY NOT NULL GENERATED ALWAYS AS IDENTITY NOT NULL, + IdItemMessage bigint NOT NULL, + Key varchar(255) NOT NULL, + Value bytea NOT NULL, + CONSTRAINT FK_ItemMessages_RetryItemMessageHeaders FOREIGN KEY (IdItemMessage) REFERENCES item_messages(IdRetryQueueItem) ON DELETE CASCADE + ); + +CREATE OR REPLACE FUNCTION f_load_item_messages(retryQueueItemsIds bigint[]) + RETURNS TABLE ( + IdRetryQueueItem bigint, + Key bytea, + Value bytea, + TopicName varchar(300), + Partition int, + "offset" bigint, + UtcTimeStamp TIMESTAMP + ) + LANGUAGE plpgsql + AS $$ + BEGIN + RETURN QUERY + SELECT IM.IdRetryQueueItem, IM.Key, IM.Value, IM.TopicName, IM.Partition, IM."offset", IM.UtcTimeStamp + FROM item_messages IM + INNER JOIN retry_queue_items RQI ON RQI.Id = IM.IdRetryQueueItem + INNER JOIN UNNEST(retryQueueItemsIds) AS RI(Id) ON IM.IdRetryQueueItem=RI.Id + ORDER BY RQI.IdRetryQueue, IM.IdRetryQueueItem; + END $$; + +-- CREATE INDEXES + +-- Table retry_queues +CREATE INDEX IF NOT EXISTS ix_retry_queues_SearchGroupKey ON retry_queues (SearchGroupKey); + +CREATE UNIQUE INDEX IF NOT EXISTS ix_retry_queues_QueueGroupKey ON retry_queues (QueueGroupKey); + +CREATE INDEX IF NOT EXISTS ix_retry_queues_IdStatus ON retry_queues (IdStatus); + +CREATE INDEX IF NOT EXISTS ix_retry_queues_CreationDate ON retry_queues (CreationDate); + +CREATE INDEX IF NOT EXISTS ix_retry_queues_LastExecution ON retry_queues (LastExecution); + +-- Table retry_queue_items +CREATE UNIQUE INDEX IF NOT EXISTS ix_retry_queue_items_IdDomain ON retry_queue_items (IdDomain); + +CREATE INDEX IF NOT EXISTS ix_retry_queue_items_Sort ON retry_queue_items (Sort); + +CREATE INDEX IF NOT EXISTS ix_retry_queue_items_IdItemStatus ON retry_queue_items (IdItemStatus); + +CREATE INDEX IF NOT EXISTS ix_retry_queue_items_IdSeverityLevel ON retry_queue_items (IdSeverityLevel); \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Deploy/02 - Populate_Tables.sql b/src/KafkaFlow.Retry.Postgres/Deploy/02 - Populate_Tables.sql new file mode 100644 index 00000000..5505bc95 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Deploy/02 - Populate_Tables.sql @@ -0,0 +1,28 @@ +-- Populate tables + +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM queue_status WHERE Code IN (1, 2)) THEN + INSERT INTO queue_status (Code, Name, Description) + VALUES + (1, 'Active', 'The queue has unprocessed messages'), + (2, 'Done', 'The queue does not have unprocessed messages'); + END IF; + + IF NOT EXISTS (SELECT 1 FROM queue_item_status WHERE Code IN (1, 2, 3)) THEN + INSERT INTO queue_item_status (Code, Name, Description) + VALUES + (1, 'Waiting', 'Waiting for retry'), + (2, 'InRetry', 'Retrying'), + (3, 'Done', 'Done'), + (4, 'Cancelled', 'Cancelled'); + END IF; + + IF NOT EXISTS (SELECT 1 FROM queue_item_severity WHERE Code IN (1, 2, 3)) THEN + INSERT INTO queue_item_severity (Code, Name, Description) + VALUES + (0, 'Unknown', 'A severity level was not defined.'), + (1, 'Low', 'No loss of service. The software should recover by itself.'), + (2, 'Medium', 'Minor loss of service. The result is an inconvenience, it''s unclear if the software can recover by itself.'), + (3, 'High', 'Partial loss of service with severe impact on the business. Usually needs human intervention to be solved.'); + END IF; +END $$; \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/IConnectionProvider.cs b/src/KafkaFlow.Retry.Postgres/IConnectionProvider.cs new file mode 100644 index 00000000..8aa4686d --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/IConnectionProvider.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.Postgres +{ + internal interface IConnectionProvider + { + IDbConnection Create(PostgresDbSettings postgresDbSettings); + + IDbConnectionWithinTransaction CreateWithinTransaction(PostgresDbSettings postgresDbSettings); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/IDbConnection.cs b/src/KafkaFlow.Retry.Postgres/IDbConnection.cs new file mode 100644 index 00000000..3c63b0b7 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/IDbConnection.cs @@ -0,0 +1,10 @@ +namespace KafkaFlow.Retry.Postgres +{ + using System; + using Npgsql; + + internal interface IDbConnection : IDisposable + { + NpgsqlCommand CreateCommand(); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/IDbConnectionWithinTransaction.cs b/src/KafkaFlow.Retry.Postgres/IDbConnectionWithinTransaction.cs new file mode 100644 index 00000000..0c09ac70 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/IDbConnectionWithinTransaction.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.Postgres +{ + internal interface IDbConnectionWithinTransaction : IDbConnection + { + void Commit(); + + void Rollback(); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/IRetrySchemaCreator.cs b/src/KafkaFlow.Retry.Postgres/IRetrySchemaCreator.cs new file mode 100644 index 00000000..4484106a --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/IRetrySchemaCreator.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.Postgres +{ + using System.Threading.Tasks; + + public interface IRetrySchemaCreator + { + Task CreateOrUpdateSchemaAsync(string databaseName); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/KafkaFlow.Retry.Postgres.csproj b/src/KafkaFlow.Retry.Postgres/KafkaFlow.Retry.Postgres.csproj new file mode 100644 index 00000000..4b7a5d50 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/KafkaFlow.Retry.Postgres.csproj @@ -0,0 +1,43 @@ + + + + netstandard2.0 + true + FARFETCH + + + + + + LICENSE.md + + Git + kafka flow kafkaflow extension extensions retry postgres + A durable persistence adapter for PostgreSQL for KafkaFlow.Retry extension. + Copyright (c) FARFETCH 2021 + + + + + + + + + + + + + + + PreserveNewest + + + + + + + + + + + \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueDboFactory.cs new file mode 100644 index 00000000..77f7b32c --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueDboFactory.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using KafkaFlow.Retry.Durable.Repository.Actions.Create; + + internal interface IRetryQueueDboFactory + { + RetryQueueDbo Create(SaveToQueueInput input); + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemDboFactory.cs new file mode 100644 index 00000000..f720c81c --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemDboFactory.cs @@ -0,0 +1,10 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using System; + using KafkaFlow.Retry.Durable.Repository.Actions.Create; + + internal interface IRetryQueueItemDboFactory + { + RetryQueueItemDbo Create(SaveToQueueInput input, long retryQueueId, Guid retryQueueDomainId); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageDboFactory.cs new file mode 100644 index 00000000..81531a7e --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageDboFactory.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using KafkaFlow.Retry.Durable.Repository.Model; + + internal interface IRetryQueueItemMessageDboFactory + { + RetryQueueItemMessageDbo Create(RetryQueueItemMessage retryQueueItemMessage, long retryQueueItemId); + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageHeaderDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageHeaderDboFactory.cs new file mode 100644 index 00000000..2577744c --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/IRetryQueueItemMessageHeaderDboFactory.cs @@ -0,0 +1,10 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using System.Collections.Generic; + using KafkaFlow.Retry.Durable.Repository.Model; + + internal interface IRetryQueueItemMessageHeaderDboFactory + { + IEnumerable Create(IEnumerable headers, long retryQueueItemId); + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs new file mode 100644 index 00000000..55de9ba3 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueDboFactory.cs @@ -0,0 +1,24 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using System; + using Dawn; + using KafkaFlow.Retry.Durable.Repository.Actions.Create; + + internal sealed class RetryQueueDboFactory : IRetryQueueDboFactory + { + public RetryQueueDbo Create(SaveToQueueInput input) + { + Guard.Argument(input).NotNull(); + + return new RetryQueueDbo + { + IdDomain = Guid.NewGuid(), + SearchGroupKey = input.SearchGroupKey, + QueueGroupKey = input.QueueGroupKey, + CreationDate = input.CreationDate, + LastExecution = input.LastExecution.Value, + Status = input.QueueStatus + }; + } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs new file mode 100644 index 00000000..5ad48d07 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemDboFactory.cs @@ -0,0 +1,30 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using System; + using Dawn; + using KafkaFlow.Retry.Durable.Repository.Actions.Create; + + internal sealed class RetryQueueItemDboFactory : IRetryQueueItemDboFactory + { + public RetryQueueItemDbo Create(SaveToQueueInput input, long retryQueueId, Guid retryQueueDomainId) + { + Guard.Argument(input, nameof(input)).NotNull(); + Guard.Argument(retryQueueId, nameof(retryQueueId)).Positive(); + Guard.Argument(retryQueueDomainId, nameof(retryQueueDomainId)).NotDefault(); + + return new RetryQueueItemDbo + { + IdDomain = Guid.NewGuid(), + CreationDate = input.CreationDate, + LastExecution = input.LastExecution, + ModifiedStatusDate = input.ModifiedStatusDate, + AttemptsCount = input.AttemptsCount, + RetryQueueId = retryQueueId, + DomainRetryQueueId = retryQueueDomainId, + Status = input.ItemStatus, + SeverityLevel = input.SeverityLevel, + Description = input.Description + }; + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs new file mode 100644 index 00000000..cd61bbc9 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageDboFactory.cs @@ -0,0 +1,25 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using Dawn; + using KafkaFlow.Retry.Durable.Repository.Model; + + internal sealed class RetryQueueItemMessageDboFactory : IRetryQueueItemMessageDboFactory + { + public RetryQueueItemMessageDbo Create(RetryQueueItemMessage retryQueueItemMessage, long retryQueueItemId) + { + Guard.Argument(retryQueueItemMessage, nameof(retryQueueItemMessage)).NotNull(); + Guard.Argument(retryQueueItemId, nameof(retryQueueItemId)).Positive(); + + return new RetryQueueItemMessageDbo + { + IdRetryQueueItem = retryQueueItemId, + Key = retryQueueItemMessage.Key, + Value = retryQueueItemMessage.Value, + Offset = retryQueueItemMessage.Offset, + Partition = retryQueueItemMessage.Partition, + TopicName = retryQueueItemMessage.TopicName, + UtcTimeStamp = retryQueueItemMessage.UtcTimeStamp, + }; + } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs new file mode 100644 index 00000000..9dffb522 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Factories/RetryQueueItemMessageHeaderDboFactory.cs @@ -0,0 +1,30 @@ +namespace KafkaFlow.Retry.Postgres.Model.Factories +{ + using System.Collections.Generic; + using System.Linq; + using Dawn; + using KafkaFlow.Retry.Durable.Repository.Model; + + internal sealed class RetryQueueItemMessageHeaderDboFactory : IRetryQueueItemMessageHeaderDboFactory + { + public IEnumerable Create(IEnumerable headers, long retryQueueItemId) + { + Guard.Argument(headers).NotNull(); + Guard.Argument(retryQueueItemId, nameof(retryQueueItemId)).Positive(); + + return headers.Select(h => this.Adapt(h, retryQueueItemId)); + } + + private RetryQueueItemMessageHeaderDbo Adapt(MessageHeader header, long retryQueueItemId) + { + Guard.Argument(header).NotNull(); + + return new RetryQueueItemMessageHeaderDbo + { + Key = header.Key, + Value = header.Value, + RetryQueueItemMessageId = retryQueueItemId + }; + } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/RetryQueueDbo.cs b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueDbo.cs new file mode 100644 index 00000000..d01f82fe --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueDbo.cs @@ -0,0 +1,24 @@ +namespace KafkaFlow.Retry.Postgres.Model +{ + using System; + using System.Diagnostics.CodeAnalysis; + using KafkaFlow.Retry.Durable.Repository.Model; + + [ExcludeFromCodeCoverage] + internal class RetryQueueDbo + { + public DateTime CreationDate { get; set; } + + public long Id { get; set; } + + public Guid IdDomain { get; set; } + + public DateTime LastExecution { get; set; } + + public string QueueGroupKey { get; set; } + + public string SearchGroupKey { get; set; } + + public RetryQueueStatus Status { get; set; } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemDbo.cs b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemDbo.cs new file mode 100644 index 00000000..521da8d4 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemDbo.cs @@ -0,0 +1,35 @@ +namespace KafkaFlow.Retry.Postgres.Model +{ + using System; + using System.Diagnostics.CodeAnalysis; + using KafkaFlow.Retry.Durable.Common; + using KafkaFlow.Retry.Durable.Repository.Model; + + [ExcludeFromCodeCoverage] + internal class RetryQueueItemDbo + { + public int AttemptsCount { get; set; } + + public DateTime CreationDate { get; set; } + + public string Description { get; set; } + + public Guid DomainRetryQueueId { get; set; } + + public long Id { get; set; } + + public Guid IdDomain { get; set; } + + public DateTime? LastExecution { get; set; } + + public DateTime? ModifiedStatusDate { get; set; } + + public long RetryQueueId { get; set; } + + public SeverityLevel SeverityLevel { get; set; } + + public int Sort { get; set; } + + public RetryQueueItemStatus Status { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageDbo.cs b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageDbo.cs new file mode 100644 index 00000000..fb83fd5f --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageDbo.cs @@ -0,0 +1,23 @@ +namespace KafkaFlow.Retry.Postgres.Model +{ + using System; + using System.Diagnostics.CodeAnalysis; + + [ExcludeFromCodeCoverage] + internal class RetryQueueItemMessageDbo + { + public long IdRetryQueueItem { get; set; } + + public byte[] Key { get; set; } + + public long Offset { get; set; } + + public int Partition { get; set; } + + public string TopicName { get; set; } + + public DateTime UtcTimeStamp { get; set; } + + public byte[] Value { get; set; } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageHeaderDbo.cs b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageHeaderDbo.cs new file mode 100644 index 00000000..dfb0e45f --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/RetryQueueItemMessageHeaderDbo.cs @@ -0,0 +1,16 @@ +namespace KafkaFlow.Retry.Postgres.Model +{ + using System.Diagnostics.CodeAnalysis; + + [ExcludeFromCodeCoverage] + internal class RetryQueueItemMessageHeaderDbo + { + public long Id { get; set; } + + public string Key { get; set; } + + public long RetryQueueItemMessageId { get; set; } + + public byte[] Value { get; set; } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs b/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs new file mode 100644 index 00000000..255537d5 --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/RetryQueuesDboWrapper.cs @@ -0,0 +1,22 @@ +namespace KafkaFlow.Retry.Postgres.Model +{ + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + + [ExcludeFromCodeCoverage] + internal class RetryQueuesDboWrapper + { + public RetryQueuesDboWrapper() + { + QueuesDbos = new RetryQueueDbo[0]; + ItemsDbos = new RetryQueueItemDbo[0]; + MessagesDbos = new RetryQueueItemMessageDbo[0]; + HeadersDbos = new RetryQueueItemMessageHeaderDbo[0]; + } + + public IList HeadersDbos { get; set; } + public IList ItemsDbos { get; set; } + public IList MessagesDbos { get; set; } + public IList QueuesDbos { get; set; } + } +} diff --git a/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs b/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs new file mode 100644 index 00000000..575d190c --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/Model/Schema/Script.cs @@ -0,0 +1,16 @@ +namespace KafkaFlow.Retry.Postgres.Model.Schema +{ + using Dawn; + + public class Script + { + public Script(string value) + { + Guard.Argument(value, nameof(value)).NotNull(); + + this.Value = value; + } + + public string Value { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs b/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs new file mode 100644 index 00000000..6f2db15d --- /dev/null +++ b/src/KafkaFlow.Retry.Postgres/PostgresDbDataProviderFactory.cs @@ -0,0 +1,73 @@ +namespace KafkaFlow.Retry.Postgres +{ + using System.Collections.Generic; + using System.IO; + using System.Reflection; + using Dawn; + using KafkaFlow.Retry.Durable.Repository; + using KafkaFlow.Retry.Postgres.Model.Factories; + using KafkaFlow.Retry.Postgres.Model.Schema; + using KafkaFlow.Retry.Postgres.Readers; + using KafkaFlow.Retry.Postgres.Readers.Adapters; + using KafkaFlow.Retry.Postgres.Repositories; + + public sealed class PostgresDbDataProviderFactory + { + public IRetryDurableQueueRepositoryProvider Create(PostgresDbSettings postgresDbSettings) + { + Guard.Argument(postgresDbSettings) + .NotNull("It is mandatory to config the factory before creating new instances of IRetryQueueDataProvider. Make sure the Config method is executed before the Create method."); + + var retryQueueItemMessageAdapter = + new RetryQueueItemMessageDboFactory(); + + var retryQueueReader = new RetryQueueReader( + new RetryQueueAdapter(), + new RetryQueueItemAdapter(), + new RetryQueueItemMessageAdapter(), + new RetryQueueItemMessageHeaderAdapter() + ); + + return new RetryQueueDataProvider( + postgresDbSettings, + new ConnectionProvider(), + new RetryQueueItemMessageHeaderRepository(), + new RetryQueueItemMessageRepository(), + new RetryQueueItemRepository(), + new RetryQueueRepository(), + new RetryQueueDboFactory(), + new RetryQueueItemDboFactory(), + retryQueueReader, + retryQueueItemMessageAdapter, + new RetryQueueItemMessageHeaderDboFactory()); + } + + public IRetrySchemaCreator CreateSchemaCreator(PostgresDbSettings postgresDbSettings) => new RetrySchemaCreator(postgresDbSettings, this.GetScriptsForSchemaCreation()); + + private IEnumerable