Skip to content

Commit

Permalink
Wire up parse logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 13, 2024
1 parent 6ce4dc9 commit 6e0d179
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ module Internal =
let n = StreamSpan.nextIndex span
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
#if COSMOSV3
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds"
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds")
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _))
|> Async.executeAsTask ct
#else
Expand Down
45 changes: 28 additions & 17 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module EquinoxSystemTextJsonParser =

/// Parses an Equinox.Cosmos Batch from a CosmosDB Item
/// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it
let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) =
let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
Expand All @@ -35,8 +35,8 @@ module EquinoxSystemTextJsonParser =
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>(), tryProp "u")) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
/// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item
let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
let d = x.d.ToSinkEventBody()
let m = x.m.ToSinkEventBody()
Expand All @@ -45,30 +45,41 @@ module EquinoxSystemTextJsonParser =
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with // an Unfold won't have a corr/cause id, but that's OK
| ValueNone | ValueSome null | ValueSome [||] -> events
| ValueSome unfolds -> seq {
yield! events
for x in unfolds do
gen true batch.n x }
let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryParseEquinoxBatchOrTip streamFilter jsonDocument
|> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x))

/// Attempts to parse a Document/Item from the Store
/// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEvents streamFilter d: seq<StreamEvent> voption =
tryParseEquinoxBatch streamFilter d
|> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosEvents u xs |> Seq.map (fun x -> s, x))
let tryEnumStreamEvents streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ false streamFilter jsonDocument

/// Extracts all events that pass the streamFilter from a Feed item
let whereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty

/// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereStream streamFilter d: StreamEvent seq =
tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty
/// Extracts all events passing the supplied categoryFilter from a Feed Item
let whereCategory categoryFilter jsonDocument: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument

/// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereCategory categoryFilter d: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d
/// Extracts all events from the specified category list from a Feed Item
let ofCategories (categories: string[]) jsonDocument: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) jsonDocument

/// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ true streamFilter jsonDocument

/// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let ofCategories categories d: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) d
/// Extracts Events and Unfolds that pass the streamFilter from a Feed item
let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty
#else
module EquinoxNewtonsoftParser =

Expand Down

0 comments on commit 6e0d179

Please sign in to comment.