From c4c09dfffef175091694fcb2d8c4447cdeee0e70 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 15 Jul 2024 16:43:47 +0100 Subject: [PATCH] feat!(Streams): Support propagating Unfolds --- CHANGELOG.md | 2 +- DOCUMENTATION.md | 9 +- .../CosmosStorePruner.fs | 6 +- src/Propulsion.CosmosStore/CosmosStoreSink.fs | 76 ++++---- .../EquinoxSystemTextJsonParser.fs | 78 +++++--- .../Propulsion.CosmosStore.fsproj | 7 +- .../Propulsion.CosmosStore3.fsproj | 7 +- .../Propulsion.DynamoStore.fsproj | 7 +- src/Propulsion.EventStore/EventStoreSink.fs | 65 +++---- .../Propulsion.EventStore.fsproj | 7 +- .../Propulsion.EventStoreDb.fsproj | 7 +- src/Propulsion.Feed/PeriodicSource.fs | 2 +- src/Propulsion.Feed/Propulsion.Feed.fsproj | 7 +- src/Propulsion.Kafka/Consumers.fs | 32 ++-- src/Propulsion.Kafka/ProducerSinks.fs | 4 +- src/Propulsion.Kafka/Propulsion.Kafka.fsproj | 5 +- .../Propulsion.MemoryStore.fsproj | 7 +- .../Propulsion.MessageDb.fsproj | 7 +- src/Propulsion.MessageDb/Readme.md | 2 +- .../Propulsion.SqlStreamStore.fsproj | 7 +- src/Propulsion/Propulsion.fsproj | 2 +- src/Propulsion/Sinks.fs | 71 +++---- src/Propulsion/Streams.fs | 175 ++++++++++-------- src/Propulsion/Sync.fs | 26 ++- .../ConsumersIntegration.fs | 4 +- .../Propulsion.MessageDb.Integration/Tests.fs | 6 +- tests/Propulsion.Tests/SinkHealthTests.fs | 10 +- tests/Propulsion.Tests/SourceTests.fs | 2 +- tests/Propulsion.Tests/StreamStateTests.fs | 115 +++++++++++- tools/Propulsion.Tool/Sync.fs | 6 +- 30 files changed, 441 insertions(+), 320 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1c55a93..80c1c990 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,7 +47,6 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Scheduler`: `Struct`/`voption` conversions; buffer reuse [#157](https://github.com/jet/propulsion/pull/157) - `Scheduler`: Replaced `Thead.Sleep` with `Task.WhenAny`; Added Sleep time logging [#161](https://github.com/jet/propulsion/pull/161) - `Streams`: Changed dominant `ITimelineEvent` `EventBody` type from `byte[]` to `System.ReadOnlyMemory` (`Sinks.EventBody`) [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208) -- `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https://github.com/jet/propulsion/pull/208) - `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226) @@ -60,6 +59,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed - `Streams.StreamSpan`: Changed from a record to individual arguments of `FsCodec.StreamName` and `Sinks.Event[]` [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208) +- `Streams.SpanResult`: Replaced with `int64` to reflect the updated position [#264](https://github.com/jet/propulsion/pull/264) [#208](https://github.com/jet/propulsion/pull/208) - `Streams`: `statsInterval` is obtained from the `Stats` wherever one is supplied [#208](https://github.com/jet/propulsion/pull/208) - `Propulsion.Cosmos`: Should not be in general use - users should port to `Propulsion.CosmosStore3`, then `Propulsion.CosmosStore` [#193](https://github.com/jet/propulsion/pull/193) - `Destructurama.FSharp` dependency [#152](https://github.com/jet/propulsion/pull/152) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index d70c2389..1e1e6e68 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -283,7 +283,7 @@ Typically, alerting should be set up based on the built in `busy` metrics provid - `failing`: streams that have had at least one failed Handler invocation (regardless of whether they are currently the subject of a retry Handler invocation or not). Typically it should be possible to define: - a reasonable limit before you'd want a low level alert to be raised - a point at which you raise an alarm on the basis that the system is in a state that will lead to a SLA breach and hence merits intervention -- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's `StreamResult`. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress) +- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's updated Stream Position. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress) Alongside alerting based on breaches of SLO limits, the values of the `busy` metrics are a key high level indicator of the health of a given Processor (along with the Handler Latency distribution). @@ -348,13 +348,12 @@ or the _Designing Data Intensive Applications_ book): - DynamoDb: requesting a 'consistent read' - CosmosDb: when using Session Consistency, require that reads are contingent on the session token being used by the feed reader. This can be achieved by using the same `CosmosClient` to ensure the session tokens are synchronized. 2. Perform a pre-flight check when reading, based on the `Index` of the newest event passed to the handler. In such a case, it may make sense to back off for a small period, before reporting failure to handle the event (by throwing an exception). The Handler will be re-invoked for another attempt, with a better chance of the event being reflected in the read. - - Once such a pre-flight check has been carried out, one can safely report `StreamResult.AllProcessed` (or `PartiallyProcessed` if you wish to defer some work due to the backlog of events triggering too much work to perform in a single invocation) 3. Perform the processing on a 'shoulder tap' basis, with the final position based on what you read. - First, load the stream's state, performing any required reactions. - - Then report the Version attained for the stream (based on the Index of the last event processed) by yielding a `StreamResult.OverrideNextIndex`. + - Then report the Version attained for the stream (based on the Index of the last event processed) as the Handler's updated position - In this case, one of following edge cases may result: - - _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield `StreamResult.OverrideNextIndex`, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched) - - _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `StreamResult.OverrideNextIndex 2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler. + - _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield the Position that the processing did see, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched) + - _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler. ### Consistency in the face of at least once delivery and re-traversal of events diff --git a/src/Propulsion.CosmosStore/CosmosStorePruner.fs b/src/Propulsion.CosmosStore/CosmosStorePruner.fs index da33960a..f0527d31 100644 --- a/src/Propulsion.CosmosStore/CosmosStorePruner.fs +++ b/src/Propulsion.CosmosStore/CosmosStorePruner.fs @@ -29,7 +29,7 @@ module Pruner = let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred) // For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events let writePos = max trimmedPos (untilIndex + 1L) - return struct (writePos, res) } + return struct (res, writePos) } type CosmosStorePrunerStats(log, statsInterval, stateInterval, [] ?failThreshold) = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) @@ -75,8 +75,8 @@ type CosmosStorePruner = #endif let interpret _stream span = let metrics = StreamSpan.metrics Event.storedSize span - struct (metrics, span) - Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r)) + struct (span, metrics) + Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil) let dumpStreams logStreamStates _log = logStreamStates Event.storedSize let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 7723a7bf..4350f8e6 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -15,8 +15,7 @@ module private Impl = type EventBody = byte[] // V4 defines one directly, here we shim it module StreamSpan = - let private toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray() - let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody + let toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray() // Trimmed edition of what V4 exposes module internal Equinox = module CosmosStore = @@ -34,10 +33,9 @@ module private Impl = // v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory rather than assuming and/or offering optimization for JSON bodies open System.Text.Json - let private toNativeEventBody (x: EventBody): JsonElement = + let toNativeEventBody (x: EventBody): JsonElement = if x.IsEmpty then JsonElement() else JsonSerializer.Deserialize(x.Span) - let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody #endif module Internal = @@ -49,38 +47,42 @@ module Internal = type [] Result = | Ok of updatedPos: int64 | Duplicate of updatedPos: int64 - | PartialDuplicate of overage: Event[] - | PrefixMissing of batch: Event[] * writePos: int64 - let logTo (log: ILogger) malformed (res: StreamName * Result) = + | PartialDuplicate of updatedPos: int64 + | PrefixMissing of gap: int * actualPos: int64 + let logTo (log: ILogger) malformed (res: StreamName * Result) = match res with - | stream, Ok (_, Result.Ok pos) -> + | stream, Ok (Result.Ok pos, _) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | stream, Ok (_, Result.Duplicate updatedPos) -> + | stream, Ok (Result.Duplicate updatedPos, _) -> log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) - | stream, Ok (_, Result.PartialDuplicate overage) -> - log.Information("Requeuing {stream} {pos} ({count} events)", stream, overage[0].Index, overage.Length) - | stream, Ok (_, Result.PrefixMissing (batch, pos)) -> - log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, batch[0].Index - pos, batch.Length, batch[0].Index) - | stream, Error (_, exn) -> + | stream, Ok (Result.PartialDuplicate updatedPos, _) -> + log.Information("Requeuing {stream} {pos}", stream, updatedPos) + | stream, Ok (Result.PrefixMissing (gap, pos), _) -> + log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) + | stream, Error (exn, _) -> let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information log.Write(level, exn, "Writing {stream} failed, retrying", stream) let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task { - log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) + let i = StreamSpan.index span + let n = StreamSpan.nextIndex span + span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds") + log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) + let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody #if COSMOSV3 - let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _)) + let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData) |> Async.executeAsTask ct #else - let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct) + let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct) #endif let res' = match res with | AppendResult.Ok pos -> Result.Ok pos.index | AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos -> match pos.index with - | actual when actual < span[0].Index -> Result.PrefixMissing (span, actual) - | actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual - | actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int)) + | actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual) + | actual when actual >= n -> Result.Duplicate actual + | actual -> Result.PartialDuplicate actual log.Debug("Result: {res}", res') return res' } let containsMalformedMessage e = @@ -103,32 +105,34 @@ module Internal = let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024) let writerResultLog = log.ForContext() let attemptWrite stream span ct = task { - let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span + let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span #if COSMOSV3 - try let! res = Writer.write log eventsContext (StreamName.toString stream) span' ct + try let! res = Writer.write log eventsContext (StreamName.toString stream) span ct #else - try let! res = Writer.write log eventsContext stream span' ct + try let! res = Writer.write log eventsContext stream span ct #endif - return Ok struct (met, res) - with e -> return Error struct (met, e) } + return Ok struct (res, met) + with e -> return Error struct (e, met) } let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = let applyResultToStreamState = function - | Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false) - | Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false - | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false - | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false - | Error struct (_stats, exn) -> + | Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) -> + let ss = streams.RecordWriteProgress(stream, pos', null) + struct (ss.WritePos, false) + | Ok (Writer.Result.PrefixMissing _, _stats) -> + streams.WritePos(stream), false + | Error struct (exn, _stats) -> let malformed = Writer.classify exn |> Writer.isMalformed - streams.SetMalformed(stream, malformed), malformed - let struct (ss, malformed) = applyResultToStreamState res + let ss = streams.SetMalformed(stream, malformed) + ss.WritePos, malformed + let struct (writePos, malformed) = applyResultToStreamState res Writer.logTo writerResultLog malformed (stream, res) - struct (ss.WritePos, res) + struct (res, writePos) Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress) type WriterResult = Internal.Writer.Result type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [] ?failThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats( log, statsInterval, stateInterval, ?failThreshold = failThreshold, logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump) let mutable okStreams, okEvents, okBytes = HashSet(), 0, 0L @@ -136,7 +140,7 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ + | { stream = stream; result = Ok (res, (es, bs)) } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okBytes <- okBytes + int64 bs @@ -146,7 +150,7 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ resultPartialDup <- resultPartialDup + 1 | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 base.RecordOk(message) - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + | { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es diff --git a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs index 8a9133d3..991ce517 100644 --- a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs +++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs @@ -1,7 +1,7 @@ namespace Propulsion.CosmosStore open Equinox.CosmosStore.Core - +open Propulsion.Internal open Propulsion.Sinks /// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams. @@ -10,12 +10,11 @@ open Propulsion.Sinks #if !COSMOSV3 module EquinoxSystemTextJsonParser = - type System.Text.Json.JsonDocument with - member document.Cast<'T>() = - System.Text.Json.JsonSerializer.Deserialize<'T>(document.RootElement) - type Batch with - member _.MapData x = - System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x + type System.Text.Json.JsonElement with + member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x) + member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory + + type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>() let timestamp (doc: System.Text.Json.JsonDocument) = let unixEpoch = System.DateTime.UnixEpoch let ts = let r = doc.RootElement in r.GetProperty("_ts") @@ -23,7 +22,7 @@ module EquinoxSystemTextJsonParser = /// Parses an Equinox.Cosmos Batch from a CosmosDB Item /// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it - let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) = + let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) = let r = d.RootElement let tryProp (id: string): ValueOption = let mutable p = Unchecked.defaultof<_> @@ -33,36 +32,55 @@ module EquinoxSystemTextJsonParser = match tryProp "p" with | ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" -> let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw) - if streamFilter sn then ValueSome (struct (sn, d.Cast())) else ValueNone + if streamFilter sn then ValueSome (struct (sn, d.Cast(), tryProp "u")) else ValueNone | _ -> ValueNone - /// Enumerates the events represented within a batch - let enumEquinoxCosmosEvents (batch: Batch): Event seq = - batch.e |> Seq.mapi (fun offset x -> - let d = batch.MapData x.d - let m = batch.MapData x.m + /// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item + let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq = + let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) = + let d = x.d.ToSinkEventBody() + let m = x.m.ToSinkEventBody() let inline len s = if isNull s then 0 else String.length s - FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t, + FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t, size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80, - correlationId = x.correlationId, causationId = x.causationId)) - - /// Attempts to parse a Document/Item from the Store + correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold) + let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset)) + // an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc + match u |> ValueOption.map (fun u -> u.Cast()) with + | ValueNone | ValueSome null | ValueSome [||] -> events + | ValueSome unfolds -> seq { + yield! events + for x in unfolds do + gen true batch.n x } + let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq voption = + tryParseEquinoxBatchOrTip streamFilter jsonDocument + |> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x)) + + /// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects - let tryEnumStreamEvents streamFilter d: seq voption = - tryParseEquinoxBatch streamFilter d - |> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x)) + let tryEnumStreamEvents streamFilter jsonDocument: seq voption = + tryEnumStreamEvents_ false streamFilter jsonDocument + + /// Extracts all events that pass the streamFilter from a Feed item + let whereStream streamFilter jsonDocument: StreamEvent seq = + tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty - /// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let whereStream streamFilter d: StreamEvent seq = - tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty + /// Extracts all events passing the supplied categoryFilter from a Feed Item + let whereCategory categoryFilter jsonDocument: StreamEvent seq = + whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument - /// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let whereCategory categoryFilter d: StreamEvent seq = - whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d + /// Extracts all events from the specified category list from a Feed Item + let ofCategories (categories: string[]) jsonDocument: StreamEvent seq = + whereCategory (fun c -> Array.contains c categories) jsonDocument + + /// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument + /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects + let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq voption = + tryEnumStreamEvents_ true streamFilter jsonDocument - /// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let ofCategories categories d: StreamEvent seq = - whereCategory (fun c -> Array.contains c categories) d + /// Extracts Events and Unfolds that pass the streamFilter from a Feed item + let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq = + tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty #else module EquinoxNewtonsoftParser = diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index be39b183..5ed6d069 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + @@ -26,8 +26,9 @@ - - + + + diff --git a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj index b31cf269..f033e850 100644 --- a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj +++ b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + COSMOSV3 @@ -45,8 +45,9 @@ - - + + + diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index 03103a6e..958c0603 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -4,7 +4,7 @@ net6.0 - + DYNAMOSTORE @@ -31,8 +31,9 @@ - - + + + diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index c99f1522..57886ef4 100755 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -21,29 +21,29 @@ module Internal = type [] Result = | Ok of updatedPos: int64 | Duplicate of updatedPos: int64 - | PartialDuplicate of overage: Event[] - | PrefixMissing of batch: Event[] * writePos: int64 + | PartialDuplicate of updatedPos: int64 + | PrefixMissing of gap: int * actualPos: int64 - let logTo (log: ILogger) (res: FsCodec.StreamName * Result) = + let logTo (log: ILogger) (res: FsCodec.StreamName * Result) = match res with - | stream, Ok (_, Result.Ok pos) -> + | stream, Ok (Result.Ok pos, _) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | stream, Ok (_, Result.Duplicate updatedPos) -> + | stream, Ok (Result.Duplicate updatedPos, _) -> log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) - | stream, Ok (_, Result.PartialDuplicate overage) -> - log.Information("Requeuing {stream} {pos} ({count} events)", stream, overage[0].Index, overage.Length) - | stream, Ok (_, Result.PrefixMissing (batch, pos)) -> - log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", - stream, batch[0].Index - pos, batch.Length, batch[0].Index) - | stream, Error (_, exn) -> + | stream, Ok (Result.PartialDuplicate updatedPos, _) -> + log.Information("Requeuing {stream} {pos}", stream, updatedPos) + | stream, Ok (Result.PrefixMissing (gap, pos), _) -> + log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) + | stream, Error (exn, _) -> log.Warning(exn,"Writing {stream} failed, retrying", stream) let write (log: ILogger) (context: EventStoreContext) stream (span: Event[]) ct = task { - log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) + let i = StreamSpan.index span + log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) #if EVENTSTORE_LEGACY - let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _)) + let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _)) #else - let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _), ct) + let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _), ct) #endif let ress = match res with @@ -51,9 +51,9 @@ module Internal = Result.Ok (pos'.streamVersion + 1L) | GatewaySyncResult.ConflictUnknown (Token.Unpack pos) -> match pos.streamVersion + 1L with - | actual when actual < span[0].Index -> Result.PrefixMissing (span, actual) - | actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual - | actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int)) + | actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual) + | actual when actual >= i + span.LongLength -> Result.Duplicate actual + | actual -> Result.PartialDuplicate actual log.Debug("Result: {res}", ress) return ress } @@ -69,33 +69,36 @@ module Internal = let index = System.Threading.Interlocked.Increment(&robin) % connections.Length let selectedConnection = connections[index] let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096 - let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span - try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct - return Ok struct (met, res) - with e -> return Error struct (met, e) } + let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span + try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span ct + return Ok struct (res, met) + with e -> return Error struct (e, met) } let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = let applyResultToStreamState = function - | Ok struct (_stats, Writer.Result.Ok pos) -> streams.RecordWriteProgress(stream, pos, null) - | Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null) - | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]) - | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]) - | Error struct (_stats, _exn) -> streams.SetMalformed(stream, false) - let ss = applyResultToStreamState res + | Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) -> + let ss = streams.RecordWriteProgress(stream, pos', null) + ss.WritePos + | Ok (Writer.Result.PrefixMissing _, _stats) -> + streams.WritePos(stream) + | Error struct (_stats, _exn) -> + let ss = streams.SetMalformed(stream, false) + ss.WritePos + let writePos = applyResultToStreamState res Writer.logTo writerResultLog (stream, res) - struct (ss.WritePos, res) + struct (res, writePos) Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretProgress) type WriterResult = Internal.Writer.Result type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [] ?failThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats( log, statsInterval, stateInterval, ?failThreshold = failThreshold, logExternalStats = defaultArg logExternalStats Log.InternalMetrics.dump) let mutable okStreams, okEvents, okBytes, exnCats, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, Stats.Counters(), HashSet(), 0 , 0L let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0 override _.Handle message = match message with - | { stream = stream; result = Ok ((es, bs), res) } -> + | { stream = stream; result = Ok (res, (es, bs)) } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okBytes <- okBytes + int64 bs @@ -105,7 +108,7 @@ type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ resultPartialDup <- resultPartialDup + 1 | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 base.RecordOk(message) - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + | { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 039f17e4..6fe17629 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -4,7 +4,7 @@ net6.0 - + EVENTSTORE_LEGACY @@ -24,8 +24,9 @@ - - + + + diff --git a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj index 6019d145..20531b40 100644 --- a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj +++ b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -22,8 +22,9 @@ - - + + + diff --git a/src/Propulsion.Feed/PeriodicSource.fs b/src/Propulsion.Feed/PeriodicSource.fs index 80236c72..b332d102 100644 --- a/src/Propulsion.Feed/PeriodicSource.fs +++ b/src/Propulsion.Feed/PeriodicSource.fs @@ -38,7 +38,7 @@ module private TimelineEvent = fun (i, x: FsCodec.IEventData<_>, context: obj) -> if i > DateTimeOffsetPosition.factor then invalidArg (nameof i) $"Index may not exceed %d{DateTimeOffsetPosition.factor}" FsCodec.Core.TimelineEvent.Create( - baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = true, context = context) + baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = false, context = context) [] type SourceItem<'F> = { streamName: FsCodec.StreamName; eventData: FsCodec.IEventData<'F>; context: obj } diff --git a/src/Propulsion.Feed/Propulsion.Feed.fsproj b/src/Propulsion.Feed/Propulsion.Feed.fsproj index 60d12042..82cc59ae 100644 --- a/src/Propulsion.Feed/Propulsion.Feed.fsproj +++ b/src/Propulsion.Feed/Propulsion.Feed.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + @@ -22,8 +22,9 @@ - - + + + diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index 1b01b69c..b2ae9c4d 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -181,15 +181,15 @@ module Core = static member Start<'Info, 'Outcome> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, - prepare, maxDop, handle: Func>, - stats: Scheduling.Stats, + prepare, maxDop, handle: Func>, + stats: Scheduling.Stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let dumpStreams logStreamStates log = logExternalState |> Option.iter (fun f -> f log) logStreamStates Event.storedSize let scheduler = Scheduling.Engine( - Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, prepare, handle, StreamResult.toIndex), stats, dumpStreams, pendingBufferSize = 5, + Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, prepare, handle), stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) let mapConsumedMessagesToStreamsBatch onCompletion (x: Submission.Batch): struct (_ * Buffer.Batch) = let onCompletion () = x.onCompletion(); onCompletion() @@ -203,7 +203,7 @@ module Core = ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let prepare _streamName span = let metrics = StreamSpan.metrics Event.storedSize span - struct (metrics, span) + struct (span, metrics) StreamsConsumer.Start<'Info, 'Outcome>( log, config, consumeResultToInfo, infoToStreamEvents, prepare, maxDop, handle, stats, ?logExternalState = logExternalState, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) @@ -229,7 +229,7 @@ module Core = ( log: ILogger, config: KafkaConsumerConfig, // often implemented via StreamNameSequenceGenerator.KeyValueToStreamEvent keyValueToStreamEvents: KeyValuePair -> StreamEvent seq, - handle: Func>, maxDop, + handle: Func>, maxDop, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = StreamsConsumer.Start, 'Outcome>( @@ -333,7 +333,7 @@ type Factory private () = static member StartConcurrentAsync<'Outcome> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToStreamEvents: ConsumeResult<_, _> -> StreamEvent seq, - maxDop, handle: Func>, stats, + maxDop, handle: Func>, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = Core.StreamsConsumer.Start, 'Outcome>( @@ -343,21 +343,21 @@ type Factory private () = static member StartBatchedAsync<'Info> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, - select, handle: Func[], CancellationToken, Task)>>>, stats, + select, handle: Func[], CancellationToken, Task * TimeSpan)>>>, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let handle (items: Scheduling.Item[]) ct - : Task>[]> = task { + : Task>[]> = task { let start = Stopwatch.timestamp () let inline err ts e (x: Scheduling.Item<_>) = let met = StreamSpan.metrics Event.renderedSize x.span - Scheduling.InternalRes.create (x, ts, Result.Error struct (met, e)) + Scheduling.InternalRes.create (x, ts, Result.Error struct (e, met)) try let! results = handle.Invoke(items, ct) - return Array.ofSeq (Seq.zip items results |> Seq.map(function - | item, (ts, Ok index') -> - let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq + return Array.ofSeq (Seq.zip items results |> Seq.map (function + | item, (Ok index', ts) -> + let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index') |> Array.ofSeq let metrics = StreamSpan.metrics Event.storedSize used - Scheduling.InternalRes.create (item, ts, Result.Ok struct (metrics, index')) - | item, (ts, Error e) -> err ts e item)) + Scheduling.InternalRes.create (item, ts, Result.Ok struct (index', metrics)) + | item, (Error e, ts) -> err ts e item)) with e -> let ts = Stopwatch.elapsed start return items |> Array.map (err ts e) } @@ -391,7 +391,7 @@ type Factory private () = // - Ok: Index at which next processing will proceed (which can trigger discarding of earlier items on that stream) // - Error: Records the processing of the stream in question as having faulted (the stream's pending events and/or // new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle) - handle: StreamState[] -> Async)>>, + handle: StreamState[] -> Async * TimeSpan)>>, // The responses from each handle invocation are passed to stats for periodic emission stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = @@ -419,7 +419,7 @@ type Factory private () = // - second component: Outcome (can be simply unit), to pass to the stats processor // - throwing marks the processing of a stream as having faulted (the stream's pending events and/or // new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle) - handle: FsCodec.StreamName -> Event[] -> Async, + handle: FsCodec.StreamName -> Event[] -> Async<'Outcome * int64>, // The 'Outcome from each handler invocation is passed to the Statistics processor by the scheduler for periodic emission stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = diff --git a/src/Propulsion.Kafka/ProducerSinks.fs b/src/Propulsion.Kafka/ProducerSinks.fs index 3f1b65ba..24b4a264 100644 --- a/src/Propulsion.Kafka/ProducerSinks.fs +++ b/src/Propulsion.Kafka/ProducerSinks.fs @@ -45,10 +45,10 @@ type StreamsProducerSink = | _ -> () do! producer.Produce(key, message, ct = ct) | ValueNone -> () - return struct (StreamResult.AllProcessed, outcome) + return struct (outcome, Events.nextIndex span) } Sync.Factory.StartAsync - ( log, maxReadAhead, maxConcurrentStreams, handle, StreamResult.toIndex, + ( log, maxReadAhead, maxConcurrentStreams, handle, stats, Event.renderedSize, Event.storedSize, maxBytes = maxBytes, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval, ?maxEvents = maxEvents, dumpExternalStats = producer.DumpStats) diff --git a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj index 86bcefe9..263bdbe0 100644 --- a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj +++ b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -22,8 +22,9 @@ + - + diff --git a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj index ddffdf47..695abef8 100644 --- a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj +++ b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + MEMORYSTORE @@ -26,8 +26,9 @@ - - + + + diff --git a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj index 09a48876..93edc96a 100644 --- a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj +++ b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -21,8 +21,9 @@ - - + + + diff --git a/src/Propulsion.MessageDb/Readme.md b/src/Propulsion.MessageDb/Readme.md index aae081c7..289bd464 100644 --- a/src/Propulsion.MessageDb/Readme.md +++ b/src/Propulsion.MessageDb/Readme.md @@ -43,7 +43,7 @@ let quickStart log stats categories handle = async { let handle stream (events: Propulsion.Sinks.Event[]) = async { // ... process the events - return Propulsion.Sinks.StreamResult.AllProcessed, () } + return (), Propulsion.Sinks.Events.nextIndex events } quickStart Log.Logger (createStats ()) [| category |] handle ``` diff --git a/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj b/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj index 3b145738..4f723686 100644 --- a/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj +++ b/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -22,8 +22,9 @@ - - + + + diff --git a/src/Propulsion/Propulsion.fsproj b/src/Propulsion/Propulsion.fsproj index 9ebeb9c6..9812123c 100644 --- a/src/Propulsion/Propulsion.fsproj +++ b/src/Propulsion/Propulsion.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + diff --git a/src/Propulsion/Sinks.fs b/src/Propulsion/Sinks.fs index f14bc201..35c4c7dc 100644 --- a/src/Propulsion/Sinks.fs +++ b/src/Propulsion/Sinks.fs @@ -17,36 +17,9 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit> module Events = /// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully) - let nextIndex: Event[] -> int64 = Streams.StreamSpan.ver + let nextIndex: Event[] -> int64 = Streams.StreamSpan.nextIndex /// The Index of the first event as supplied to this handler - let index: Event[] -> int64 = Streams.StreamSpan.idx - -/// Represents progress attained during the processing of the supplied Events for a given StreamName. -/// This will be reflected in adjustments to the Write Position for the stream in question. -/// Incoming StreamEvents with Indexes prior to the Write Position implied by the result are proactively -/// dropped from incoming buffers, yielding increased throughput due to reduction of redundant processing. -type StreamResult = - /// Indicates no events where processed. - /// Handler should be supplied the same events (plus any that arrived in the interim) in the next scheduling cycle. - | NoneProcessed - /// Indicates all Events supplied have been processed. - /// Write Position should move beyond the last event supplied. - | AllProcessed - /// Indicates only a subset of the presented events have been processed; - /// Write Position should remove count items from the Events supplied. - | PartiallyProcessed of count: int - /// Apply an externally observed Version determined by the handler during processing. - /// If the Version of the stream is running ahead or behind the current input StreamSpan, this enables one to have - /// events that have already been handled be dropped from the scheduler's buffers and/or as they arrive. - | OverrideNextIndex of version: int64 - -module StreamResult = - - let toIndex<'F> (span: FsCodec.ITimelineEvent<'F>[]) = function - | NoneProcessed -> span[0].Index - | AllProcessed -> span[0].Index + span.LongLength // all-but equivalent to Events.nextIndex span - | PartiallyProcessed count -> span[0].Index + int64 count - | OverrideNextIndex index -> index + let index: Event[] -> int64 = Streams.StreamSpan.index /// Internal helpers used to compute buffer sizes for stats module Event = @@ -65,62 +38,60 @@ type StreamState = Propulsion.Streams.Scheduling.Item [] type Factory private () = - /// Project Events using up to maxConcurrentStreams handle functions that yield a StreamResult and an Outcome to be fed to the Stats + /// Project Events using up to maxConcurrentStreams concurrent instances of a + /// handle function that yields an Outcome to be fed to the Stats, and an updated Stream Position static member StartConcurrentAsync<'Outcome> ( log, maxReadAhead, - maxConcurrentStreams, handle: Func>, + maxConcurrentStreams, handle: Func>, stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?commitInterval, [] ?ingesterStateInterval) = - Streams.Concurrent.Start<'Outcome, EventBody, StreamResult>( - log, maxReadAhead, maxConcurrentStreams, handle, StreamResult.toIndex, Event.storedSize, stats, + Streams.Concurrent.Start<'Outcome, EventBody>( + log, maxReadAhead, maxConcurrentStreams, handle, Event.storedSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) - /// Project Events sequentially via a handle function that yields a StreamResult per selected Item + /// Project Events sequentially via a handle function that yields an updated Stream Position and latency per selected Item static member StartBatchedAsync<'Outcome> ( log, maxReadAhead, select: Func, - handle: Func)>>>, + handle: Func * TimeSpan)>>>, stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?commitInterval, [] ?ingesterStateInterval) = - let handle items ct = task { - let! res = handle.Invoke(items, ct) - return seq { for i, (ts, r) in Seq.zip items res -> struct (ts, Result.map (StreamResult.toIndex i.span) r) } } Streams.Batched.Start(log, maxReadAhead, select, handle, Event.storedSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) - /// Project Events using up to maxConcurrentStreams concurrent instances of a handle function - /// Each dispatched handle invocation yields a StreamResult conveying progress, together with an Outcome to be fed to the Stats + /// Project Events using up to maxConcurrentStreams concurrent instances of a + /// handle function that yields an Outcome to be fed to the Stats, and an updated Stream Position static member StartConcurrent<'Outcome> ( log, maxReadAhead, - maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async, + maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async<'Outcome * int64>, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?commitInterval, [] ?ingesterStateInterval) = let handle' stream events ct = task { - let! res, outcome = handle stream events |> Async.executeAsTask ct - return struct (res, outcome) } + let! outcome, pos' = handle stream events |> Async.executeAsTask ct + return struct (outcome, pos') } Factory.StartConcurrentAsync(log, maxReadAhead, maxConcurrentStreams, handle', stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) - /// Project Events using up to maxConcurrentStreams concurrent instances of a handle function - /// Each dispatched handle invocation yields a StreamResult conveying progress, together with an Outcome to be fed to the Stats + /// Project Events using up to maxConcurrentStreams concurrent instances of a + /// handle function that yields an Outcome to be fed to the Stats, and an updated Stream Position /// Like StartConcurrent, but the events supplied to the Handler are constrained by maxBytes and maxEvents static member StartConcurrentChunked<'Outcome> ( log, maxReadAhead, - maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async, + maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async<'Outcome * int64>, stats: Sync.Stats<'Outcome>, // Default 1 ms ?idleDelay, @@ -133,16 +104,16 @@ type Factory private () = // Frequency of jettisoning Write Position state of inactive streams (held by the scheduler for deduplication purposes) to limit memory consumption // NOTE: Purging can impair performance, increase write costs or result in duplicate event emissions due to redundant inputs not being deduplicated ?purgeInterval) = - let handle' s xs ct = task { let! r, o = handle s xs |> Async.executeAsTask ct in return struct (r, o) } - Sync.Factory.StartAsync(log, maxReadAhead, maxConcurrentStreams, handle', StreamResult.toIndex, stats, Event.renderedSize, Event.storedSize, + let handle' s xs ct = task { let! o, pos' = handle s xs |> Async.executeAsTask ct in return struct (o, pos') } + Sync.Factory.StartAsync(log, maxReadAhead, maxConcurrentStreams, handle', stats, Event.renderedSize, Event.storedSize, ?dumpExternalStats = dumpExternalStats, ?idleDelay = idleDelay, ?maxBytes = maxBytes, ?maxEvents = maxEvents, ?purgeInterval = purgeInterval) /// Project Events by continually selecting and then dispatching a batch of streams to a handle function - /// Per handled stream, the result can be either a StreamResult conveying progress, or an exception + /// Per handled stream, the result can be either an updated Stream Position, or an exception static member StartBatched<'Outcome> ( log, maxReadAhead, select: StreamState seq -> StreamState[], - handle: StreamState[] -> Async)>>, + handle: StreamState[] -> Async * TimeSpan)>>, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, [] ?purgeInterval, diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index d17d7a1b..5d1c4016 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -84,57 +84,78 @@ module StreamSpan = type Metrics = (struct (int * int)) let metrics eventSize (xs: FsCodec.ITimelineEvent<'F>[]): Metrics = struct (xs.Length, xs |> Seq.sumBy eventSize) - let slice<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]): struct (Metrics * FsCodec.ITimelineEvent<'F>[]) = + let private trimEvents<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]) = let mutable count, bytes = 0, 0 let mutable countBudget, bytesBudget = maxEvents, maxBytes - let withinLimits y = + let withinLimits (x: FsCodec.ITimelineEvent<_>) = countBudget <- countBudget - 1 - let eventBytes = eventSize y + let eventBytes = eventSize x bytesBudget <- bytesBudget - eventBytes - // always send at least one event in order to surface the problem and have the stream marked malformed - let res = count = 0 || (countBudget >= 0 && bytesBudget >= 0) - if res then count <- count + 1; bytes <- bytes + eventBytes - res - let trimmed = span |> Array.takeWhile withinLimits - metrics eventSize trimmed, trimmed - - let inline idx (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index - let inline ver (span: FsCodec.ITimelineEvent<'F>[]) = idx span + span.LongLength - let dropBeforeIndex min: FsCodec.ITimelineEvent<_>[] -> FsCodec.ITimelineEvent<_>[] = function - | xs when xs.Length = 0 -> null - | xs when idx xs >= min -> xs // don't adjust if min not within - | v when ver v <= min -> null // throw away if before min - | xs -> xs |> Array.skip (min - idx xs |> int) // slice + let fitsAndNotAnUnfold = (countBudget >= 0 && bytesBudget >= 0) && not x.IsUnfold + if fitsAndNotAnUnfold then count <- count + 1; bytes <- bytes + eventBytes + fitsAndNotAnUnfold + let trimmedEvents = span |> Array.takeWhile withinLimits + // takeWhile terminated either because it hit the first Unfold, or the size limit + // In either case, if the next event is an Unfold, we know it (and any successors) must be associated with that final event + if span |> Array.tryItem trimmedEvents.Length |> Option.exists _.IsUnfold then span + else trimmedEvents + let slice<'F> eventSize limits (span: FsCodec.ITimelineEvent<'F>[]): struct (FsCodec.ITimelineEvent<'F>[] * Metrics) = + let trimmed = + // we must always send one event, even if it exceeds the limit (if the handler throws, the the Stats can categorize the problem to surface it) + if span[0].IsUnfold || span.Length = 1 || span[1].IsUnfold then span + // If we have 2 or more (non-Unfold) events, then we limit the batch size + else trimEvents<'F> eventSize limits span + trimmed, metrics eventSize trimmed + + let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index + let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) = + let l = span[span.Length - 1] + if l.IsUnfold then l.Index else l.Index + 1L + let inline dropBeforeIndex min = function + | [||] as xs -> xs + | xs when nextIndex xs < min -> Array.empty + | xs -> + match index xs with + | xi when xi = min -> xs + | xi -> xs |> Array.skip (min - xi |> int) let merge min (spans: FsCodec.ITimelineEvent<_>[][]) = - let candidates = [| - for span in spans do - if span <> null then - match dropBeforeIndex min span with - | null -> () - | trimmed when trimmed.Length = 0 -> invalidOp "Cant add empty" - | trimmed -> trimmed |] + let candidates = [| for span in spans do + if span <> null then + match dropBeforeIndex min span with + | [||] -> () + | xs -> xs |] if candidates.Length = 0 then null elif candidates.Length = 1 then candidates else - candidates |> Array.sortInPlaceBy idx + candidates |> Array.sortInPlaceBy index // no data buffered -> buffer first item - let mutable curr = candidates[0] + let mutable acc = candidates[0] let mutable buffer = null for i in 1 .. candidates.Length - 1 do let x = candidates[i] - let index = idx x - let currNext = ver curr - if index > currNext then // Gap - if buffer = null then buffer <- ResizeArray(candidates.Length) - buffer.Add curr - curr <- x + let xIndex = index x + let accNext = nextIndex acc + if xIndex > accNext then // Gap + match acc |> Array.filter (_.IsUnfold >> not) with + | [||] -> () + | eventsOnly -> + if buffer = null then buffer <- ResizeArray(candidates.Length) + buffer.Add eventsOnly + acc <- x // Overlapping, join - elif index + x.LongLength > currNext then - curr <- Array.append curr (dropBeforeIndex currNext x) - if buffer = null then Array.singleton curr - else buffer.Add curr; buffer.ToArray() + elif nextIndex x > accNext then + match dropBeforeIndex accNext x with + | [||] -> () + | news -> + acc <- [| for x in acc do if not x.IsUnfold then x + yield! news |] + match acc with + | [||] when buffer = null -> null + | [||] -> buffer.ToArray() + | last when buffer = null -> Array.singleton last + | last -> buffer.Add last; buffer.ToArray() /// A Single Event from an Ordered stream being supplied for ingestion into the internal data structures type StreamEvent<'Format> = (struct (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>)) @@ -151,7 +172,7 @@ module Buffer = if malformed then { write = WritePosMalformed; queue = queue } else StreamState<'Format>.Create(write, queue) static member Create(write, queue) = { write = (match write with ValueSome x -> x | ValueNone -> WritePosUnknown); queue = queue } - member x.IsEmpty = obj.ReferenceEquals(null, x.queue) + member x.IsEmpty = LanguagePrimitives.PhysicalEquality null x.queue member x.EventsSumBy(f) = if x.IsEmpty then 0L else x.queue |> Seq.map (Seq.sumBy f) |> Seq.sum |> int64 member x.EventsCount = if x.IsEmpty then 0 else x.queue |> Seq.sumBy Array.length @@ -292,6 +313,7 @@ module Scheduling = | ValueSome ss when not ss.IsEmpty && not ss.IsMalformed && (not requireAll || ss.QueuedIsAtWritePos) && not (busy.Contains s) -> ValueSome ss | _ -> ValueNone + member _.WritePos(stream) = tryGetItem stream |> ValueOption.bind _.WritePos member _.WritePositionIsAlreadyBeyond(stream, required) = match tryGetItem stream with // Example scenario: if a write reported we reached version 2, and we are ingesting an event that requires 2, then we drop it @@ -738,7 +760,7 @@ module Scheduling = abstract member HasCapacity: bool with get abstract member AwaitCapacity: CancellationToken -> Task abstract member TryReplenish: pending: seq> * handleStarted: (FsCodec.StreamName * int64 -> unit) -> struct (bool * bool) - abstract member InterpretProgress: StreamStates<'F> * FsCodec.StreamName * Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>) + abstract member InterpretProgress: StreamStates<'F> * FsCodec.StreamName * Result<'P, 'E> -> struct (Result<'R, 'E> * int64 voption) and [] Item<'Format> = { stream: FsCodec.StreamName; nextIndex: int64 voption; span: FsCodec.ITimelineEvent<'Format>[] } and [] InternalRes<'R> = { stream: FsCodec.StreamName; index: int64; event: string; duration: TimeSpan; result: 'R } @@ -821,14 +843,14 @@ module Scheduling = // Ingest information to be gleaned from processing the results into `streams` (i.e. remove stream requirements as they are completed) let handleResult ({ stream = stream; index = i; event = et; duration = duration; result = r }: InternalRes<_>) = match dispatcher.InterpretProgress(streams, stream, r) with - | ValueSome index', Ok (r: 'R) -> + | Ok (r: 'R), ValueSome index' -> batches.MarkStreamProgress(stream, index') streams.RecordProgress(stream, index') stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = index'; result = Ok r } - | ValueNone, Ok (r: 'R) -> + | Ok (r: 'R), ValueNone -> streams.RecordNoProgress(stream) stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = Ok r } - | _, Error exn -> + | Error exn, _ -> streams.RecordNoProgress(stream) stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = Error exn } let tryHandleResults () = tryApplyResults handleResult @@ -987,27 +1009,26 @@ module Dispatcher = /// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit) type Concurrent<'P, 'R, 'E, 'F> internal ( inner: ItemDispatcher, 'F>, - project: struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, - interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P,'E> -> struct (int64 voption * Result<'R, 'E>)) = + project: struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, + interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (Result<'R, 'E> * int64 voption)) = static member Create ( maxDop, // NOTE `project` must not throw under any circumstances, or the exception will go unobserved, and DOP will leak in the dispatcher project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task>, - interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) = + interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (Result<'R, 'E> * int64 voption)) = let project struct (startTs, item: Scheduling.Item<'F>) (ct: CancellationToken) = task { let! res = project item.stream item.span ct return Scheduling.InternalRes.create (item, Stopwatch.elapsed startTs, res) } Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress) - static member Create(maxDop, prepare: Func<_, _, _>, handle: Func<_, _, CancellationToken, Task<_>>, toIndex: Func<_, 'R, int64>) = + static member Create(maxDop, prepare: Func<_, _, _>, handle: Func<_, _, CancellationToken, Task<_>>) = let project stream span ct = task { - let struct (met, span: FsCodec.ITimelineEvent<'F>[]) = prepare.Invoke(stream, span) - try let! struct (spanResult, outcome) = handle.Invoke(stream, span, ct) - let index' = toIndex.Invoke(span, spanResult) - return Ok struct (index', met, outcome) - with e -> return Error struct (met, e) } + let struct (span: FsCodec.ITimelineEvent<'F>[], met) = prepare.Invoke(stream, span) + try let! struct (outcome, index') = handle.Invoke(stream, span, ct) + return Ok struct (outcome, index', met) + with e -> return Error struct (e, met) } let interpretProgress (_streams: Scheduling.StreamStates<'F>) _stream = function - | Ok struct (index', met, outcome) -> struct (ValueSome index', Ok struct (met, outcome)) - | Error struct (met, exn) -> ValueNone, Error struct (met, exn) + | Ok struct (outcome, index', met) -> struct (Ok struct (outcome, met), ValueSome index') + | Error struct (exn, met) -> Error struct (exn, met), ValueNone Concurrent<_, _, _, 'F>.Create(maxDop, project, interpretProgress) interface Scheduling.IDispatcher<'P, 'R, 'E, 'F> with [] override _.Result = inner.Result @@ -1023,9 +1044,9 @@ module Dispatcher = ( select: Func seq, Scheduling.Item<'F>[]>, // NOTE `handle` must not throw under any circumstances, or the exception will go unobserved handle: Scheduling.Item<'F>[] -> CancellationToken -> - Task>[]>) = + Task>[]>) = let inner = DopDispatcher 1 - let result = Event>>() + let result = Event>>() // On each iteration, we offer the ordered work queue to the selector // we propagate the selected streams to the handler @@ -1041,7 +1062,7 @@ module Dispatcher = hasCapacity <- false struct (dispatched, hasCapacity) - interface Scheduling.IDispatcher with + interface Scheduling.IDispatcher with [] override _.Result = result.Publish override _.Pump ct = task { use _ = inner.Result.Subscribe(Array.iter result.Trigger) @@ -1052,13 +1073,13 @@ module Dispatcher = override _.TryReplenish(pending, handleStarted) = trySelect pending handleStarted override _.InterpretProgress(_streams: Scheduling.StreamStates<_>, _stream: FsCodec.StreamName, res: Result<_, _>) = match res with - | Ok (met, pos') -> ValueSome pos', Ok (met, ()) - | Error (met, exn) -> ValueNone, Error (met, exn) + | Ok (pos', met) -> Ok ((), met), ValueSome pos' + | Error (exn, met) -> Error (exn, met), ValueNone [] type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, [] ?failThreshold, [] ?abendThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats( log, statsInterval, statesInterval, ?failThreshold = failThreshold, ?abendThreshold = abendThreshold, ?logExternalStats = logExternalStats) let mutable okStreams, okEvents, okBytes, exnStreams, exnCats, exnEvents, exnBytes = HashSet(), 0, 0L, HashSet(), Stats.Counters(), 0, 0L let mutable resultOk, resultExn = 0, 0 @@ -1079,14 +1100,14 @@ type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, override this.Handle res = match res with - | { stream = stream; result = Ok ((es, bs), outcome) } -> + | { stream = stream; result = Ok (outcome, (es, bs)) } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okBytes <- okBytes + int64 bs resultOk <- resultOk + 1 base.RecordOk res this.HandleOk outcome - | { duration = duration; stream = stream; index = index; event = et; result = Error ((es, bs), Exception.Inner exn) } -> + | { duration = duration; stream = stream; index = index; event = et; result = Error (Exception.Inner exn, (es, bs)) } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es @@ -1126,11 +1147,10 @@ type Concurrent private () = /// Custom projection mechanism that divides work into a prepare phase that selects the prefix of the queued StreamSpan to handle, /// and a handle function that yields a Write Position representing the next event that's to be handled on this Stream - static member StartEx<'Progress, 'Outcome, 'F, 'R> + static member StartEx<'Outcome, 'F> ( log: ILogger, maxReadAhead, maxConcurrentStreams, - prepare: Func[], struct(StreamSpan.Metrics * FsCodec.ITimelineEvent<'F>[])>, - handle: Func[], CancellationToken, Task>, - toIndex: Func[], 'R, int64>, + prepare: Func[], struct(FsCodec.ITimelineEvent<'F>[] * StreamSpan.Metrics)>, + handle: Func[], CancellationToken, Task>, eventSize, stats: Scheduling.Stats<_, _>, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, @@ -1143,7 +1163,7 @@ type Concurrent private () = [] ?idleDelay, [] ?requireAll, [] ?ingesterStateInterval, [] ?commitInterval) : Propulsion.SinkPipeline seq>> = - let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, 'F>.Create(maxConcurrentStreams, prepare, handle, toIndex) + let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, 'F>.Create(maxConcurrentStreams, prepare, handle) let dumpStreams logStreamStates _log = logStreamStates eventSize let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, defaultArg pendingBufferSize maxReadAhead, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, @@ -1152,10 +1172,9 @@ type Concurrent private () = ingesterStateInterval = defaultArg ingesterStateInterval stats.StateInterval.Period, ?commitInterval = commitInterval) /// Project Events using a handle function that yields a Write Position representing the next event that's to be handled on this Stream - static member Start<'Outcome, 'F, 'R> + static member Start<'Outcome, 'F> ( log: ILogger, maxReadAhead, maxConcurrentStreams, - handle: Func[], CancellationToken, Task>, - toIndex: Func[], 'R, int64>, + handle: Func[], CancellationToken, Task>, eventSize, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, @@ -1170,9 +1189,9 @@ type Concurrent private () = : Propulsion.SinkPipeline seq>> = let prepare _streamName span = let metrics = StreamSpan.metrics eventSize span - struct (metrics, span) - Concurrent.StartEx<'R, 'Outcome, 'F, 'R>( - log, maxReadAhead, maxConcurrentStreams, prepare, handle, toIndex, eventSize, stats, + struct (span, metrics) + Concurrent.StartEx<'Outcome, 'F>( + log, maxReadAhead, maxConcurrentStreams, prepare, handle, eventSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) @@ -1182,28 +1201,28 @@ type Batched private () = /// Establishes a Sink pipeline that continually dispatches to a single instance of a handle function /// Prior to the dispatch, the potential streams to include in the batch are identified by the select function - static member Start<'Progress, 'Outcome, 'F> + static member Start<'Outcome, 'F> ( log: ILogger, maxReadAhead, select: Func seq, Scheduling.Item<'F>[]>, - handle: Func[], CancellationToken, Task)>>>, + handle: Func[], CancellationToken, Task * TimeSpan)>>>, eventSize, stats: Scheduling.Stats<_, _>, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?ingesterStateInterval, [] ?commitInterval) : Propulsion.SinkPipeline seq>> = let handle (items: Scheduling.Item<'F>[]) ct - : Task>[]> = task { + : Task>[]> = task { let start = Stopwatch.timestamp () let err ts e (x: Scheduling.Item<_>) = let met = StreamSpan.metrics eventSize x.span - Scheduling.InternalRes.create (x, ts, Error struct (met, e)) + Scheduling.InternalRes.create (x, ts, Error struct (e, met)) try let! results = handle.Invoke(items, ct) return Array.ofSeq (Seq.zip items results |> Seq.map (function - | item, (ts, Ok index') -> + | item, (Ok index', ts) -> let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index') |> Array.ofSeq let met = StreamSpan.metrics eventSize used - Scheduling.InternalRes.create (item, ts, Ok struct (met, index')) - | item, (ts, Error e) -> err ts e item)) + Scheduling.InternalRes.create (item, ts, Ok struct (index', met)) + | item, (Error e, ts) -> err ts e item)) with e -> let ts = Stopwatch.elapsed start return items |> Array.map (err ts e) } diff --git a/src/Propulsion/Sync.fs b/src/Propulsion/Sync.fs index 47286230..43e7cf30 100644 --- a/src/Propulsion/Sync.fs +++ b/src/Propulsion/Sync.fs @@ -9,7 +9,7 @@ open System.Collections.Generic [] type Stats<'Outcome>(log: ILogger, statsInterval, stateInterval, [] ?failThreshold) = - inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) + inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) let mutable okStreams, okEvents, okBytes, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, HashSet(), 0, 0L let prepareStats = Stats.LatencyStats("prepare") override _.DumpStats() = @@ -24,14 +24,14 @@ type Stats<'Outcome>(log: ILogger, statsInterval, stateInterval, [] ? override this.Handle message = match message with - | { stream = stream; result = Ok ((es, bs), prepareElapsed, outcome) } -> + | { stream = stream; result = Ok (outcome, (es, bs), prepareElapsed) } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okBytes <- okBytes + int64 bs prepareStats.Record prepareElapsed base.RecordOk message this.HandleOk outcome - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + | { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } -> exnStreams.Add stream |> ignore exnEvents <- exnEvents + es exnBytes <- exnBytes + int64 bs @@ -44,8 +44,7 @@ type Factory private () = static member StartAsync ( log: ILogger, maxReadAhead, maxConcurrentStreams, - handle: Func[], CancellationToken, Task>, - toIndex: Func[], 'R, int64>, + handle: Func[], CancellationToken, Task>, stats: Stats<'Outcome>, sliceSize, eventSize, ?dumpExternalStats, ?idleDelay, ?maxBytes, ?maxEvents, ?purgeInterval, ?ingesterStateInterval, ?commitInterval) : SinkPipeline seq>> = @@ -53,25 +52,24 @@ type Factory private () = let maxEvents, maxBytes = defaultArg maxEvents 16384, (defaultArg maxBytes (1024 * 1024 - (*fudge*)4096)) let attemptWrite stream (events: FsCodec.ITimelineEvent<'F>[]) ct = task { - let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events + let struct (trimmed, met) = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events let prepareTs = Stopwatch.timestamp () - try let! res, outcome = handle.Invoke(stream, span', ct) - let index' = toIndex.Invoke(span', res) - return Ok struct (index', met, Stopwatch.elapsed prepareTs, outcome) - with e -> return Error struct (met, e) } + try let! outcome, index' = handle.Invoke(stream, trimmed, ct) + return Ok struct (outcome, index', met, Stopwatch.elapsed prepareTs) + with e -> return Error struct (e, met) } let interpretProgress _streams (stream: FsCodec.StreamName) = function - | Ok struct (i', met, prep, outcome) -> struct (ValueSome i', Ok struct (met, prep, outcome)) - | Error struct (struct (eventCount, bytesCount) as met, exn: exn) -> + | Ok struct (outcome, index', met, prep) -> struct (Ok struct (outcome, met, prep), ValueSome index') + | Error struct (exn: exn, (struct (eventCount, bytesCount) as met)) -> log.Warning(exn, "Handling {events:n0}e {bytes:n0}b for {stream} failed, retrying", eventCount, bytesCount, stream) - ValueNone, Error struct (met, exn) + Error struct (exn, met), ValueNone let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretProgress) let dumpStreams logStreamStates log = logStreamStates eventSize match dumpExternalStats with Some f -> f log | None -> () let scheduler = - Scheduling.Engine + Scheduling.Engine (dispatcher, stats, dumpStreams, pendingBufferSize = maxReadAhead, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval) Factory.Start(log, scheduler.Pump, maxReadAhead, scheduler, diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index cd225e25..b9131d43 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -127,7 +127,7 @@ module Helpers = do! handler (getConsumer()) (deserialize consumerId event) (log: ILogger).Information("BATCHED CONSUMER Handled {c} events in {l} streams", c, streams.Length) let ts = Stopwatch.elapsed ts - return seq { for x in streams -> struct (ts, Ok (Propulsion.Sinks.Events.nextIndex x.span)) } } + return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.nextIndex x.span), ts) } } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer = @@ -165,7 +165,7 @@ module Helpers = let handle _ (span: Propulsion.Sinks.Event[]) = async { for event in span do do! handler (getConsumer()) (deserialize consumerId event) - return Propulsion.Sinks.StreamResult.AllProcessed, () } + return (), Propulsion.Sinks.Events.nextIndex span } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer = diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 78c8ab51..9a062c1b 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -84,7 +84,7 @@ let ``It processes events for a category`` () = task { test <@ Array.chooseV Simple.codec.Decode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> if handled.Count >= 2000 then stop () - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + return struct ((), Propulsion.Sinks.Events.nextIndex events) } use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) let source = MessageDbSource( log, TimeSpan.FromMinutes 1, @@ -131,8 +131,8 @@ let ``It doesn't read the tail event again`` () = task { let stats = stats log - let handle _ _ _ = task { - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + let handle _ events _ = task { + return struct ((), Propulsion.Sinks.Events.nextIndex events) } use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) let batchSize = 10 let source = MessageDbSource( diff --git a/tests/Propulsion.Tests/SinkHealthTests.fs b/tests/Propulsion.Tests/SinkHealthTests.fs index de4455db..a8060af0 100644 --- a/tests/Propulsion.Tests/SinkHealthTests.fs +++ b/tests/Propulsion.Tests/SinkHealthTests.fs @@ -22,15 +22,15 @@ type Scenario(testOutput) = let sid n = FsCodec.StreamName.Internal.trust n let stuckSid = sid "a-stuck" let failingSid = sid "a-bad" - let handle sn _ = async { + let handle sn events = async { if sn = stuckSid then do! Async.Sleep (TimeSpan.FromMilliseconds 50) - return (Propulsion.Sinks.StreamResult.NoneProcessed, ()) + return ((), Propulsion.Sinks.Events.index events) elif sn = failingSid then return failwith "transient" else do! Async.Sleep (TimeSpan.FromSeconds 1) - return Propulsion.Sinks.StreamResult.AllProcessed, () } + return (), Propulsion.Sinks.Events.nextIndex events } let sink = Propulsion.Sinks.Factory.StartConcurrent(log, 2, 2, handle, stats) let dispose () = sink.Stop() @@ -67,7 +67,7 @@ type Scenario(testOutput) = pe.StuckStreams.Length = 1 && pe.FailingStreams.Length = 1 && all |> Seq.exists (fun struct (_s, age, _c) -> age > abendThreshold) @> - test <@ obj.ReferenceEquals(me, pe) @> - test <@ obj.ReferenceEquals(me, sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> } + test <@ LanguagePrimitives.PhysicalEquality me pe @> + test <@ LanguagePrimitives.PhysicalEquality me (sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> } interface IDisposable with member _.Dispose() = dispose () diff --git a/tests/Propulsion.Tests/SourceTests.fs b/tests/Propulsion.Tests/SourceTests.fs index dd49cdfa..7d5f7eac 100644 --- a/tests/Propulsion.Tests/SourceTests.fs +++ b/tests/Propulsion.Tests/SourceTests.fs @@ -14,7 +14,7 @@ type Scenario(testOutput) = let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with member _.HandleOk x = () member _.HandleExn(log, x) = () } - let handle _ _ _ = task { return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.nextIndex events) } let sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) let dispose () = sink.Stop() diff --git a/tests/Propulsion.Tests/StreamStateTests.fs b/tests/Propulsion.Tests/StreamStateTests.fs index a6d3387d..c961443d 100644 --- a/tests/Propulsion.Tests/StreamStateTests.fs +++ b/tests/Propulsion.Tests/StreamStateTests.fs @@ -1,27 +1,68 @@ module Propulsion.Tests.StreamStateTests +open Propulsion.Internal open Propulsion.Streams open Swensen.Unquote open Xunit +module FsCodec301 = // Not yet merged, https://github.com/jet/FsCodec/pull/123 + open FsCodec + open System + /// An Event or Unfold that's been read from a Store and hence has a defined Index on the Event Timeline. + [] + type TimelineEvent2<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) = + + static member Create(index, eventType, data, ?meta, ?eventId, ?correlationId, ?causationId, ?timestamp, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> = + let isUnfold = defaultArg isUnfold false + let meta = match meta with Some x -> x | None -> Unchecked.defaultof<_> + let eventId = match eventId with Some x -> x | None -> Guid.Empty + let ts = match timestamp with Some ts -> ts | None -> DateTimeOffset.UtcNow + let size = defaultArg size 0 + TimelineEvent2(index, eventType, data, meta, eventId, Option.toObj correlationId, Option.toObj causationId, ts, isUnfold, Option.toObj context, size) :> _ + + static member Create(index, inner: IEventData<'Format>, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> = + let isUnfold = defaultArg isUnfold false + let size = defaultArg size 0 + TimelineEvent2(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _ + + override _.ToString() = + let t = if isUnfold then "Unfold" else "Event" + $"{t} {eventType} @{index} {context}" + interface ITimelineEvent<'Format> with + member _.Index = index + member _.IsUnfold = isUnfold + member _.Context = context + member _.Size = size + member _.EventType = eventType + member _.Data = data + member _.Meta = meta + member _.EventId = eventId + member _.CorrelationId = correlationId + member _.CausationId = causationId + member _.Timestamp = timestamp +open FsCodec301 + let canonicalTime = System.DateTimeOffset.UtcNow -let mk p c: FsCodec.ITimelineEvent[] = - [| for x in 0..c-1 -> FsCodec.Core.TimelineEvent.Create(p + int64 x, p + int64 x |> string, null, timestamp = canonicalTime) |] +let mk_ p c seg uc: FsCodec.ITimelineEvent[] = + let mk id et isUnfold = TimelineEvent2.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg) + [| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false + for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |] +let mk p c = mk_ p c 0 0 let merge = StreamSpan.merge +let isSame = LanguagePrimitives.PhysicalEquality let dropBeforeIndex = StreamSpan.dropBeforeIndex let is (xs: FsCodec.ITimelineEvent[][]) (res: FsCodec.ITimelineEvent[][]) = - (xs = null && res = null) - || (xs, res) ||> Seq.forall2 (fun x y -> (x = null && y = null) - || (x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))) + (xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y) + || x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType)) let [] nothing () = let r = merge 0L [| mk 0L 0; mk 0L 0 |] - test <@ obj.ReferenceEquals(null, r) @> + test <@ isSame null r @> let [] synced () = let r = merge 1L [| mk 0L 1; mk 0L 0 |] - test <@ obj.ReferenceEquals(null, r) @> + test <@ isSame null r @> let [] ``no overlap`` () = let r = merge 0L [| mk 0L 1; mk 2L 2 |] @@ -41,7 +82,7 @@ let [] adjacent () = let [] ``adjacent to min`` () = let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |] - test <@ r |> is [| null; mk 2L 1 |] @> + test <@ r |> is [| [||]; mk 2L 1 |] @> let [] ``adjacent to min merge`` () = let r = merge 2L [| mk 0L 1; mk 1L 2 |] @@ -83,6 +124,64 @@ let [] ``fail 2`` () = let r = merge 11613L [| mk 11614L 1; null |] test <@ r |> is [| mk 11614L 1 |] @> +let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame + +let [] ``merges retain freshest unfolds, one per event type`` counts = + let input = [| + let mutable pos = 0L + let mutable seg = 0 + for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in (counts : _[]) do + let events = normal % 10 + let unfolds = unfolds % 10 + pos <- if gapOrOverlap < 0uy then max 0L (pos+int64 gapOrOverlap) else pos + int64 gapOrOverlap + yield mk_ pos events seg unfolds + pos <- pos + int64 events + seg <- seg + 1 |] + let res = merge 0L input + // The only way to end up with a null output is by sending either no spans, or all empties + if res = null then + test <@ input |> Array.forall Array.isEmpty @> + else + + // an Empty span sequence is replaced with null + test <@ res |> Array.any @> + // A Span sequence does not have any empty spans + test <@ res |> Array.forall Array.any @> + let all = res |> Array.concat + let unfolds, events = all |> Array.partition _.IsUnfold + // Events are always in order + test <@ (events |> Seq.sortBy _.Index) === events @> + // Unfolds are always in order + test <@ unfolds |> Seq.sortBy _.Index === unfolds @> + // Unfolds are always after events + test <@ all |> Seq.sortBy _.IsUnfold === all @> + // One unfold per type + test <@ unfolds |> Array.groupBy _.EventType |> Array.forall (fun (_et, xs) -> xs.Length = 1) @> + // Unfolds are always for the same Index (as preceding ones are invalidated by any newer event) + test <@ unfolds |> Array.forall (fun x -> x.Index = (Array.last all).Index) @> + // Version that Unfolds pertain to must always be > final event Index + test <@ match events |> Array.tryLast, unfolds |> Array.tryLast with + | Some le, Some lu -> lu.Index > le.Index + | _ -> true @> + + // resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span + test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.nextIndex x < StreamSpan.index y) @> + + let others = res |> Array.take (res.Length - 1) + // Only the last span can have unfolds + test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @> + + match res |> Array.last |> Array.last with + | u when u.IsUnfold -> + // If there are unfolds, they can only be the newest ones + test <@ input |> Array.forall (not << Array.exists (fun x -> x.IsUnfold && x.Index > u.Index)) @> + // if two sets of unfolds with identical Index values were supplied, the freshest ones must win + let uc = unbox u.Context + let newerUnfolds = Seq.concat input |> Seq.filter (fun x -> x.IsUnfold && x.Index = u.Index && unbox x.Context > uc) + test <@ newerUnfolds === [||] || uc = -1 @> + | _ -> () + // TODO verify that slice never orphans unfolds + #if MEMORY_USAGE_ANALYSIS // https://bartoszsypytkowski.com/writing-high-performance-f-code // https://github.com/SergeyTeplyakov/ObjectLayoutInspector diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index e1ea0c03..09fd2bec 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -204,9 +204,9 @@ type Stats(log: ILogger, statsInterval, stateInterval, logExternalStats) = accHam.Clear(); accSpam.Clear() accEventTypeLats.Clear() -let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async<_ * Outcome> = async { +let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async = async { let ham, spam = events |> Array.partition isValidEvent - return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 } + return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events } let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed" let run appName (c: Args.Configuration, p: ParseResults) = async { @@ -248,7 +248,7 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore - return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 } + return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events } Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats, requireAll = requireAll) | SubCommand.Sync sa ->