Skip to content

Commit

Permalink
Prevent orphaning of unfolds
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 13, 2024
1 parent 6e0d179 commit 8770230
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ module EquinoxSystemTextJsonParser =
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with // an Unfold won't have a corr/cause id, but that's OK
// an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
| ValueNone | ValueSome null | ValueSome [||] -> events
| ValueSome unfolds -> seq {
yield! events
Expand Down
4 changes: 4 additions & 0 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ module ValueOption =
let inline toOption x = match x with ValueSome x -> Some x | ValueNone -> None
let inline map f x = match x with ValueSome x -> ValueSome (f x) | ValueNone -> ValueNone

module Obj =

let isSame = LanguagePrimitives.PhysicalEquality

module Seq =

let partition predicate xs =
Expand Down
12 changes: 8 additions & 4 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,20 @@ module StreamSpan =
let slice<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]): struct (Metrics * FsCodec.ITimelineEvent<'F>[]) =
let mutable count, bytes = 0, 0
let mutable countBudget, bytesBudget = maxEvents, maxBytes
let withinLimits y =
let withinLimits x =
countBudget <- countBudget - 1
let eventBytes = eventSize y
let eventBytes = eventSize x
bytesBudget <- bytesBudget - eventBytes
// always send at least one event in order to surface the problem and have the stream marked malformed
let res = count = 0 || (countBudget >= 0 && bytesBudget >= 0)
if res then count <- count + 1; bytes <- bytes + eventBytes
res
let trimmed = span |> Array.takeWhile withinLimits
// TODO all or none of unfolds
let trimmed =
let inline isEvent (x: FsCodec.ITimelineEvent<'F>) = not x.IsUnfold
if Obj.isSame (Array.last trimmed) (Array.last span) then trimmed
elif trimmed |> Array.exists isEvent then trimmed |> Array.filter isEvent // Remove the unfolds if there's > 0 events
else span // We don't have any events, but we never orphan unfolds, even if the limit would imply they should get split
metrics eventSize trimmed, trimmed

let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
Expand Down Expand Up @@ -164,7 +168,7 @@ module Buffer =
if malformed then { write = WritePosMalformed; queue = queue }
else StreamState<'Format>.Create(write, queue)
static member Create(write, queue) = { write = (match write with ValueSome x -> x | ValueNone -> WritePosUnknown); queue = queue }
member x.IsEmpty = obj.ReferenceEquals(null, x.queue)
member x.IsEmpty = Obj.isSame null x.queue
member x.EventsSumBy(f) = if x.IsEmpty then 0L else x.queue |> Seq.map (Seq.sumBy f) |> Seq.sum |> int64
member x.EventsCount = if x.IsEmpty then 0 else x.queue |> Seq.sumBy Array.length

Expand Down
5 changes: 3 additions & 2 deletions tests/Propulsion.Tests/SinkHealthTests.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Propulsion.Tests.SinkHealthTests

open FSharp.Control
open Propulsion.Internal
open Propulsion.Feed
open Swensen.Unquote
open System
Expand Down Expand Up @@ -67,7 +68,7 @@ type Scenario(testOutput) =
pe.StuckStreams.Length = 1
&& pe.FailingStreams.Length = 1
&& all |> Seq.exists (fun struct (_s, age, _c) -> age > abendThreshold) @>
test <@ obj.ReferenceEquals(me, pe) @>
test <@ obj.ReferenceEquals(me, sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> }
test <@ Obj.isSame me pe @>
test <@ Obj.isSame me (sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> }

interface IDisposable with member _.Dispose() = dispose ()
11 changes: 6 additions & 5 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Propulsion.Tests.StreamStateTests

open Propulsion.Internal
open Propulsion.Streams
open Swensen.Unquote
open Xunit
Expand Down Expand Up @@ -56,11 +57,11 @@ let is (xs: FsCodec.ITimelineEvent<string>[][]) (res: FsCodec.ITimelineEvent<str

let [<Fact>] nothing () =
let r = merge 0L [| mk 0L 0; mk 0L 0 |]
test <@ obj.ReferenceEquals(null, r) @>
test <@ Obj.isSame null r @>

let [<Fact>] synced () =
let r = merge 1L [| mk 0L 1; mk 0L 0 |]
test <@ obj.ReferenceEquals(null, r) @>
test <@ Obj.isSame null r @>

let [<Fact>] ``no overlap`` () =
let r = merge 0L [| mk 0L 1; mk 2L 2 |]
Expand Down Expand Up @@ -122,7 +123,7 @@ let [<Fact>] ``fail 2`` () =
let r = merge 11613L [| mk 11614L 1; null |]
test <@ r |> is [| mk 11614L 1 |] @>

let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 (fun x y -> obj.ReferenceEquals(x, y))
let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 Obj.isSame

let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds, one per event type`` counts =
let input = [|
Expand All @@ -142,9 +143,9 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
else

// an Empty span sequence is replaced with null
test <@ res |> Array.isEmpty |> not @>
test <@ res |> Array.any @>
// A Span sequence does not have any empty spans
test <@ res |> Array.forall (not << Array.isEmpty) @>
test <@ res |> Array.forall Array.any @>
let all = res |> Array.concat
let unfolds, events = all |> Array.partition _.IsUnfold
// Events are always in order
Expand Down

0 comments on commit 8770230

Please sign in to comment.