Skip to content

Commit

Permalink
feat(Cosmos Reader+Writer, Scheduler): propagate unfolds
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 12, 2024
1 parent a54891d commit 9bc6130
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 30 deletions.
27 changes: 18 additions & 9 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ module private Impl =

// v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory<byte> rather than assuming and/or offering optimization for JSON bodies
open System.Text.Json
let private toNativeEventBody (x: EventBody): JsonElement =
let toNativeEventBody (x: EventBody): JsonElement =
if x.IsEmpty then JsonElement()
else JsonSerializer.Deserialize(x.Span)
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody
else JsonSerializer.Deserialize<JsonElement>(x.Span)
#endif

module Internal =
Expand Down Expand Up @@ -66,21 +65,31 @@ module Internal =
log.Write(level, exn, "Writing {stream} failed, retrying", stream)

let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
let i = StreamSpan.idx span
let n = StreamSpan.ver span
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
#if COSMOSV3
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _))
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds"
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _))
|> Async.executeAsTask ct
#else
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct)
let unfolds, span = span |> Array.partition _.IsUnfold
let mkUnfold baseIndex (compressor, x: IEventData<'t>): Unfold =
{ i = baseIndex; t = x.Timestamp
c = x.EventType; d = compressor x.Data; m = compressor x.Meta }
let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody
let unfolds = unfolds |> Array.map (fun x -> (*Equinox.CosmosStore.Core.Store.Sync.*)mkUnfold i (StreamSpan.toNativeEventBody, x))
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, unfolds, ct)
#endif
let res' =
match res with
| AppendResult.Ok pos -> Result.Ok pos.index
| AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos ->
// TODO can't drop unfolds
match pos.index with
| actual when actual < span[0].Index -> Result.PrefixMissing (span, actual)
| actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual
| actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int))
| actual when actual < i -> Result.PrefixMissing (span, actual) // TODO
| actual when actual >= n -> Result.Duplicate actual
| actual -> Result.PartialDuplicate (span |> Array.skip (actual - i |> int)) // TODO
log.Debug("Result: {res}", res')
return res' }
let containsMalformedMessage e =
Expand Down
36 changes: 21 additions & 15 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace Propulsion.CosmosStore

open Equinox.CosmosStore.Core

open Propulsion.Internal
open Propulsion.Sinks

/// <summary>Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.</summary>
Expand All @@ -10,12 +10,11 @@ open Propulsion.Sinks
#if !COSMOSV3
module EquinoxSystemTextJsonParser =

type System.Text.Json.JsonDocument with
member document.Cast<'T>() =
System.Text.Json.JsonSerializer.Deserialize<'T>(document.RootElement)
type Batch with
member _.MapData x =
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x
type System.Text.Json.JsonElement with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x)
member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory

type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>()
let timestamp (doc: System.Text.Json.JsonDocument) =
let unixEpoch = System.DateTime.UnixEpoch
let ts = let r = doc.RootElement in r.GetProperty("_ts")
Expand All @@ -33,24 +32,31 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>())) else ValueNone
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>(), tryProp "u")) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch: Batch): Event seq =
batch.e |> Seq.mapi (fun offset x ->
let d = batch.MapData x.d
let m = batch.MapData x.m
let enumEquinoxCosmosEvents (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
let d = x.d.ToSinkEventBody()
let m = x.m.ToSinkEventBody()
let inline len s = if isNull s then 0 else String.length s
FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t,
FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t,
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId))
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
| ValueNone | ValueSome null | ValueSome [||] -> events
| ValueSome unfolds -> seq {
yield! events
for x in unfolds do
gen true batch.n x }

/// Attempts to parse a Document/Item from the Store
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEvents streamFilter d: seq<StreamEvent> voption =
tryParseEquinoxBatch streamFilter d
|> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x))
|> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosEvents u xs |> Seq.map (fun x -> s, x))

/// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereStream streamFilter d: StreamEvent seq =
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0" />
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Feed/PeriodicSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module private TimelineEvent =
fun (i, x: FsCodec.IEventData<_>, context: obj) ->
if i > DateTimeOffsetPosition.factor then invalidArg (nameof i) $"Index may not exceed %d{DateTimeOffsetPosition.factor}"
FsCodec.Core.TimelineEvent.Create(
baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = true, context = context)
baseIndex + i, x.EventType, x.Data, x.Meta, x.EventId, x.CorrelationId, x.CausationId, x.Timestamp, isUnfold = false, context = context)

[<Struct; NoComparison; NoEquality>]
type SourceItem<'F> = { streamName: FsCodec.StreamName; eventData: FsCodec.IEventData<'F>; context: obj }
Expand Down
3 changes: 2 additions & 1 deletion src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ module StreamSpan =
if res then count <- count + 1; bytes <- bytes + eventBytes
res
let trimmed = span |> Array.takeWhile withinLimits
// TODO all or none of unfolds
metrics eventSize trimmed, trimmed

