Skip to content

Commit

Permalink
Fix edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 17, 2024
1 parent 573e685 commit 0d7f33c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 36 deletions.
40 changes: 19 additions & 21 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,22 @@ module StreamSpan =
let slice<'F> eventSize limits (span: FsCodec.ITimelineEvent<'F>[]): struct (FsCodec.ITimelineEvent<'F>[] * Metrics) =
let trimmed =
// we must always send one event, even if it exceeds the limit (if the handler throws, the the Stats can categorize the problem to surface it)
if span[0].IsUnfold || span.Length = 1 || span[1].IsUnfold then span
if span.Length = 1 || span[0].IsUnfold || span[1].IsUnfold then span
// If we have 2 or more (non-Unfold) events, then we limit the batch size
else trimEvents<'F> eventSize limits span
trimmed, metrics eventSize trimmed

let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) =
let l = span[span.Length - 1]
if l.IsUnfold then l.Index else l.Index + 1L
let inline dropBeforeIndex min = function
if l.IsUnfold && span[0].IsUnfold then l.Index else l.Index + 1L
let inline dropBeforeIndex i = function
| [||] as xs -> xs
| xs when nextIndex xs < min -> Array.empty
| xs when nextIndex xs < i -> Array.empty
| xs ->
match index xs with
| xi when xi = min -> xs
| xi -> xs |> Array.skip (min - xi |> int)
| xi when xi = i -> xs
| xi -> xs |> Array.skip (i - xi |> int)

let merge min (spans: FsCodec.ITimelineEvent<_>[][]) =
let candidates =
Expand All @@ -128,27 +128,22 @@ module StreamSpan =
else
candidates |> Array.sortInPlaceBy index

// no data buffered -> buffer first item
let mutable acc = candidates[0]
let mutable acc = candidates[0] // buffer first item
let mutable buffer = null
for i in 1 .. candidates.Length - 1 do
let x = candidates[i]
let xIndex = index x
let accNext = nextIndex acc
if xIndex > accNext then // Gap
match acc |> Array.filter (_.IsUnfold >> not) with
| [||] -> ()
| eventsOnly ->
if index x > accNext then // Gap
if acc |> Seq.exists (_.IsUnfold >> not) then
if buffer = null then buffer <- ResizeArray(candidates.Length)
buffer.Add eventsOnly
buffer.Add(acc |> Array.filter (_.IsUnfold >> not))
acc <- x
// Overlapping, join
elif nextIndex x > accNext then
elif nextIndex x >= accNext then // Overlapping; join
match dropBeforeIndex accNext x with
| [||] -> ()
| news ->
acc <- [| for x in acc do if not x.IsUnfold then x
yield! news |]

acc <- [| yield! acc |> Seq.filter (_.IsUnfold >> not); yield! news |]
match acc with
| [||] when buffer = null -> null
| [||] -> buffer.ToArray()
Expand Down Expand Up @@ -231,7 +226,7 @@ module Buffer =
let streams, reqs = Streams<'Format>(), Dictionary<FsCodec.StreamName, int64>()
for struct (stream, event) in streamEvents do
streams.Merge(stream, event)
match reqs.TryGetValue(stream), event.Index + 1L with
match reqs.TryGetValue(stream), if event.IsUnfold then event.Index else event.Index + 1L with
| (false, _), required -> reqs[stream] <- required
| (true, actual), required when actual < required -> reqs[stream] <- required
| (true, _), _ -> () // replayed same or earlier item
Expand Down Expand Up @@ -314,8 +309,11 @@ module Scheduling =
member _.WritePos(stream) = tryGetItem stream |> ValueOption.bind _.WritePos
member _.WritePositionIsAlreadyBeyond(stream, required) =
match tryGetItem stream with
// Example scenario: if a write reported we reached version 2, and we are ingesting an event that requires 2, then we drop it
| ValueSome ss -> match ss.WritePos with ValueSome cw -> cw >= required | ValueNone -> false
// Example scenario: if a write reported nextIndex = 1 (after handling an event with Index 0 but no unfolds yet) then:
// we can drop a resend of the event with Index=0 (which will have a required of 1)
// we can drop an Unfold with Index=0 (which would not happen, but would have a required of 0)
// we can NOT drop an Unfold with Index=1 (which will have a required of 1)
| ValueSome ss -> match ss.WritePos with ValueSome cw -> cw > required | ValueNone -> false
| ValueNone -> false // If the entry has been purged, or we've yet to visit a stream, we can't drop them
member _.Merge(streams: Streams<'Format>) =
for kv in streams.States do
Expand Down
31 changes: 16 additions & 15 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ let mk_ p c seg uc: FsCodec.ITimelineEvent<string>[] =
[| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false
for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |]
let mk p c = mk_ p c 0 0
let mkU p uc = mk_ p 0 0 uc
let merge = StreamSpan.merge
let isSame = LanguagePrimitives.PhysicalEquality
let dropBeforeIndex = StreamSpan.dropBeforeIndex
Expand Down Expand Up @@ -81,39 +82,39 @@ let [<Fact>] adjacent () =
test <@ r |> is [| mk 0L 3 |] @>

let [<Fact>] ``adjacent to min`` () =
let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |]
test <@ r |> is [| [||]; mk 2L 1 |] @>
let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2; mkU 1L 1; mkU 2L 2 |]
test <@ r |> is [| [||]; mk 2L 1; [||]; mkU 2L 2 |] @>

