Skip to content

Commit

Permalink
Simplify split algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 15, 2024
1 parent bb557a5 commit c7d5041
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 30 deletions.
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ module Internal =
let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024)
let writerResultLog = log.ForContext<Writer.Result>()
let attemptWrite stream span ct = task {
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
#if COSMOSV3
try let! res = Writer.write log eventsContext (StreamName.toString stream) span' ct
try let! res = Writer.write log eventsContext (StreamName.toString stream) span ct
#else
try let! res = Writer.write log eventsContext stream span' ct
try let! res = Writer.write log eventsContext stream span ct
#endif
return Ok struct (res, met)
with e -> return Error struct (e, met) }
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ module Internal =
let index = System.Threading.Interlocked.Increment(&robin) % connections.Length
let selectedConnection = connections[index]
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct
let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span ct
return Ok struct (res, met)
with e -> return Error struct (e, met) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
Expand Down
4 changes: 0 additions & 4 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ module ValueOption =
let inline toOption x = match x with ValueSome x -> Some x | ValueNone -> None
let inline map f x = match x with ValueSome x -> ValueSome (f x) | ValueNone -> ValueNone

module Obj =

let isSame = LanguagePrimitives.PhysicalEquality

module Seq =

let partition predicate xs =
Expand Down
30 changes: 17 additions & 13 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,28 @@ module StreamSpan =
type Metrics = (struct (int * int))
let metrics eventSize (xs: FsCodec.ITimelineEvent<'F>[]): Metrics =
struct (xs.Length, xs |> Seq.sumBy eventSize)
let slice<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]): struct (Metrics * FsCodec.ITimelineEvent<'F>[]) =
let private trimEvents<'F> eventSize (maxEvents, maxBytes) (span: FsCodec.ITimelineEvent<'F>[]) =
let mutable count, bytes = 0, 0
let mutable countBudget, bytesBudget = maxEvents, maxBytes
let withinLimits x =
let withinLimits (x: FsCodec.ITimelineEvent<_>) =
countBudget <- countBudget - 1
let eventBytes = eventSize x
bytesBudget <- bytesBudget - eventBytes
// always send at least one event in order to surface the problem and have the stream marked malformed
let res = count = 0 || (countBudget >= 0 && bytesBudget >= 0)
if res then count <- count + 1; bytes <- bytes + eventBytes
res
let trimmed = span |> Array.takeWhile withinLimits
let fitsAndNotAnUnfold = (countBudget >= 0 && bytesBudget >= 0) && not x.IsUnfold
if fitsAndNotAnUnfold then count <- count + 1; bytes <- bytes + eventBytes
fitsAndNotAnUnfold
let trimmedEvents = span |> Array.takeWhile withinLimits
// takeWhile terminated either because it hit the first Unfold, or the size limit
// In either case, if the next event is an Unfold, we know it (and any successors) must be associated with that final event
if span |> Array.tryItem trimmedEvents.Length |> Option.exists _.IsUnfold then span
else trimmedEvents
let slice<'F> eventSize limits (span: FsCodec.ITimelineEvent<'F>[]): struct (FsCodec.ITimelineEvent<'F>[] * Metrics) =
let trimmed =
let inline isEvent (x: FsCodec.ITimelineEvent<'F>) = not x.IsUnfold
if Obj.isSame (Array.last trimmed) (Array.last span) then trimmed
elif trimmed |> Array.exists isEvent then trimmed |> Array.filter isEvent // Remove the unfolds if there's > 0 events
else span // We don't have any events, but we never orphan unfolds, even if the limit would imply they should get split
metrics eventSize trimmed, trimmed
// we must always send one event, even if it exceeds the limit (if the handler throws, the the Stats can categorize the problem to surface it)
if span[0].IsUnfold || span.Length = 1 || span[1].IsUnfold then span
// If we have 2 or more (non-Unfold) events, then we limit the batch size
else trimEvents<'F> eventSize limits span
trimmed, metrics eventSize trimmed

let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) =
Expand Down Expand Up @@ -168,7 +172,7 @@ module Buffer =
if malformed then { write = WritePosMalformed; queue = queue }
else StreamState<'Format>.Create(write, queue)
static member Create(write, queue) = { write = (match write with ValueSome x -> x | ValueNone -> WritePosUnknown); queue = queue }
member x.IsEmpty = Obj.isSame null x.queue
member x.IsEmpty = LanguagePrimitives.PhysicalEquality null x.queue
member x.EventsSumBy(f) = if x.IsEmpty then 0L else x.queue |> Seq.map (Seq.sumBy f) |> Seq.sum |> int64
member x.EventsCount = if x.IsEmpty then 0 else x.queue |> Seq.sumBy Array.length

Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type Factory private () =
let maxEvents, maxBytes = defaultArg maxEvents 16384, (defaultArg maxBytes (1024 * 1024 - (*fudge*)4096))

let attemptWrite stream (events: FsCodec.ITimelineEvent<'F>[]) ct = task {
let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events
let struct (trimmed, met) = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events
let prepareTs = Stopwatch.timestamp ()
try let! outcome, index' = handle.Invoke(stream, span', ct)
try let! outcome, index' = handle.Invoke(stream, trimmed, ct)
return Ok struct (outcome, index', met, Stopwatch.elapsed prepareTs)
with e -> return Error struct (e, met) }

Expand Down
5 changes: 2 additions & 3 deletions tests/Propulsion.Tests/SinkHealthTests.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module Propulsion.Tests.SinkHealthTests

open FSharp.Control
open Propulsion.Internal
open Propulsion.Feed
open Swensen.Unquote
open System
Expand Down Expand Up @@ -68,7 +67,7 @@ type Scenario(testOutput) =
pe.StuckStreams.Length = 1
&& pe.FailingStreams.Length = 1
&& all |> Seq.exists (fun struct (_s, age, _c) -> age > abendThreshold) @>
test <@ Obj.isSame me pe @>
test <@ Obj.isSame me (sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> }
test <@ LanguagePrimitives.PhysicalEquality me pe @>
test <@ LanguagePrimitives.PhysicalEquality me (sink.Await() |> Async.Catch |> Async.RunSynchronously |> extractHealthCheckExn) @> }

interface IDisposable with member _.Dispose() = dispose ()
7 changes: 4 additions & 3 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ let mk_ p c seg uc: FsCodec.ITimelineEvent<string>[] =
for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |]
let mk p c = mk_ p c 0 0
let merge = StreamSpan.merge
let isSame = LanguagePrimitives.PhysicalEquality
let dropBeforeIndex = StreamSpan.dropBeforeIndex
let is (xs: FsCodec.ITimelineEvent<string>[][]) (res: FsCodec.ITimelineEvent<string>[][]) =
(xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y)
|| x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))

let [<Fact>] nothing () =
let r = merge 0L [| mk 0L 0; mk 0L 0 |]
test <@ Obj.isSame null r @>
test <@ isSame null r @>

let [<Fact>] synced () =
let r = merge 1L [| mk 0L 1; mk 0L 0 |]
test <@ Obj.isSame null r @>
test <@ isSame null r @>

let [<Fact>] ``no overlap`` () =
let r = merge 0L [| mk 0L 1; mk 2L 2 |]
Expand Down Expand Up @@ -123,7 +124,7 @@ let [<Fact>] ``fail 2`` () =
let r = merge 11613L [| mk 11614L 1; null |]
test <@ r |> is [| mk 11614L 1 |] @>

let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 Obj.isSame
let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame

let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds, one per event type`` counts =
let input = [|
Expand Down

0 comments on commit c7d5041

Please sign in to comment.