Skip to content

Commit

Permalink
Tidy; add event name
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 27, 2023
1 parent 0712923 commit 68a443e
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 137 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Feed`: `Checkpoint` enables committing progress (and obtaining the achieved positions) without stopping the Sink [#162](https://github.com/jet/propulsion/pull/162)
- `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179)
- `Scheduler`: Split out stats re `rateLimited` and `timedOut` vs `exceptions` [#194](https://github.com/jet/propulsion/pull/194)
- `Scheduler`: Added `index`, `eventType` to error logging [#233](https://github.com/jet/propulsion/pull/233)
- `Scheduler`: `purgeInterval` to control memory usage [#97](https://github.com/jet/propulsion/pull/97)
- `Scheduler`: `wakeForResults` option to maximize throughput (without having to drop sleep interval to zero) [#161](https://github.com/jet/propulsion/pull/161)
- `Sinks`, `Sinks.Config`: top level APIs for wiring common sink structures [#208](https://github.com/jet/propulsion/pull/208)
Expand Down
12 changes: 6 additions & 6 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,21 @@ module Internal =
#else
try let! res = Writer.write log eventsContext stream span' ct
#endif
return struct (Events.index span', span'.Length > 0, Ok struct (met, res))
with e -> return struct (Events.index span', false, Error struct (met, e)) }
let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res =
return Ok struct (met, res)
with e -> return Error struct (met, e) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false)
| Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [|overage|]), false
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, Events.index overage, [| overage |]), false
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false
| Error struct (_stats, exn) ->
let malformed = Writer.classify exn |> Writer.isMalformed
streams.SetMalformed(stream, malformed), malformed
let struct (ss, malformed) = applyResultToStreamState res
Writer.logTo writerResultLog malformed (stream, res)
struct (ss.WritePos, res)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretWriteResultProgress)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress)

type WriterResult = Internal.Writer.Result

Expand Down
10 changes: 5 additions & 5 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ module Internal =
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct
return struct (Events.index span', span'.Length > 0, Ok struct (met, res))
with e -> return Events.index span', false, Error struct (met, e) }
let interpretWriteResultProgress (streams : Scheduling.StreamStates<_>) stream res =
return Ok struct (met, res)
with e -> return Error struct (met, e) }
let interpretProgress (streams : Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> streams.RecordWriteProgress(stream, pos, null)
| Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null)
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |])
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, Events.index overage, [| overage |])
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |])
| Error struct (_stats, _exn) -> streams.SetMalformed(stream, false)
let ss = applyResultToStreamState res
Writer.logTo writerResultLog (stream, res)
struct (ss.WritePos, res)
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretWriteResultProgress)
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretProgress)

type WriterResult = Internal.Writer.Result

Expand Down
46 changes: 19 additions & 27 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -344,33 +344,25 @@ type Factory private () =
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)

