Skip to content

Commit

Permalink
feat(Streams)!: Stats/error logging polish (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Sep 27, 2023
1 parent 4596002 commit b684e72
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 284 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Feed`: `Checkpoint` enables committing progress (and obtaining the achieved positions) without stopping the Sink [#162](https://github.com/jet/propulsion/pull/162)
- `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179)
- `Scheduler`: Split out stats re `rateLimited` and `timedOut` vs `exceptions` [#194](https://github.com/jet/propulsion/pull/194)
- `Scheduler`: Added `index`, `eventType` to error logging [#234](https://github.com/jet/propulsion/pull/234)
- `Scheduler`: `purgeInterval` to control memory usage [#97](https://github.com/jet/propulsion/pull/97)
- `Scheduler`: `wakeForResults` option to maximize throughput (without having to drop sleep interval to zero) [#161](https://github.com/jet/propulsion/pull/161)
- `Sinks`, `Sinks.Config`: top level APIs for wiring common sink structures [#208](https://github.com/jet/propulsion/pull/208)
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ module Pruner =
let writePos = max trimmedPos (untilIndex + 1L)
return struct (writePos, res) }

type CosmosStorePrunerStats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval)
type CosmosStorePrunerStats(log, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)

let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0
override _.HandleOk outcome =
Expand Down
55 changes: 26 additions & 29 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -109,36 +109,33 @@ module Internal =
#else
try let! res = Writer.write log eventsContext stream span' ct
#endif
return struct (span'.Length > 0, Ok struct (met, res))
with e -> return struct (false, Error struct (met, e)) }
let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res =
return Ok struct (met, res)
with e -> return Error struct (met, e) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false)
| Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [|overage|]), false
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false
| Error struct (_stats, exn) ->
let malformed = Writer.classify exn |> Writer.isMalformed
streams.SetMalformed(stream, malformed), malformed
let struct (ss, malformed) = applyResultToStreamState res
Writer.logTo writerResultLog malformed (stream, res)
struct (ss.WritePos, res)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretWriteResultProgress)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress)

type WriterResult = Internal.Writer.Result

