diff --git a/src/Propulsion.CosmosStore/CosmosStoreParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs similarity index 63% rename from src/Propulsion.CosmosStore/CosmosStoreParser.fs rename to src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs index 31696cad..11831c7d 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreParser.fs +++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs @@ -16,41 +16,47 @@ module EquinoxSystemTextJsonParser = type Batch with member _.MapData x = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x - let timestamp (doc : System.Text.Json.JsonDocument) = + let timestamp (doc: System.Text.Json.JsonDocument) = let unixEpoch = System.DateTime.UnixEpoch let ts = let r = doc.RootElement in r.GetProperty("_ts") unixEpoch.AddSeconds(ts.GetDouble()) - /// Sanity check to determine whether the Document represents an `Equinox.Cosmos` >= 1.0 based batch - let tryParseEquinoxBatch categoryFilter (d : System.Text.Json.JsonDocument) = + /// Parses an Equinox.Cosmos Batch from a CosmosDB Item + /// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it + let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) = let r = d.RootElement - let tryProp (id : string) : ValueOption = + let tryProp (id: string): ValueOption = let mutable p = Unchecked.defaultof<_> if r.TryGetProperty(id, &p) then ValueSome p else ValueNone - let hasProp (id : string) : bool = tryProp id |> ValueOption.isSome + let hasProp = tryProp >> ValueOption.isSome match tryProp "p" with | ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" -> - let FsCodec.StreamName.Category cat as sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw) - if categoryFilter cat then ValueSome (struct (sn, d.Cast())) else ValueNone + let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw) + if streamFilter sn then ValueSome (struct (sn, d.Cast())) else ValueNone | _ -> ValueNone /// Enumerates the events represented within a batch - let enumEquinoxCosmosEvents (batch : Batch) : Event seq = + let enumEquinoxCosmosEvents (batch: Batch): Event seq = batch.e |> Seq.mapi (fun offset x -> FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, batch.MapData x.d, batch.MapData x.m, timestamp = x.t)) /// Attempts to parse a Document/Item from the Store - /// returns ValueNone if it does not bear the hallmarks of a valid Batch - let tryEnumStreamEvents categoryFilter d : seq voption = - tryParseEquinoxBatch categoryFilter d |> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x)) + /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects + let tryEnumStreamEvents streamFilter d: seq voption = + tryParseEquinoxBatch streamFilter d + |> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x)) + + /// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch + let whereStream streamFilter d: StreamEvent seq = + tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty - /// Collects all events that pass the categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let enumStreamEvents categoryFilter d : StreamEvent seq = - tryEnumStreamEvents categoryFilter d |> ValueOption.defaultValue Seq.empty + /// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch + let whereCategory categoryFilter d: StreamEvent seq = + whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d /// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let enumCategoryEvents categories d : StreamEvent seq = - enumStreamEvents (fun c -> Array.contains c categories) d + let ofCategories categories d: StreamEvent seq = + whereCategory (fun c -> Array.contains c categories) d #else module EquinoxNewtonsoftParser = diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index 1748ddda..34a4922b 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -5,7 +5,7 @@ - + diff --git a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj index 28cb1bf7..02d6837e 100644 --- a/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj +++ b/src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj @@ -6,8 +6,8 @@ - - EquinoxCosmosParser.fs + + EquinoxNewtonsoftParser.fs ChangeFeedProcessor.fs diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index eca4f925..398589ba 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -346,11 +346,10 @@ module Project = return Propulsion.Sinks.AllProcessed, () } Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentStreams, handle, stats, idleDelay = a.IdleDelay) let source = - let nullFilter _ = true match storeArgs with | Choice1Of3 sa -> let monitored, leases = sa.ConnectFeed() |> Async.RunSynchronously - let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents nullFilter + let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream (fun _sn -> true) let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect parseFeedDoc) Propulsion.CosmosStore.CosmosStoreSource.Start ( Log.Logger, monitored, leases, group, observer,