static member StartBatchedAsync<'Info>
( log : ILogger, config : KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents,
select, handle : Func<Scheduling.Item<_>[], CancellationToken, Task<seq<Result<int64, exn>>>>, stats,
( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents,
select, handle: Func<Scheduling.Item<_>[], CancellationToken, Task<seq<struct (TimeSpan * Result<int64, exn>)>>>, stats,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
let handle (items : Scheduling.Item<EventBody>[]) ct
: Task<Scheduling.Res<Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>>[]> = task {
let sw = Stopwatch.start ()
let avgElapsed () =
let tot = float sw.ElapsedMilliseconds
TimeSpan.FromMilliseconds(tot / float items.Length)
let handle (items: Scheduling.Item<EventBody>[]) ct
: Task<Scheduling.InternalRes<Result<struct (StreamSpan.Metrics * int64), struct (StreamSpan.Metrics * exn)>>[]> = task {
let start = Stopwatch.timestamp ()
let inline err ts e (x: Scheduling.Item<_>) =
let met = StreamSpan.metrics Event.renderedSize x.span
Scheduling.InternalRes.create (x, ts, Result.Error struct (met, e))
try let! results = handle.Invoke(items, ct)
let ae = avgElapsed ()
return
[| for x in Seq.zip items results ->
match x with
| item, Ok index' ->
let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq
let metrics = StreamSpan.metrics Event.storedSize used
Scheduling.Res.create (ae, item.stream, Events.index item.span, not (Array.isEmpty used), Ok struct (index', struct (metrics, ())))
| item, Error e ->
let metrics = StreamSpan.metrics Event.renderedSize item.span
Scheduling.Item.createResE (ae, item, metrics, e) |]
return Array.ofSeq (Seq.zip items results |> Seq.map(function
| item, (ts, Ok index') ->
let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq
let metrics = StreamSpan.metrics Event.storedSize used
Scheduling.InternalRes.create (item, ts, Result.Ok struct (metrics, index'))
| item, (ts, Error e) -> err ts e item))
with e ->
let ae = avgElapsed ()
return
[| for x in items ->
let metrics = StreamSpan.metrics Event.renderedSize x.span
Scheduling.Item.createResE (ae, x, metrics, e) |] }
let ts = Stopwatch.elapsed start
return items |> Array.map (err ts e) }
let dispatcher = Dispatcher.Batched(select, handle)
let dumpStreams logStreamStates log =
logExternalState |> Option.iter (fun f -> f log)
Expand All @@ -394,14 +386,14 @@ type Factory private () =
/// Processor <c>'Outcome<c/>s are passed to be accumulated into the <c>stats</c> for periodic emission.<br/>
/// Processor will run perpetually in a background until `Stop()` is requested.
static member StartBatched<'Info>
( log : ILogger, config : KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents,
select : StreamState seq -> StreamState[],
( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents,
select: StreamState seq -> StreamState[],
// Handler responses:
// - the result seq is expected to match the ordering of the input <c>Scheduling.Item</c>s
// - Ok: Index at which next processing will proceed (which can trigger discarding of earlier items on that stream)
// - Error: Records the processing of the stream in question as having faulted (the stream's pending events and/or
// new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle)
handle : StreamState[] -> Async<seq<Result<int64, exn>>>,
handle: StreamState[] -> Async<seq<struct (TimeSpan * Result<int64, exn>)>>,
// The responses from each <c>handle</c> invocation are passed to <c>stats</c> for periodic emission
stats,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
Expand Down
10 changes: 5 additions & 5 deletions src/Propulsion/Sinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ type Factory private () =
/// Project Events sequentially via a <code>handle</code> function that yields a StreamResult per <c>select</c>ed Item
static member StartBatchedAsync<'Outcome>
( log, maxReadAhead,
select : Func<StreamState seq, StreamState[]>,
handle : Func<StreamState[], CancellationToken, Task<seq<Result<StreamResult, exn>>>>,
select: Func<StreamState seq, StreamState[]>,
handle: Func<StreamState[], CancellationToken, Task<seq<struct (TimeSpan * Result<StreamResult, exn>)>>>,
stats,
[<O; D null>] ?pendingBufferSize, [<O; D null>] ?purgeInterval, [<O; D null>] ?wakeForResults, [<O; D null>] ?idleDelay,
[<O; D null>] ?ingesterStatsInterval, [<O; D null>] ?requireCompleteStreams) =
let handle items ct = task {
let! res = handle.Invoke(items, ct)
return seq { for i, r in Seq.zip items res -> Result.map (StreamResult.toIndex i.span) r } }
return seq { for i, (ts, r) in Seq.zip items res -> struct (ts, Result.map (StreamResult.toIndex i.span) r) } }
Streams.Batched.Start(log, maxReadAhead, select, handle, Event.storedSize, stats,
?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay,
?ingesterStatsInterval = ingesterStatsInterval, ?requireCompleteStreams = requireCompleteStreams)
Expand Down Expand Up @@ -143,8 +143,8 @@ type Factory private () =
/// Per handled stream, the result can be either a StreamResult conveying progress, or an exception
static member StartBatched<'Outcome>
( log, maxReadAhead,
select : StreamState seq -> StreamState[],
handle : StreamState[] -> Async<seq<Result<StreamResult, exn>>>,
select: StreamState seq -> StreamState[],
handle: StreamState[] -> Async<seq<struct (TimeSpan * Result<StreamResult, exn>)>>,
stats,
// Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead
[<O; D null>] ?pendingBufferSize, [<O; D null>] ?purgeInterval, [<O; D null>] ?wakeForResults, [<O; D null>] ?idleDelay,
Expand Down
Loading

0 comments on commit 68a443e

Please sign in to comment.