Skip to content

Commit

Permalink
feat(Cosmos, propulsion sync): propagate unfolds (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 1, 2024
1 parent 6f6c2d0 commit 58a37fd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 43 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
- `Propulsion.Tool`: `sync <kafka|stats>` supports `from json` source option [#250](https://github.com/jet/propulsion/pull/250)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)
- `Propulsion.Tool`: `sync cosmos from <cosmos|json>` [#252](https://github.com/jet/propulsion/pull/252)
- `Propulsion.Tool`: `sync cosmos from <cosmos|json>` [#252](https://github.com/jet/propulsion/pull/252) [#263](https://github.com/jet/propulsion/pull/263)

### Changed

Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ module Internal =
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData)
|> Async.executeAsTask ct
#else
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore does not [yet] support ingesting unfolds")
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct)
let unfolds, events = span |> Array.partition _.IsUnfold
log.Debug("Writing {s}@{i}x{n}+{u}", stream, i, events.Length, unfolds.Length)
let! res = ctx.Sync(stream, { index = i; etag = None }, events |> Array.map mapData, unfolds |> Array.map mapData, ct)
#endif
let res' =
match res with
Expand Down
78 changes: 48 additions & 30 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace Propulsion.CosmosStore

open Equinox.CosmosStore.Core

open Propulsion.Internal
open Propulsion.Sinks

/// <summary>Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.</summary>
Expand All @@ -10,20 +10,19 @@ open Propulsion.Sinks
#if !COSMOSV3
module EquinoxSystemTextJsonParser =

type System.Text.Json.JsonDocument with
member document.Cast<'T>() =
System.Text.Json.JsonSerializer.Deserialize<'T>(document.RootElement)
type Batch with
member _.MapData x =
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x
type System.Text.Json.JsonElement with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x)
member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory

type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>()
let timestamp (doc: System.Text.Json.JsonDocument) =
let unixEpoch = System.DateTime.UnixEpoch
let ts = let r = doc.RootElement in r.GetProperty("_ts")
unixEpoch.AddSeconds(ts.GetDouble())

/// Parses an Equinox.Cosmos Batch from a CosmosDB Item
/// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it
let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) =
let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
Expand All @@ -33,36 +32,55 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>())) else ValueNone
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>(), tryProp "u")) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch: Batch): Event seq =
batch.e |> Seq.mapi (fun offset x ->
let d = batch.MapData x.d
let m = batch.MapData x.m
/// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item
let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
let d = x.d.ToSinkEventBody()
let m = x.m.ToSinkEventBody()
let inline len s = if isNull s then 0 else String.length s
FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t,
FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t,
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId))

/// Attempts to parse a Document/Item from the Store
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
// an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
| ValueNone | ValueSome null | ValueSome [||] -> events
| ValueSome unfolds -> seq {
yield! events
for x in unfolds do
gen true batch.n x }
let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryParseEquinoxBatchOrTip streamFilter jsonDocument
|> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x))

/// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEvents streamFilter d: seq<StreamEvent> voption =
tryParseEquinoxBatch streamFilter d
|> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x))
let tryEnumStreamEvents streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ false streamFilter jsonDocument

/// Extracts all events that pass the streamFilter from a Feed item
let whereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty

/// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereStream streamFilter d: StreamEvent seq =
tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty
/// Extracts all events passing the supplied categoryFilter from a Feed Item
let whereCategory categoryFilter jsonDocument: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument

/// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereCategory categoryFilter d: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d
/// Extracts all events from the specified category list from a Feed Item
let ofCategories (categories: string[]) jsonDocument: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) jsonDocument

/// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ true streamFilter jsonDocument

/// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let ofCategories categories d: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) d
/// Extracts Events and Unfolds that pass the streamFilter from a Feed item
let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty
#else
module EquinoxNewtonsoftParser =

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0" />
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.15" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Propulsion.Tool.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.38.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.42.0" />
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Argu" Version="6.2.2" />
Expand Down
22 changes: 14 additions & 8 deletions tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-F"; Unique>] Follow
| [<AltCommandLine "-A"; Unique>] RequireAll
| [<AltCommandLine "-E"; Unique>] EventsOnly
| [<AltCommandLine "-C"; Unique>] Categorize
| [<AltCommandLine "-b"; Unique>] MaxItems of int

| [<AltCommandLine "-I"; AltCommandLine "--include-system"; Unique>] IncSys
| [<AltCommandLine "-N"; AltCommandLine "--exclude-system"; Unique>] ExcSys
| [<AltCommandLine "-cat"; AltCommandLine "--include-category">] IncCat of regex: string
| [<AltCommandLine "-ncat"; AltCommandLine "--exclude-category">] ExcCat of regex: string
| [<AltCommandLine "-sn"; AltCommandLine "--include-streamname">] IncStream of regex: string
Expand All @@ -37,10 +38,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
"NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " +
"NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " +
"Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream."
| EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed"
| Categorize -> "Gather handler latency stats by category"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Sync Default (Sync): 9999. Default: Unlimited"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited"

| IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix."
| ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix."
| IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules."
| ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)."
| IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests."
Expand All @@ -56,9 +58,10 @@ and Arguments(c, p: ParseResults<Parameters>) =
member val Filters = Propulsion.StreamFilter(
allowCats = p.GetResults IncCat, denyCats = p.GetResults ExcCat,
allowSns = p.GetResults IncStream, denySns = p.GetResults ExcStream,
includeSystem = p.Contains IncSys,
includeSystem = not (p.Contains ExcSys),
allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent)
member val Categorize = p.Contains Categorize
member val IncludeUnfolds = not (p.Contains EventsOnly)
member val Command =
match p.GetSubCommand() with
| Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka
Expand Down Expand Up @@ -233,7 +236,10 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
Some p
| SubCommand.Stats _ | SubCommand.Sync _ -> None
let isFileSource = match a.Command.Source with Json _ -> true | _ -> false
let parse = a.Filters.CreateStreamFilter() |> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
let parse =
a.Filters.CreateStreamFilter()
|> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
let statsInterval, stateInterval = a.StatsInterval, a.StateInterval
let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2)
let maxConcurrentProcessors = p.GetResult(MaxWriters, 8)
Expand All @@ -254,9 +260,9 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
requireAll = requireAll)
| SubCommand.Sync sa ->
let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval,
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, storeLog = Metrics.log,
logExternalStats = dumpStoreStats, Categorize = a.Categorize)
Propulsion.CosmosStore.CosmosStoreSink.Start(Metrics.log, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
Propulsion.CosmosStore.CosmosStoreSink.Start(Log.Logger, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
maxBytes = sa.MaxBytes, requireAll = requireAll,
?purgeInterval = if requireAll then None else Some (TimeSpan.hours 1))
let source =
Expand Down Expand Up @@ -299,7 +305,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
if follow then
source.AwaitWithStopOnCancellation()
else async {
let initialWait = TimeSpan.seconds 10
let initialWait = TimeSpan.seconds 30
do! source.Monitor.AwaitCompletion(initialWait, awaitFullyCaughtUp = true, logInterval = statsInterval / 2.) |> Async.ofTask
source.Stop()
do! source.Await() // Let it emit the stats
Expand Down

0 comments on commit 58a37fd

Please sign in to comment.