Skip to content

Commit

Permalink
Whitespace
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 27, 2023
1 parent b684e72 commit bbfad2f
Show file tree
Hide file tree
Showing 69 changed files with 798 additions and 799 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Feed`: `Checkpoint` enables committing progress (and obtaining the achieved positions) without stopping the Sink [#162](https://github.com/jet/propulsion/pull/162)
- `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179)
- `Scheduler`: Split out stats re `rateLimited` and `timedOut` vs `exceptions` [#194](https://github.com/jet/propulsion/pull/194)
- `Scheduler`: Added `index`, `eventType` to error logging [#234](https://github.com/jet/propulsion/pull/234)
- `Scheduler`: Added `index`, `eventType` to error logging [#237](https://github.com/jet/propulsion/pull/237)
- `Scheduler`: `purgeInterval` to control memory usage [#97](https://github.com/jet/propulsion/pull/97)
- `Scheduler`: `wakeForResults` option to maximize throughput (without having to drop sleep interval to zero) [#161](https://github.com/jet/propulsion/pull/161)
- `Sinks`, `Sinks.Config`: top level APIs for wiring common sink structures [#208](https://github.com/jet/propulsion/pull/208)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks.
- `Propulsion.Tool` [![Tool NuGet](https://img.shields.io/nuget/v/Propulsion.Tool.svg)](https://www.nuget.org/packages/Propulsion.Tool/): Tool used to initialize a Change Feed Processor `aux` container for `Propulsion.CosmosStore` and demonstrate basic projection, including to Kafka. See [quickstart](#quickstart).

- `init`: CosmosDB: Initialize an `-aux` Container for use by the CosmosDb client library ChangeFeedProcessor
- `initpg` : MessageDb: Initialize a checkpoints table in a Postgres Database
- `initpg`: MessageDb: Initialize a checkpoints table in a Postgres Database
- `index`: DynamoStore: validate and/or reindex DynamoStore Index
- `checkpoint`: CosmosStore/DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamStore: adjust checkpoints in DynamoStore/CosmosStore/SQL Server/Postgres
- `project`: CosmosDB/DynamoStore/EventStoreDb/MessageDb: walk change feeds/indexes and/or project to Kafka
Expand Down
50 changes: 25 additions & 25 deletions src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ open System.Threading
open System.Threading.Tasks

[<NoComparison>]
type ChangeFeedObserverContext = { source : Container; group : string; epoch : int64; timestamp : DateTime; rangeId : int; requestCharge : float }
type ChangeFeedObserverContext = { source: Container; group: string; epoch: int64; timestamp: DateTime; rangeId: int; requestCharge: float }

type IChangeFeedObserver =
inherit IDisposable
Expand All @@ -21,14 +21,14 @@ type IChangeFeedObserver =
/// - triggering marking of progress via an invocation of `ctx.Checkpoint()` (can be immediate, but can also be deferred and performed asynchronously)
/// NB emitting an exception will not trigger a retry, and no progress writing will take place without explicit calls to `ctx.Checkpoint`
#if COSMOSV3
abstract member Ingest: context : ChangeFeedObserverContext * tryCheckpointAsync : (CancellationToken -> Task<unit>) * docs : IReadOnlyCollection<Newtonsoft.Json.Linq.JObject> * CancellationToken -> Task<unit>
abstract member Ingest: context: ChangeFeedObserverContext * tryCheckpointAsync: (CancellationToken -> Task<unit>) * docs: IReadOnlyCollection<Newtonsoft.Json.Linq.JObject> * CancellationToken -> Task<unit>
#else
abstract member Ingest: context : ChangeFeedObserverContext * tryCheckpointAsync : (CancellationToken -> Task<unit>) * docs : IReadOnlyCollection<System.Text.Json.JsonDocument> * CancellationToken -> Task<unit>
abstract member Ingest: context: ChangeFeedObserverContext * tryCheckpointAsync: (CancellationToken -> Task<unit>) * docs: IReadOnlyCollection<System.Text.Json.JsonDocument> * CancellationToken -> Task<unit>
#endif

type internal SourcePipeline =

static member Start(log : ILogger, start, maybeStartChild, stop, observer : IDisposable) =
static member Start(log: ILogger, start, maybeStartChild, stop, observer: IDisposable) =
let cts = new CancellationTokenSource()
let triggerStop _disposing =
let level = if cts.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Information
Expand Down Expand Up @@ -60,37 +60,37 @@ type internal SourcePipeline =
type ChangeFeedProcessor =

static member Start
( log : ILogger, monitored : Container,
( log: ILogger, monitored: Container,
// The non-partitioned (i.e., PartitionKey is "id") Container holding the partition leases.
// Should always read from the write region to keep the number of write conflicts to a minimum when the sdk
// updates the leases. Since the non-write region(s) might lag behind due to us using non-strong consistency, during
// failover we are likely to reprocess some messages, but that's okay since processing has to be idempotent in any case
leases : Container,
leases: Container,
// Identifier to disambiguate multiple independent feed processor positions (akin to a 'consumer group')
processorName : string,
processorName: string,
// Observers to forward documents to (Disposal is tied to stopping of the Source)
observer : IChangeFeedObserver,
?leaseOwnerId : string,
observer: IChangeFeedObserver,
?leaseOwnerId: string,
// (NB Only applies if this is the first time this leasePrefix is presented)
// Specify `true` to request starting of projection from the present write position.
// Default: false (projecting all events from start beforehand)
?startFromTail : bool,
?startFromTail: bool,
// Frequency to check for partitions without a processor. Default 1s
?leaseAcquireInterval : TimeSpan,
?leaseAcquireInterval: TimeSpan,
// Frequency to renew leases held by processors under our control. Default 3s
?leaseRenewInterval : TimeSpan,
?leaseRenewInterval: TimeSpan,
// Duration to take lease when acquired/renewed. Default 10s
?leaseTtl : TimeSpan,
?leaseTtl: TimeSpan,
// Delay before re-polling a partition after backlog has been drained
?feedPollDelay : TimeSpan,
?feedPollDelay: TimeSpan,
// Limit on items to take in a batch when querying for changes (in addition to 4MB response size limit). Default Unlimited.
// Max Items is not emphasized as a control mechanism as it can only be used meaningfully when events are highly regular in size.
?maxItems : int,
?maxItems: int,
// Continuously fed per-partition lag information until parent Async completes
// callback should Async.Sleep until next update is desired
?reportLagAndAwaitNextEstimation,
// Enables reporting or other processing of Exception conditions as per <c>WithErrorNotification</c>
?notifyError : int -> exn -> unit,
?notifyError: int -> exn -> unit,
// Admits customizations in the ChangeFeedProcessorBuilder chain
?customize) =
let leaseOwnerId = defaultArg leaseOwnerId (ChangeFeedProcessor.mkLeaseOwnerIdForProcess())
Expand All @@ -102,16 +102,16 @@ type ChangeFeedProcessor =
log.Information("ChangeFeed {processorName} Lease acquire {leaseAcquireIntervalS:n0}s ttl {ttlS:n0}s renew {renewS:n0}s feedPollDelay {feedPollDelayS:n0}s Items limit {maxItems} fromTail {fromTail}",
processorName, leaseAcquireInterval.TotalSeconds, leaseTtl.TotalSeconds, leaseRenewInterval.TotalSeconds, feedPollDelay.TotalSeconds, Option.toNullable maxItems, defaultArg startFromTail false)
let processorName_ = processorName + ":"
let leaseTokenToPartitionId (leaseToken : string) = int (leaseToken.Trim[|'"'|])
let leaseTokenToPartitionId (leaseToken: string) = int (leaseToken.Trim[|'"'|])
let processor =
let handler =
let aux (context : ChangeFeedProcessorContext)
let aux (context: ChangeFeedProcessorContext)
#if COSMOSV3
(changes : IReadOnlyCollection<Newtonsoft.Json.Linq.JObject>)
(changes: IReadOnlyCollection<Newtonsoft.Json.Linq.JObject>)
#else
(changes : IReadOnlyCollection<System.Text.Json.JsonDocument>)
(changes: IReadOnlyCollection<System.Text.Json.JsonDocument>)
#endif
(checkpointAsync : CancellationToken -> Task<unit>) ct = task {
(checkpointAsync: CancellationToken -> Task<unit>) ct = task {
let log: exn -> unit = function
| :? OperationCanceledException -> () // Shutdown via .Stop triggers this
| e -> log.Error(e, "Reader {processorName}/{partition} Handler Threw", processorName, context.LeaseToken)
Expand All @@ -126,7 +126,7 @@ type ChangeFeedProcessor =
requestCharge = context.Headers.RequestCharge }
return! observer.Ingest(ctx, checkpointAsync, changes, ct)
with Exception.Log log () -> () }
fun ctx chg (chk : Func<Task>) ct ->
fun ctx chg (chk: Func<Task>) ct ->
let chk' _ct = task { do! chk.Invoke() }
aux ctx chg chk' ct :> Task
let acquireAsync leaseToken = log.Information("Reader {partition} Assigned", leaseTokenToPartitionId leaseToken); Task.CompletedTask
Expand All @@ -152,9 +152,9 @@ type ChangeFeedProcessor =
reportLagAndAwaitNextEstimation
|> Option.map (fun lagMonitorCallback ->
let estimator = monitored.GetChangeFeedEstimator(processorName_, leases)
let emitLagMetrics (ct : CancellationToken) = task {
let emitLagMetrics (ct: CancellationToken) = task {
while not ct.IsCancellationRequested do
let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : Task<'u seq> = task {
let feedIteratorMap (map: ChangeFeedProcessorState -> 'u): Task<'u seq> = task {
// earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it
use query = estimator.GetCurrentStateIterator() // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable
let result = ResizeArray()
Expand All @@ -165,7 +165,7 @@ type ChangeFeedProcessor =
let! leasesState = feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag)
do! lagMonitorCallback (Seq.sortBy fst leasesState |> List.ofSeq) }
emitLagMetrics)
let wrap (f : unit -> Task) () = task { return! f () }
let wrap (f: unit -> Task) () = task { return! f () }
SourcePipeline.Start(log, wrap processor.StartAsync, maybePumpMetrics, wrap processor.StopAsync, observer)
static member private mkLeaseOwnerIdForProcess() =
// If k>1 processes share an owner id, then they will compete for same partitions.
Expand Down
5 changes: 2 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ open Propulsion.Internal
open Propulsion.Sinks
open Propulsion.Streams
open Serilog
open System

module Pruner =

type Outcome =
| Ok of completed : int * deferred : int
| Ok of completed: int * deferred: int
| Nop of int

// Per set of accumulated events per stream (selected via `selectExpired`), attempt to prune up to the high water mark
Expand Down Expand Up @@ -65,7 +64,7 @@ type CosmosStorePruner =
/// DANGER: this API DELETES events - use with care
/// Starts a <c>Sink</c> that prunes _all submitted events from the supplied <c>context</c>_
static member Start
( log : ILogger, maxReadAhead, context, maxConcurrentStreams, stats: CosmosStorePrunerStats,
( log: ILogger, maxReadAhead, context, maxConcurrentStreams, stats: CosmosStorePrunerStats,
?purgeInterval, ?wakeForResults, ?idleDelay,
// Defaults to statsInterval
?ingesterStatsInterval)
Expand Down
20 changes: 10 additions & 10 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module private Impl =
type EventBody = byte[] // V4 defines one directly, here we shim it
module StreamSpan =

let private toNativeEventBody (xs : Propulsion.Sinks.EventBody) : byte[] = xs.ToArray()
let private toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray()
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody
// Trimmed edition of what V4 exposes
module internal Equinox =
Expand All @@ -34,7 +34,7 @@ module private Impl =

// v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory<byte> rather than assuming and/or offering optimization for JSON bodies
open System.Text.Json
let private toNativeEventBody (x : EventBody) : JsonElement =
let private toNativeEventBody (x: EventBody): JsonElement =
if x.IsEmpty then JsonElement()
else JsonSerializer.Deserialize(x.Span)
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody
Expand All @@ -47,11 +47,11 @@ module Internal =
type [<RequireQualifiedAccess>] ResultKind = TimedOut | RateLimited | TooLarge | Malformed | Other

type [<NoComparison; NoEquality; RequireQualifiedAccess>] Result =
| Ok of updatedPos : int64
| Duplicate of updatedPos : int64
| PartialDuplicate of overage : Event[]
| PrefixMissing of batch : Event[] * writePos : int64
let logTo (log : ILogger) malformed (res : StreamName * Result<struct (StreamSpan.Metrics * Result), struct (StreamSpan.Metrics * exn)>) =
| Ok of updatedPos: int64
| Duplicate of updatedPos: int64
| PartialDuplicate of overage: Event[]
| PrefixMissing of batch: Event[] * writePos: int64
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (StreamSpan.Metrics * Result), struct (StreamSpan.Metrics * exn)>) =
match res with
| stream, Ok (_, Result.Ok pos) ->
log.Information("Wrote {stream} up to {pos}", stream, pos)
Expand All @@ -65,7 +65,7 @@ module Internal =
let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information
log.Write(level, exn, "Writing {stream} failed, retrying", stream)

let write (log : ILogger) (ctx : EventsContext) stream (span : Event[]) ct = task {
let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
#if COSMOSV3
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _))
Expand Down Expand Up @@ -99,7 +99,7 @@ module Internal =

type Dispatcher =

static member Create(log : ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) =
static member Create(log: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) =
let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024)
let writerResultLog = log.ForContext<Writer.Result>()
let attemptWrite stream span ct = task {
Expand Down Expand Up @@ -178,7 +178,7 @@ type CosmosStoreSink =

/// Starts a <c>Sink</c> that ingests all submitted events into the supplied <c>context</c>
static member Start
( log : ILogger, maxReadAhead, eventsContext, maxConcurrentStreams, stats: CosmosStoreSinkStats,
( log: ILogger, maxReadAhead, eventsContext, maxConcurrentStreams, stats: CosmosStoreSinkStats,
?purgeInterval, ?wakeForResults, ?idleDelay,
// Default: 16384
?maxEvents,
Expand Down
Loading

0 comments on commit bbfad2f

Please sign in to comment.