type CosmosStoreSinkStats(log : ILogger, statsInterval, stateInterval) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval)
let mutable okStreams, resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther = HashSet(), 0, 0, 0, 0, 0
let mutable badCats, failStreams, rateLimited, timedOut, tooLarge, malformed = Stats.CatStats(), HashSet(), 0, 0, 0, 0
let rlStreams, toStreams, tlStreams, mfStreams, oStreams = HashSet(), HashSet(), HashSet(), HashSet(), HashSet()
let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L
type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)
let mutable okStreams, okEvents, okBytes = HashSet(), 0, 0L
let mutable exnCats, exnStreams, exnEvents, exnBytes = Stats.CatStats(), HashSet(), 0, 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
let inline adds x (set:HashSet<_>) = set.Add x |> ignore
let inline bads x (set:HashSet<_>) = badCats.Ingest(StreamName.categorize x); adds x set
match message with
| { stream = stream; result = Ok ((es, bs), res) } ->
adds stream okStreams
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
match res with
Expand All @@ -148,31 +145,31 @@ type CosmosStoreSinkStats(log : ILogger, statsInterval, stateInterval) =
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
base.RecordOk(message)
| { stream = stream; result = Error ((es, bs), Exception.Inner exn) } ->
adds stream failStreams
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
resultExn <- resultExn + 1
let kind =
match Internal.Writer.classify exn with
| Internal.Writer.ResultKind.RateLimited -> adds stream rlStreams; rateLimited <- rateLimited + 1; OutcomeKind.RateLimited
| Internal.Writer.ResultKind.TimedOut -> adds stream toStreams; timedOut <- timedOut + 1; OutcomeKind.Timeout
| Internal.Writer.ResultKind.TooLarge -> bads stream tlStreams; tooLarge <- tooLarge + 1; OutcomeKind.Failed
| Internal.Writer.ResultKind.Malformed -> bads stream mfStreams; malformed <- malformed + 1; OutcomeKind.Failed
| Internal.Writer.ResultKind.Other -> adds stream toStreams; timedOut <- timedOut + 1; OutcomeKind.Exception
| Internal.Writer.ResultKind.RateLimited -> OutcomeKind.RateLimited
| Internal.Writer.ResultKind.TimedOut -> OutcomeKind.Timeout
| Internal.Writer.ResultKind.TooLarge -> OutcomeKind.Tagged "tooLarge"
| Internal.Writer.ResultKind.Malformed -> OutcomeKind.Tagged "malformed"
| Internal.Writer.ResultKind.Other -> OutcomeKind.Exn
base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn)
override _.DumpStats() =
let results = resultOk + resultDup + resultPartialDup + resultPrefix
log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)",
Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix)
okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L
if rateLimited <> 0 || timedOut <> 0 || tooLarge <> 0 || malformed <> 0 || badCats.Any then
let fails = rateLimited + timedOut + tooLarge + malformed + resultExnOther
log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e Rate-limited {rateLimited:n0}r {rlStreams:n0}s Timed out {toCount:n0}r {toStreams:n0}s",
Log.miB exnBytes, fails, failStreams.Count, exnEvents, rateLimited, rlStreams.Count, timedOut, toStreams.Count)
rateLimited <- 0; timedOut <- 0; resultExnOther <- 0; failStreams.Clear(); rlStreams.Clear(); toStreams.Clear(); exnBytes <- 0L; exnEvents <- 0
if badCats.Any then
log.Warning(" Affected cats {@badCats} Too large {tooLarge:n0}r {@tlStreams} Malformed {malformed:n0}r {@mfStreams} Other {other:n0}r {@oStreams}",
badCats.StatsDescending |> Seq.truncate 50, tooLarge, tlStreams |> Seq.truncate 100, malformed, mfStreams |> Seq.truncate 100, resultExnOther, oStreams |> Seq.truncate 100)
badCats.Clear(); tooLarge <- 0; malformed <- 0; resultExnOther <- 0; tlStreams.Clear(); mfStreams.Clear(); oStreams.Clear()
if exnCats.Any then
log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e",
Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents)
resultExn <- 0; exnBytes <- 0L; exnEvents <- 0
log.Warning(" Affected cats {@exnCats} Streams {@exnStreams}",
exnCats.StatsDescending |> Seq.truncate 50, exnStreams |> Seq.truncate 100)
exnCats.Clear(); exnStreams.Clear()
Equinox.CosmosStore.Core.Log.InternalMetrics.dump log

override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled")
Expand Down
5 changes: 4 additions & 1 deletion src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ module EquinoxSystemTextJsonParser =

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch: Batch): Event seq =
batch.e |> Seq.mapi (fun offset x -> FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, batch.MapData x.d, batch.MapData x.m, timestamp = x.t))
batch.e |> Seq.mapi (fun offset x ->
let d = batch.MapData x.d
let m = batch.MapData x.m
FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t, size = x.c.Length + d.Length + m.Length + 80))

/// Attempts to parse a Document/Item from the Store
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
Expand Down
48 changes: 20 additions & 28 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ module Internal =
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct
return struct (span'.Length > 0, Ok struct (met, res))
with e -> return false, Error struct (met, e) }
let interpretWriteResultProgress (streams : Scheduling.StreamStates<_>) stream res =
return Ok struct (met, res)
with e -> return Error struct (met, e) }
let interpretProgress (streams : Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> streams.RecordWriteProgress(stream, pos, null)
| Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null)
Expand All @@ -83,22 +83,18 @@ module Internal =
let ss = applyResultToStreamState res
Writer.logTo writerResultLog (stream, res)
struct (ss.WritePos, res)
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretWriteResultProgress)
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretProgress)

type WriterResult = Internal.Writer.Result

type EventStoreSinkStats(log : ILogger, statsInterval, stateInterval) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval)