let [<Fact>] ``adjacent to min merge`` () =
let r = merge 2L [| mk 0L 1; mk 1L 2 |]
test <@ r |> is [| mk 2L 1 |] @>
let r = merge 2L [| mk 0L 1; mk 1L 2; mkU 2L 2 |]
test <@ r |> is [| [| yield! mk 2L 1; yield! mkU 2L 2 |] |] @>

let [<Fact>] ``adjacent to min no overlap`` () =
let r = merge 2L [| mk 0L 1; mk 2L 1 |]
let r = merge 2L [| mk_ 0L 2 0 1; mk 2L 1 |]
test <@ r |> is [| mk 2L 1|] @>

let [<Fact>] ``adjacent trim`` () =
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2 |]
test <@ r |> is [| mk 1L 1; mk 2L 2 |] @>
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2; mkU 2L 2 |]
test <@ r |> is [| mk 1L 1; mk 2L 2; mkU 2L 2 |] @>

let [<Fact>] ``adjacent trim merge`` () =
let r = merge 1L [| mk 0L 2; mk 2L 2 |]
test <@ r |> is [| mk 1L 3 |] @>

let [<Fact>] ``adjacent trim append`` () =
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |]
test <@ r |> is [| mk 1L 1; mk 2L 2; mk 5L 1 |] @>
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mkU 1L 1; mk 2L 2; mk 5L 1 |]
test <@ r |> is [| mk 1L 1; mkU 1L 1; mk 2L 2; mk 5L 1 |] @>

let [<Fact>] ``adjacent trim append merge`` () =
let r = merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|]
test <@ r |> is [| mk 1L 3; mk 5L 1 |] @>

let [<Fact>] ``mixed adjacent trim append`` () =
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |]
test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2 |] @>
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 5L 1; mk 2L 2; mk_ 0L 2 0 2; mk_ 2L 2 0 2 |]
test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2; mk_ 1L 1 0 2; mk_ 2L 2 0 2 |] @>

let [<Fact>] ``mixed adjacent trim append merge`` () =
let r = merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|]
let r = merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2; mkU 4L 2 |]
test <@ r |> is [| mk 1L 3; mk 5L 1 |] @>

let [<Fact>] fail () =
Expand All @@ -133,7 +134,7 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in (counts : _[]) do
let events = normal % 10
let unfolds = unfolds % 10
pos <- if gapOrOverlap < 0uy then max 0L (pos+int64 gapOrOverlap) else pos + int64 gapOrOverlap
pos <- max 0L (pos+int64 gapOrOverlap)
yield mk_ pos events seg unfolds
pos <- pos + int64 events
seg <- seg + 1 |]
Expand All @@ -159,9 +160,9 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
test <@ unfolds |> Array.groupBy _.EventType |> Array.forall (fun (_et, xs) -> xs.Length = 1) @>
// Unfolds are always for the same Index (as preceding ones are invalidated by any newer event)
test <@ unfolds |> Array.forall (fun x -> x.Index = (Array.last all).Index) @>
// Version that Unfolds pertain to must always be > final event Index
// Version that Unfolds pertain to must always be >= final event Index
test <@ match events |> Array.tryLast, unfolds |> Array.tryLast with
| Some le, Some lu -> lu.Index > le.Index
| Some le, Some lu -> lu.Index >= le.Index
| _ -> true @>

// resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span
Expand Down

0 comments on commit 0d7f33c

Please sign in to comment.