From 6ce4dc9b0c7630fc67153f7dd29a6c7b0b5aa46c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 12 Jul 2024 16:56:44 +0100 Subject: [PATCH] Finalize merge cases --- src/Propulsion.CosmosStore/CosmosStoreSink.fs | 4 +- src/Propulsion/Sinks.fs | 4 +- src/Propulsion/Streams.fs | 61 +++++++++++-------- tests/Propulsion.Tests/StreamStateTests.fs | 60 ++++++++++-------- 4 files changed, 73 insertions(+), 56 deletions(-) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index d2602ae0..d99c1a48 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -65,8 +65,8 @@ module Internal = log.Write(level, exn, "Writing {stream} failed, retrying", stream) let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task { - let i = StreamSpan.idx span - let n = StreamSpan.ver span + let i = StreamSpan.index span + let n = StreamSpan.nextIndex span log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) #if COSMOSV3 span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds" diff --git a/src/Propulsion/Sinks.fs b/src/Propulsion/Sinks.fs index f14bc201..ab3046ae 100644 --- a/src/Propulsion/Sinks.fs +++ b/src/Propulsion/Sinks.fs @@ -17,9 +17,9 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit> module Events = /// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully) - let nextIndex: Event[] -> int64 = Streams.StreamSpan.ver + let nextIndex: Event[] -> int64 = Streams.StreamSpan.nextIndex /// The Index of the first event as supplied to this handler - let index: Event[] -> int64 = Streams.StreamSpan.idx + let index: Event[] -> int64 = Streams.StreamSpan.index /// Represents progress attained during the processing of the supplied Events for a given StreamName. /// This will be reflected in adjustments to the Write Position for the stream in question. diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 90891db8..7af0b47f 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -99,48 +99,55 @@ module StreamSpan = // TODO all or none of unfolds metrics eventSize trimmed, trimmed - let inline idx (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index - let inline ver (span: FsCodec.ITimelineEvent<'F>[]) = span[span.Length - 1].Index + 1L - let dropBeforeIndex min: FsCodec.ITimelineEvent<_>[] -> FsCodec.ITimelineEvent<_>[] = function - | xs when xs.Length = 0 -> null - | xs when idx xs >= min -> xs // don't adjust if min not within - | v when ver v <= min -> null // throw away if before min - | xs -> xs |> Array.skip (min - idx xs |> int) // slice + let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index + let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) = + let l = span[span.Length - 1] + if l.IsUnfold then l.Index else l.Index + 1L + let inline dropBeforeIndex min = function + | [||] as xs -> xs + | xs when nextIndex xs < min -> Array.empty + | xs -> + match index xs with + | xi when xi = min -> xs + | xi -> xs |> Array.skip (min - xi |> int) let merge min (spans: FsCodec.ITimelineEvent<_>[][]) = - let candidates = [| - for span in spans do - if span <> null then - match dropBeforeIndex min span with - | null -> () - | trimmed when trimmed.Length = 0 -> invalidOp "Cant add empty" - | trimmed -> trimmed |] + let candidates = [| for span in spans do + if span <> null then + match dropBeforeIndex min span with + | [||] -> () + | xs -> xs |] if candidates.Length = 0 then null elif candidates.Length = 1 then candidates else - candidates |> Array.sortInPlaceBy idx + candidates |> Array.sortInPlaceBy index // no data buffered -> buffer first item - let mutable curr = candidates[0] + let mutable acc = candidates[0] let mutable buffer = null for i in 1 .. candidates.Length - 1 do let x = candidates[i] - let index = idx x - let currNext = ver curr - if index > currNext then // Gap - match curr |> Array.filter (_.IsUnfold >> not) with + let xIndex = index x + let accNext = nextIndex acc + if xIndex > accNext then // Gap + match acc |> Array.filter (_.IsUnfold >> not) with | [||] -> () | eventsOnly -> if buffer = null then buffer <- ResizeArray(candidates.Length) buffer.Add eventsOnly - curr <- x + acc <- x // Overlapping, join - elif index + x.LongLength > currNext then - curr <- Array.append curr (dropBeforeIndex currNext x) - let v = ver curr - 1L - let last = curr |> Array.filter (fun x -> not x.IsUnfold || x.Index = v) - if buffer = null then Array.singleton last - else buffer.Add last; buffer.ToArray() + elif nextIndex x > accNext then + match dropBeforeIndex accNext x with + | [||] -> () + | news -> + acc <- [| for x in acc do if not x.IsUnfold then x + yield! news |] + match acc with + | [||] when buffer = null -> null + | [||] -> buffer.ToArray() + | last when buffer = null -> Array.singleton last + | last -> buffer.Add last; buffer.ToArray() /// A Single Event from an Ordered stream being supplied for ingestion into the internal data structures type StreamEvent<'Format> = (struct (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>)) diff --git a/tests/Propulsion.Tests/StreamStateTests.fs b/tests/Propulsion.Tests/StreamStateTests.fs index 4b267a58..ff952d86 100644 --- a/tests/Propulsion.Tests/StreamStateTests.fs +++ b/tests/Propulsion.Tests/StreamStateTests.fs @@ -26,7 +26,7 @@ module FsCodecEx = override _.ToString() = let t = if isUnfold then "Unfold" else "Event" - $"{t} {eventType} @{index}" + $"{t} {eventType} @{index} {context}" interface ITimelineEvent<'Format> with member _.Index = index member _.IsUnfold = isUnfold @@ -51,9 +51,8 @@ let mk p c = mk_ p c 0 0 let merge = StreamSpan.merge let dropBeforeIndex = StreamSpan.dropBeforeIndex let is (xs: FsCodec.ITimelineEvent[][]) (res: FsCodec.ITimelineEvent[][]) = - (xs = null && res = null) - || (xs, res) ||> Seq.forall2 (fun x y -> (x = null && y = null) - || (x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))) + (xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y) + || x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType)) let [] nothing () = let r = merge 0L [| mk 0L 0; mk 0L 0 |] @@ -81,7 +80,7 @@ let [] adjacent () = let [] ``adjacent to min`` () = let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |] - test <@ r |> is [| null; mk 2L 1 |] @> + test <@ r |> is [| [||]; mk 2L 1 |] @> let [] ``adjacent to min merge`` () = let r = merge 2L [| mk 0L 1; mk 1L 2 |] @@ -125,25 +124,28 @@ let [] ``fail 2`` () = let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 (fun x y -> obj.ReferenceEquals(x, y)) -let [] ``merges retain freshest unfolds, one per event type`` (counts: _[]) = +let [] ``merges retain freshest unfolds, one per event type`` counts = let input = [| let mutable pos = 0L let mutable seg = 0 - for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in counts do - let normal = normal % 10 - let unfolds = unfolds % 120 // |> ignore; 0 + for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in (counts : _[]) do + let events = normal % 10 + let unfolds = unfolds % 10 pos <- if gapOrOverlap < 0uy then max 0L (pos+int64 gapOrOverlap) else pos + int64 gapOrOverlap - yield mk_ pos normal seg unfolds - pos <- pos + int64 normal - seg <- seg + 1 - |] - let spans = merge 0L input - // Empty spans are dropped - if spans = null then + yield mk_ pos events seg unfolds + pos <- pos + int64 events + seg <- seg + 1 |] + let res = merge 0L input + // The only way to end up with a null output is by sending either no spans, or all empties + if res = null then test <@ input |> Array.forall Array.isEmpty @> else - let all = spans |> Array.concat + // an Empty span sequence is replaced with null + test <@ res |> Array.isEmpty |> not @> + // A Span sequence does not have any empty spans + test <@ res |> Array.forall (not << Array.isEmpty) @> + let all = res |> Array.concat let unfolds, events = all |> Array.partition _.IsUnfold // Events are always in order test <@ (events |> Seq.sortBy _.Index) === events @> @@ -159,15 +161,23 @@ let [] ``merges retain freshest unfolds, one per event t test <@ match events |> Array.tryLast, unfolds |> Array.tryLast with | Some le, Some lu -> lu.Index > le.Index | _ -> true @> - // resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span - test <@ spans |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.ver x < StreamSpan.idx y) @> - match spans with - | [||] -> () - | xs -> - let others = Array.take (xs.Length - 1) xs - // Only the last span can have unfolds - test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @> + // resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span + test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.nextIndex x < StreamSpan.index y) @> + + let others = res |> Array.take (res.Length - 1) + // Only the last span can have unfolds + test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @> + + match res |> Array.last |> Array.last with + | u when u.IsUnfold -> + // If there are unfolds, they can only be the newest ones + test <@ input |> Array.forall (not << Array.exists (fun x -> x.IsUnfold && x.Index > u.Index)) @> + // if two sets of unfolds with identical Index values were supplied, the freshest ones must win + let uc = unbox u.Context + let newerUnfolds = Seq.concat input |> Seq.filter (fun x -> x.IsUnfold && x.Index = u.Index && unbox x.Context > uc) + test <@ newerUnfolds === [||] || uc = -1 @> + | _ -> () // TODO verify that slice never orphans unfolds #if MEMORY_USAGE_ANALYSIS