Skip to content

Commit

Permalink
feat(Cosmos)!: whereStream (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 11, 2023
1 parent a583626 commit cff3f59
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,47 @@ module EquinoxSystemTextJsonParser =
type Batch with
member _.MapData x =
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x
let timestamp (doc : System.Text.Json.JsonDocument) =
let timestamp (doc: System.Text.Json.JsonDocument) =
let unixEpoch = System.DateTime.UnixEpoch
let ts = let r = doc.RootElement in r.GetProperty("_ts")
unixEpoch.AddSeconds(ts.GetDouble())

/// Sanity check to determine whether the Document represents an `Equinox.Cosmos` >= 1.0 based batch
let tryParseEquinoxBatch categoryFilter (d : System.Text.Json.JsonDocument) =
/// Parses an Equinox.Cosmos Batch from a CosmosDB Item
/// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it
let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id : string) : ValueOption<System.Text.Json.JsonElement> =
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
if r.TryGetProperty(id, &p) then ValueSome p else ValueNone
let hasProp (id : string) : bool = tryProp id |> ValueOption.isSome
let hasProp = tryProp >> ValueOption.isSome

match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let FsCodec.StreamName.Category cat as sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
if categoryFilter cat then ValueSome (struct (sn, d.Cast<Batch>())) else ValueNone
let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>())) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch : Batch) : Event seq =
let enumEquinoxCosmosEvents (batch: Batch): Event seq =
batch.e |> Seq.mapi (fun offset x -> FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, batch.MapData x.d, batch.MapData x.m, timestamp = x.t))

/// Attempts to parse a Document/Item from the Store
/// returns ValueNone if it does not bear the hallmarks of a valid Batch
let tryEnumStreamEvents categoryFilter d : seq<StreamEvent> voption =
tryParseEquinoxBatch categoryFilter d |> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x))
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEvents streamFilter d: seq<StreamEvent> voption =
tryParseEquinoxBatch streamFilter d
|> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x))

/// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereStream streamFilter d: StreamEvent seq =
tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty

/// Collects all events that pass the categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumStreamEvents categoryFilter d : StreamEvent seq =
tryEnumStreamEvents categoryFilter d |> ValueOption.defaultValue Seq.empty
/// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereCategory categoryFilter d: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d

/// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumCategoryEvents categories d : StreamEvent seq =
enumStreamEvents (fun c -> Array.contains c categories) d
let ofCategories categories d: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) d
#else
module EquinoxNewtonsoftParser =

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="CosmosStoreParser.fs" />
<Compile Include="EquinoxSystemTextJsonParser.fs" />
<Compile Include="ChangeFeedProcessor.fs" />
<Compile Include="CosmosStoreSource.fs" />
<Compile Include="CosmosStoreSink.fs" />
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion.CosmosStore\CosmosStoreParser.fs">
<Link>EquinoxCosmosParser.fs</Link>
<Compile Include="..\Propulsion.CosmosStore\EquinoxSystemTextJsonParser.fs">
<Link>EquinoxNewtonsoftParser.fs</Link>
</Compile>
<Compile Include="..\Propulsion.CosmosStore\ChangeFeedProcessor.fs">
<Link>ChangeFeedProcessor.fs</Link>
Expand Down
3 changes: 1 addition & 2 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,10 @@ module Project =
return Propulsion.Sinks.AllProcessed, () }
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentStreams, handle, stats, idleDelay = a.IdleDelay)
let source =
let nullFilter _ = true
match storeArgs with
| Choice1Of3 sa ->
let monitored, leases = sa.ConnectFeed() |> Async.RunSynchronously
let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents nullFilter
let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream (fun _sn -> true)
let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect parseFeedDoc)
Propulsion.CosmosStore.CosmosStoreSource.Start
( Log.Logger, monitored, leases, group, observer,
Expand Down

0 comments on commit cff3f59

Please sign in to comment.