diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index d99c1a48..60ea8f60 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -69,7 +69,7 @@ module Internal = let n = StreamSpan.nextIndex span log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) #if COSMOSV3 - span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds" + span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds") let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _)) |> Async.executeAsTask ct #else diff --git a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs index e214cf8e..8e03a294 100644 --- a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs +++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs @@ -22,7 +22,7 @@ module EquinoxSystemTextJsonParser = /// Parses an Equinox.Cosmos Batch from a CosmosDB Item /// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it - let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) = + let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) = let r = d.RootElement let tryProp (id: string): ValueOption = let mutable p = Unchecked.defaultof<_> @@ -35,8 +35,8 @@ module EquinoxSystemTextJsonParser = if streamFilter sn then ValueSome (struct (sn, d.Cast(), tryProp "u")) else ValueNone | _ -> ValueNone - /// Enumerates the events represented within a batch - let enumEquinoxCosmosEvents (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq = + /// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item + let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq = let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) = let d = x.d.ToSinkEventBody() let m = x.m.ToSinkEventBody() @@ -45,30 +45,41 @@ module EquinoxSystemTextJsonParser = size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80, correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold) let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset)) - match u |> ValueOption.map (fun u -> u.Cast()) with + match u |> ValueOption.map (fun u -> u.Cast()) with // an Unfold won't have a corr/cause id, but that's OK | ValueNone | ValueSome null | ValueSome [||] -> events | ValueSome unfolds -> seq { yield! events for x in unfolds do gen true batch.n x } + let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq voption = + tryParseEquinoxBatchOrTip streamFilter jsonDocument + |> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x)) - /// Attempts to parse a Document/Item from the Store + /// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects - let tryEnumStreamEvents streamFilter d: seq voption = - tryParseEquinoxBatch streamFilter d - |> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosEvents u xs |> Seq.map (fun x -> s, x)) + let tryEnumStreamEvents streamFilter jsonDocument: seq voption = + tryEnumStreamEvents_ false streamFilter jsonDocument + + /// Extracts all events that pass the streamFilter from a Feed item + let whereStream streamFilter jsonDocument: StreamEvent seq = + tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty - /// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let whereStream streamFilter d: StreamEvent seq = - tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty + /// Extracts all events passing the supplied categoryFilter from a Feed Item + let whereCategory categoryFilter jsonDocument: StreamEvent seq = + whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument - /// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let whereCategory categoryFilter d: StreamEvent seq = - whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d + /// Extracts all events from the specified category list from a Feed Item + let ofCategories (categories: string[]) jsonDocument: StreamEvent seq = + whereCategory (fun c -> Array.contains c categories) jsonDocument + + /// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument + /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects + let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq voption = + tryEnumStreamEvents_ true streamFilter jsonDocument - /// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let ofCategories categories d: StreamEvent seq = - whereCategory (fun c -> Array.contains c categories) d + /// Extracts Events and Unfolds that pass the streamFilter from a Feed item + let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq = + tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty #else module EquinoxNewtonsoftParser =