Skip to content

Commit

Permalink
feat!(Streams): Support propagating Unfolds
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 15, 2024
1 parent a54891d commit c4c09df
Show file tree
Hide file tree
Showing 30 changed files with 441 additions and 320 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Scheduler`: `Struct`/`voption` conversions; buffer reuse [#157](https://github.com/jet/propulsion/pull/157)
- `Scheduler`: Replaced `Thead.Sleep` with `Task.WhenAny`; Added Sleep time logging [#161](https://github.com/jet/propulsion/pull/161)
- `Streams`: Changed dominant `ITimelineEvent` `EventBody` type from `byte[]` to `System.ReadOnlyMemory<byte>` (`Sinks.EventBody`) [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https://github.com/jet/propulsion/pull/208)
- `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226)
Expand All @@ -60,6 +59,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed

- `Streams.StreamSpan`: Changed from a record to individual arguments of `FsCodec.StreamName` and `Sinks.Event[]` [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams.SpanResult`: Replaced with `int64` to reflect the updated position [#264](https://github.com/jet/propulsion/pull/264) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams`: `statsInterval` is obtained from the `Stats` wherever one is supplied [#208](https://github.com/jet/propulsion/pull/208)
- `Propulsion.Cosmos`: Should not be in general use - users should port to `Propulsion.CosmosStore3`, then `Propulsion.CosmosStore` [#193](https://github.com/jet/propulsion/pull/193)
- `Destructurama.FSharp` dependency [#152](https://github.com/jet/propulsion/pull/152)
Expand Down
9 changes: 4 additions & 5 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Typically, alerting should be set up based on the built in `busy` metrics provid
- `failing`: streams that have had at least one failed Handler invocation (regardless of whether they are currently the subject of a retry Handler invocation or not). Typically it should be possible to define:
- a reasonable limit before you'd want a low level alert to be raised
- a point at which you raise an alarm on the basis that the system is in a state that will lead to a SLA breach and hence merits intervention
- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's `StreamResult`. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress)
- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's updated Stream Position. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress)

Alongside alerting based on breaches of SLO limits, the values of the `busy` metrics are a key high level indicator of the health of a given Processor (along with the Handler Latency distribution).

Expand Down Expand Up @@ -348,13 +348,12 @@ or the _Designing Data Intensive Applications_ book):
- DynamoDb: requesting a 'consistent read'
- CosmosDb: when using Session Consistency, require that reads are contingent on the session token being used by the feed reader. This can be achieved by using the same `CosmosClient` to ensure the session tokens are synchronized.
2. Perform a pre-flight check when reading, based on the `Index` of the newest event passed to the handler. In such a case, it may make sense to back off for a small period, before reporting failure to handle the event (by throwing an exception). The Handler will be re-invoked for another attempt, with a better chance of the event being reflected in the read.
- Once such a pre-flight check has been carried out, one can safely report `StreamResult.AllProcessed` (or `PartiallyProcessed` if you wish to defer some work due to the backlog of events triggering too much work to perform in a single invocation)
3. Perform the processing on a 'shoulder tap' basis, with the final position based on what you read.
- First, load the stream's state, performing any required reactions.
- Then report the Version attained for the stream (based on the Index of the last event processed) by yielding a `StreamResult.OverrideNextIndex`.
- Then report the Version attained for the stream (based on the Index of the last event processed) as the Handler's updated position
- In this case, one of following edge cases may result:
- _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield `StreamResult.OverrideNextIndex`, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched)
- _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `StreamResult.OverrideNextIndex 2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler.
- _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield the Position that the processing did see, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched)
- _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler.

### Consistency in the face of at least once delivery and re-traversal of events

Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Pruner =
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred)
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events
let writePos = max trimmedPos (untilIndex + 1L)
return struct (writePos, res) }
return struct (res, writePos) }

type CosmosStorePrunerStats(log, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)
Expand Down Expand Up @@ -75,8 +75,8 @@ type CosmosStorePruner =
#endif
let interpret _stream span =
let metrics = StreamSpan.metrics Event.storedSize span
struct (metrics, span)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r))
struct (span, metrics)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5,
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Expand Down
76 changes: 40 additions & 36 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ module private Impl =
type EventBody = byte[] // V4 defines one directly, here we shim it
module StreamSpan =

let private toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray()
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody
let toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray()
// Trimmed edition of what V4 exposes
module internal Equinox =
module CosmosStore =
Expand All @@ -34,10 +33,9 @@ module private Impl =

// v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory<byte> rather than assuming and/or offering optimization for JSON bodies
open System.Text.Json
let private toNativeEventBody (x: EventBody): JsonElement =
let toNativeEventBody (x: EventBody): JsonElement =
if x.IsEmpty then JsonElement()
else JsonSerializer.Deserialize(x.Span)
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody
#endif

module Internal =
Expand All @@ -49,38 +47,42 @@ module Internal =
type [<NoComparison; NoEquality; RequireQualifiedAccess>] Result =
| Ok of updatedPos: int64
| Duplicate of updatedPos: int64
| PartialDuplicate of overage: Event[]
| PrefixMissing of batch: Event[] * writePos: int64
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (StreamSpan.Metrics * Result), struct (StreamSpan.Metrics * exn)>) =
| PartialDuplicate of updatedPos: int64
| PrefixMissing of gap: int * actualPos: int64
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (Result * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>) =
match res with
| stream, Ok (_, Result.Ok pos) ->
| stream, Ok (Result.Ok pos, _) ->
log.Information("Wrote {stream} up to {pos}", stream, pos)
| stream, Ok (_, Result.Duplicate updatedPos) ->
| stream, Ok (Result.Duplicate updatedPos, _) ->
log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos)
| stream, Ok (_, Result.PartialDuplicate overage) ->
log.Information("Requeuing {stream} {pos} ({count} events)", stream, overage[0].Index, overage.Length)
| stream, Ok (_, Result.PrefixMissing (batch, pos)) ->
log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, batch[0].Index - pos, batch.Length, batch[0].Index)
| stream, Error (_, exn) ->
| stream, Ok (Result.PartialDuplicate updatedPos, _) ->
log.Information("Requeuing {stream} {pos}", stream, updatedPos)
| stream, Ok (Result.PrefixMissing (gap, pos), _) ->
log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos)
| stream, Error (exn, _) ->
let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information
log.Write(level, exn, "Writing {stream} failed, retrying", stream)

let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
let i = StreamSpan.index span
let n = StreamSpan.nextIndex span
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds")
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody
#if COSMOSV3
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _))
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData)
|> Async.executeAsTask ct
#else
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct)
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct)
#endif
let res' =
match res with
| AppendResult.Ok pos -> Result.Ok pos.index
| AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos ->
match pos.index with
| actual when actual < span[0].Index -> Result.PrefixMissing (span, actual)
| actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual
| actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int))
| actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual)
| actual when actual >= n -> Result.Duplicate actual
| actual -> Result.PartialDuplicate actual
log.Debug("Result: {res}", res')
return res' }
let containsMalformedMessage e =
Expand All @@ -103,40 +105,42 @@ module Internal =
let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024)
let writerResultLog = log.ForContext<Writer.Result>()
let attemptWrite stream span ct = task {
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
#if COSMOSV3
try let! res = Writer.write log eventsContext (StreamName.toString stream) span' ct
try let! res = Writer.write log eventsContext (StreamName.toString stream) span ct
#else
try let! res = Writer.write log eventsContext stream span' ct
try let! res = Writer.write log eventsContext stream span ct
#endif
return Ok struct (met, res)
with e -> return Error struct (met, e) }
return Ok struct (res, met)
with e -> return Error struct (e, met) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false)
| Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false
| Error struct (_stats, exn) ->
| Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
let ss = streams.RecordWriteProgress(stream, pos', null)
struct (ss.WritePos, false)
| Ok (Writer.Result.PrefixMissing _, _stats) ->
streams.WritePos(stream), false
| Error struct (exn, _stats) ->
let malformed = Writer.classify exn |> Writer.isMalformed
streams.SetMalformed(stream, malformed), malformed
let struct (ss, malformed) = applyResultToStreamState res
let ss = streams.SetMalformed(stream, malformed)
ss.WritePos, malformed
let struct (writePos, malformed) = applyResultToStreamState res
Writer.logTo writerResultLog malformed (stream, res)
struct (ss.WritePos, res)
struct (res, writePos)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress)

type WriterResult = Internal.Writer.Result

type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(
inherit Scheduling.Stats<struct (WriterResult * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>(
log, statsInterval, stateInterval, ?failThreshold = failThreshold,
logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump)
let mutable okStreams, okEvents, okBytes = HashSet(), 0, 0L
let mutable exnCats, exnStreams, exnEvents, exnBytes = Stats.Counters(), HashSet(), 0, 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
match message with
| { stream = stream; result = Ok ((es, bs), res) } ->
| { stream = stream; result = Ok (res, (es, bs)) } ->
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -146,7 +150,7 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D nul
| WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
base.RecordOk(message)
| { stream = stream; result = Error ((es, bs), Exception.Inner exn) } ->
| { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } ->
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
Expand Down
Loading

0 comments on commit c4c09df

Please sign in to comment.