diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c1ae8a3..6ca7086c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,8 +52,10 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226) - `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139) +- `Propulsion.Feed`: Moved implementations into main `Propulsion` library. While this adds a `FSharp.Control.TaskSeq` dependency, it makes maintenance and navigation easier - `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https://github.com/jet/propulsion/pull/158), [#159](https://github.com/jet/propulsion/pull/159) - `Propulsion.Kafka`: Target `FsCodec.NewtonsoftJson` v `3.0.0` [#139](https://github.com/jet/propulsion/pull/139) +- `Propulsion.Prometheus`: Extracted `Propulsion.Prometheus` and `Propulsion.Feed.Prometheus` in order to remove `Prometheus` dependency from core package - `Propulsion.Tool`: `project` renamed to `sync`; sources now have a `from` prefix [#252](https://github.com/jet/propulsion/pull/252) ### Removed diff --git a/Directory.Build.props b/Directory.Build.props index 3ba16424..9ad45ce7 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -17,6 +17,8 @@ $(NoWarn);FS2003;NU5105 + + diff --git a/Propulsion.sln b/Propulsion.sln index 4f54b74c..82ff9780 100644 --- a/Propulsion.sln +++ b/Propulsion.sln @@ -36,7 +36,7 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.SqlStreamStore", EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.CosmosStore", "src\Propulsion.CosmosStore\Propulsion.CosmosStore.fsproj", "{356294D8-DF59-4903-9A9C-03F0F459B2A3}" EndProject -Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.Feed", "src\Propulsion.Feed\Propulsion.Feed.fsproj", "{B6C1C225-940C-425C-A2F7-A728AEBFCB31}" +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.Prometheus", "src\Propulsion.Prometheus\Propulsion.Prometheus.fsproj", "{B6C1C225-940C-425C-A2F7-A728AEBFCB31}" EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.CosmosStore3", "src\Propulsion.CosmosStore3\Propulsion.CosmosStore3.fsproj", "{8A2D7DDF-ED05-4871-A2B1-66D22A581C95}" EndProject diff --git a/README.md b/README.md index 06a46509..11fe9c73 100644 --- a/README.md +++ b/README.md @@ -13,20 +13,26 @@ If you're looking for a good discussion forum on these kinds of topics, look no ## Core Components -- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics` +- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion. [Depends](https://www.fuget.org/packages/Propulsion) on `Serilog`, `MathNet.Numerics`, `FSharp.Control.TaskSeq`: 1. `StreamsSink`: High performance pipeline that handles parallelized event processing. Ingestion of events, and checkpointing of progress are handled asynchronously. Each aspect of the pipeline is decoupled such that it can be customized as desired. 2. `Streams.Prometheus`: Helper that exposes per-scheduler metrics for Prometheus scraping. 3. `ParallelProjector`: Scaled down variant of `StreamsSink` that does not preserve stream level ordering semantics + 4. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of `FsCodec.ITimelineEvent` records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it. + 5. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test. + 6. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been deemed handled by the Sink. + 7. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested. + 8. `JsonSource`: Simple source that feeds items from a File containing JSON (such a file can be generated via `eqx query -o JSONFILE from cosmos` etc) -- `Propulsion.Feed` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Feed.svg)](https://www.nuget.org/packages/Propulsion.Feed/) Provides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). [Depends](https://www.fuget.org/packages/Propulsion.Feed) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore`) + NOTE `Propulsion.Feed` is a namespace within the main `Propulsion` package that provides helpers for checkpointed consumption of a feed of stream-based inputs. + - Supported inputs include custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). + - Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). + - Using a feed normally requires a checkpoint store that inmplements `IFeedCheckpointStore` from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore` - 1. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of `FsCodec.ITimelineEvent` records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it. - 2. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test. - 3. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been deemed handled by the Sink. - 4. `JsonSource`: Simple source that feeds items from a File containing JSON (such a file can be generated via `eqx query -o JSONFILE from cosmos` etc) - 5. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested. - 6. `Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`). (NOTE all other statistics relating to processing throughput and latency etc are exposed from the Scheduler component on the Sink side) +- `Propulsion.Prometheus` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Prometheus.svg)](https://www.nuget.org/packages/Propulsion.Prometheus/) Provides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). [Depends](https://www.fuget.org/packages/Propulsion.Prometheus) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore`) + + 1. `Propulsion.Prometheus`: Exposes processing throughput statistics to Prometheus. + 2. `Propulsion.Feed.Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`). - `Propulsion.MemoryStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MemoryStore.svg)](https://www.nuget.org/packages/Propulsion.MemoryStore/). Provides bindings to `Equinox.MemoryStore`. [Depends](https://www.fuget.org/packages/Propulsion.MemoryStore) on `Equinox.MemoryStore` v `4.0.0`, `FsCodec.Box`, `Propulsion` @@ -51,9 +57,8 @@ If you're looking for a good discussion forum on these kinds of topics, look no 2. `DynamoStoreIndexer`: writes to `AppendsIndex`/`AppendsEpoch` (used by `Propulsion.DynamoStore.Indexer`, `Propulsion.Tool`) 3. `DynamoStoreSource`: reads from `AppendsIndex`/`AppendsEpoch` (see `DynamoStoreIndexer`) 4. `ReaderCheckpoint`: checkpoint storage for `Propulsion.DynamoStore`/`EventStoreDb`/`Feed`/`MessageDb`/`SqlStreamSteamStore` using `Equinox.DynamoStore` v `4.0.0`. - 5. `Monitor.AwaitCompletion`: See `Propulsion.Feed` - (Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`) + (Reading and position metrics are exposed via `Propulsion.Prometheus`) - `Propulsion.DynamoStore.Indexer` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Indexer.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Indexer/) AWS Lambda to index appends into an Index Table. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Indexer) on `Propulsion.DynamoStore`, `Amazon.Lambda.Core`, `Amazon.Lambda.DynamoDBEvents`, `Amazon.Lambda.Serialization.SystemTextJson` @@ -76,7 +81,7 @@ If you're looking for a good discussion forum on these kinds of topics, look no 2. `DynamoStoreNotifierLambda`: CDK wiring for `Propulsion.DynamoStore.Notifier` 3. `DynamoStoreReactorLambda`: CDK wiring for a Reactor that's triggered based on messages supplied by `Propulsion.DynamoStore.Notifier` -- `Propulsion.DynamoStore.Lambda` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Lambda.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Lambda/) Helpers for implementing Lambda Reactors. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Lambda) on `Amazon.Lambda.SQSEvents`, `Propulsion.Feed` +- `Propulsion.DynamoStore.Lambda` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Lambda.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Lambda/) Helpers for implementing Lambda Reactors. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Lambda) on `Amazon.Lambda.SQSEvents` 1. `SqsNotificationBatch.parse`: parses a batch of notification events (queued by a `Notifier`) in a `Amazon.Lambda.SQSEvents.SQSEvent` 2. `SqsNotificationBatch.batchResponseWithFailuresForPositionsNotReached`: Correlates the updated checkpoints with the input `SQSEvent`, generating a `SQSBatchResponse` that will requeue any notifications that have not yet been serviced. @@ -86,23 +91,17 @@ If you're looking for a good discussion forum on these kinds of topics, look no - `Propulsion.EventStoreDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStoreDb.svg)](https://www.nuget.org/packages/Propulsion.EventStoreDb/). Provides bindings to [EventStoreDB](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink`. [Depends](https://www.fuget.org/packages/Propulsion.EventStoreDb) on `Equinox.EventStoreDb` v `4.0.0` 1. `EventStoreSource`: reading from an EventStoreDB >= `20.10` `$all` stream using the gRPC interface into a `Propulsion.Sink`. 2. `EventStoreSink`: writing to `Equinox.EventStoreDb` v `4.0.0` - 3. `Monitor.AwaitCompletion`: See `Propulsion.Feed` - - (Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`) - `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99` -- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `7.0.7` [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord) +- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion`, `Npgsql` >= `7.0.7` [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord) 1. `MessageDbSource`: reading from one or more MessageDB categories into a `Propulsion.Sink` 2. `CheckpointStore`: checkpoint storage for `Propulsion.Feed` using `Npgsql` (can be initialized via `propulsion initpg -c connstr -s schema`) -- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL Server table. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3` +- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL Server table. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3` 1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink` 2. `ReaderCheckpoint`: checkpoint storage for `Propulsion.EventStoreDb`/`Feed`/`SqlStreamSteamStore` using `Dapper`, `Microsoft.Data.SqlClient` - 3. `Monitor.AwaitCompletion`: See `Propulsion.Feed` - - (Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`) The ubiquitous `Serilog` dependency is solely on the core module, not any sinks. diff --git a/build.proj b/build.proj index 2d273597..877ec5f7 100644 --- a/build.proj +++ b/build.proj @@ -20,10 +20,10 @@ - + diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index b782e81e..5bbbe05b 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -26,8 +26,8 @@ - - + + diff --git a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj index f033e850..468b8da0 100644 --- a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj +++ b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj @@ -45,9 +45,8 @@ - - - + + diff --git a/src/Propulsion.DynamoStore.Constructs/Propulsion.DynamoStore.Constructs.fsproj b/src/Propulsion.DynamoStore.Constructs/Propulsion.DynamoStore.Constructs.fsproj index cb424b16..64c2ad79 100644 --- a/src/Propulsion.DynamoStore.Constructs/Propulsion.DynamoStore.Constructs.fsproj +++ b/src/Propulsion.DynamoStore.Constructs/Propulsion.DynamoStore.Constructs.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.12 + diff --git a/src/Propulsion.DynamoStore.Indexer/Propulsion.DynamoStore.Indexer.fsproj b/src/Propulsion.DynamoStore.Indexer/Propulsion.DynamoStore.Indexer.fsproj index 8cfd4829..34b247b2 100644 --- a/src/Propulsion.DynamoStore.Indexer/Propulsion.DynamoStore.Indexer.fsproj +++ b/src/Propulsion.DynamoStore.Indexer/Propulsion.DynamoStore.Indexer.fsproj @@ -34,8 +34,8 @@ - - + + diff --git a/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj b/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj index fb02c1a0..14b70bb6 100644 --- a/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj +++ b/src/Propulsion.DynamoStore.Lambda/Propulsion.DynamoStore.Lambda.fsproj @@ -14,8 +14,8 @@ - - + + diff --git a/src/Propulsion.DynamoStore.Notifier/Propulsion.DynamoStore.Notifier.fsproj b/src/Propulsion.DynamoStore.Notifier/Propulsion.DynamoStore.Notifier.fsproj index fa486740..70135bce 100644 --- a/src/Propulsion.DynamoStore.Notifier/Propulsion.DynamoStore.Notifier.fsproj +++ b/src/Propulsion.DynamoStore.Notifier/Propulsion.DynamoStore.Notifier.fsproj @@ -42,8 +42,8 @@ - - + + diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index dedf35af..49716354 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -31,9 +31,8 @@ - - - + + diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 6fe17629..f4ed7690 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -24,9 +24,8 @@ - - - + + diff --git a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj index 20531b40..4f9c5e4d 100644 --- a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj +++ b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj @@ -22,9 +22,8 @@ - - - + + diff --git a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj index 263bdbe0..699a3863 100644 --- a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj +++ b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj @@ -22,9 +22,8 @@ - - - + + diff --git a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj index fff5dd11..28f4672f 100644 --- a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj +++ b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj @@ -26,8 +26,8 @@ - - + + diff --git a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj index 93edc96a..721c194b 100644 --- a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj +++ b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj @@ -21,9 +21,8 @@ - - - + + diff --git a/src/Propulsion.Feed/FeedPrometheus.fs b/src/Propulsion.Prometheus/FeedPrometheus.fs similarity index 100% rename from src/Propulsion.Feed/FeedPrometheus.fs rename to src/Propulsion.Prometheus/FeedPrometheus.fs diff --git a/src/Propulsion.Feed/Propulsion.Feed.fsproj b/src/Propulsion.Prometheus/Propulsion.Prometheus.fsproj similarity index 52% rename from src/Propulsion.Feed/Propulsion.Feed.fsproj rename to src/Propulsion.Prometheus/Propulsion.Prometheus.fsproj index 62ca91ee..e4b15841 100644 --- a/src/Propulsion.Feed/Propulsion.Feed.fsproj +++ b/src/Propulsion.Prometheus/Propulsion.Prometheus.fsproj @@ -8,22 +8,19 @@ - - - - + - - + + - - + + diff --git a/src/Propulsion/PropulsionPrometheus.fs b/src/Propulsion.Prometheus/PropulsionPrometheus.fs similarity index 100% rename from src/Propulsion/PropulsionPrometheus.fs rename to src/Propulsion.Prometheus/PropulsionPrometheus.fs diff --git a/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj b/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj index 4f723686..76d80aec 100644 --- a/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj +++ b/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj @@ -22,9 +22,8 @@ - - - + + diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion/FeedReader.fs similarity index 100% rename from src/Propulsion.Feed/FeedReader.fs rename to src/Propulsion/FeedReader.fs diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion/FeedSource.fs similarity index 92% rename from src/Propulsion.Feed/FeedSource.fs rename to src/Propulsion/FeedSource.fs index b08a4520..41e45514 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion/FeedSource.fs @@ -124,21 +124,6 @@ type AllFeedSource member x.Start() = base.Start(x.Pump) -/// Drives reading from the Source, stopping when the Tail of each of the Tranches has been reached -type SinglePassFeedSource - ( log: Serilog.ILogger, statsInterval: TimeSpan, - sourceId, - crawl: Func)>>, - checkpoints: IFeedCheckpointStore, sink: Propulsion.Sinks.SinkPipeline, - ?renderPos, ?logReadFailure, ?readFailureSleepInterval, ?logCommitFailure) = - inherit TailingFeedSource(log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, defaultArg renderPos string, - crawl, - ?logReadFailure = logReadFailure, ?readFailureSleepInterval = readFailureSleepInterval, ?logCommitFailure = logCommitFailure, - readersStopAtTail = true) - - member x.Start(readTranches) = - base.Start(fun ct -> x.Pump(readTranches, ct)) - module Categories = let private startsWith (prefix: string) (s: FsCodec.StreamName) = (FsCodec.StreamName.toString s).StartsWith(prefix) diff --git a/src/Propulsion.Feed/JsonSource.fs b/src/Propulsion/JsonSource.fs similarity index 95% rename from src/Propulsion.Feed/JsonSource.fs rename to src/Propulsion/JsonSource.fs index ea5d1b27..b595b788 100644 --- a/src/Propulsion.Feed/JsonSource.fs +++ b/src/Propulsion/JsonSource.fs @@ -39,5 +39,5 @@ type [] JsonSource private () = else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>)) with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) } - let source = Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, sourceId, crawl, checkpoints, sink, string) + let source = SinglePassFeedSource(log, statsInterval, sourceId, crawl, checkpoints, sink, string) source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] }) diff --git a/src/Propulsion.Feed/PeriodicSource.fs b/src/Propulsion/PeriodicSource.fs similarity index 100% rename from src/Propulsion.Feed/PeriodicSource.fs rename to src/Propulsion/PeriodicSource.fs diff --git a/src/Propulsion/Propulsion.fsproj b/src/Propulsion/Propulsion.fsproj index 0b9f83ba..f8828c2b 100644 --- a/src/Propulsion/Propulsion.fsproj +++ b/src/Propulsion/Propulsion.fsproj @@ -4,7 +4,9 @@ net6.0 - 3.0.0-rc.14 + + true + @@ -19,20 +21,21 @@ - + + + + + - - - + + - - diff --git a/src/Propulsion/SinglePassFeedSource.fs b/src/Propulsion/SinglePassFeedSource.fs new file mode 100644 index 00000000..c74312dc --- /dev/null +++ b/src/Propulsion/SinglePassFeedSource.fs @@ -0,0 +1,21 @@ +namespace Propulsion.Feed + +open Propulsion.Feed.Core +open System +open System.Collections.Generic +open System.Threading + +/// Drives reading from the Source, stopping when the Tail of each of the Tranches has been reached +type SinglePassFeedSource + ( log: Serilog.ILogger, statsInterval: TimeSpan, + sourceId, + crawl: Func)>>, + checkpoints: IFeedCheckpointStore, sink: Propulsion.Sinks.SinkPipeline, + ?renderPos, ?logReadFailure, ?readFailureSleepInterval, ?logCommitFailure) = + inherit TailingFeedSource(log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, defaultArg renderPos string, + crawl, + ?logReadFailure = logReadFailure, ?readFailureSleepInterval = readFailureSleepInterval, ?logCommitFailure = logCommitFailure, + readersStopAtTail = true) + + member x.Start(readTranches) = + base.Start(fun ct -> x.Pump(readTranches, ct)) diff --git a/tests/Propulsion.Tests/SinkHealthTests.fs b/tests/Propulsion.Tests/SinkHealthTests.fs index be73ad20..c9d19fb2 100644 --- a/tests/Propulsion.Tests/SinkHealthTests.fs +++ b/tests/Propulsion.Tests/SinkHealthTests.fs @@ -49,7 +49,7 @@ type Scenario(testOutput) = [] let run () = async { - let source = Propulsion.Feed.Core.SinglePassFeedSource(log, TimeSpan.FromSeconds 5, SourceId.parse "sid", crawl, checkpoints, sink, string) + let source = Propulsion.Feed.SinglePassFeedSource(log, TimeSpan.FromSeconds 5, SourceId.parse "sid", crawl, checkpoints, sink, string) let src = source.Start(fun _ct -> task { return [| TrancheId.parse "tid" |] }) let! monEx = src.Monitor.AwaitCompletion(propagationDelay = TimeSpan.FromSeconds 1, awaitFullyCaughtUp = true) |> Propulsion.Internal.Async.ofTask |> Async.Catch let me = extractHealthCheckExn monEx diff --git a/tests/Propulsion.Tests/SourceTests.fs b/tests/Propulsion.Tests/SourceTests.fs index 5df1923f..ce97c4ff 100644 --- a/tests/Propulsion.Tests/SourceTests.fs +++ b/tests/Propulsion.Tests/SourceTests.fs @@ -39,7 +39,7 @@ type Scenario(testOutput) = [] let SinglePassFeedSource withWait = async { let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>)) - let source = Propulsion.Feed.Core.SinglePassFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", crawl, checkpoints, sink, string) + let source = Propulsion.Feed.SinglePassFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", crawl, checkpoints, sink, string) use src = source.Start(fun _ct -> task { return [| TrancheId.parse "tid" |] }) // SinglePassFeedSource completion includes Waiting for Completion of all Batches on all Tranches and Flushing of Checkpoints // Hence waiting with the Monitor is not actually necessary (though it provides progress logging which otherwise would be less thorough) diff --git a/tools/Propulsion.Tool/Propulsion.Tool.fsproj b/tools/Propulsion.Tool/Propulsion.Tool.fsproj index 050e6cbc..ef04d0ca 100644 --- a/tools/Propulsion.Tool/Propulsion.Tool.fsproj +++ b/tools/Propulsion.Tool/Propulsion.Tool.fsproj @@ -7,7 +7,7 @@ true propulsion - + Major @@ -28,9 +28,11 @@ - + + +