Skip to content

Commit

Permalink
Finalize merge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 12, 2024
1 parent cef23d8 commit 6ce4dc9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ module Internal =
log.Write(level, exn, "Writing {stream} failed, retrying", stream)

let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
let i = StreamSpan.idx span
let n = StreamSpan.ver span
let i = StreamSpan.index span
let n = StreamSpan.nextIndex span
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
#if COSMOSV3
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds"
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion/Sinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit>
module Events =

/// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully)
let nextIndex: Event[] -> int64 = Streams.StreamSpan.ver
let nextIndex: Event[] -> int64 = Streams.StreamSpan.nextIndex
/// The Index of the first event as supplied to this handler
let index: Event[] -> int64 = Streams.StreamSpan.idx
let index: Event[] -> int64 = Streams.StreamSpan.index

/// Represents progress attained during the processing of the supplied Events for a given <c>StreamName</c>.
/// This will be reflected in adjustments to the Write Position for the stream in question.
Expand Down
61 changes: 34 additions & 27 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -99,48 +99,55 @@ module StreamSpan =
// TODO all or none of unfolds
metrics eventSize trimmed, trimmed

let inline idx (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline ver (span: FsCodec.ITimelineEvent<'F>[]) = span[span.Length - 1].Index + 1L
let dropBeforeIndex min: FsCodec.ITimelineEvent<_>[] -> FsCodec.ITimelineEvent<_>[] = function
| xs when xs.Length = 0 -> null
| xs when idx xs >= min -> xs // don't adjust if min not within
| v when ver v <= min -> null // throw away if before min
| xs -> xs |> Array.skip (min - idx xs |> int) // slice
let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) =
let l = span[span.Length - 1]
if l.IsUnfold then l.Index else l.Index + 1L
let inline dropBeforeIndex min = function
| [||] as xs -> xs
| xs when nextIndex xs < min -> Array.empty
| xs ->
match index xs with
| xi when xi = min -> xs
| xi -> xs |> Array.skip (min - xi |> int)

let merge min (spans: FsCodec.ITimelineEvent<_>[][]) =
let candidates = [|
for span in spans do
if span <> null then
match dropBeforeIndex min span with
| null -> ()
| trimmed when trimmed.Length = 0 -> invalidOp "Cant add empty"
| trimmed -> trimmed |]
let candidates = [| for span in spans do
if span <> null then
match dropBeforeIndex min span with
| [||] -> ()
| xs -> xs |]
if candidates.Length = 0 then null
elif candidates.Length = 1 then candidates
else
candidates |> Array.sortInPlaceBy idx
candidates |> Array.sortInPlaceBy index

// no data buffered -> buffer first item
let mutable curr = candidates[0]
let mutable acc = candidates[0]
let mutable buffer = null
for i in 1 .. candidates.Length - 1 do
let x = candidates[i]
let index = idx x
let currNext = ver curr
if index > currNext then // Gap
match curr |> Array.filter (_.IsUnfold >> not) with
let xIndex = index x
let accNext = nextIndex acc
if xIndex > accNext then // Gap
match acc |> Array.filter (_.IsUnfold >> not) with
| [||] -> ()
| eventsOnly ->
if buffer = null then buffer <- ResizeArray(candidates.Length)
buffer.Add eventsOnly
curr <- x
acc <- x
// Overlapping, join
elif index + x.LongLength > currNext then
curr <- Array.append curr (dropBeforeIndex currNext x)
let v = ver curr - 1L
let last = curr |> Array.filter (fun x -> not x.IsUnfold || x.Index = v)
if buffer = null then Array.singleton last
else buffer.Add last; buffer.ToArray()
elif nextIndex x > accNext then
match dropBeforeIndex accNext x with
| [||] -> ()
| news ->
acc <- [| for x in acc do if not x.IsUnfold then x
yield! news |]
match acc with
| [||] when buffer = null -> null
| [||] -> buffer.ToArray()
| last when buffer = null -> Array.singleton last
| last -> buffer.Add last; buffer.ToArray()

/// A Single Event from an Ordered stream being supplied for ingestion into the internal data structures
type StreamEvent<'Format> = (struct (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>))
Expand Down
60 changes: 35 additions & 25 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module FsCodecEx =

override _.ToString() =
let t = if isUnfold then "Unfold" else "Event"
$"{t} {eventType} @{index}"
$"{t} {eventType} @{index} {context}"
interface ITimelineEvent<'Format> with
member _.Index = index
member _.IsUnfold = isUnfold
Expand All @@ -51,9 +51,8 @@ let mk p c = mk_ p c 0 0
let merge = StreamSpan.merge
let dropBeforeIndex = StreamSpan.dropBeforeIndex
let is (xs: FsCodec.ITimelineEvent<string>[][]) (res: FsCodec.ITimelineEvent<string>[][]) =
(xs = null && res = null)
|| (xs, res) ||> Seq.forall2 (fun x y -> (x = null && y = null)
|| (x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType)))
(xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y)
|| x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))

