Skip to content

Commit

Permalink
feat!(Streams): Support propagating Unfolds; remove StreamResult (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 1, 2024
1 parent a54891d commit 40c9ab4
Show file tree
Hide file tree
Showing 35 changed files with 674 additions and 556 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Scheduler`: `Struct`/`voption` conversions; buffer reuse [#157](https://github.com/jet/propulsion/pull/157)
- `Scheduler`: Replaced `Thead.Sleep` with `Task.WhenAny`; Added Sleep time logging [#161](https://github.com/jet/propulsion/pull/161)
- `Streams`: Changed dominant `ITimelineEvent` `EventBody` type from `byte[]` to `System.ReadOnlyMemory<byte>` (`Sinks.EventBody`) [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https://github.com/jet/propulsion/pull/208)
- `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226)
Expand All @@ -60,6 +59,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed

- `Streams.StreamSpan`: Changed from a record to individual arguments of `FsCodec.StreamName` and `Sinks.Event[]` [#169](https://github.com/jet/propulsion/pull/169) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams.SpanResult`: Replaced with `int64` to reflect the updated position [#264](https://github.com/jet/propulsion/pull/264) [#208](https://github.com/jet/propulsion/pull/208)
- `Streams`: `statsInterval` is obtained from the `Stats` wherever one is supplied [#208](https://github.com/jet/propulsion/pull/208)
- `Propulsion.Cosmos`: Should not be in general use - users should port to `Propulsion.CosmosStore3`, then `Propulsion.CosmosStore` [#193](https://github.com/jet/propulsion/pull/193)
- `Destructurama.FSharp` dependency [#152](https://github.com/jet/propulsion/pull/152)
Expand Down
9 changes: 4 additions & 5 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Typically, alerting should be set up based on the built in `busy` metrics provid
- `failing`: streams that have had at least one failed Handler invocation (regardless of whether they are currently the subject of a retry Handler invocation or not). Typically it should be possible to define:
- a reasonable limit before you'd want a low level alert to be raised
- a point at which you raise an alarm on the basis that the system is in a state that will lead to a SLA breach and hence merits intervention
- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the Handler's `StreamResult`. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress)
- `stalled`: streams that have had only successful Handler invocations, but have not declared any progress via the updated Stream Position yielded in the Handler's result. In some cases, the design of a Reaction Process may be such that one might intentionally back off and retry in some scenarios (see [Consistency](#consistency)). In the general case, a stalled stream may reflect a coding error (e.g., if a handler uses read a stale value from a cache but the cache never gets invalidated, it will never make progress)

Alongside alerting based on breaches of SLO limits, the values of the `busy` metrics are a key high level indicator of the health of a given Processor (along with the Handler Latency distribution).

Expand Down Expand Up @@ -348,13 +348,12 @@ or the _Designing Data Intensive Applications_ book):
- DynamoDb: requesting a 'consistent read'
- CosmosDb: when using Session Consistency, require that reads are contingent on the session token being used by the feed reader. This can be achieved by using the same `CosmosClient` to ensure the session tokens are synchronized.
2. Perform a pre-flight check when reading, based on the `Index` of the newest event passed to the handler. In such a case, it may make sense to back off for a small period, before reporting failure to handle the event (by throwing an exception). The Handler will be re-invoked for another attempt, with a better chance of the event being reflected in the read.
- Once such a pre-flight check has been carried out, one can safely report `StreamResult.AllProcessed` (or `PartiallyProcessed` if you wish to defer some work due to the backlog of events triggering too much work to perform in a single invocation)
3. Perform the processing on a 'shoulder tap' basis, with the final position based on what you read.
- First, load the stream's state, performing any required reactions.
- Then report the Version attained for the stream (based on the Index of the last event processed) by yielding a `StreamResult.OverrideNextIndex`.
- Then report the Version attained for the stream (based on the Index of the last event processed) as the Handler's updated Position for that Stream
- In this case, one of following edge cases may result:
- _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield `StreamResult.OverrideNextIndex`, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched)
- _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `StreamResult.OverrideNextIndex 2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler.
- _The handler saw a version prior to the prompting event_. For example, if a Create event (`Index = 0`) is relayed, but reading does not yield any events (the replica in question is behind the node from which the feed obtained its state). In this case, the Handler can simply yield the Position that the processing did see, which will cause the event to be retained in the input buffer (and most likely, a fresh invocation for that same stream will immediately be dispatched)
- _The Handler saw a Version fresher than the prompting event_. For example: if a Create (`Index = 0`) is immediately followed by an Update (`Index = 1`), the handler can yield `2` to reflect the fact that the next event that's of interest will be event `Index = 2`. Regardless of whether Event 1 arrived while the handler was processing Event 0, or whether it arrives some time afterwards, the event will be dropped from the events pending for that Stream's Handler.

### Consistency in the face of at least once delivery and re-traversal of events

Expand Down
2 changes: 2 additions & 0 deletions Propulsion.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=abend/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Dispatchable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=exns/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=ingester/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Prioritizer/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=slipstreamed/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=slipstreaming/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=stddev/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Pruner =
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred)
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events
let writePos = max trimmedPos (untilIndex + 1L)
return struct (writePos, res) }
return struct (res, writePos) }

type CosmosStorePrunerStats(log, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)
Expand Down Expand Up @@ -75,8 +75,8 @@ type CosmosStorePruner =
#endif
let interpret _stream span =
let metrics = StreamSpan.metrics Event.storedSize span
struct (metrics, span)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r))
struct (span, metrics)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5,
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Expand Down
Loading

0 comments on commit 40c9ab4

Please sign in to comment.