From 40c9ab4699e6d6138855f486d4fa1efdf46eae9e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 1 Aug 2024 17:01:54 +0100 Subject: [PATCH] feat!(Streams): Support propagating Unfolds; remove StreamResult (#264) --- CHANGELOG.md | 2 +- DOCUMENTATION.md | 9 +- Propulsion.sln.DotSettings | 2 + .../CosmosStorePruner.fs | 6 +- src/Propulsion.CosmosStore/CosmosStoreSink.fs | 115 ++-- src/Propulsion.CosmosStore/FeedObserver.fs | 4 +- .../Propulsion.CosmosStore.fsproj | 7 +- .../Propulsion.CosmosStore3.fsproj | 7 +- .../Propulsion.DynamoStore.fsproj | 7 +- src/Propulsion.EventStore/EventStoreSink.fs | 85 ++- .../Propulsion.EventStore.fsproj | 7 +- .../Propulsion.EventStoreDb.fsproj | 7 +- src/Propulsion.Feed/FeedReader.fs | 4 +- src/Propulsion.Feed/PeriodicSource.fs | 2 +- src/Propulsion.Feed/Propulsion.Feed.fsproj | 7 +- src/Propulsion.Kafka/Consumers.fs | 34 +- src/Propulsion.Kafka/ProducerSinks.fs | 4 +- src/Propulsion.Kafka/Propulsion.Kafka.fsproj | 5 +- .../Propulsion.MemoryStore.fsproj | 7 +- .../Propulsion.MessageDb.fsproj | 7 +- src/Propulsion.MessageDb/Readme.md | 2 +- .../Propulsion.SqlStreamStore.fsproj | 7 +- src/Propulsion/FeedMonitor.fs | 4 +- src/Propulsion/Internal.fs | 5 +- src/Propulsion/Propulsion.fsproj | 2 +- src/Propulsion/Sinks.fs | 71 +-- src/Propulsion/Streams.fs | 575 ++++++++++-------- src/Propulsion/Sync.fs | 46 +- .../ConsumersIntegration.fs | 4 +- .../Propulsion.MessageDb.Integration/Tests.fs | 6 +- tests/Propulsion.Tests/ProgressTests.fs | 6 +- tests/Propulsion.Tests/SinkHealthTests.fs | 10 +- tests/Propulsion.Tests/SourceTests.fs | 2 +- tests/Propulsion.Tests/StreamStateTests.fs | 151 ++++- tools/Propulsion.Tool/Sync.fs | 11 +- 35 files changed, 674 insertions(+), 556 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1c55a93..80c1c990 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,7 +47,6 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Scheduler`: `Struct`/`voption` conversions; buffer reuse [#157](https://github.com/jet/propulsion/pull/157) - `Scheduler`: Replaced `Thead.Sleep` with `Task.WhenAny`; Added Sleep time logging [#161](https://github.com/jet/propulsion/pull/161) - `Streams`: Changed dominant `ITimelineEvent` `EventBody` type from `byte[]` to `System.ReadOnlyMemory` (`Sinks.EventBody`) [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208) -- `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https://github.com/jet/propulsion/pull/208) - `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226) @@ -60,6 +59,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed - `Streams.StreamSpan`: Changed from a record to individual arguments of `FsCodec.StreamName` and `Sinks.Event[]` [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208) +- `Streams.SpanResult`: Replaced with `int64` to reflect the updated position [#264](https://github.com/jet/propulsion/pull/264) [#208](https://github.com/jet/propulsion/pull/208) - `Streams`: `statsInterval` is obtained from the `Stats` wherever one is supplied [#208](https://github.com/jet/propulsion/pull/208) - `Propulsion.Cosmos`: Should not be in general use - users should port to `Propulsion.CosmosStore3`, then `Propulsion.CosmosStore` [#193](https://github.com/jet/propulsion/pull/193) - `Destructurama.FSharp` dependency [#152](https://github.com/jet/propulsion/pull/152) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index d70c2389..b7e207b4 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -283,7 +283,7 @@ Typically, alerting should be set up based on the built in `busy` metrics provid - `failing`: streams that have had at least one failed Handler invocation (regardless of whether they are currently the subject of a retry Handler invocation or not). Typically it should be possible to define: - a reasonable limit before you'd want a low level alert to be raised - a point at which you raise an alarm on the basis that the system is in a state that will lead to a SLA breach and hence merits intervention -- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's `StreamResult`. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress) +- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the updated Stream Position yielded in the Handler's result. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress) Alongside alerting based on breaches of SLO limits, the values of the `busy` metrics are a key high level indicator of the health of a given Processor (along with the Handler Latency distribution). @@ -348,13 +348,12 @@ or the _Designing Data Intensive Applications_ book): - DynamoDb: requesting a 'consistent read' - CosmosDb: when using Session Consistency, require that reads are contingent on the session token being used by the feed reader. This can be achieved by using the same `CosmosClient` to ensure the session tokens are synchronized. 2. Perform a pre-flight check when reading, based on the `Index` of the newest event passed to the handler. In such a case, it may make sense to back off for a small period, before reporting failure to handle the event (by throwing an exception). The Handler will be re-invoked for another attempt, with a better chance of the event being reflected in the read. - - Once such a pre-flight check has been carried out, one can safely report `StreamResult.AllProcessed` (or `PartiallyProcessed` if you wish to defer some work due to the backlog of events triggering too much work to perform in a single invocation) 3. Perform the processing on a 'shoulder tap' basis, with the final position based on what you read. - First, load the stream's state, performing any required reactions. - - Then report the Version attained for the stream (based on the Index of the last event processed) by yielding a `StreamResult.OverrideNextIndex`. + - Then report the Version attained for the stream (based on the Index of the last event processed) as the Handler's updated Position for that Stream - In this case, one of following edge cases may result: - - _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield `StreamResult.OverrideNextIndex`, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched) - - _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `StreamResult.OverrideNextIndex 2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler. + - _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield the Position that the processing did see, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched) + - _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler. ### Consistency in the face of at least once delivery and re-traversal of events diff --git a/Propulsion.sln.DotSettings b/Propulsion.sln.DotSettings index 491108f7..ad9a014d 100644 --- a/Propulsion.sln.DotSettings +++ b/Propulsion.sln.DotSettings @@ -1,7 +1,9 @@  True + True True True + True True True True diff --git a/src/Propulsion.CosmosStore/CosmosStorePruner.fs b/src/Propulsion.CosmosStore/CosmosStorePruner.fs index da33960a..f0527d31 100644 --- a/src/Propulsion.CosmosStore/CosmosStorePruner.fs +++ b/src/Propulsion.CosmosStore/CosmosStorePruner.fs @@ -29,7 +29,7 @@ module Pruner = let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred) // For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events let writePos = max trimmedPos (untilIndex + 1L) - return struct (writePos, res) } + return struct (res, writePos) } type CosmosStorePrunerStats(log, statsInterval, stateInterval, [] ?failThreshold) = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) @@ -75,8 +75,8 @@ type CosmosStorePruner = #endif let interpret _stream span = let metrics = StreamSpan.metrics Event.storedSize span - struct (metrics, span) - Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r)) + struct (span, metrics) + Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil) let dumpStreams logStreamStates _log = logStreamStates Event.storedSize let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 7723a7bf..7563a756 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -15,8 +15,7 @@ module private Impl = type EventBody = byte[] // V4 defines one directly, here we shim it module StreamSpan = - let private toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray() - let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody + let toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray() // Trimmed edition of what V4 exposes module internal Equinox = module CosmosStore = @@ -34,10 +33,9 @@ module private Impl = // v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory rather than assuming and/or offering optimization for JSON bodies open System.Text.Json - let private toNativeEventBody (x: EventBody): JsonElement = + let toNativeEventBody (x: EventBody): JsonElement = if x.IsEmpty then JsonElement() - else JsonSerializer.Deserialize(x.Span) - let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody + else JsonSerializer.Deserialize(x.Span) #endif module Internal = @@ -49,38 +47,42 @@ module Internal = type [] Result = | Ok of updatedPos: int64 | Duplicate of updatedPos: int64 - | PartialDuplicate of overage: Event[] - | PrefixMissing of batch: Event[] * writePos: int64 - let logTo (log: ILogger) malformed (res: StreamName * Result) = - match res with - | stream, Ok (_, Result.Ok pos) -> + | PartialDuplicate of updatedPos: int64 + | PrefixMissing of gap: int * actualPos: int64 + let logTo (log: ILogger) (stream: FsCodec.StreamName): Result, Dispatcher.ExnAndMetrics> -> unit = function + | Ok (Result.Ok pos, _, _) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | stream, Ok (_, Result.Duplicate updatedPos) -> + | Ok (Result.Duplicate updatedPos, _, _) -> log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) - | stream, Ok (_, Result.PartialDuplicate overage) -> - log.Information("Requeuing {stream} {pos} ({count} events)", stream, overage[0].Index, overage.Length) - | stream, Ok (_, Result.PrefixMissing (batch, pos)) -> - log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, batch[0].Index - pos, batch.Length, batch[0].Index) - | stream, Error (_, exn) -> + | Ok (Result.PartialDuplicate updatedPos, _, _) -> + log.Information("Requeuing {stream} {pos}", stream, updatedPos) + | Ok (Result.PrefixMissing (gap, pos), _, _) -> + log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) + | Error (exn, malformed, _) -> let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information log.Write(level, exn, "Writing {stream} failed, retrying", stream) let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task { - log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) + let i = StreamSpan.index span + let n = StreamSpan.next span + let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody #if COSMOSV3 - let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _)) + span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds") + log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) + let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData) |> Async.executeAsTask ct #else - let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct) + span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore does not [yet] support ingesting unfolds") + let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct) #endif let res' = match res with | AppendResult.Ok pos -> Result.Ok pos.index | AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos -> match pos.index with - | actual when actual < span[0].Index -> Result.PrefixMissing (span, actual) - | actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual - | actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int)) + | actual when actual < i -> Result.PrefixMissing (i - actual |> int, actual) + | actual when actual >= n -> Result.Duplicate actual + | actual -> Result.PartialDuplicate actual log.Debug("Result: {res}", res') return res' } let containsMalformedMessage e = @@ -99,57 +101,55 @@ module Internal = type Dispatcher = - static member Create(log: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) = + static member Create(storeLog: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) = let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024) - let writerResultLog = log.ForContext() - let attemptWrite stream span ct = task { - let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span + let attemptWrite stream span revision ct = task { + let struct (trimmed, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span #if COSMOSV3 - try let! res = Writer.write log eventsContext (StreamName.toString stream) span' ct + try let! wr = Writer.write storeLog eventsContext (StreamName.toString stream) trimmed ct #else - try let! res = Writer.write log eventsContext stream span' ct + try let! wr = Writer.write storeLog eventsContext stream trimmed ct #endif - return Ok struct (met, res) - with e -> return Error struct (met, e) } - let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = - let applyResultToStreamState = function - | Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false) - | Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false - | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false - | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false - | Error struct (_stats, exn) -> - let malformed = Writer.classify exn |> Writer.isMalformed - streams.SetMalformed(stream, malformed), malformed - let struct (ss, malformed) = applyResultToStreamState res - Writer.logTo writerResultLog malformed (stream, res) - struct (ss.WritePos, res) - Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress) + let hp = wr |> function + | Writer.Result.Ok pos' -> Buffer.HandlerProgress.ofMetricsAndPos revision met pos' |> ValueSome + | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos' -> Buffer.HandlerProgress.ofPos pos' |> ValueSome + | Writer.Result.PrefixMissing _ -> ValueNone + return Ok struct (wr, hp, met) + with e -> return Error struct (e, Writer.classify e |> Writer.isMalformed, met) } + let interpretProgress = function + | Ok struct (_wr, hp, _met) as res -> struct (res, hp, false) + | Error struct (_exn, malformed, _met) as res -> res, ValueNone, malformed + Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress = interpretProgress) type WriterResult = Internal.Writer.Result -type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [] ?failThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( +type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, []?storeLog, [] ?failThreshold, [] ?logExternalStats) = + inherit Scheduling.Stats, Dispatcher.ExnAndMetrics>( log, statsInterval, stateInterval, ?failThreshold = failThreshold, logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump) - let mutable okStreams, okEvents, okBytes = HashSet(), 0, 0L - let mutable exnCats, exnStreams, exnEvents, exnBytes = Stats.Counters(), HashSet(), 0, 0L + let writerResultLog = (defaultArg storeLog log).ForContext() + let mutable okStreams, okEvents, okUnfolds, okBytes = HashSet(), 0, 0, 0L + let mutable exnCats, exnStreams, exnEvents, exnUnfolds, exnBytes = Stats.Counters(), HashSet(), 0, 0, 0L let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0 override _.Handle message = match message with - | { stream = stream; result = Ok ((es, bs), res) } -> + | { stream = stream; result = Ok (res, _hp, (es, us, bs)) as r } -> okStreams.Add stream |> ignore okEvents <- okEvents + es + okUnfolds <- okUnfolds + us okBytes <- okBytes + int64 bs match res with | WriterResult.Ok _ -> resultOk <- resultOk + 1 | WriterResult.Duplicate _ -> resultDup <- resultDup + 1 | WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 - base.RecordOk(message) - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + Internal.Writer.logTo writerResultLog stream r + base.RecordOk(message, us <> 0) + | { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) as r } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es + exnUnfolds <- exnUnfolds + us exnBytes <- exnBytes + int64 bs resultExn <- resultExn + 1 let kind = @@ -159,16 +159,17 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ OutcomeKind.Tagged "tooLarge" | Internal.Writer.ResultKind.Malformed -> OutcomeKind.Tagged "malformed" | Internal.Writer.ResultKind.Other -> OutcomeKind.Exn - base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn) + Internal.Writer.logTo writerResultLog stream r + base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es).ForContext("unfolds", us), exn) override _.DumpStats() = let results = resultOk + resultDup + resultPartialDup + resultPrefix - log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)", - Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix) - okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L + log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e {unfolds:n0}u ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)", + Log.miB okBytes, results, okStreams.Count, okEvents, okUnfolds, resultOk, resultDup, resultPartialDup, resultPrefix) + okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okUnfolds <-0; okBytes <- 0L if exnCats.Any then - log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e", - Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents) - resultExn <- 0; exnBytes <- 0L; exnEvents <- 0 + log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e {unfolds:n0}u", + Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents, exnUnfolds) + resultExn <- 0; exnBytes <- 0L; exnEvents <- 0; exnUnfolds <- 0 log.Warning(" Affected cats {@exnCats} Streams {@exnStreams}", exnCats.StatsDescending |> Seq.truncate 50, exnStreams |> Seq.truncate 100) exnCats.Clear(); exnStreams.Clear() diff --git a/src/Propulsion.CosmosStore/FeedObserver.fs b/src/Propulsion.CosmosStore/FeedObserver.fs index 7bba42f7..5f40afe0 100644 --- a/src/Propulsion.CosmosStore/FeedObserver.fs +++ b/src/Propulsion.CosmosStore/FeedObserver.fs @@ -20,9 +20,7 @@ module Log = /// Attach a property to the captured event record to hold the metric information let internal withMetric (value: Metric) = Log.withScalarProperty PropertyTag value let [] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption = - let mutable p = Unchecked.defaultof<_> - logEvent.Properties.TryGetValue(PropertyTag, &p) |> ignore - match p with Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone + match logEvent.Properties.TryGetValue PropertyTag with true, Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone [] type ChangeFeedContext = { group: string; rangeId: int; epoch: int64; timestamp: DateTime; requestCharge: float } diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index be39b183..5ed6d069 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + @@ -26,8 +26,9 @@ - - + + + diff --git a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj index b31cf269..f033e850 100644 --- a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj +++ b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + COSMOSV3 @@ -45,8 +45,9 @@ - - + + + diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index 03103a6e..958c0603 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -4,7 +4,7 @@ net6.0 - + DYNAMOSTORE @@ -31,8 +31,9 @@ - - + + + diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index c99f1522..ed8d98f8 100755 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -21,81 +21,76 @@ module Internal = type [] Result = | Ok of updatedPos: int64 | Duplicate of updatedPos: int64 - | PartialDuplicate of overage: Event[] - | PrefixMissing of batch: Event[] * writePos: int64 + | PartialDuplicate of updatedPos: int64 + | PrefixMissing of gap: int * actualPos: int64 - let logTo (log: ILogger) (res: FsCodec.StreamName * Result) = - match res with - | stream, Ok (_, Result.Ok pos) -> + let logTo (log: ILogger) (stream: FsCodec.StreamName): Result, Dispatcher.ExnAndMetrics> -> unit = function + | Ok (Result.Ok pos, _, _) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | stream, Ok (_, Result.Duplicate updatedPos) -> + | Ok (Result.Duplicate updatedPos, _, _) -> log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) - | stream, Ok (_, Result.PartialDuplicate overage) -> - log.Information("Requeuing {stream} {pos} ({count} events)", stream, overage[0].Index, overage.Length) - | stream, Ok (_, Result.PrefixMissing (batch, pos)) -> - log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", - stream, batch[0].Index - pos, batch.Length, batch[0].Index) - | stream, Error (_, exn) -> + | Ok (Result.PartialDuplicate updatedPos, _, _) -> + log.Information("Requeuing {stream} {pos}", stream, updatedPos) + | Ok (Result.PrefixMissing (gap, pos), _, _) -> + log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) + | Error (exn, _, _) -> log.Warning(exn,"Writing {stream} failed, retrying", stream) let write (log: ILogger) (context: EventStoreContext) stream (span: Event[]) ct = task { - log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) + let i = StreamSpan.index span + log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) #if EVENTSTORE_LEGACY - let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _)) + let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _)) #else - let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _), ct) + let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _), ct) #endif - let ress = + let res' = match res with | GatewaySyncResult.Written (Token.Unpack pos') -> Result.Ok (pos'.streamVersion + 1L) | GatewaySyncResult.ConflictUnknown (Token.Unpack pos) -> match pos.streamVersion + 1L with - | actual when actual < span[0].Index -> Result.PrefixMissing (span, actual) - | actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual - | actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int)) - log.Debug("Result: {res}", ress) - return ress } + | actual when actual < i -> Result.PrefixMissing (i - actual |> int, actual) + | actual when actual >= i + span.LongLength -> Result.Duplicate actual + | actual -> Result.PartialDuplicate actual + log.Debug("Result: {res}", res') + return res' } type [] ResultKind = TimedOut | Other type Dispatcher = - static member Create(log: ILogger, storeLog, connections: _[], maxDop) = - let writerResultLog = log.ForContext() + static member Create(storeLog, connections: _[], maxDop) = let mutable robin = 0 - let attemptWrite stream span ct = task { + let attemptWrite stream span _revision ct = task { let index = System.Threading.Interlocked.Increment(&robin) % connections.Length let selectedConnection = connections[index] let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096 - let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span - try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct - return Ok struct (met, res) - with e -> return Error struct (met, e) } - let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = - let applyResultToStreamState = function - | Ok struct (_stats, Writer.Result.Ok pos) -> streams.RecordWriteProgress(stream, pos, null) - | Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null) - | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]) - | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]) - | Error struct (_stats, _exn) -> streams.SetMalformed(stream, false) - let ss = applyResultToStreamState res - Writer.logTo writerResultLog (stream, res) - struct (ss.WritePos, res) - Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretProgress) + let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span + try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span ct + let hp = res |> function + | Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos' -> Buffer.HandlerProgress.ofPos pos' |> ValueSome + | Writer.Result.PrefixMissing _ -> ValueNone + return Ok struct (res, hp, met) + with e -> return Error struct (e, false, met) } + let interpretProgress = function + | Ok struct (_res, hp, _met) as res -> struct (res, hp, false) + | Error struct (_exn, malformed, _met) as res -> res, ValueNone, malformed + Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, project = attemptWrite, interpretProgress = interpretProgress) type WriterResult = Internal.Writer.Result type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [] ?failThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats, Dispatcher.ExnAndMetrics>( log, statsInterval, stateInterval, ?failThreshold = failThreshold, logExternalStats = defaultArg logExternalStats Log.InternalMetrics.dump) + let writerResultLog = log.ForContext() let mutable okStreams, okEvents, okBytes, exnCats, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, Stats.Counters(), HashSet(), 0 , 0L let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0 override _.Handle message = match message with - | { stream = stream; result = Ok ((es, bs), res) } -> + | { stream = stream; result = Ok (res, _hp, (es, us, bs)) as r } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okBytes <- okBytes + int64 bs @@ -104,13 +99,15 @@ type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ resultDup <- resultDup + 1 | WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 - base.RecordOk(message) - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + Internal.Writer.logTo writerResultLog stream r + base.RecordOk(message, us <> 0) + | { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, _us, bs)) as r } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es exnBytes <- exnBytes + int64 bs resultExn <- resultExn + 1 + Internal.Writer.logTo writerResultLog stream r base.RecordExn(message, OutcomeKind.classify exn, log.ForContext("stream", stream).ForContext("events", es), exn) override _.DumpStats() = let results = resultOk + resultDup + resultPartialDup + resultPrefix @@ -138,7 +135,7 @@ type EventStoreSink = // Tune the sleep time when there are no items to schedule or responses to process. Default 1ms. ?idleDelay, ?ingesterStateInterval, ?commitInterval): SinkPipeline = - let dispatcher = Internal.Dispatcher.Create(log, storeLog, connections, maxConcurrentStreams) + let dispatcher = Internal.Dispatcher.Create(storeLog, connections, maxConcurrentStreams) let scheduler = let dumpStreams logStreamStates _log = logStreamStates Event.storedSize Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?idleDelay = idleDelay) diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 039f17e4..6fe17629 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -4,7 +4,7 @@ net6.0 - + EVENTSTORE_LEGACY @@ -24,8 +24,9 @@ - - + + + diff --git a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj index 6019d145..20531b40 100644 --- a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj +++ b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -22,8 +22,9 @@ - - + + + diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index 5c69945d..70062151 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -34,9 +34,7 @@ module Log = /// Attach a property to the captured event record to hold the metric information let internal withMetric (value: Metric) = Log.withScalarProperty PropertyTag value let [] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption = - let mutable p = Unchecked.defaultof<_> - logEvent.Properties.TryGetValue(PropertyTag, &p) |> ignore - match p with Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone + match logEvent.Properties.TryGetValue PropertyTag with true, Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone type internal Stats(partition: int, source: SourceId, tranche: TrancheId) = diff --git a/src/Propulsion.Feed/PeriodicSource.fs b/src/Propulsion.Feed/PeriodicSource.fs index 80236c72..b332d102 100644 --- a/src/Propulsion.Feed/PeriodicSource.fs +++ b/src/Propulsion.Feed/PeriodicSource.fs @@ -38,7 +38,7 @@ module private TimelineEvent = fun (i, x: FsCodec.IEventData<_>, context: obj) -> if i > DateTimeOffsetPosition.factor then invalidArg (nameof i) $"Index may not exceed %d{DateTimeOffsetPosition.factor}" FsCodec.Core.TimelineEvent.Create( - baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = true, context = context) + baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = false, context = context) [] type SourceItem<'F> = { streamName: FsCodec.StreamName; eventData: FsCodec.IEventData<'F>; context: obj } diff --git a/src/Propulsion.Feed/Propulsion.Feed.fsproj b/src/Propulsion.Feed/Propulsion.Feed.fsproj index 60d12042..82cc59ae 100644 --- a/src/Propulsion.Feed/Propulsion.Feed.fsproj +++ b/src/Propulsion.Feed/Propulsion.Feed.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + @@ -22,8 +22,9 @@ - - + + + diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index 1b01b69c..b96e854e 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -181,15 +181,15 @@ module Core = static member Start<'Info, 'Outcome> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, - prepare, maxDop, handle: Func>, - stats: Scheduling.Stats, + prepare, maxDop, handle: Func>, + stats: Scheduling.Stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let dumpStreams logStreamStates log = logExternalState |> Option.iter (fun f -> f log) logStreamStates Event.storedSize let scheduler = Scheduling.Engine( - Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, prepare, handle, StreamResult.toIndex), stats, dumpStreams, pendingBufferSize = 5, + Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, prepare, handle), stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) let mapConsumedMessagesToStreamsBatch onCompletion (x: Submission.Batch): struct (_ * Buffer.Batch) = let onCompletion () = x.onCompletion(); onCompletion() @@ -203,7 +203,7 @@ module Core = ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let prepare _streamName span = let metrics = StreamSpan.metrics Event.storedSize span - struct (metrics, span) + struct (span, metrics) StreamsConsumer.Start<'Info, 'Outcome>( log, config, consumeResultToInfo, infoToStreamEvents, prepare, maxDop, handle, stats, ?logExternalState = logExternalState, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) @@ -229,7 +229,7 @@ module Core = ( log: ILogger, config: KafkaConsumerConfig, // often implemented via StreamNameSequenceGenerator.KeyValueToStreamEvent keyValueToStreamEvents: KeyValuePair -> StreamEvent seq, - handle: Func>, maxDop, + handle: Func>, maxDop, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = StreamsConsumer.Start, 'Outcome>( @@ -333,7 +333,7 @@ type Factory private () = static member StartConcurrentAsync<'Outcome> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToStreamEvents: ConsumeResult<_, _> -> StreamEvent seq, - maxDop, handle: Func>, stats, + maxDop, handle: Func>, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = Core.StreamsConsumer.Start, 'Outcome>( @@ -343,21 +343,21 @@ type Factory private () = static member StartBatchedAsync<'Info> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, - select, handle: Func[], CancellationToken, Task)>>>, stats, + select, handle: Func[], CancellationToken, Task * TimeSpan)>>>, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let handle (items: Scheduling.Item[]) ct - : Task>[]> = task { + : Task>[]> = task { let start = Stopwatch.timestamp () let inline err ts e (x: Scheduling.Item<_>) = let met = StreamSpan.metrics Event.renderedSize x.span - Scheduling.InternalRes.create (x, ts, Result.Error struct (met, e)) + Scheduling.InternalRes.create (x, ts, Result.Error struct (e, false, met)) try let! results = handle.Invoke(items, ct) - return Array.ofSeq (Seq.zip items results |> Seq.map(function - | item, (ts, Ok index') -> - let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq + return Array.ofSeq (Seq.zip items results |> Seq.map (function + | item, (Ok index', ts) -> + let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index') |> Array.ofSeq let metrics = StreamSpan.metrics Event.storedSize used - Scheduling.InternalRes.create (item, ts, Result.Ok struct (metrics, index')) - | item, (ts, Error e) -> err ts e item)) + Scheduling.InternalRes.create (item, ts, Result.Ok struct (index', metrics)) + | item, (Error e, ts) -> err ts e item)) with e -> let ts = Stopwatch.elapsed start return items |> Array.map (err ts e) } @@ -391,9 +391,9 @@ type Factory private () = // - Ok: Index at which next processing will proceed (which can trigger discarding of earlier items on that stream) // - Error: Records the processing of the stream in question as having faulted (the stream's pending events and/or // new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle) - handle: StreamState[] -> Async)>>, + handle: StreamState[] -> Async * TimeSpan)>>, // The responses from each handle invocation are passed to stats for periodic emission - stats, + stats: Propulsion.Streams.Stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let handle' xs ct = handle xs |> Async.executeAsTask ct Factory.StartBatchedAsync<'Info>(log, config, consumeResultToInfo, infoToStreamEvents, select, handle', stats, @@ -419,7 +419,7 @@ type Factory private () = // - second component: Outcome (can be simply unit), to pass to the stats processor // - throwing marks the processing of a stream as having faulted (the stream's pending events and/or // new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle) - handle: FsCodec.StreamName -> Event[] -> Async, + handle: FsCodec.StreamName -> Event[] -> Async<'Outcome * int64>, // The 'Outcome from each handler invocation is passed to the Statistics processor by the scheduler for periodic emission stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = diff --git a/src/Propulsion.Kafka/ProducerSinks.fs b/src/Propulsion.Kafka/ProducerSinks.fs index 3f1b65ba..b3e6affa 100644 --- a/src/Propulsion.Kafka/ProducerSinks.fs +++ b/src/Propulsion.Kafka/ProducerSinks.fs @@ -45,10 +45,10 @@ type StreamsProducerSink = | _ -> () do! producer.Produce(key, message, ct = ct) | ValueNone -> () - return struct (StreamResult.AllProcessed, outcome) + return struct (outcome, Events.next span) } Sync.Factory.StartAsync - ( log, maxReadAhead, maxConcurrentStreams, handle, StreamResult.toIndex, + ( log, maxReadAhead, maxConcurrentStreams, handle, stats, Event.renderedSize, Event.storedSize, maxBytes = maxBytes, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval, ?maxEvents = maxEvents, dumpExternalStats = producer.DumpStats) diff --git a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj index 86bcefe9..263bdbe0 100644 --- a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj +++ b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -22,8 +22,9 @@ + - + diff --git a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj index ddffdf47..695abef8 100644 --- a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj +++ b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + MEMORYSTORE @@ -26,8 +26,9 @@ - - + + + diff --git a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj index 09a48876..93edc96a 100644 --- a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj +++ b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -21,8 +21,9 @@ - - + + + diff --git a/src/Propulsion.MessageDb/Readme.md b/src/Propulsion.MessageDb/Readme.md index aae081c7..289bd464 100644 --- a/src/Propulsion.MessageDb/Readme.md +++ b/src/Propulsion.MessageDb/Readme.md @@ -43,7 +43,7 @@ let quickStart log stats categories handle = async { let handle stream (events: Propulsion.Sinks.Event[]) = async { // ... process the events - return Propulsion.Sinks.StreamResult.AllProcessed, () } + return (), Propulsion.Sinks.Events.nextIndex events } quickStart Log.Logger (createStats ()) [| category |] handle ``` diff --git a/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj b/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj index 3b145738..4f723686 100644 --- a/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj +++ b/src/Propulsion.SqlStreamStore/Propulsion.SqlStreamStore.fsproj @@ -4,7 +4,7 @@ net6.0 - + @@ -22,8 +22,9 @@ - - + + + diff --git a/src/Propulsion/FeedMonitor.fs b/src/Propulsion/FeedMonitor.fs index 2e22690b..196f6ef7 100644 --- a/src/Propulsion/FeedMonitor.fs +++ b/src/Propulsion/FeedMonitor.fs @@ -83,10 +83,10 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId match waitMode with | OriginalWorkOnly -> log.Information("FeedMonitor {totalTime} Awaiting {starting} Checkpoint {completed}", TimeSpan.humanize elapsed, startReadPositions, completed) - | IncludeSubsequent -> log.Information("FeedMonitor {totalTime:n1}s Awaiting Running. Tails {current} Start {starting} Completed {completed}", + | IncludeSubsequent -> log.Information("FeedMonitor {totalTime} Awaiting Running. Tails {current} Start {starting} Completed {completed}", TimeSpan.humanize elapsed, currentRead, startReadPositions, completed) | AwaitFullyCaughtUp -> let draining = current |> choose (fun v -> if TranchePosition.isDrained v then ValueNone else ValueSome ()) |> Array.map ValueTuple.fst - log.Information("FeedMonitor {totalTime:n1}s Awaiting Tails {tranches}. Tails {current} Start {starting} Completed {completed}", + log.Information("FeedMonitor {totalTime} Awaiting Tails {tranches}. Tails {current} Start {starting} Completed {completed}", TimeSpan.humanize elapsed, draining, currentRead, startReadPositions, completed) let busy () = let current = fetchPositions () diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index 0619edc7..5c6acc83 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -208,6 +208,9 @@ module ValueTuple = let inline snd struct (_f, s) = s let inline ofKvp (x: System.Collections.Generic.KeyValuePair<_, _>) = struct (x.Key, x.Value) let inline toKvp struct (k, v) = System.Collections.Generic.KeyValuePair(k, v) + let inline groupWith ([] f) xs = + Seq.groupBy fst xs + |> Seq.map (fun (k, xs) -> struct (k, xs |> Seq.map snd |> f)) module ValueOption = @@ -227,7 +230,7 @@ module Seq = let tryPickV f (xs: _ seq) = use e = xs.GetEnumerator() let mutable res = ValueNone - while (ValueOption.isNone res && e.MoveNext()) do + while ValueOption.isNone res && e.MoveNext() do res <- f e.Current res let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () } diff --git a/src/Propulsion/Propulsion.fsproj b/src/Propulsion/Propulsion.fsproj index 9ebeb9c6..9812123c 100644 --- a/src/Propulsion/Propulsion.fsproj +++ b/src/Propulsion/Propulsion.fsproj @@ -4,7 +4,7 @@ net6.0 - 3.0.0-rc.13 + diff --git a/src/Propulsion/Sinks.fs b/src/Propulsion/Sinks.fs index f14bc201..a4160cf2 100644 --- a/src/Propulsion/Sinks.fs +++ b/src/Propulsion/Sinks.fs @@ -17,36 +17,9 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit> module Events = /// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully) - let nextIndex: Event[] -> int64 = Streams.StreamSpan.ver + let next: Event[] -> int64 = Streams.StreamSpan.next /// The Index of the first event as supplied to this handler - let index: Event[] -> int64 = Streams.StreamSpan.idx - -/// Represents progress attained during the processing of the supplied Events for a given StreamName. -/// This will be reflected in adjustments to the Write Position for the stream in question. -/// Incoming StreamEvents with Indexes prior to the Write Position implied by the result are proactively -/// dropped from incoming buffers, yielding increased throughput due to reduction of redundant processing. -type StreamResult = - /// Indicates no events where processed. - /// Handler should be supplied the same events (plus any that arrived in the interim) in the next scheduling cycle. - | NoneProcessed - /// Indicates all Events supplied have been processed. - /// Write Position should move beyond the last event supplied. - | AllProcessed - /// Indicates only a subset of the presented events have been processed; - /// Write Position should remove count items from the Events supplied. - | PartiallyProcessed of count: int - /// Apply an externally observed Version determined by the handler during processing. - /// If the Version of the stream is running ahead or behind the current input StreamSpan, this enables one to have - /// events that have already been handled be dropped from the scheduler's buffers and/or as they arrive. - | OverrideNextIndex of version: int64 - -module StreamResult = - - let toIndex<'F> (span: FsCodec.ITimelineEvent<'F>[]) = function - | NoneProcessed -> span[0].Index - | AllProcessed -> span[0].Index + span.LongLength // all-but equivalent to Events.nextIndex span - | PartiallyProcessed count -> span[0].Index + int64 count - | OverrideNextIndex index -> index + let index: Event[] -> int64 = Streams.StreamSpan.index /// Internal helpers used to compute buffer sizes for stats module Event = @@ -65,62 +38,60 @@ type StreamState = Propulsion.Streams.Scheduling.Item [] type Factory private () = - /// Project Events using up to maxConcurrentStreams handle functions that yield a StreamResult and an Outcome to be fed to the Stats + /// Project Events using up to maxConcurrentStreams concurrent instances of a + /// handle function that yields an Outcome to be fed to the Stats, and an updated Stream Position static member StartConcurrentAsync<'Outcome> ( log, maxReadAhead, - maxConcurrentStreams, handle: Func>, + maxConcurrentStreams, handle: Func>, stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?commitInterval, [] ?ingesterStateInterval) = - Streams.Concurrent.Start<'Outcome, EventBody, StreamResult>( - log, maxReadAhead, maxConcurrentStreams, handle, StreamResult.toIndex, Event.storedSize, stats, + Streams.Concurrent.Start<'Outcome, EventBody>( + log, maxReadAhead, maxConcurrentStreams, handle, Event.storedSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) - /// Project Events sequentially via a handle function that yields a StreamResult per selected Item + /// Project Events sequentially via a handle function that yields an updated Stream Position and latency per selected Item static member StartBatchedAsync<'Outcome> ( log, maxReadAhead, select: Func, - handle: Func)>>>, + handle: Func * TimeSpan)>>>, stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?commitInterval, [] ?ingesterStateInterval) = - let handle items ct = task { - let! res = handle.Invoke(items, ct) - return seq { for i, (ts, r) in Seq.zip items res -> struct (ts, Result.map (StreamResult.toIndex i.span) r) } } Streams.Batched.Start(log, maxReadAhead, select, handle, Event.storedSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) - /// Project Events using up to maxConcurrentStreams concurrent instances of a handle function - /// Each dispatched handle invocation yields a StreamResult conveying progress, together with an Outcome to be fed to the Stats + /// Project Events using up to maxConcurrentStreams concurrent instances of a + /// handle function that yields an Outcome to be fed to the Stats, and an updated Stream Position static member StartConcurrent<'Outcome> ( log, maxReadAhead, - maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async, + maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async<'Outcome * int64>, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?commitInterval, [] ?ingesterStateInterval) = let handle' stream events ct = task { - let! res, outcome = handle stream events |> Async.executeAsTask ct - return struct (res, outcome) } + let! outcome, pos' = handle stream events |> Async.executeAsTask ct + return struct (outcome, pos') } Factory.StartConcurrentAsync(log, maxReadAhead, maxConcurrentStreams, handle', stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) - /// Project Events using up to maxConcurrentStreams concurrent instances of a handle function - /// Each dispatched handle invocation yields a StreamResult conveying progress, together with an Outcome to be fed to the Stats + /// Project Events using up to maxConcurrentStreams concurrent instances of a + /// handle function that yields an Outcome to be fed to the Stats, and an updated Stream Position /// Like StartConcurrent, but the events supplied to the Handler are constrained by maxBytes and maxEvents static member StartConcurrentChunked<'Outcome> ( log, maxReadAhead, - maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async, + maxConcurrentStreams, handle: FsCodec.StreamName -> Event[] -> Async<'Outcome * int64>, stats: Sync.Stats<'Outcome>, // Default 1 ms ?idleDelay, @@ -133,16 +104,16 @@ type Factory private () = // Frequency of jettisoning Write Position state of inactive streams (held by the scheduler for deduplication purposes) to limit memory consumption // NOTE: Purging can impair performance, increase write costs or result in duplicate event emissions due to redundant inputs not being deduplicated ?purgeInterval) = - let handle' s xs ct = task { let! r, o = handle s xs |> Async.executeAsTask ct in return struct (r, o) } - Sync.Factory.StartAsync(log, maxReadAhead, maxConcurrentStreams, handle', StreamResult.toIndex, stats, Event.renderedSize, Event.storedSize, + let handle' s xs ct = task { let! o, pos' = handle s xs |> Async.executeAsTask ct in return struct (o, pos') } + Sync.Factory.StartAsync(log, maxReadAhead, maxConcurrentStreams, handle', stats, Event.renderedSize, Event.storedSize, ?dumpExternalStats = dumpExternalStats, ?idleDelay = idleDelay, ?maxBytes = maxBytes, ?maxEvents = maxEvents, ?purgeInterval = purgeInterval) /// Project Events by continually selecting and then dispatching a batch of streams to a handle function - /// Per handled stream, the result can be either a StreamResult conveying progress, or an exception + /// Per handled stream, the result can be either an updated Stream Position, or an exception static member StartBatched<'Outcome> ( log, maxReadAhead, select: StreamState seq -> StreamState[], - handle: StreamState[] -> Async)>>, + handle: StreamState[] -> Async * TimeSpan)>>, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, [] ?purgeInterval, diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index d17d7a1b..1ba5cf82 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -50,9 +50,7 @@ module Log = /// Attach a property to the captured event record to hold the metric information let internal withMetric (value: Metric) = Log.withScalarProperty PropertyTag value let tryGetScalar<'t> key (logEvent: Serilog.Events.LogEvent): 't voption = - let mutable p = Unchecked.defaultof<_> - logEvent.Properties.TryGetValue(key, &p) |> ignore - match p with Log.ScalarValue (:? 't as e) -> ValueSome e | _ -> ValueNone + match logEvent.Properties.TryGetValue key with true, Log.ScalarValue (:? 't as e) -> ValueSome e | _ -> ValueNone let [] GroupTag = "group" let [] (|MetricEvent|_|) logEvent = match tryGetScalar PropertyTag logEvent with @@ -81,112 +79,166 @@ module StreamName = /// Manipulates contiguous set of Events from a Ordered stream, as held internally within this module module StreamSpan = - type Metrics = (struct (int * int)) + type Metrics = (struct (int * int * int)) let metrics eventSize (xs: FsCodec.ITimelineEvent<'F>[]): Metrics = - struct (xs.Length, xs |> Seq.sumBy eventSize) - let slice<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]): struct (Metrics * FsCodec.ITimelineEvent<'F>[]) = - let mutable count, bytes = 0, 0 + (struct (0, 0, 0), xs) ||> Seq.fold (fun struct (es, us, bs) x -> + let s = eventSize x + if x.IsUnfold then es, us + 1, bs + s + else es + 1, us, bs + s) + let private trimEvents<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]) = let mutable countBudget, bytesBudget = maxEvents, maxBytes - let withinLimits y = + let fitsInBudget (x: FsCodec.ITimelineEvent<_>) = countBudget <- countBudget - 1 - let eventBytes = eventSize y - bytesBudget <- bytesBudget - eventBytes - // always send at least one event in order to surface the problem and have the stream marked malformed - let res = count = 0 || (countBudget >= 0 && bytesBudget >= 0) - if res then count <- count + 1; bytes <- bytes + eventBytes - res - let trimmed = span |> Array.takeWhile withinLimits - metrics eventSize trimmed, trimmed - - let inline idx (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index - let inline ver (span: FsCodec.ITimelineEvent<'F>[]) = idx span + span.LongLength - let dropBeforeIndex min: FsCodec.ITimelineEvent<_>[] -> FsCodec.ITimelineEvent<_>[] = function - | xs when xs.Length = 0 -> null - | xs when idx xs >= min -> xs // don't adjust if min not within - | v when ver v <= min -> null // throw away if before min - | xs -> xs |> Array.skip (min - idx xs |> int) // slice - - let merge min (spans: FsCodec.ITimelineEvent<_>[][]) = - let candidates = [| - for span in spans do - if span <> null then - match dropBeforeIndex min span with - | null -> () - | trimmed when trimmed.Length = 0 -> invalidOp "Cant add empty" - | trimmed -> trimmed |] - if candidates.Length = 0 then null - elif candidates.Length = 1 then candidates - else - candidates |> Array.sortInPlaceBy idx - - // no data buffered -> buffer first item - let mutable curr = candidates[0] - let mutable buffer = null - for i in 1 .. candidates.Length - 1 do - let x = candidates[i] - let index = idx x - let currNext = ver curr - if index > currNext then // Gap - if buffer = null then buffer <- ResizeArray(candidates.Length) - buffer.Add curr - curr <- x - // Overlapping, join - elif index + x.LongLength > currNext then - curr <- Array.append curr (dropBeforeIndex currNext x) - if buffer = null then Array.singleton curr - else buffer.Add curr; buffer.ToArray() + bytesBudget <- bytesBudget - eventSize x + (countBudget >= 0 && bytesBudget >= 0 && not x.IsUnfold) // Stop at unfolds; if they belong, we need to supply all + || (countBudget = maxEvents - 1) // We need to guarantee to yield at least one Event, even if it's outside of the size limit + let trimmedEvents = span |> Array.takeWhile fitsInBudget + // takeWhile terminated either because it hit the first Unfold, or the size limit + match span |> Array.tryItem trimmedEvents.Length with + | Some successor when successor.IsUnfold -> span // If takeWhile stopped on an Unfold we all remaining belong with the preceding event + | _ -> trimmedEvents + let slice<'F> eventSize limits (span: FsCodec.ITimelineEvent<'F>[]): struct (FsCodec.ITimelineEvent<'F>[] * Metrics) = + let trimmed = + // we must always send one event, even if it exceeds the limit (if the handler throws, the the Stats can categorize the problem to surface it) + if span.Length = 1 || span[0].IsUnfold || span[1].IsUnfold then span + // If we have 2 or more (non-Unfold) events, then we limit the batch size + else trimEvents<'F> eventSize limits span + trimmed, metrics eventSize trimmed + + let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index + let inline next (span: FsCodec.ITimelineEvent<'F>[]) = let l = span[span.Length - 1] in if l.IsUnfold then l.Index else l.Index + 1L + let inline dropBefore i = function + | [||] as xs -> xs + | xs when next xs < i -> Array.empty + | xs -> + match index xs with + | xi when xi = i -> xs + | xi -> xs |> Array.skip (i - xi |> int) + let private coalesce xs = + xs |> Array.sortInPlaceBy index + let mutable outputs, acc = null, xs[0] + for x in xs |> Seq.skip 1 do + match next acc with + | accNext when index x > accNext -> // Gap + if acc |> Seq.exists (_.IsUnfold >> not) then + if outputs = null then outputs <- ResizeArray(xs.Length) + outputs.Add(acc |> Array.filter (_.IsUnfold >> not)) + acc <- x + | accNext when next x >= accNext -> // Overlapping; join + match dropBefore accNext x with + | [||] -> () + | news -> acc <- [| yield! acc |> Seq.filter (_.IsUnfold >> not); yield! news |] + | _ -> () + match acc with + | [||] when outputs = null -> null + | [||] -> outputs.ToArray() + | unified when outputs = null -> Array.singleton unified + | tail -> outputs.Add tail; outputs.ToArray() + let private normalize (inputs: FsCodec.ITimelineEvent<_>[][]) = + match inputs |> Array.filter (function null | [||] -> false | _ -> true) with + | [||] -> null + | [| _ |] as alreadyUnified -> alreadyUnified + | multiple -> coalesce multiple + let merge min (inputs: FsCodec.ITimelineEvent<_>[][]) = + inputs |> Array.map (dropBefore min) |> normalize + let stripUnfolds (xq: FsCodec.ITimelineEvent<_>[][]) = + if xq = null then xq + else xq |> Array.map (Array.filter (fun x -> not x.IsUnfold)) |> normalize /// A Single Event from an Ordered stream being supplied for ingestion into the internal data structures type StreamEvent<'Format> = (struct (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>)) module Buffer = + type Revision = int + and [] revision + module Revision = + open FSharp.UMX + let initial: Revision = % -1 + let increment (x: Revision): Revision = % (% x + 1) + type HandlerProgress = (struct (int64 * Revision)) + module HandlerProgress = + let ofPos pos: HandlerProgress = (pos, Revision.initial) + let ofMetricsAndPos revision ((_es, us, _bs): StreamSpan.Metrics) pos: HandlerProgress = if us <> 0 then (pos, revision) else ofPos pos + let [] WritePosUnknown = -2L // sentinel value for write position signifying `None` (no write position yet established) let [] WritePosMalformed = -3L // sentinel value for malformed data /// Buffers events for a stream, tolerating gaps and out of order arrival (see requireAll for scenarios dictating this need) /// Optimized Representation as this is the dominant one in terms of memory usage - takes it from 24b to a cache-friendlier 16b [] - type StreamState<'Format> = private { write: int64; queue: FsCodec.ITimelineEvent<'Format>[][] } with - static member Create(write, queue, malformed) = - if malformed then { write = WritePosMalformed; queue = queue } - else StreamState<'Format>.Create(write, queue) - static member Create(write, queue) = { write = (match write with ValueSome x -> x | ValueNone -> WritePosUnknown); queue = queue } - member x.IsEmpty = obj.ReferenceEquals(null, x.queue) - member x.EventsSumBy(f) = if x.IsEmpty then 0L else x.queue |> Seq.map (Seq.sumBy f) |> Seq.sum |> int64 - member x.EventsCount = if x.IsEmpty then 0 else x.queue |> Seq.sumBy Array.length + type StreamState<'Format> = private { write: int64; revision: Revision; queue: FsCodec.ITimelineEvent<'Format>[][] } with + static member Create(write, queue, revision) = { write = defaultValueArg write WritePosUnknown; revision = revision; queue = queue } + static member Create(write, queue, revision, malformed) = StreamState<'Format>.Create((if malformed then ValueSome WritePosMalformed else write), queue, revision) + member x.IsEmpty = LanguagePrimitives.PhysicalEquality null x.queue + member x.EventsSumBy(f) = x.queue |> Seq.map (Seq.sumBy f) |> Seq.sum |> int64 + member x.EventsCount = x.queue |> Seq.sumBy Array.length member x.HeadSpan = x.queue[0] - member x.QueuePos = x.HeadSpan[0].Index member x.IsMalformed = not x.IsEmpty && WritePosMalformed = x.write - member x.QueuedIsAtWritePos = match x.write with WritePosUnknown -> x.QueuePos = 0L | w -> w = x.QueuePos + member x.QueuedIsAtWritePos = match x.write with WritePosUnknown -> x.HeadSpan[0].Index = 0L | w -> w = x.HeadSpan[0].Index member x.WritePos = match x.write with WritePosUnknown | WritePosMalformed -> ValueNone | w -> ValueSome w - member x.CanPurge = x.IsEmpty + // Count of number of times the the Unfolds held in the queue have changed (typically due to events such as the CosmosDB ChangeFeed delivering a new edition of an Item) + member x.QueueRevision = x.revision + member x.TailHasUnfoldAtIndex(index) = + let tailSpan = x.queue |> Array.last + let tailEvent = tailSpan |> Array.last + tailEvent.IsUnfold && tailEvent.Index = index + + type ProgressRequirement = (struct (int64 * Revision voption)) + // example: when we reach position 1 on the stream (having handled event 0), and the required position was 1, we remove the requirement + // NOTE Any unfolds that accompany event 0 will also bear Index 0 + // NOTE 2: subsequent updates to Unfolds will bear the same Index of 0 until there is an Event with Index 1 + module ProgressRequirement = + let ofPos pos: ProgressRequirement = pos, ValueNone + let ofPosUnfoldRevision pos rev: ProgressRequirement = pos, ValueSome rev + let isSatisfiedBy ((updatedPos, dispatchedRevision): HandlerProgress): ProgressRequirement -> bool = function + | xPos, _ when updatedPos > xPos -> true + | xPos, ValueNone -> updatedPos = xPos + | xPos, ValueSome xRev when updatedPos = xPos -> dispatchedRevision >= xRev + | _ -> false + let inline compute index hadUnfold (x: StreamState<'Format>): ProgressRequirement voption = + // If the queue is empty, or the write position is already beyond the requirement, it has already been handled + if x.IsEmpty || x.WritePos |> ValueOption.exists (fun wp -> wp > index) then ValueNone + // If there's an unfold at the tail, we can't checkpoint until it's been handled, a fresher unfold has been handled, or a successor event has moved the position past it + elif hadUnfold && x.TailHasUnfoldAtIndex index then ofPosUnfoldRevision index x.QueueRevision |> ValueSome + else ofPos index |> ValueSome module StreamState = let combine (s1: StreamState<_>) (s2: StreamState<_>): StreamState<'Format> = - let writePos = max s1.WritePos s2.WritePos let malformed = s1.IsMalformed || s2.IsMalformed - let any1 = not (isNull s1.queue) - let any2 = not (isNull s2.queue) - if any1 || any2 then - let items = if any1 && any2 then Array.append s1.queue s2.queue elif any1 then s1.queue else s2.queue - StreamState<'Format>.Create(writePos, StreamSpan.merge (defaultValueArg writePos 0L) items, malformed) - else StreamState<'Format>.Create(writePos, null, malformed) + let writePos = max s1.WritePos s2.WritePos + let queue = + let any1 = not (isNull s1.queue) + let any2 = not (isNull s2.queue) + if any1 || any2 then + let items = if any1 && any2 then Array.append s1.queue s2.queue elif any1 then s1.queue else s2.queue + StreamSpan.merge (defaultValueArg writePos 0L) items + else null + let maybeLastUnfold = function null -> ValueNone | q -> let (li: FsCodec.ITimelineEvent<_>) = Array.last q |> Array.last in if li.IsUnfold then ValueSome li else ValueNone + let changed = + match maybeLastUnfold queue, maybeLastUnfold s1.queue with + | ValueNone, ValueNone -> false + | ValueNone, ValueSome _ + | ValueSome _, ValueNone -> true + | ValueSome l1, ValueSome l2 -> LanguagePrimitives.PhysicalEquality l1 l2 |> not + let revision = if changed then Revision.increment s1.revision else s1.revision + StreamState<'Format>.Create(writePos, queue, revision, malformed) + let tryTrimUnfoldsIffPosAndRevisionStill ((pos, revision): HandlerProgress) ({ write = xw; revision = xr; queue = xq } as x) = + if xw <> pos || xr <> revision then ValueNone + else ValueSome { x with revision = Revision.increment xr; queue = StreamSpan.stripUnfolds xq } type Streams<'Format>() = let states = Dictionary>() let merge stream (state: StreamState<_>) = - let mutable current = Unchecked.defaultof<_> - if states.TryGetValue(stream, ¤t) then states[stream] <- StreamState.combine current state - else states.Add(stream, state) + match states.TryGetValue stream with + | true, current -> states[stream] <- StreamState.combine current state + | false, _ -> states.Add(stream, state) - member _.Merge(stream, event: FsCodec.ITimelineEvent<'Format>) = - merge stream (StreamState<'Format>.Create(ValueNone, [| [| event |] |])) - - member _.States = states :> seq>> + member internal _.States = states :> seq>> member _.Merge(other: Streams<'Format>) = for x in other.States do merge x.Key x.Value + member _.Merge(stream, events: FsCodec.ITimelineEvent<'Format>[]) = merge stream (StreamState<'Format>.Create(ValueNone, [| events |], Revision.initial)) member _.Dump(log: ILogger, estimateSize, categorize) = let mutable waiting, waitingE, waitingB = 0, 0, 0L @@ -197,7 +249,7 @@ module Buffer = waitingCats.Ingest(categorize stream) if sz <> 0L then let sn, wp = FsCodec.StreamName.toString stream, defaultValueArg state.WritePos 0L - waitingStreams.Ingest(sprintf "%s@%dx%d" sn wp state.queue[0].Length, (sz + 512L) / 1024L) + waitingStreams.Ingest(sprintf "%s@%dx%d" sn wp state.HeadSpan.Length, (sz + 512L) / 1024L) waiting <- waiting + 1 waitingE <- waitingE + state.EventsCount waitingB <- waitingB + sz @@ -207,20 +259,37 @@ module Buffer = if waitingCats.Any then log.Information(" Waiting Streams, KB {@readyStreams}", Seq.truncate 5 waitingStreams.StatsDescending) [] - type Batch private (onCompletion, reqs: Dictionary) = + type Batch private (onCompletion, reqs: Dictionary, unfoldReqs: ISet, eventsCount, unfoldsCount) = static member Create(onCompletion, streamEvents: StreamEvent<'Format> seq) = - let streams, reqs = Streams<'Format>(), Dictionary() - for struct (stream, event) in streamEvents do - streams.Merge(stream, event) - match reqs.TryGetValue(stream), event.Index + 1L with - | (false, _), required -> reqs[stream] <- required - | (true, actual), required when actual < required -> reqs[stream] <- required - | (true, _), _ -> () // replayed same or earlier item - struct (streams, Batch(onCompletion, reqs)) - + let streams, reqs, unfoldReqs = Streams<'Format>(), Dictionary(), HashSet() + let mutable eventsCount, unfoldsCount = 0, 0 + for struct (stream, eventsAndUnfolds) in streamEvents |> ValueTuple.groupWith Seq.toArray do + let unfolds, events = eventsAndUnfolds |> Array.partition _.IsUnfold + let mutable hwm = -1L + // for events, we tolerate mis-ordered items within a batch (but they should not be there and this only makes sense for requireAll mode) + for event in events do + let asBatch = [| event |] + streams.Merge(stream, asBatch) + hwm <- asBatch |> StreamSpan.next |> max hwm + eventsCount <- eventsCount + events.Length + match unfolds with + | [||] -> () + | unfolds -> + unfoldsCount <- unfoldsCount + unfolds.Length + unfoldReqs.Add stream |> ignore + let next = unfolds |> StreamSpan.next + // Drop all but the last set + let unfolds = unfolds |> Array.filter (fun x -> x.Index = next) + hwm <- max hwm next + streams.Merge(stream, unfolds) + reqs.Add(stream, hwm) + struct (streams, Batch(onCompletion, reqs, unfoldReqs, eventsCount, unfoldsCount)) member val OnCompletion = onCompletion member val StreamsCount = reqs.Count + member val EventsCount = eventsCount + member val UnfoldsCount = unfoldsCount member val Reqs = reqs :> seq> + member val UnfoldReqs = unfoldReqs type [] OutcomeKind = Ok | Tagged of string | Exn module OutcomeKind = @@ -254,32 +323,32 @@ type HealthCheckException(oldestStuck, oldestFailing, stuckStreams, failingStrea module Scheduling = open Buffer - type StreamStates<'Format>() = let states = Dictionary>() - - let tryGetItem stream = - let mutable x = Unchecked.defaultof<_> - if states.TryGetValue(stream, &x) then ValueSome x else ValueNone let merge stream (state: StreamState<_>) = - match tryGetItem stream with - | ValueSome current -> + match states.TryGetValue stream with + | true, current -> let updated = StreamState.combine current state states[stream] <- updated updated - | ValueNone -> + | false, _ -> states.Add(stream, state) state - let markCompleted stream index = - merge stream (StreamState<'Format>.Create(ValueSome index, queue = null, malformed = false)) |> ignore - let updateWritePos stream isMalformed pos span = - merge stream (StreamState<'Format>.Create(pos, span, isMalformed)) + let updateStreamState stream = function + | Error malformed -> + // Flag that the data at the head of the stream is triggering a non-transient error condition from the handler, preventing any further handler dispatches for `stream` + merge stream (StreamState<'Format>.Create(ValueNone, null, Revision.initial, malformed = malformed)) |> ignore + | Ok (updatedPos, _dispatchedRevision as up: HandlerProgress) -> + // Ensure we have a position (in case it got purged); Drop any events or unfolds implied by updatedPos + merge stream (StreamState<'Format>.Create(ValueSome updatedPos, null, Revision.initial)) + // Strip unfolds out of the queue if the handler reported the position as unchanged, but the unfolds were included in the invocation + |> StreamState.tryTrimUnfoldsIffPosAndRevisionStill up |> ValueOption.iter (fun trimmed -> states[ stream ] <- trimmed) let purge () = let mutable purged = 0 for x in states do let streamState = x.Value - if streamState.CanPurge then - states.Remove x.Key |> ignore // Safe to do while iterating on netcore >=3.0 + if streamState.IsEmpty then + states.Remove x.Key |> ignore // Safe to do while iterating on netcore >= 3.0 purged <- purged + 1 states.Count, purged @@ -287,41 +356,29 @@ module Scheduling = let markBusy stream = busy.Add stream |> ignore let markNotBusy stream = busy.Remove stream |> ignore - member _.ChooseDispatchable(s: FsCodec.StreamName, requireAll): StreamState<'Format> voption = - match tryGetItem s with - | ValueSome ss when not ss.IsEmpty && not ss.IsMalformed && (not requireAll || ss.QueuedIsAtWritePos) && not (busy.Contains s) -> ValueSome ss - | _ -> ValueNone - - member _.WritePositionIsAlreadyBeyond(stream, required) = - match tryGetItem stream with - // Example scenario: if a write reported we reached version 2, and we are ingesting an event that requires 2, then we drop it - | ValueSome ss -> match ss.WritePos with ValueSome cw -> cw >= required | ValueNone -> false - | ValueNone -> false // If the entry has been purged, or we've yet to visit a stream, we can't drop them - member _.Merge(streams: Streams<'Format>) = - for kv in streams.States do - merge kv.Key kv.Value |> ignore - member _.RecordWriteProgress(stream, pos, queue) = - merge stream (StreamState<'Format>.Create(ValueSome pos, queue)) - member _.SetMalformed(stream, isMalformed) = - updateWritePos stream isMalformed ValueNone null - member _.Purge() = - purge () + member _.ToProgressRequirement(stream, index, hadUnfold): ProgressRequirement voption = + match states.TryGetValue stream with + | false, _ -> ValueNone // if there's no state for the stream, then it's all already written (and purged) + | true, ss -> ProgressRequirement.compute index hadUnfold ss member _.HeadSpanSizeBy(f: _ -> int) stream = - match tryGetItem stream with - | ValueSome state when not state.IsEmpty -> state.HeadSpan |> Array.sumBy f |> int64 + match states.TryGetValue stream with + | true, state when not state.IsEmpty -> state.HeadSpan |> Array.sumBy f |> int64 | _ -> 0L - member _.MarkBusy stream = - markBusy stream + member _.ChooseDispatchable(s: FsCodec.StreamName, requireAll): StreamState<'Format> voption = + match states.TryGetValue s with + | true, ss when not ss.IsEmpty && not ss.IsMalformed && (not requireAll || ss.QueuedIsAtWritePos) && not (busy.Contains s) -> ValueSome ss + | _ -> ValueNone - member _.RecordProgress(stream, index) = - markNotBusy stream - markCompleted stream index + member _.Merge(buffered: Streams<'Format>) = for kv in buffered.States do merge kv.Key kv.Value |> ignore + member _.Purge() = purge () - member _.RecordNoProgress stream = + member _.LockForWrite stream = + markBusy stream + member _.DropHandledEventsAndUnlock(stream, outcome) = + updateStreamState stream outcome markNotBusy stream - member _.Dump(log: ILogger, totalPurged: int, eventSize) = let mutable (busyCount, busyE, busyB), (ready, readyE, readyB), synced = (0, 0, 0L), (0, 0, 0L), 0 let mutable (waiting, waitingE, waitingB), (malformed, malformedE, malformedB) = (0, 0, 0L), (0, 0, 0L) @@ -338,7 +395,8 @@ module Scheduling = busyB <- busyB + sz busyE <- busyE + state.EventsCount else - let cat, label = StreamName.categorize stream, sprintf "%s@%dx%d" (FsCodec.StreamName.toString stream) state.QueuePos state.HeadSpan.Length + let cat = StreamName.categorize stream + let label = let hs = state.HeadSpan in sprintf "%s@%dx%d" (FsCodec.StreamName.toString stream) (StreamSpan.index hs) hs.Length if state.IsMalformed then malformedCats.Ingest(cat) malformedStreams.Ingest(label, Log.miB sz |> int64) @@ -420,9 +478,9 @@ module Scheduling = let state = Dictionary() member _.HandleResult(sn, isStuck, startTs) = if not isStuck then state.Remove sn |> ignore - else let mutable v = Unchecked.defaultof<_> - if state.TryGetValue(sn, &v) then v.count <- v.count + 1 - else state.Add(sn, { ts = startTs; count = 1 }) + else match state.TryGetValue sn with + | true, v -> v.count <- v.count + 1 + | false, _ -> state.Add(sn, { ts = startTs; count = 1 }) member _.State = walkAges state |> renderState member _.Stats = renderStats state member _.Contains sn = state.ContainsKey sn @@ -477,8 +535,6 @@ module Scheduling = let sw = Stopwatch.start() member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts - // If we did not dispatch, we attempt ingestion of streams as a standalone task, but need to add to dispatch time to compensate for calcs below - member x.RecordDispatchNone ts = x.RecordDispatch ts member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts member _.RecordStats ts = stats <- stats + Stopwatch.elapsedTicks ts @@ -556,7 +612,7 @@ module Scheduling = let monitor, monitorInterval = Stats.Busy.Monitor(), IntervalTimer(TimeSpan.FromSeconds 1.) let stateStats = Stats.StateStats() let lats = LatencyStats() - let mutable cycles, batchesCompleted, batchesStarted, streamsStarted, eventsStarted, streamsWrittenAhead, eventsWrittenAhead = 0, 0, 0, 0, 0, 0, 0 + let mutable cycles, batchesCompleted, batchesStarted, streams, skipped, unfolded, events, unfolds = 0, 0, 0, 0, 0, 0, 0, 0 member val Log = log member val Latency = lats @@ -567,9 +623,9 @@ module Scheduling = member x.DumpStats(struct (dispatchActive, dispatchMax), struct (batchesWaiting, batchesRunning), abend) = let batchesCompleted = System.Threading.Interlocked.Exchange(&batchesCompleted, 0) - log.Information("Batches waiting {waiting} started {started} {streams:n0}s {events:n0}e skipped {streamsSkipped:n0}s {eventsSkipped:n0}e completed {completed} Running {active}", - batchesWaiting, batchesStarted, streamsStarted, eventsStarted, streamsWrittenAhead, eventsWrittenAhead, batchesCompleted, batchesRunning) - batchesStarted <- 0; streamsStarted <- 0; eventsStarted <- 0; streamsWrittenAhead <- 0; eventsWrittenAhead <- 0; (*batchesCompleted <- 0*) + log.Information("Batches Waiting {waiting} Started {started} {streams:n0}s ({skipped:n0} skipped {streamsUnfolded:n0} unfolded) {events:n0}e {unfolds:n0}u | Completed {completed} Running {active}", + batchesWaiting, batchesStarted, streams, skipped, unfolded, events, unfolds, batchesCompleted, batchesRunning) + batchesStarted <- 0; streams <- 0; skipped <- 0; unfolded <- 0; events <- 0; unfolds <- 0; (*batchesCompleted <- 0*) x.Timers.Dump log log.Information("Scheduler {cycles} cycles {@states} Running {busy}/{processors}", cycles, stateStats.StatsDescending, dispatchActive, dispatchMax) @@ -584,12 +640,13 @@ module Scheduling = member _.HasLongRunning = monitor.OldestFailing.TotalSeconds > longRunningThresholdS member _.Classify sn = monitor.Classify(longRunningThresholdS, sn) - member _.RecordIngested(streams, events, skippedStreams, skippedEvents) = + member _.RecordIngested(streamsCount, streamsSkipped, streamsUnfolded, eventCount, unfoldCount) = batchesStarted <- batchesStarted + 1 - streamsStarted <- streamsStarted + streams - eventsStarted <- eventsStarted + events - streamsWrittenAhead <- streamsWrittenAhead + skippedStreams - eventsWrittenAhead <- eventsWrittenAhead + skippedEvents + streams <- streams + streamsCount + skipped <- skipped + streamsSkipped + unfolded <- unfolded + streamsUnfolded + events <- events + eventCount + unfolds <- unfolds + unfoldCount member _.RecordBatchCompletion() = System.Threading.Interlocked.Increment(&batchesCompleted) |> ignore @@ -620,8 +677,7 @@ module Scheduling = abstract member Handle: Res> -> unit - member private x.RecordOutcomeKind(r, k) = - let progressed = r.index' > r.index + member private x.RecordOutcomeKind(r, k, progressed) = monitor.HandleResult(r.stream, succeeded = OutcomeKind.isOk k, progressed = progressed) let kindTag = lats.RecordOutcome(r.stream, k, r.duration) if metricsLog.IsEnabled LogEventLevel.Information then @@ -629,9 +685,9 @@ module Scheduling = (metricsLog |> Log.withMetric m).Information("Outcome {kind} in {ms:n0}ms, progressed: {progressed}", kindTag, r.duration.TotalMilliseconds, progressed) if monitorInterval.IfDueRestart() then monitor.EmitMetrics metricsLog - member x.RecordOk(r) = x.RecordOutcomeKind(r, OutcomeKind.Ok) + member x.RecordOk(r, force) = x.RecordOutcomeKind(r, OutcomeKind.Ok, r.index' > r.index || force) member x.RecordExn(r, k, log, exn) = - x.RecordOutcomeKind(r, k) + x.RecordOutcomeKind(r, k, progressed = false) if OutcomeKind.isException k then x.HandleExn(log, exn) @@ -651,38 +707,38 @@ module Scheduling = module Progress = - type [] BatchState = { markCompleted: unit -> unit; streamToRequiredIndex: Dictionary } + type [] BatchState = private { markCompleted: unit -> unit; reqs: Dictionary } type ProgressState<'Pos>() = let pending = Queue() let trim () = - while pending.Count <> 0 && pending.Peek().streamToRequiredIndex.Count = 0 do - let batch = pending.Dequeue() - batch.markCompleted () + let mutable batch = Unchecked.defaultof<_> + while pending.TryPeek &batch && batch.reqs.Count = 0 do + pending.Dequeue().markCompleted () member _.RunningCount = pending.Count member _.EnumPending(): seq = trim () pending - member _.AppendBatch(markCompleted, reqs: Dictionary) = - let fresh = { markCompleted = markCompleted; streamToRequiredIndex = reqs } + member _.IngestBatch(markCompleted, reqs) = + let fresh = { markCompleted = markCompleted; reqs = reqs } pending.Enqueue fresh trim () if pending.Count = 0 then ValueNone // If already complete, avoid triggering stream ingestion or a dispatch cycle else ValueSome fresh - member _.MarkStreamProgress(stream, index) = + member _.RemoveAttainedRequirements(stream, updatedPosAndDispatchedRevision) = for x in pending do - // example: when we reach position 1 on the stream (having handled event 0), and the required position was 1, we remove the requirement - let mutable requiredIndex = Unchecked.defaultof<_> - if x.streamToRequiredIndex.TryGetValue(stream, &requiredIndex) && requiredIndex <= index then - x.streamToRequiredIndex.Remove stream |> ignore + match x.reqs.TryGetValue stream with + | true, req when ProgressRequirement.isSatisfiedBy updatedPosAndDispatchedRevision req -> + x.reqs.Remove stream |> ignore + | _ -> () member _.Dump(log: ILogger, lel, classify: FsCodec.StreamName -> Stats.Busy.State) = if log.IsEnabled lel && pending.Count <> 0 then let stuck, failing, slow, running, waiting = ResizeArray(), ResizeArray(), ResizeArray(), ResizeArray(), ResizeArray() let h = pending.Peek() - for x in h.streamToRequiredIndex do + for x in h.reqs do match classify x.Key with | Stats.Busy.Stuck count -> stuck.Add struct(x.Key, x.Value, count) | Stats.Busy.Failing count -> failing.Add struct(x.Key, x.Value, count) @@ -700,7 +756,7 @@ module Scheduling = let collectUniqueStreams (xs: IEnumerator) = seq { while xs.MoveNext() do let x = xs.Current - for s in x.streamToRequiredIndex.Keys do + for s in x.reqs.Keys do if streamsSuggested.Add s then yield s } @@ -710,7 +766,7 @@ module Scheduling = // sortBuffer is reused per invocation, but the result is lazy so we can only clear on entry sortBuffer.Clear() let weight s = -getStreamWeight s |> int - for s in head.streamToRequiredIndex.Keys do + for s in head.reqs.Keys do if streamsSuggested.Add s then let w = weight s sortBuffer.Add(struct (s, w)) @@ -738,10 +794,11 @@ module Scheduling = abstract member HasCapacity: bool with get abstract member AwaitCapacity: CancellationToken -> Task abstract member TryReplenish: pending: seq> * handleStarted: (FsCodec.StreamName * int64 -> unit) -> struct (bool * bool) - abstract member InterpretProgress: StreamStates<'F> * FsCodec.StreamName * Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>) + abstract member InterpretProgress: Result<'P, 'E> -> ResProgressAndMalformed> and [] - Item<'Format> = { stream: FsCodec.StreamName; nextIndex: int64 voption; span: FsCodec.ITimelineEvent<'Format>[] } + Item<'Format> = { stream: FsCodec.StreamName; writePos: int64 voption; span: FsCodec.ITimelineEvent<'Format>[]; revision: Revision } and [] InternalRes<'R> = { stream: FsCodec.StreamName; index: int64; event: string; duration: TimeSpan; result: 'R } + and ResProgressAndMalformed<'O> = (struct ('O * Buffer.HandlerProgress voption * bool)) module InternalRes = let inline create (i: Item<_>, d, r) = let h = i.span[0] @@ -796,59 +853,47 @@ module Scheduling = let batches = Progress.ProgressState() let batchesWaitingAndRunning () = struct (waitingCount (), batches.RunningCount) // Enumerates the active batches; when the caller pulls beyond that, more get ingested on the fly - let enumBatches ingestStreams ingestBatches = seq { + let enumBatches tryIngestBatch = seq { yield! batches.EnumPending() // We'll get here as soon as the dispatching process has exhausted the currently queued items - match ingestBatches () with - | [||] -> () // Nothing more available - | freshlyAddedBatches -> - // we've just enqueued fresh batches - // hence we need to ingest events potentially added since first call to guarantee we have all the events on which the batches depend - ingestStreams () - yield! freshlyAddedBatches } + let mutable cont = true in while cont do match tryIngestBatch () with ValueSome x -> yield x | ValueNone -> cont <- false } let priority = Progress.StreamsPrioritizer(prioritizeStreamsBy |> Option.map streams.HeadSpanSizeBy) let chooseDispatchable = let requireAll = defaultArg requireAll false if requireAll && Option.isSome purgeInterval then invalidArg (nameof requireAll) "Cannot be combined with a purgeInterval" fun stream -> streams.ChooseDispatchable(stream, requireAll) - |> ValueOption.map (fun ss -> { stream = stream; nextIndex = ss.WritePos; span = ss.HeadSpan }) - let tryDispatch ingestStreams ingestBatches = - let candidateItems: seq> = enumBatches ingestStreams ingestBatches |> priority.CollectStreams |> Seq.chooseV chooseDispatchable - let handleStarted (stream, ts) = stats.HandleStarted(stream, ts); streams.MarkBusy(stream) + |> ValueOption.map (fun ss -> { stream = stream; writePos = ss.WritePos; span = ss.HeadSpan; revision = ss.QueueRevision }) + let tryDispatch enumBatches = + let candidateItems: seq> = enumBatches |> priority.CollectStreams |> Seq.chooseV chooseDispatchable + let handleStarted (stream, ts) = stats.HandleStarted(stream, ts); streams.LockForWrite(stream) dispatcher.TryReplenish(candidateItems, handleStarted) // Ingest information to be gleaned from processing the results into `streams` (i.e. remove stream requirements as they are completed) let handleResult ({ stream = stream; index = i; event = et; duration = duration; result = r }: InternalRes<_>) = - match dispatcher.InterpretProgress(streams, stream, r) with - | ValueSome index', Ok (r: 'R) -> - batches.MarkStreamProgress(stream, index') - streams.RecordProgress(stream, index') - stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = index'; result = Ok r } - | ValueNone, Ok (r: 'R) -> - streams.RecordNoProgress(stream) - stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = Ok r } - | _, Error exn -> - streams.RecordNoProgress(stream) - stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = Error exn } + match dispatcher.InterpretProgress r with + | Ok _ as r, ValueSome (index', _ as updatedPosAndDispatchedRevision), _malformed -> + batches.RemoveAttainedRequirements(stream, updatedPosAndDispatchedRevision) + streams.DropHandledEventsAndUnlock(stream, Ok updatedPosAndDispatchedRevision) + stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = index'; result = r } + | Ok _ as r, ValueNone, malformed + | (Error _ as r), _, malformed -> + streams.DropHandledEventsAndUnlock(stream, Error malformed) + stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = r } let tryHandleResults () = tryApplyResults handleResult - // Take an incoming batch of events, correlating it against our known stream state to yield a set of remaining work + // Take an incoming batch of events, correlating it against our known stream state to yield a set of required work before we can complete/checkpoint it let ingest (batch: Batch) = let reqs = Dictionary() - let mutable events, eventsSkipped = 0, 0 for item in batch.Reqs do - if streams.WritePositionIsAlreadyBeyond(item.Key, item.Value) then - eventsSkipped <- eventsSkipped + 1 - else - events <- events + 1 - reqs[item.Key] <- item.Value - stats.RecordIngested(reqs.Count, events, batch.StreamsCount - reqs.Count, eventsSkipped) + streams.ToProgressRequirement(item.Key, item.Value, batch.UnfoldReqs.Contains item.Key) + |> ValueOption.iter (fun req -> reqs[item.Key] <- req) + stats.RecordIngested(reqs.Count, batch.StreamsCount - reqs.Count, batch.UnfoldReqs.Count, batch.EventsCount, batch.UnfoldsCount) let onCompletion () = batch.OnCompletion () stats.RecordBatchCompletion() - batches.AppendBatch(onCompletion, reqs) - let ingestBatch () = [| match tryPending () |> ValueOption.bind ingest with ValueSome b -> b | ValueNone -> () |] + batches.IngestBatch(onCompletion, reqs) + let tryIngestBatch ingestStreams () = tryPending () |> ValueOption.bind (fun b -> ingestStreams (); ingest b) let recordAndPeriodicallyLogStats exiting abend = if stats.RecordStats() || exiting then @@ -888,30 +933,30 @@ module Scheduling = with e -> abend e }) let inline ts () = Stopwatch.timestamp () let t = stats.Timers - let processResults () = let ts = ts () in let r = tryHandleResults () in t.RecordResults ts; r - let ingestStreams () = let ts, r = ts (), applyStreams streams.Merge in t.RecordMerge ts; r - let ingestBatches () = let ts, b = ts (), ingestBatch () in t.RecordIngest ts; b - let ingestStreamsOnly () = let ts = ts () in let r = ingestStreams () in t.RecordDispatchNone ts; r + let tryHandleResults () = let ts = ts () in let r = tryHandleResults () in t.RecordResults ts; r + // NOTE Timers subtracts merge time from dispatch time, so callers (inc ingestStreamsInLieuOfDispatch) *must* RecordDispatch the gross time + let mergeStreams_ () = let ts, r = ts (), applyStreams streams.Merge in t.RecordMerge ts; r + let tryIngestStreamsInLieuOfDispatch () = let ts = ts () in let r = mergeStreams_ () in t.RecordDispatch ts; r + let tryIngestBatch () = let ts, b = ts (), tryIngestBatch (mergeStreams_ >> ignore) () in t.RecordIngest ts; b + let tryDispatch () = let ts = ts () in let r = tryDispatch (enumBatches tryIngestBatch) in t.RecordDispatch ts; r let mutable exiting = false while not exiting do exiting <- ct.IsCancellationRequested // 1. propagate write write outcomes to buffer (can mark batches completed etc) - let processedResults = processResults () + let processedResults = tryHandleResults () // 2. top up provisioning of writers queue // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first // Where there is insufficient work in the queue, we trigger ingestion of batches as needed - let struct (dispatched, hasCapacity) = - if not dispatcher.HasCapacity then struct ((*dispatched*)false, (*hasCapacity*)false) - else let ts = ts () in let r = tryDispatch (ingestStreams >> ignore) ingestBatches in t.RecordDispatch ts; r + let struct (dispatched, hasCapacity) = if not dispatcher.HasCapacity then false, false else tryDispatch () // 3. Report the stats per stats interval let statsTs = ts () if exiting then - processResults () |> ignore + tryHandleResults () |> ignore batches.EnumPending() |> ignore recordAndPeriodicallyLogStats exiting abend; t.RecordStats statsTs // 4. Do a minimal sleep so we don't run completely hot when empty (unless we did something non-trivial) - let idle = not processedResults && not dispatched && not (ingestStreamsOnly ()) + let idle = not processedResults && not dispatched && not (tryIngestStreamsInLieuOfDispatch ()) if idle && not exiting then let sleepTs = ts () do! Task.runWithCancellation ct (fun ct -> @@ -987,28 +1032,27 @@ module Dispatcher = /// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit) type Concurrent<'P, 'R, 'E, 'F> internal ( inner: ItemDispatcher, 'F>, - project: struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, - interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P,'E> -> struct (int64 voption * Result<'R, 'E>)) = + project: struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, + interpretProgress: Result<'P, 'E> -> Scheduling.ResProgressAndMalformed>) = static member Create ( maxDop, // NOTE `project` must not throw under any circumstances, or the exception will go unobserved, and DOP will leak in the dispatcher - project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task>, - interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) = + project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> Buffer.Revision -> CancellationToken -> Task>, + interpretProgress: Result<'P, 'E> -> Scheduling.ResProgressAndMalformed>) = let project struct (startTs, item: Scheduling.Item<'F>) (ct: CancellationToken) = task { - let! res = project item.stream item.span ct + let! res = project item.stream item.span item.revision ct return Scheduling.InternalRes.create (item, Stopwatch.elapsed startTs, res) } Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress) - static member Create(maxDop, prepare: Func<_, _, _>, handle: Func<_, _, CancellationToken, Task<_>>, toIndex: Func<_, 'R, int64>) = - let project stream span ct = task { - let struct (met, span: FsCodec.ITimelineEvent<'F>[]) = prepare.Invoke(stream, span) - try let! struct (spanResult, outcome) = handle.Invoke(stream, span, ct) - let index' = toIndex.Invoke(span, spanResult) - return Ok struct (index', met, outcome) - with e -> return Error struct (met, e) } - let interpretProgress (_streams: Scheduling.StreamStates<'F>) _stream = function - | Ok struct (index', met, outcome) -> struct (ValueSome index', Ok struct (met, outcome)) - | Error struct (met, exn) -> ValueNone, Error struct (met, exn) - Concurrent<_, _, _, 'F>.Create(maxDop, project, interpretProgress) + static member Create(maxDop, prepare: Func[], _>, handle: Func[], CancellationToken, Task>) = + let project stream span revision ct = task { + let struct (span: FsCodec.ITimelineEvent<'F>[], met) = prepare.Invoke(stream, span) + try let! struct (outcome, index') = handle.Invoke(stream, span, ct) + return Ok struct (outcome, Buffer.HandlerProgress.ofMetricsAndPos revision met index', met) + with e -> return Error struct (e, false, met) } + let interpretProgress = function + | Ok struct (outcome, hp, met) -> struct (Ok struct (outcome, met), ValueSome hp, false) + | Error struct (exn, malformed, met) -> Error struct (exn, malformed, met), ValueNone, malformed + Concurrent<_, _, _, 'F>.Create(maxDop, project, interpretProgress = interpretProgress) interface Scheduling.IDispatcher<'P, 'R, 'E, 'F> with [] override _.Result = inner.Result override _.Pump ct = inner.Pump ct @@ -1016,16 +1060,19 @@ module Dispatcher = override _.HasCapacity = inner.HasCapacity override _.AwaitCapacity(ct) = inner.AwaitCapacity(ct) override _.TryReplenish(pending, handleStarted) = inner.TryReplenish(pending, handleStarted, project) - override _.InterpretProgress(streams, stream, res) = interpretProgress streams stream res + override _.InterpretProgress res = interpretProgress res + + type ResProgressAndMetrics<'O> = (struct ('O * Buffer.HandlerProgress voption * StreamSpan.Metrics)) + type ExnAndMetrics = (struct(exn * bool * StreamSpan.Metrics)) + type NextIndexAndMetrics = (struct(int64 * StreamSpan.Metrics)) /// Implementation of IDispatcher that allows a supplied handler select work and declare completion based on arbitrarily defined criteria type Batched<'F> ( select: Func seq, Scheduling.Item<'F>[]>, // NOTE `handle` must not throw under any circumstances, or the exception will go unobserved - handle: Scheduling.Item<'F>[] -> CancellationToken -> - Task>[]>) = + handle: Scheduling.Item<'F>[] -> CancellationToken -> Task>[]>) = let inner = DopDispatcher 1 - let result = Event>>() + let result = Event>>() // On each iteration, we offer the ordered work queue to the selector // we propagate the selected streams to the handler @@ -1041,7 +1088,7 @@ module Dispatcher = hasCapacity <- false struct (dispatched, hasCapacity) - interface Scheduling.IDispatcher with + interface Scheduling.IDispatcher with [] override _.Result = result.Publish override _.Pump ct = task { use _ = inner.Result.Subscribe(Array.iter result.Trigger) @@ -1050,27 +1097,27 @@ module Dispatcher = override _.HasCapacity = inner.HasCapacity override _.AwaitCapacity(ct) = inner.AwaitButRelease(ct) override _.TryReplenish(pending, handleStarted) = trySelect pending handleStarted - override _.InterpretProgress(_streams: Scheduling.StreamStates<_>, _stream: FsCodec.StreamName, res: Result<_, _>) = + override _.InterpretProgress(res: Result<_, _>) = match res with - | Ok (met, pos') -> ValueSome pos', Ok (met, ()) - | Error (met, exn) -> ValueNone, Error (met, exn) + | Ok (pos', met) -> Ok ((), met), ValueSome (Buffer.HandlerProgress.ofPos pos'), false + | Error (exn, malformed, met) -> Error (exn, malformed, met), ValueNone, malformed [] type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, [] ?failThreshold, [] ?abendThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats( log, statsInterval, statesInterval, ?failThreshold = failThreshold, ?abendThreshold = abendThreshold, ?logExternalStats = logExternalStats) - let mutable okStreams, okEvents, okBytes, exnStreams, exnCats, exnEvents, exnBytes = HashSet(), 0, 0L, HashSet(), Stats.Counters(), 0, 0L + let mutable okStreams, okEvents, okUnfolds, okBytes, exnStreams, exnCats, exnEvents, exnUnfolds, exnBytes = HashSet(), 0, 0, 0L, HashSet(), Stats.Counters(), 0, 0, 0L let mutable resultOk, resultExn = 0, 0 override _.DumpStats() = if resultOk <> 0 then - log.Information("Projected {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok)", + log.Information("Projected {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e {unfolds:n0}u ({ok:n0} ok)", Log.miB okBytes, resultOk, okStreams.Count, okEvents, resultOk) - okStreams.Clear(); resultOk <- 0; okEvents <- 0; okBytes <- 0L + okStreams.Clear(); resultOk <- 0; okEvents <- 0; okUnfolds <- 0; okBytes <- 0L if resultExn <> 0 then - log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e", - Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents) - resultExn <- 0; exnStreams.Clear(); exnBytes <- 0L; exnEvents <- 0 + log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e {unfolds:n0}u", + Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents, exnUnfolds) + resultExn <- 0; exnStreams.Clear(); exnBytes <- 0L; exnEvents <- 0; exnUnfolds <- 0 log.Warning(" Affected cats {@badCats}", exnCats.StatsDescending) exnCats.Clear() @@ -1079,20 +1126,22 @@ type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, override this.Handle res = match res with - | { stream = stream; result = Ok ((es, bs), outcome) } -> + | { stream = stream; result = Ok (outcome, (es, us, bs)) } -> okStreams.Add stream |> ignore okEvents <- okEvents + es + okUnfolds <- okUnfolds + us okBytes <- okBytes + int64 bs resultOk <- resultOk + 1 - base.RecordOk res + base.RecordOk(res, us <> 0) this.HandleOk outcome - | { duration = duration; stream = stream; index = index; event = et; result = Error ((es, bs), Exception.Inner exn) } -> + | { duration = duration; stream = stream; index = index; event = et; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es + exnUnfolds <- exnUnfolds + us exnBytes <- exnBytes + int64 bs resultExn <- resultExn + 1 - base.RecordExn(res, this.Classify exn, log.ForContext("stream", stream).ForContext("index", index).ForContext("eventType", et).ForContext("count", es).ForContext("duration", duration), exn) + base.RecordExn(res, this.Classify exn, log.ForContext("stream", stream).ForContext("index", index).ForContext("eventType", et).ForContext("events", es).ForContext("unfolds", us).ForContext("duration", duration), exn) abstract member HandleOk: outcome: 'Outcome -> unit @@ -1126,11 +1175,10 @@ type Concurrent private () = /// Custom projection mechanism that divides work into a prepare phase that selects the prefix of the queued StreamSpan to handle, /// and a handle function that yields a Write Position representing the next event that's to be handled on this Stream - static member StartEx<'Progress, 'Outcome, 'F, 'R> + static member StartEx<'Outcome, 'F> ( log: ILogger, maxReadAhead, maxConcurrentStreams, - prepare: Func[], struct(StreamSpan.Metrics * FsCodec.ITimelineEvent<'F>[])>, - handle: Func[], CancellationToken, Task>, - toIndex: Func[], 'R, int64>, + prepare: Func[], struct (FsCodec.ITimelineEvent<'F>[] * StreamSpan.Metrics)>, + handle: Func[], CancellationToken, Task>, eventSize, stats: Scheduling.Stats<_, _>, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, @@ -1143,7 +1191,7 @@ type Concurrent private () = [] ?idleDelay, [] ?requireAll, [] ?ingesterStateInterval, [] ?commitInterval) : Propulsion.SinkPipeline seq>> = - let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, 'F>.Create(maxConcurrentStreams, prepare, handle, toIndex) + let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, 'F>.Create(maxConcurrentStreams, prepare = prepare, handle = handle) let dumpStreams logStreamStates _log = logStreamStates eventSize let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, defaultArg pendingBufferSize maxReadAhead, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, @@ -1152,10 +1200,9 @@ type Concurrent private () = ingesterStateInterval = defaultArg ingesterStateInterval stats.StateInterval.Period, ?commitInterval = commitInterval) /// Project Events using a handle function that yields a Write Position representing the next event that's to be handled on this Stream - static member Start<'Outcome, 'F, 'R> + static member Start<'Outcome, 'F> ( log: ILogger, maxReadAhead, maxConcurrentStreams, - handle: Func[], CancellationToken, Task>, - toIndex: Func[], 'R, int64>, + handle: Func[], CancellationToken, Task>, eventSize, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, @@ -1170,9 +1217,9 @@ type Concurrent private () = : Propulsion.SinkPipeline seq>> = let prepare _streamName span = let metrics = StreamSpan.metrics eventSize span - struct (metrics, span) - Concurrent.StartEx<'R, 'Outcome, 'F, 'R>( - log, maxReadAhead, maxConcurrentStreams, prepare, handle, toIndex, eventSize, stats, + struct (span, metrics) + Concurrent.StartEx<'Outcome, 'F>( + log, maxReadAhead, maxConcurrentStreams, prepare, handle, eventSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll, ?ingesterStateInterval = ingesterStateInterval, ?commitInterval = commitInterval) @@ -1182,28 +1229,28 @@ type Batched private () = /// Establishes a Sink pipeline that continually dispatches to a single instance of a handle function /// Prior to the dispatch, the potential streams to include in the batch are identified by the select function - static member Start<'Progress, 'Outcome, 'F> + static member Start<'Outcome, 'F> ( log: ILogger, maxReadAhead, select: Func seq, Scheduling.Item<'F>[]>, - handle: Func[], CancellationToken, Task)>>>, - eventSize, stats: Scheduling.Stats<_, _>, + handle: Func[], CancellationToken, Task * TimeSpan)>>>, + eventSize, stats: Scheduling.Stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?ingesterStateInterval, [] ?commitInterval) : Propulsion.SinkPipeline seq>> = let handle (items: Scheduling.Item<'F>[]) ct - : Task>[]> = task { + : Task>[]> = task { let start = Stopwatch.timestamp () let err ts e (x: Scheduling.Item<_>) = let met = StreamSpan.metrics eventSize x.span - Scheduling.InternalRes.create (x, ts, Error struct (met, e)) + Scheduling.InternalRes.create (x, ts, Error struct (e, false, met)) try let! results = handle.Invoke(items, ct) return Array.ofSeq (Seq.zip items results |> Seq.map (function - | item, (ts, Ok index') -> + | item, (Ok index', ts) -> let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index') |> Array.ofSeq let met = StreamSpan.metrics eventSize used - Scheduling.InternalRes.create (item, ts, Ok struct (met, index')) - | item, (ts, Error e) -> err ts e item)) + Scheduling.InternalRes.create (item, ts, Ok struct (index', met)) + | item, (Error e, ts) -> err ts e item)) with e -> let ts = Stopwatch.elapsed start return items |> Array.map (err ts e) } diff --git a/src/Propulsion/Sync.fs b/src/Propulsion/Sync.fs index 47286230..87768d6d 100644 --- a/src/Propulsion/Sync.fs +++ b/src/Propulsion/Sync.fs @@ -9,14 +9,14 @@ open System.Collections.Generic [] type Stats<'Outcome>(log: ILogger, statsInterval, stateInterval, [] ?failThreshold) = - inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) - let mutable okStreams, okEvents, okBytes, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, HashSet(), 0, 0L + inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) + let mutable okStreams, okEvents, okUnfolds, okBytes, exnStreams, exnEvents, exnUnfolds, exnBytes = HashSet(), 0, 0, 0L, HashSet(), 0, 0, 0L let prepareStats = Stats.LatencyStats("prepare") override _.DumpStats() = if okStreams.Count <> 0 && exnStreams.Count <> 0 then - log.Information("Completed {okMb:n0}MB {okStreams:n0}s {okEvents:n0}e Exceptions {exnMb:n0}MB {exnStreams:n0}s {exnEvents:n0}e", - Log.miB okBytes, okStreams.Count, okEvents, Log.miB exnBytes, exnStreams.Count, exnEvents) - okStreams.Clear(); okEvents <- 0; okBytes <- 0L; exnStreams.Clear(); exnBytes <- 0; exnEvents <- 0 + log.Information("Completed {okMb:n0}MB {okStreams:n0}s {okEvents:n0}e {okUnfolds:n0}u Exceptions {exnMb:n0}MB {exnStreams:n0}s {exnEvents:n0}e {exnUnfolds:n0}u", + Log.miB okBytes, okStreams.Count, okEvents, okUnfolds, Log.miB exnBytes, exnStreams.Count, exnEvents, exnUnfolds) + okStreams.Clear(); okEvents <- 0; okUnfolds <- 0; okBytes <- 0L; exnStreams.Clear(); exnBytes <- 0; exnEvents <- 0; exnUnfolds <- 0 prepareStats.Dump log abstract member Classify: exn -> OutcomeKind @@ -24,18 +24,20 @@ type Stats<'Outcome>(log: ILogger, statsInterval, stateInterval, [] ? override this.Handle message = match message with - | { stream = stream; result = Ok ((es, bs), prepareElapsed, outcome) } -> + | { stream = stream; result = Ok (outcome, (es, us, bs), prepareElapsed) } -> okStreams.Add stream |> ignore okEvents <- okEvents + es + okUnfolds <- okUnfolds + us okBytes <- okBytes + int64 bs prepareStats.Record prepareElapsed - base.RecordOk message + base.RecordOk(message, us <> 0) this.HandleOk outcome - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + | { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) } -> exnStreams.Add stream |> ignore exnEvents <- exnEvents + es + exnUnfolds <- exnUnfolds + us exnBytes <- exnBytes + int64 bs - base.RecordExn(message, this.Classify exn, log.ForContext("stream", stream).ForContext("events", es), exn) + base.RecordExn(message, this.Classify exn, log.ForContext("stream", stream).ForContext("events", es).ForContext("unfolds", us), exn) abstract member HandleOk: outcome: 'Outcome -> unit @@ -44,34 +46,30 @@ type Factory private () = static member StartAsync ( log: ILogger, maxReadAhead, maxConcurrentStreams, - handle: Func[], CancellationToken, Task>, - toIndex: Func[], 'R, int64>, + handle: Func[], CancellationToken, Task>, stats: Stats<'Outcome>, sliceSize, eventSize, ?dumpExternalStats, ?idleDelay, ?maxBytes, ?maxEvents, ?purgeInterval, ?ingesterStateInterval, ?commitInterval) : SinkPipeline seq>> = let maxEvents, maxBytes = defaultArg maxEvents 16384, (defaultArg maxBytes (1024 * 1024 - (*fudge*)4096)) - let attemptWrite stream (events: FsCodec.ITimelineEvent<'F>[]) ct = task { - let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events + let attemptWrite stream (events: FsCodec.ITimelineEvent<'F>[]) revision ct = task { + let struct (trimmed, met) = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events let prepareTs = Stopwatch.timestamp () - try let! res, outcome = handle.Invoke(stream, span', ct) - let index' = toIndex.Invoke(span', res) - return Ok struct (index', met, Stopwatch.elapsed prepareTs, outcome) - with e -> return Error struct (met, e) } + try let! outcome, index' = handle.Invoke(stream, trimmed, ct) + return Ok struct (outcome, Buffer.HandlerProgress.ofMetricsAndPos revision met index', met, Stopwatch.elapsed prepareTs) + with e -> return Error struct (e, false, met) } - let interpretProgress _streams (stream: FsCodec.StreamName) = function - | Ok struct (i', met, prep, outcome) -> struct (ValueSome i', Ok struct (met, prep, outcome)) - | Error struct (struct (eventCount, bytesCount) as met, exn: exn) -> - log.Warning(exn, "Handling {events:n0}e {bytes:n0}b for {stream} failed, retrying", eventCount, bytesCount, stream) - ValueNone, Error struct (met, exn) + let interpretProgress = function + | Ok struct (outcome, hp, met: StreamSpan.Metrics, prep) -> struct (Ok struct (outcome, met, prep), ValueSome hp, false) + | Error struct (exn: exn, malformed, met) -> Error struct (exn, malformed, met), ValueNone, malformed - let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretProgress) + let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretProgress = interpretProgress) let dumpStreams logStreamStates log = logStreamStates eventSize match dumpExternalStats with Some f -> f log | None -> () let scheduler = - Scheduling.Engine + Scheduling.Engine (dispatcher, stats, dumpStreams, pendingBufferSize = maxReadAhead, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval) Factory.Start(log, scheduler.Pump, maxReadAhead, scheduler, diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index cd225e25..c98b2375 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -127,7 +127,7 @@ module Helpers = do! handler (getConsumer()) (deserialize consumerId event) (log: ILogger).Information("BATCHED CONSUMER Handled {c} events in {l} streams", c, streams.Length) let ts = Stopwatch.elapsed ts - return seq { for x in streams -> struct (ts, Ok (Propulsion.Sinks.Events.nextIndex x.span)) } } + return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.next x.span), ts) } } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer = @@ -165,7 +165,7 @@ module Helpers = let handle _ (span: Propulsion.Sinks.Event[]) = async { for event in span do do! handler (getConsumer()) (deserialize consumerId event) - return Propulsion.Sinks.StreamResult.AllProcessed, () } + return (), Propulsion.Sinks.Events.next span } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer = diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 78c8ab51..2e65573c 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -84,7 +84,7 @@ let ``It processes events for a category`` () = task { test <@ Array.chooseV Simple.codec.Decode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> if handled.Count >= 2000 then stop () - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + return struct ((), Propulsion.Sinks.Events.next events) } use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) let source = MessageDbSource( log, TimeSpan.FromMinutes 1, @@ -131,8 +131,8 @@ let ``It doesn't read the tail event again`` () = task { let stats = stats log - let handle _ _ _ = task { - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + let handle _ events _ = task { + return struct ((), Propulsion.Sinks.Events.next events) } use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) let batchSize = 10 let source = MessageDbSource( diff --git a/tests/Propulsion.Tests/ProgressTests.fs b/tests/Propulsion.Tests/ProgressTests.fs index a610e35b..31ec180f 100644 --- a/tests/Propulsion.Tests/ProgressTests.fs +++ b/tests/Propulsion.Tests/ProgressTests.fs @@ -7,7 +7,11 @@ open System.Collections.Generic open Xunit let sn x = StreamName.compose "test" [| x |] -let mkDictionary xs = Dictionary(dict xs) +let mkDictionary xs = Dictionary(xs |> Seq.map (fun (k, v) -> k, Propulsion.Streams.Buffer.ProgressRequirement.ofPos v) |> dict) + +type ProgressState<'T> with + member x.MarkStreamProgress(s, i) = x.RemoveAttainedRequirements(s, (i, Propulsion.Streams.Buffer.Revision.initial)) + member x.AppendBatch(s, i) = x.IngestBatch(s, i) let [] ``Empty has zero streams pending or progress to write`` () = let sut = StreamsPrioritizer(None) diff --git a/tests/Propulsion.Tests/SinkHealthTests.fs b/tests/Propulsion.Tests/SinkHealthTests.fs index de4455db..be73ad20 100644 --- a/tests/Propulsion.Tests/SinkHealthTests.fs +++ b/tests/Propulsion.Tests/SinkHealthTests.fs @@ -22,15 +22,15 @@ type Scenario(testOutput) = let sid n = FsCodec.StreamName.Internal.trust n let stuckSid = sid "a-stuck" let failingSid = sid "a-bad" - let handle sn _ = async { + let handle sn events = async { if sn = stuckSid then do! Async.Sleep (TimeSpan.FromMilliseconds 50) - return (Propulsion.Sinks.StreamResult.NoneProcessed, ()) + return ((), Propulsion.Sinks.Events.index events) elif sn = failingSid then return failwith "transient" else do! Async.Sleep (TimeSpan.FromSeconds 1) - return Propulsion.Sinks.StreamResult.AllProcessed, () } + return (), Propulsion.Sinks.Events.next events } let sink = Propulsion.Sinks.Factory.StartConcurrent(log, 2, 2, handle, stats) let dispose () = sink.Stop() @@ -67,7 +67,7 @@ type Scenario(testOutput) = pe.StuckStreams.Length = 1 && pe.FailingStreams.Length = 1 && all |> Seq.exists (fun struct (_s, age, _c) -> age > abendThreshold) @> - test <@ obj.ReferenceEquals(me, pe) @> - test <@ obj.ReferenceEquals(me, sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> } + test <@ LanguagePrimitives.PhysicalEquality me pe @> + test <@ LanguagePrimitives.PhysicalEquality me (sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> } interface IDisposable with member _.Dispose() = dispose () diff --git a/tests/Propulsion.Tests/SourceTests.fs b/tests/Propulsion.Tests/SourceTests.fs index dd49cdfa..5df1923f 100644 --- a/tests/Propulsion.Tests/SourceTests.fs +++ b/tests/Propulsion.Tests/SourceTests.fs @@ -14,7 +14,7 @@ type Scenario(testOutput) = let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with member _.HandleOk x = () member _.HandleExn(log, x) = () } - let handle _ _ _ = task { return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.next events) } let sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) let dispose () = sink.Stop() diff --git a/tests/Propulsion.Tests/StreamStateTests.fs b/tests/Propulsion.Tests/StreamStateTests.fs index a6d3387d..8d2e64ba 100644 --- a/tests/Propulsion.Tests/StreamStateTests.fs +++ b/tests/Propulsion.Tests/StreamStateTests.fs @@ -1,88 +1,177 @@ module Propulsion.Tests.StreamStateTests +open Propulsion.Internal open Propulsion.Streams open Swensen.Unquote open Xunit let canonicalTime = System.DateTimeOffset.UtcNow -let mk p c: FsCodec.ITimelineEvent[] = - [| for x in 0..c-1 -> FsCodec.Core.TimelineEvent.Create(p + int64 x, p + int64 x |> string, null, timestamp = canonicalTime) |] -let merge = StreamSpan.merge -let dropBeforeIndex = StreamSpan.dropBeforeIndex +let mk_ p c seg uc: FsCodec.ITimelineEvent[] = + let mk id et isUnfold = FsCodec.Core.TimelineEvent.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg) + [| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false + for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |] +let mk p c = mk_ p c 0 0 +let mkU p uc = mk_ p 0 0 uc +let isSame = LanguagePrimitives.PhysicalEquality let is (xs: FsCodec.ITimelineEvent[][]) (res: FsCodec.ITimelineEvent[][]) = - (xs = null && res = null) - || (xs, res) ||> Seq.forall2 (fun x y -> (x = null && y = null) - || (x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))) + (xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y) + || x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType)) let [] nothing () = - let r = merge 0L [| mk 0L 0; mk 0L 0 |] - test <@ obj.ReferenceEquals(null, r) @> + let r = StreamSpan.merge 0L [| mk 0L 0; mk 0L 0 |] + test <@ isSame null r @> let [] synced () = - let r = merge 1L [| mk 0L 1; mk 0L 0 |] - test <@ obj.ReferenceEquals(null, r) @> + let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 0 |] + test <@ isSame null r @> let [] ``no overlap`` () = - let r = merge 0L [| mk 0L 1; mk 2L 2 |] + let r = StreamSpan.merge 0L [| mk 0L 1; mk 2L 2 |] test <@ r |> is [| mk 0L 1; mk 2L 2 |] @> let [] overlap () = - let r = merge 0L [| mk 0L 1; mk 0L 2 |] + let r = StreamSpan.merge 0L [| mk 0L 1; mk 0L 2 |] test <@ r |> is [| mk 0L 2 |] @> let [] ``remove nulls`` () = - let r = merge 1L [| mk 0L 1; mk 0L 2 |] + let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 2 |] test <@ r |> is [| mk 1L 1 |] @> let [] adjacent () = - let r = merge 0L [| mk 0L 1; mk 1L 2 |] + let r = StreamSpan.merge 0L [| mk 0L 1; mk 1L 2 |] test <@ r |> is [| mk 0L 3 |] @> let [] ``adjacent to min`` () = - let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |] - test <@ r |> is [| null; mk 2L 1 |] @> + let r = Array.map (StreamSpan.dropBefore 2L) [| mk 0L 1; mk 1L 2; mkU 1L 1; mkU 2L 2 |] + test <@ r |> is [| [||]; mk 2L 1; [||]; mkU 2L 2 |] @> let [] ``adjacent to min merge`` () = - let r = merge 2L [| mk 0L 1; mk 1L 2 |] - test <@ r |> is [| mk 2L 1 |] @> + let r = StreamSpan.merge 2L [| mk 0L 1; mk 1L 2; mkU 2L 2 |] + test <@ r |> is [| [| yield! mk 2L 1; yield! mkU 2L 2 |] |] @> let [] ``adjacent to min no overlap`` () = - let r = merge 2L [| mk 0L 1; mk 2L 1 |] + let r = StreamSpan.merge 2L [| mk_ 0L 2 0 1; mk 2L 1 |] test <@ r |> is [| mk 2L 1|] @> let [] ``adjacent trim`` () = - let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2 |] - test <@ r |> is [| mk 1L 1; mk 2L 2 |] @> + let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2; mkU 2L 2 |] + test <@ r |> is [| mk 1L 1; mk 2L 2; mkU 2L 2 |] @> let [] ``adjacent trim merge`` () = - let r = merge 1L [| mk 0L 2; mk 2L 2 |] + let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2 |] test <@ r |> is [| mk 1L 3 |] @> let [] ``adjacent trim append`` () = - let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |] - test <@ r |> is [| mk 1L 1; mk 2L 2; mk 5L 1 |] @> + let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mkU 1L 1; mk 2L 2; mk 5L 1 |] + test <@ r |> is [| mk 1L 1; mkU 1L 1; mk 2L 2; mk 5L 1 |] @> let [] ``adjacent trim append merge`` () = - let r = merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|] + let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|] test <@ r |> is [| mk 1L 3; mk 5L 1 |] @> let [] ``mixed adjacent trim append`` () = - let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |] - test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2 |] @> + let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 5L 1; mk 2L 2; mk_ 0L 2 0 2; mk_ 2L 2 0 2 |] + test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2; mk_ 1L 1 0 2; mk_ 2L 2 0 2 |] @> let [] ``mixed adjacent trim append merge`` () = - let r = merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|] + let r = StreamSpan.merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2; mkU 4L 2 |] test <@ r |> is [| mk 1L 3; mk 5L 1 |] @> let [] fail () = - let r = merge 11614L [| null; mk 11614L 1 |] + let r = StreamSpan.merge 11614L [| [||]; mk 11614L 1 |] test <@ r |> is [| mk 11614L 1 |] @> let [] ``fail 2`` () = - let r = merge 11613L [| mk 11614L 1; null |] + let r = StreamSpan.merge 11613L [| mk 11614L 1; [||] |] test <@ r |> is [| mk 11614L 1 |] @> +let [] ``merge to strip Events should not strip unfold`` () = + let r = StreamSpan.merge 0L [| mk_ 0L 0 0 1 |] + test <@ r |> is [| mkU 0L 1 |] @> + +let [] ``merge to strip Events should not strip unfolds`` () = + let r = StreamSpan.merge 0L [| mk_ 0L 0 0 2 |] + test <@ r |> is [| mkU 0L 2 |] @> + +let [] ``merge to strip Events should retain at non-0`` () = + let r = StreamSpan.merge 1L [| mk_ 0L 1 0 1 |] + test <@ r |> is [| mkU 1L 1 |] @> + +let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame + +let [] ``strip merge should retain last unfolds`` () = + let r = StreamSpan.merge 0L [| mk_ 0L 0 2 1; mkU 0L 2 |] + test <@ r |> is [| mk_ 0L 0 2 2 |] @> + +let [] ``nextIndex u`` () = + 1L =! StreamSpan.next (mk_ 0L 1 0 1) +let [] ``nextIndex E`` () = + 1L =! StreamSpan.next (mk 0L 1) + +let [] ``merges retain freshest unfolds, one per event type`` counts = + let input = [| + let mutable pos = 0L + let mutable seg = 0 + for gapOrOverlap: sbyte, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in (counts : _[]) do + let events = normal % 10 + let unfolds = unfolds % 10 + pos <- max 0L (pos+int64 gapOrOverlap) + yield mk_ pos events seg unfolds + pos <- pos + int64 events + seg <- seg + 1 |] + let res = StreamSpan.merge 0L input + // The only way to end up with a null output is by sending either no spans, or all empties + if res = null then + test <@ input |> Array.forall Array.isEmpty @> + else + + // an Empty span sequence is replaced with null + test <@ res |> Array.any @> + // A Span sequence does not have any empty spans + test <@ res |> Array.forall Array.any @> + let all = res |> Array.concat + let unfolds, events = all |> Array.partition _.IsUnfold + // Events are always in order + test <@ (events |> Seq.sortBy _.Index) === events @> + // Unfolds are always in order + test <@ unfolds |> Seq.sortBy _.Index === unfolds @> + // Unfolds are always after events + test <@ all |> Seq.sortBy _.IsUnfold === all @> + // One unfold per type + test <@ unfolds |> Array.groupBy _.EventType |> Array.forall (fun (_et, xs) -> xs.Length = 1) @> + // Unfolds are always for the same Index (as preceding ones are invalidated by any newer event) + test <@ unfolds |> Array.forall (fun x -> x.Index = (Array.last all).Index) @> + // Version that Unfolds pertain to must always be >= final event Index + test <@ match events |> Array.tryLast, unfolds |> Array.tryLast with + | Some le, Some lu -> lu.Index >= le.Index + | _ -> true @> + + // resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span + test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.next x < StreamSpan.index y) @> + + let others = res |> Array.take (res.Length - 1) + // Only the last span can have unfolds + test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @> + + match res |> Array.last |> Array.last with + | u when u.IsUnfold -> + // If there are unfolds, they can only be the newest ones + test <@ input |> Array.forall (not << Array.exists (fun x -> x.IsUnfold && x.Index > u.Index)) @> + // if two sets of unfolds with identical Index values were supplied, the freshest ones must win + let uc = unbox u.Context + let newerUnfolds = Seq.concat input |> Seq.filter (fun x -> x.IsUnfold && x.Index = u.Index && unbox x.Context > uc) + test <@ newerUnfolds === [||] || uc = -1 @> + // all unfolds that got merged as part of the same Span should be retained, and not have been reordered + let outUnf = res |> Array.last |> Array.filter _.IsUnfold + let unfSeg = trap <@ outUnf |> Seq.map _.Context |> Seq.cast |> Seq.distinct |> Seq.exactlyOne @> + let inUnf = input |> Seq.concat |> Seq.filter (fun x -> x.IsUnfold && unbox x.Context = unfSeg) |> Seq.toArray + if inUnf.Length <> outUnf.Length then + printf "here" + test <@ Array.forall2 isSame inUnf outUnf @> + | _ -> () + // TODO verify that slice never orphans unfolds + #if MEMORY_USAGE_ANALYSIS // https://bartoszsypytkowski.com/writing-high-performance-f-code // https://github.com/SergeyTeplyakov/ObjectLayoutInspector diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index e1ea0c03..0cc57326 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -38,7 +38,7 @@ type [] Parameters = "NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " + "Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream." | Categorize -> "Gather handler latency stats by category" - | MaxItems _ -> "Controls checkpointing granularity by adjusting the batch size being loaded from the feed. Default: Unlimited" + | MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Sync Default (Sync): 9999. Default: Unlimited" | IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix." | IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules." @@ -204,9 +204,9 @@ type Stats(log: ILogger, statsInterval, stateInterval, logExternalStats) = accHam.Clear(); accSpam.Clear() accEventTypeLats.Clear() -let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async<_ * Outcome> = async { +let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async = async { let ham, spam = events |> Array.partition isValidEvent - return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 } + return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events } let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed" let run appName (c: Args.Configuration, p: ParseResults) = async { @@ -222,7 +222,8 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { | Some x, _ -> x | None, Json _ -> System.Guid.NewGuid() |> _.ToString("N") | None, _ -> p.Raise "ConsumerGroupName is mandatory, unless consuming from a JSON file" - let startFromTail, follow, requireAll, maxItems = p.Contains FromTail, p.Contains Follow, p.Contains RequireAll, p.TryGetResult MaxItems + let startFromTail, follow, requireAll = p.Contains FromTail, p.Contains Follow, p.Contains RequireAll + let maxItems = match a.Command with SubCommand.Sync _ -> p.GetResult(MaxItems, 9999) |> Some | _ -> p.TryGetResult MaxItems let producer = match a.Command with | SubCommand.Kafka a -> @@ -248,7 +249,7 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore - return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 } + return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events } Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats, requireAll = requireAll) | SubCommand.Sync sa ->