let [<Fact>] nothing () =
let r = merge 0L [| mk 0L 0; mk 0L 0 |]
Expand Down Expand Up @@ -81,7 +80,7 @@ let [<Fact>] adjacent () =

let [<Fact>] ``adjacent to min`` () =
let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |]
test <@ r |> is [| null; mk 2L 1 |] @>
test <@ r |> is [| [||]; mk 2L 1 |] @>

let [<Fact>] ``adjacent to min merge`` () =
let r = merge 2L [| mk 0L 1; mk 1L 2 |]
Expand Down Expand Up @@ -125,25 +124,28 @@ let [<Fact>] ``fail 2`` () =

let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 (fun x y -> obj.ReferenceEquals(x, y))

let [<FsCheck.Xunit.Property>] ``merges retain freshest unfolds, one per event type`` (counts: _[]) =
let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds, one per event type`` counts =
let input = [|
let mutable pos = 0L
let mutable seg = 0
for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in counts do
let normal = normal % 10
let unfolds = unfolds % 120 // |> ignore; 0
for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in (counts : _[]) do
let events = normal % 10
let unfolds = unfolds % 10
pos <- if gapOrOverlap < 0uy then max 0L (pos+int64 gapOrOverlap) else pos + int64 gapOrOverlap
yield mk_ pos normal seg unfolds
pos <- pos + int64 normal
seg <- seg + 1
|]
let spans = merge 0L input
// Empty spans are dropped
if spans = null then
yield mk_ pos events seg unfolds
pos <- pos + int64 events
seg <- seg + 1 |]
let res = merge 0L input
// The only way to end up with a null output is by sending either no spans, or all empties
if res = null then
test <@ input |> Array.forall Array.isEmpty @>
else

let all = spans |> Array.concat
// an Empty span sequence is replaced with null
test <@ res |> Array.isEmpty |> not @>
// A Span sequence does not have any empty spans
test <@ res |> Array.forall (not << Array.isEmpty) @>
let all = res |> Array.concat
let unfolds, events = all |> Array.partition _.IsUnfold
// Events are always in order
test <@ (events |> Seq.sortBy _.Index) === events @>
Expand All @@ -159,15 +161,23 @@ let [<FsCheck.Xunit.Property>] ``merges retain freshest unfolds, one per event t
test <@ match events |> Array.tryLast, unfolds |> Array.tryLast with
| Some le, Some lu -> lu.Index > le.Index
| _ -> true @>
// resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span
test <@ spans |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.ver x < StreamSpan.idx y) @>
match spans with
| [||] -> ()
| xs ->
let others = Array.take (xs.Length - 1) xs
// Only the last span can have unfolds
test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @>

// resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span
test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.nextIndex x < StreamSpan.index y) @>

let others = res |> Array.take (res.Length - 1)
// Only the last span can have unfolds
test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @>

match res |> Array.last |> Array.last with
| u when u.IsUnfold ->
// If there are unfolds, they can only be the newest ones
test <@ input |> Array.forall (not << Array.exists (fun x -> x.IsUnfold && x.Index > u.Index)) @>
// if two sets of unfolds with identical Index values were supplied, the freshest ones must win
let uc = unbox<int> u.Context
let newerUnfolds = Seq.concat input |> Seq.filter (fun x -> x.IsUnfold && x.Index = u.Index && unbox<int> x.Context > uc)
test <@ newerUnfolds === [||] || uc = -1 @>
| _ -> ()
// TODO verify that slice never orphans unfolds

#if MEMORY_USAGE_ANALYSIS
Expand Down

0 comments on commit 6ce4dc9

Please sign in to comment.