Skip to content


Revert "Defer change to next PR"
Browse files Browse the repository at this point in the history
This reverts commit 207269f.
  • Loading branch information
bartelink committed Jul 16, 2024
1 parent 4254c9d commit d90d5fd
Showing 1 changed file with 51 additions and 30 deletions.
81 changes: 51 additions & 30 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace Propulsion.CosmosStore

open Equinox.CosmosStore.Core

open Propulsion.Internal
open Propulsion.Sinks

/// <summary>Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.</summary>
Expand All @@ -10,20 +10,19 @@ open Propulsion.Sinks
module EquinoxSystemTextJsonParser =

type System.Text.Json.JsonDocument with
member document.Cast<'T>() =
type Batch with
member _.MapData x =
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x
type System.Text.Json.JsonElement with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x)
member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory

type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>()
let timestamp (doc: System.Text.Json.JsonDocument) =
let unixEpoch = System.DateTime.UnixEpoch
let ts = let r = doc.RootElement in r.GetProperty("_ts")

/// Parses an Equinox.Cosmos Batch from a CosmosDB Item
/// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it
let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) =
let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
Expand All @@ -33,36 +32,58 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>())) else ValueNone
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>(), tryProp "u")) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch: Batch): Event seq =
batch.e |> Seq.mapi (fun offset x ->
let d = batch.MapData x.d
let m = batch.MapData x.m
/// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item
let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
let d = x.d.ToSinkEventBody()
let m = x.m.ToSinkEventBody()
let inline len s = if isNull s then 0 else String.length s
FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t,
FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t,
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId))

/// Attempts to parse a Document/Item from the Store
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
// an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc
match u |> (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
| ValueNone | ValueSome null | ValueSome [||] -> events
| ValueSome unfolds -> seq {
yield! events
for x in unfolds do
gen true batch.n x }
let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
match tryParseEquinoxBatchOrTip streamFilter jsonDocument with
| ValueNone -> ValueNone
| ValueSome struct (s, xs, u) -> ValueSome <| seq {
for x in enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs do
s, x }

/// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEvents streamFilter d: seq<StreamEvent> voption =
tryParseEquinoxBatch streamFilter d
|> (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> (fun x -> s, x))
let tryEnumStreamEvents streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ false streamFilter jsonDocument

/// Extracts all events that pass the streamFilter from a Feed item
let whereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty

/// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereStream streamFilter d: StreamEvent seq =
tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty
/// Extracts all events passing the supplied categoryFilter from a Feed Item
let whereCategory categoryFilter jsonDocument: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument

/// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereCategory categoryFilter d: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d
/// Extracts all events from the specified category list from a Feed Item
let ofCategories (categories: string[]) jsonDocument: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) jsonDocument

/// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ true streamFilter jsonDocument

/// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let ofCategories categories d: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) d
/// Extracts Events and Unfolds that pass the streamFilter from a Feed item
let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty
module EquinoxNewtonsoftParser =

Expand Down

0 comments on commit d90d5fd

Please sign in to comment.