diff --git a/CHANGELOG.md b/CHANGELOG.md index 80c1c990..1c1ae8a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141) - `Propulsion.Tool`: `sync ` supports `from json` source option [#250](https://github.com/jet/propulsion/pull/250) - `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish) -- `Propulsion.Tool`: `sync cosmos from ` [#252](https://github.com/jet/propulsion/pull/252) +- `Propulsion.Tool`: `sync cosmos from ` [#252](https://github.com/jet/propulsion/pull/252) [#263](https://github.com/jet/propulsion/pull/263) ### Changed diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 7563a756..6e70fc5a 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -72,8 +72,9 @@ module Internal = let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData) |> Async.executeAsTask ct #else - span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore does not [yet] support ingesting unfolds") - let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct) + let unfolds, events = span |> Array.partition _.IsUnfold + log.Debug("Writing {s}@{i}x{n}+{u}", stream, i, events.Length, unfolds.Length) + let! res = ctx.Sync(stream, { index = i; etag = None }, events |> Array.map mapData, unfolds |> Array.map mapData, ct) #endif let res' = match res with diff --git a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs index 8a9133d3..991ce517 100644 --- a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs +++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs @@ -1,7 +1,7 @@ namespace Propulsion.CosmosStore open Equinox.CosmosStore.Core - +open Propulsion.Internal open Propulsion.Sinks /// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams. @@ -10,12 +10,11 @@ open Propulsion.Sinks #if !COSMOSV3 module EquinoxSystemTextJsonParser = - type System.Text.Json.JsonDocument with - member document.Cast<'T>() = - System.Text.Json.JsonSerializer.Deserialize<'T>(document.RootElement) - type Batch with - member _.MapData x = - System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x + type System.Text.Json.JsonElement with + member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x) + member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory + + type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>() let timestamp (doc: System.Text.Json.JsonDocument) = let unixEpoch = System.DateTime.UnixEpoch let ts = let r = doc.RootElement in r.GetProperty("_ts") @@ -23,7 +22,7 @@ module EquinoxSystemTextJsonParser = /// Parses an Equinox.Cosmos Batch from a CosmosDB Item /// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it - let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) = + let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) = let r = d.RootElement let tryProp (id: string): ValueOption = let mutable p = Unchecked.defaultof<_> @@ -33,36 +32,55 @@ module EquinoxSystemTextJsonParser = match tryProp "p" with | ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" -> let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw) - if streamFilter sn then ValueSome (struct (sn, d.Cast())) else ValueNone + if streamFilter sn then ValueSome (struct (sn, d.Cast(), tryProp "u")) else ValueNone | _ -> ValueNone - /// Enumerates the events represented within a batch - let enumEquinoxCosmosEvents (batch: Batch): Event seq = - batch.e |> Seq.mapi (fun offset x -> - let d = batch.MapData x.d - let m = batch.MapData x.m + /// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item + let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq = + let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) = + let d = x.d.ToSinkEventBody() + let m = x.m.ToSinkEventBody() let inline len s = if isNull s then 0 else String.length s - FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t, + FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t, size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80, - correlationId = x.correlationId, causationId = x.causationId)) - - /// Attempts to parse a Document/Item from the Store + correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold) + let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset)) + // an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc + match u |> ValueOption.map (fun u -> u.Cast()) with + | ValueNone | ValueSome null | ValueSome [||] -> events + | ValueSome unfolds -> seq { + yield! events + for x in unfolds do + gen true batch.n x } + let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq voption = + tryParseEquinoxBatchOrTip streamFilter jsonDocument + |> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x)) + + /// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects - let tryEnumStreamEvents streamFilter d: seq voption = - tryParseEquinoxBatch streamFilter d - |> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x)) + let tryEnumStreamEvents streamFilter jsonDocument: seq voption = + tryEnumStreamEvents_ false streamFilter jsonDocument + + /// Extracts all events that pass the streamFilter from a Feed item + let whereStream streamFilter jsonDocument: StreamEvent seq = + tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty - /// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let whereStream streamFilter d: StreamEvent seq = - tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty + /// Extracts all events passing the supplied categoryFilter from a Feed Item + let whereCategory categoryFilter jsonDocument: StreamEvent seq = + whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument - /// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let whereCategory categoryFilter d: StreamEvent seq = - whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d + /// Extracts all events from the specified category list from a Feed Item + let ofCategories (categories: string[]) jsonDocument: StreamEvent seq = + whereCategory (fun c -> Array.contains c categories) jsonDocument + + /// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument + /// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects + let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq voption = + tryEnumStreamEvents_ true streamFilter jsonDocument - /// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch - let ofCategories categories d: StreamEvent seq = - whereCategory (fun c -> Array.contains c categories) d + /// Extracts Events and Unfolds that pass the streamFilter from a Feed item + let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq = + tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty #else module EquinoxNewtonsoftParser = diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index 40755c4f..e53ac9de 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -21,7 +21,7 @@ - + diff --git a/tools/Propulsion.Tool/Propulsion.Tool.fsproj b/tools/Propulsion.Tool/Propulsion.Tool.fsproj index 3d82fd21..050e6cbc 100644 --- a/tools/Propulsion.Tool/Propulsion.Tool.fsproj +++ b/tools/Propulsion.Tool/Propulsion.Tool.fsproj @@ -28,7 +28,7 @@ - + diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index 0cc57326..0eba6dc5 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -12,10 +12,11 @@ type [] Parameters = | [] FromTail | [] Follow | [] RequireAll + | [] EventsOnly | [] Categorize | [] MaxItems of int - | [] IncSys + | [] ExcSys | [] IncCat of regex: string | [] ExcCat of regex: string | [] IncStream of regex: string @@ -37,10 +38,11 @@ type [] Parameters = "NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " + "NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " + "Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream." + | EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed" | Categorize -> "Gather handler latency stats by category" - | MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Sync Default (Sync): 9999. Default: Unlimited" + | MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited" - | IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix." + | ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix." | IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules." | ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)." | IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests." @@ -56,9 +58,10 @@ and Arguments(c, p: ParseResults) = member val Filters = Propulsion.StreamFilter( allowCats = p.GetResults IncCat, denyCats = p.GetResults ExcCat, allowSns = p.GetResults IncStream, denySns = p.GetResults ExcStream, - includeSystem = p.Contains IncSys, + includeSystem = not (p.Contains ExcSys), allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent) member val Categorize = p.Contains Categorize + member val IncludeUnfolds = not (p.Contains EventsOnly) member val Command = match p.GetSubCommand() with | Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka @@ -233,7 +236,10 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { Some p | SubCommand.Stats _ | SubCommand.Sync _ -> None let isFileSource = match a.Command.Source with Json _ -> true | _ -> false - let parse = a.Filters.CreateStreamFilter() |> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream + let parse = + a.Filters.CreateStreamFilter() + |> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream + else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream let statsInterval, stateInterval = a.StatsInterval, a.StateInterval let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2) let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) @@ -254,9 +260,9 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { requireAll = requireAll) | SubCommand.Sync sa -> let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously - let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, + let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, storeLog = Metrics.log, logExternalStats = dumpStoreStats, Categorize = a.Categorize) - Propulsion.CosmosStore.CosmosStoreSink.Start(Metrics.log, maxReadAhead, eventsContext, maxConcurrentProcessors, stats, + Propulsion.CosmosStore.CosmosStoreSink.Start(Log.Logger, maxReadAhead, eventsContext, maxConcurrentProcessors, stats, maxBytes = sa.MaxBytes, requireAll = requireAll, ?purgeInterval = if requireAll then None else Some (TimeSpan.hours 1)) let source = @@ -299,7 +305,7 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { if follow then source.AwaitWithStopOnCancellation() else async { - let initialWait = TimeSpan.seconds 10 + let initialWait = TimeSpan.seconds 30 do! source.Monitor.AwaitCompletion(initialWait, awaitFullyCaughtUp = true, logInterval = statsInterval / 2.) |> Async.ofTask source.Stop() do! source.Await() // Let it emit the stats