let inline idx (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline ver (span: FsCodec.ITimelineEvent<'F>[]) = idx span + span.LongLength
let inline ver (span: FsCodec.ITimelineEvent<'F>[]) = span[span.Length - 1].Index + 1L
let dropBeforeIndex min: FsCodec.ITimelineEvent<_>[] -> FsCodec.ITimelineEvent<_>[] = function
| xs when xs.Length = 0 -> null
| xs when idx xs >= min -> xs // don't adjust if min not within
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Tests/Propulsion.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
<Compile Include="ProgressTests.fs" />
<Compile Include="StreamStateTests.fs" />
<Compile Include="FsKafkaCodec.fs" />
<Compile Include="SpanQueueTests.fs" />
<Compile Include="ParallelThrottledValidation.fs" />
<Compile Include="TestOutputLogger.fs" />
<Compile Include="SourceTests.fs" />
<Compile Include="SinkHealthTests.fs" />
<Compile Include="SpanQueueTests.fs" />
<Compile Include="AppendsIndexTests.fs" />
<Compile Include="AppendsEpochTests.fs" />
</ItemGroup>
Expand Down
91 changes: 89 additions & 2 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,50 @@ open Propulsion.Streams
open Swensen.Unquote
open Xunit

module FsCodecEx =
open FsCodec
open System
/// <summary>An Event or Unfold that's been read from a Store and hence has a defined <c>Index</c> on the Event Timeline.</summary>
[<NoComparison; NoEquality>]
type TimelineEvent2<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) =

static member Create(index, eventType, data, ?meta, ?eventId, ?correlationId, ?causationId, ?timestamp, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> =
let isUnfold = defaultArg isUnfold false
let meta = match meta with Some x -> x | None -> Unchecked.defaultof<_>
let eventId = match eventId with Some x -> x | None -> Guid.Empty
let ts = match timestamp with Some ts -> ts | None -> DateTimeOffset.UtcNow
let size = defaultArg size 0
TimelineEvent2(index, eventType, data, meta, eventId, Option.toObj correlationId, Option.toObj causationId, ts, isUnfold, Option.toObj context, size) :> _

static member Create(index, inner: IEventData<'Format>, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> =
let isUnfold = defaultArg isUnfold false
let size = defaultArg size 0
TimelineEvent2(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _

override _.ToString() =
let t = if isUnfold then "Unfold" else "Event"
$"{t} {eventType} @{index}"
interface ITimelineEvent<'Format> with
member _.Index = index
member _.IsUnfold = isUnfold
member _.Context = context
member _.Size = size
member _.EventType = eventType
member _.Data = data
member _.Meta = meta
member _.EventId = eventId
member _.CorrelationId = correlationId
member _.CausationId = causationId
member _.Timestamp = timestamp
open FsCodecEx

let canonicalTime = System.DateTimeOffset.UtcNow

let mk p c: FsCodec.ITimelineEvent<string>[] =
[| for x in 0..c-1 -> FsCodec.Core.TimelineEvent.Create(p + int64 x, p + int64 x |> string, null, timestamp = canonicalTime) |]
let mk_ p c seg uc: FsCodec.ITimelineEvent<string>[] =
let mk id et isUnfold = TimelineEvent2.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg)
[| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false
for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |]
let mk p c = mk_ p c 0 0
let merge = StreamSpan.merge
let dropBeforeIndex = StreamSpan.dropBeforeIndex
let is (xs: FsCodec.ITimelineEvent<string>[][]) (res: FsCodec.ITimelineEvent<string>[][]) =
Expand Down Expand Up @@ -83,6 +123,53 @@ let [<Fact>] ``fail 2`` () =
let r = merge 11613L [| mk 11614L 1; null |]
test <@ r |> is [| mk 11614L 1 |] @>

let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 (fun x y -> obj.ReferenceEquals(x, y))

let [<FsCheck.Xunit.Property>] ``merges retain freshest unfolds, one per event type`` (counts: _[]) =
let input = [|
let mutable pos = 0L
let mutable seg = 0
for gapOrOverlap, FsCheck.NonNegativeInt normal, FsCheck.NonNegativeInt unfolds in counts do
let normal = normal % 10
let unfolds = unfolds % 120 // |> ignore; 0
pos <- if gapOrOverlap < 0uy then max 0L (pos+int64 gapOrOverlap) else pos + int64 gapOrOverlap
yield mk_ pos normal seg unfolds
pos <- pos + int64 normal
seg <- seg + 1
|]
let spans = merge 0L input
// Empty spans are dropped
if spans = null then
test <@ input |> Array.forall Array.isEmpty @>
else

let all = spans |> Array.concat
let unfolds, events = all |> Array.partition _.IsUnfold
// Events are always in order
test <@ (events |> Seq.sortBy _.Index) === events @>
// Unfolds are always in order
test <@ unfolds |> Seq.sortBy _.Index === unfolds @>
// Unfolds are always after events
test <@ all |> Seq.sortBy _.IsUnfold === all @>
// One unfold per type
test <@ unfolds |> Array.groupBy _.EventType |> Array.forall (fun (_et, xs) -> xs.Length = 1) @>
// Unfolds are always for the same Index (as preceding ones are invalidated by any newer event)
test <@ unfolds |> Array.forall (fun x -> x.Index = (Array.last all).Index) @>
// Version that Unfolds pertain to must always be > final event Index
test <@ match events |> Array.tryLast, unfolds |> Array.tryLast with
| Some le, Some lu -> lu.Index > le.Index
| _ -> true @>
// resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span
test <@ spans |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.ver x < StreamSpan.idx y) @>
match spans with
| [||] -> ()
| xs ->
let others = Array.take (xs.Length - 1) xs
// Only the last span can have unfolds
test <@ others |> Array.forall (Array.forall (fun x -> not x.IsUnfold)) @>

// TODO verify that slice never orphans unfolds

#if MEMORY_USAGE_ANALYSIS
// https://bartoszsypytkowski.com/writing-high-performance-f-code
// https://github.com/SergeyTeplyakov/ObjectLayoutInspector
Expand Down

0 comments on commit 9bc6130

Please sign in to comment.