let mutable okStreams, badCats, failStreams, toStreams, oStreams = HashSet(), Stats.CatStats(), HashSet(), HashSet(), HashSet()
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther, timedOut = 0, 0, 0, 0, 0, 0
let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L
type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)
let mutable okStreams, okEvents, okBytes, exnCats, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, Stats.CatStats(), HashSet(), 0 , 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
let inline adds x (set : HashSet<_>) = set.Add x |> ignore
let inline bads streamName (set : HashSet<_>) = badCats.Ingest(StreamName.categorize streamName); adds streamName set
match message with
| { stream = stream; result = Ok ((es, bs), res) } ->
adds stream okStreams
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
match res with
Expand All @@ -108,28 +104,24 @@ type EventStoreSinkStats(log : ILogger, statsInterval, stateInterval) =
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
base.RecordOk(message)
| { stream = stream; result = Error ((es, bs), Exception.Inner exn) } ->
adds stream failStreams
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
let kind = OutcomeKind.classify exn
match kind with
| OutcomeKind.Timeout -> adds stream toStreams; timedOut <- timedOut + 1
| _ -> bads stream oStreams; resultExnOther <- resultExnOther + 1
base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn)
resultExn <- resultExn + 1
base.RecordExn(message, OutcomeKind.classify exn, log.ForContext("stream", stream).ForContext("events", es), exn)
override _.DumpStats() =
let results = resultOk + resultDup + resultPartialDup + resultPrefix
log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)",
Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix)
okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L
if timedOut <> 0 || badCats.Any then
let fails = timedOut + resultExnOther
log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e Timed out {toCount:n0}r {toStreams:n0}s",
Log.miB exnBytes, fails, failStreams.Count, exnEvents, timedOut, toStreams.Count)
timedOut <- 0; resultExnOther <- 0; failStreams.Clear(); toStreams.Clear(); exnBytes <- 0L; exnEvents <- 0
if badCats.Any then
log.Warning(" Affected cats {@badCats} Other {other:n0}r {@oStreams}",
badCats.StatsDescending |> Seq.truncate 50, resultExnOther, oStreams |> Seq.truncate 100)
badCats.Clear(); resultExnOther <- 0; oStreams.Clear()
if exnCats.Any then
log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e",
Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents)
resultExn <- 0; exnBytes <- 0L; exnEvents <- 0
log.Warning(" Affected cats {@exnCats} Streams {@exnStreams}",
exnCats.StatsDescending |> Seq.truncate 50, exnStreams |> Seq.truncate 100)
exnCats.Clear(); exnStreams.Clear()
Log.InternalMetrics.dump log

override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled")
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type internal Stats(partition : int, source : SourceId, tranche : TrancheId, ren
elif lastCommittedPosition = batchLastPosition then "COMPLETE"
else if finishedReading then "End" else "Tail"
(Log.withMetric m log).ForContext("tail", lastWasTail).Information(
"Reader {partition} {state} @ {lastCommittedPosition}/{readPosition} Pages {pagesRead} empty {pagesEmpty} events {events} | Recent {l:f1}s Pages {recentPagesRead} empty {recentPagesEmpty} events {recentEvents} | Wait {pausedS:f1}s Ahead {cur}/{max}",
"Reader {partition} {state} @ {lastCommittedPosition}/{readPosition} Pages {pagesRead} empty {pagesEmpty} events {events} | Recent {l:f1}s Pages {recentPagesRead} empty {recentPagesEmpty} events {recentEvents} Wait {pausedS:f1}s Ahead {cur}/{max}",
partition, state, r lastCommittedPosition, r batchLastPosition, pagesRead, pagesEmpty, events, readS, recentPagesRead, recentPagesEmpty, recentEvents, postS, currentBatches, maxReadAhead)
readLatency <- TimeSpan.Zero; ingestLatency <- TimeSpan.Zero
recentPagesRead <- 0; recentEvents <- 0; recentPagesEmpty <- 0
Expand Down
Loading

0 comments on commit b684e72

Please sign in to comment.