diff --git a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
index 8a9133d3..bf2ee065 100644
--- a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
+++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
@@ -1,7 +1,7 @@
namespace Propulsion.CosmosStore
open Equinox.CosmosStore.Core
-
+open Propulsion.Internal
open Propulsion.Sinks
/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.
@@ -10,12 +10,11 @@ open Propulsion.Sinks
#if !COSMOSV3
module EquinoxSystemTextJsonParser =
- type System.Text.Json.JsonDocument with
- member document.Cast<'T>() =
- System.Text.Json.JsonSerializer.Deserialize<'T>(document.RootElement)
- type Batch with
- member _.MapData x =
- System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x
+ type System.Text.Json.JsonElement with
+ member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x)
+ member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory
+
+ type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>()
let timestamp (doc: System.Text.Json.JsonDocument) =
let unixEpoch = System.DateTime.UnixEpoch
let ts = let r = doc.RootElement in r.GetProperty("_ts")
@@ -23,7 +22,7 @@ module EquinoxSystemTextJsonParser =
/// Parses an Equinox.Cosmos Batch from a CosmosDB Item
/// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it
- let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) =
+ let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id: string): ValueOption =
let mutable p = Unchecked.defaultof<_>
@@ -33,36 +32,58 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
- if streamFilter sn then ValueSome (struct (sn, d.Cast())) else ValueNone
+ if streamFilter sn then ValueSome (struct (sn, d.Cast(), tryProp "u")) else ValueNone
| _ -> ValueNone
- /// Enumerates the events represented within a batch
- let enumEquinoxCosmosEvents (batch: Batch): Event seq =
- batch.e |> Seq.mapi (fun offset x ->
- let d = batch.MapData x.d
- let m = batch.MapData x.m
+ /// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item
+ let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
+ let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
+ let d = x.d.ToSinkEventBody()
+ let m = x.m.ToSinkEventBody()
let inline len s = if isNull s then 0 else String.length s
- FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t,
+ FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t,
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
- correlationId = x.correlationId, causationId = x.causationId))
-
- /// Attempts to parse a Document/Item from the Store
+ correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
+ let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
+ // an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc
+ match u |> ValueOption.map (fun u -> u.Cast()) with
+ | ValueNone | ValueSome null | ValueSome [||] -> events
+ | ValueSome unfolds -> seq {
+ yield! events
+ for x in unfolds do
+ gen true batch.n x }
+ let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq voption =
+ match tryParseEquinoxBatchOrTip streamFilter jsonDocument with
+ | ValueNone -> ValueNone
+ | ValueSome struct (s, xs, u) -> ValueSome <| seq {
+ for x in enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs do
+ s, x }
+
+ /// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
- let tryEnumStreamEvents streamFilter d: seq voption =
- tryParseEquinoxBatch streamFilter d
- |> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x))
+ let tryEnumStreamEvents streamFilter jsonDocument: seq voption =
+ tryEnumStreamEvents_ false streamFilter jsonDocument
+
+ /// Extracts all events that pass the streamFilter from a Feed item
+ let whereStream streamFilter jsonDocument: StreamEvent seq =
+ tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty
- /// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
- let whereStream streamFilter d: StreamEvent seq =
- tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty
+ /// Extracts all events passing the supplied categoryFilter from a Feed Item
+ let whereCategory categoryFilter jsonDocument: StreamEvent seq =
+ whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument
- /// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
- let whereCategory categoryFilter d: StreamEvent seq =
- whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d
+ /// Extracts all events from the specified category list from a Feed Item
+ let ofCategories (categories: string[]) jsonDocument: StreamEvent seq =
+ whereCategory (fun c -> Array.contains c categories) jsonDocument
+
+ /// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
+ /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
+ let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq voption =
+ tryEnumStreamEvents_ true streamFilter jsonDocument
- /// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
- let ofCategories categories d: StreamEvent seq =
- whereCategory (fun c -> Array.contains c categories) d
+ /// Extracts Events and Unfolds that pass the streamFilter from a Feed item
+ let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq =
+ tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty
#else
module EquinoxNewtonsoftParser =