Skip to content

Commit

Permalink
Misc cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 2, 2023
1 parent 35dfe3f commit a6fb489
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type internal SourcePipeline =
static member Start(log : ILogger, start, maybeStartChild, stop, observer : IDisposable) =
let cts = new CancellationTokenSource()
let triggerStop _disposing =
let level = if cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
let level = if cts.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Information
log.Write(level, "Source stopping...")
observer.Dispose()
cts.Cancel()
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type CosmosStorePruner =
struct (metrics, span)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r))
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Pruner.Stats(log.ForContext<Pruner.Stats>(), statsInterval, stateInterval)
let stats = Pruner.Stats(log, statsInterval, stateInterval)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5,
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module Internal =
| stream, Ok (_, Result.PrefixMissing (batch, pos)) ->
log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, batch[0].Index - pos, batch.Length, batch[0].Index)
| stream, Error (_, exn) ->
let level = if malformed then Events.LogEventLevel.Warning else Events.LogEventLevel.Information
let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information
log.Write(level, exn, "Writing {stream} failed, retrying", stream)

let write (log : ILogger) (ctx : EventsContext) stream (span : Event[]) ct = task {
Expand Down Expand Up @@ -188,7 +188,7 @@ type CosmosStoreSink =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let dispatcher = Internal.Dispatcher.Create(log, eventsContext, maxConcurrentStreams, ?maxEvents = maxEvents, ?maxBytes = maxBytes)
let scheduler =
let stats = Internal.Stats(log.ForContext<Internal.Stats>(), statsInterval, stateInterval)
let stats = Internal.Stats(log, statsInterval, stateInterval)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, prioritizeStreamsBy = Event.storedSize,
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Expand Down
3 changes: 1 addition & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ module Log =
| Read of ReadMetric
| Lag of LagMetric

/// Attach a property to the captured event record to hold the metric information
// Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124
let [<Literal>] PropertyTag = "propulsionCosmosEvent"
/// Attach a property to the captured event record to hold the metric information
let internal withMetric (value : Metric) = Log.withScalarProperty PropertyTag value
let [<return: Struct>] (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric voption =
let mutable p = Unchecked.defaultof<_>
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ module private Impl =
return Checkpoint.positionOfEpochAndOffset epochId version }

let logReadFailure (storeLog : Serilog.ILogger) =
let force = storeLog.IsEnabled Serilog.Events.LogEventLevel.Verbose
let force = storeLog.IsEnabled LogEventLevel.Verbose
function
| Exceptions.ProvisionedThroughputExceeded when not force -> ()
| e -> storeLog.Warning(e, "DynamoStoreSource read failure")

let logCommitFailure (storeLog : Serilog.ILogger) =
let force = storeLog.IsEnabled Serilog.Events.LogEventLevel.Verbose
let force = storeLog.IsEnabled LogEventLevel.Verbose
function
| Exceptions.ProvisionedThroughputExceeded when not force -> ()
| e -> storeLog.Warning(e, "DynamoStoreSource commit failure")
Expand Down
10 changes: 3 additions & 7 deletions src/Propulsion.EventStore/Checkpoint.fs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
module Propulsion.EventStore.Checkpoint

open FSharp.UMX
open Propulsion.Internal
open System // must shadow UMX to use DateTimeOffSet
open System // must shadow UMX to use DateTimeOffset
open System.Threading.Tasks

type CheckpointSeriesId = string<checkpointSeriesId>
Expand Down Expand Up @@ -116,14 +115,11 @@ type Service internal (resolve: CheckpointSeriesId -> Equinox.DeciderCore<Events
let decider = resolve series
decider.Transact(interpret (Command.Update(DateTimeOffset.UtcNow, pos)), load = Equinox.AnyCachedValue, ct = ct)

let create resolve = Service(streamId >> resolve)

// General pattern is that an Equinox Service is a singleton and calls pass an identifier for a stream per call
// This light wrapper means we can adhere to that general pattern yet still end up with legible code while we in practice only maintain a single checkpoint series per running app
type CheckpointSeries(groupName, resolve, ?log) =
type CheckpointSeries(groupName, resolve) =
let seriesId = CheckpointSeriesId.ofGroupName groupName
let log = match log with Some x -> x | None -> Serilog.Log.ForContext<Service>()
let inner = create (resolve log)
let inner = Service(streamId >> resolve)

member _.Read(ct): Task<Fold.State> = inner.Read(seriesId, ct)
member _.Start(freq, pos, ct): Task<unit> = inner.Start(seriesId, freq, pos, ct)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type EventStoreSink =
let dispatcher = Internal.Dispatcher.Create(log, storeLog, connections, maxConcurrentStreams)
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let scheduler =
let stats = Internal.Stats(log.ForContext<Internal.Stats>(), statsInterval, stateInterval)
let stats = Internal.Stats(log, statsInterval, stateInterval)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?idleDelay = idleDelay)
Projector.Pipeline.Start( log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval statsInterval)
5 changes: 2 additions & 3 deletions src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ module Log =
token : Nullable<Position>; latency : TimeSpan; pages : int; items : int
ingestLatency : TimeSpan; ingestQueued : int }

/// Attach a property to the captured event record to hold the metric information
// Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124
let [<Literal>] PropertyTag = "propulsionFeedEvent"
/// Attach a property to the captured event record to hold the metric information
let internal withMetric (value : Metric) = Log.withScalarProperty PropertyTag value
let [<return: Struct>] (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric voption =
let mutable p = Unchecked.defaultof<_>
Expand Down Expand Up @@ -144,7 +143,7 @@ type FeedReader
stats.RecordBatch(readLatency, batch)
match Array.length batch.items with
| 0 -> log.Verbose("Page {latency:f0}ms Checkpoint {checkpoint} Empty", readLatency.TotalMilliseconds, batch.checkpoint)
| c -> if log.IsEnabled(Serilog.Events.LogEventLevel.Debug) then
| c -> if log.IsEnabled(LogEventLevel.Debug) then
let streamsCount = batch.items |> Seq.distinctBy ValueTuple.fst |> Seq.length
log.Debug("Page {latency:f0}ms Checkpoint {checkpoint} {eventCount}e {streamCount}s",
readLatency.TotalMilliseconds, batch.checkpoint, c, streamsCount)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ and FeedMonitor internal (log : Serilog.ILogger, positions : TranchePositions, s
let isDrainedNow () = positions.Current() |> isDrained
let linger = match lingerTime with None -> TimeSpan.Zero | Some lingerF -> lingerF (isDrainedNow ()) propagationDelay propUsed procUsed
let skipLinger = linger = TimeSpan.Zero
let ll = if skipLinger then Serilog.Events.LogEventLevel.Information else Serilog.Events.LogEventLevel.Debug
let ll = if skipLinger then LogEventLevel.Information else LogEventLevel.Debug
let originalCompleted = currentCompleted |> Seq.cache
if log.IsEnabled ll then
let completed = positions.Current() |> choose (fun v -> v.completed)
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion/Pipeline.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Pipeline(task : Task<unit>, triggerStop) =
static member Prepare(log : ILogger, pumpScheduler, pumpSubmitter, ?pumpIngester, ?pumpDispatcher) =
let cts = new CancellationTokenSource()
let triggerStop disposing =
let level = if disposing || cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
let level = if disposing || cts.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Information
log.Write(level, "Sink stopping...")
cts.Cancel()
let ct = cts.Token
Expand Down Expand Up @@ -79,7 +79,7 @@ type Pipeline(task : Task<unit>, triggerStop) =
let ts = Stopwatch.timestamp ()
let finishedAsRequested = scheduler.Wait(TimeSpan.FromSeconds 2)
let ms = let t = Stopwatch.elapsed ts in int t.TotalMilliseconds
let level = if finishedAsRequested && ms < 200 then Events.LogEventLevel.Information else Events.LogEventLevel.Warning
let level = if finishedAsRequested && ms < 200 then LogEventLevel.Information else LogEventLevel.Warning
log.Write(level, "... sink completed {schedulerCleanupMs}ms", ms) }

let task = Task.Run<unit>(supervise)
Expand Down
3 changes: 1 addition & 2 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@ module Log =
newest : float

let [<Literal>] PropertyTag = "propulsionEvent"
let [<Literal>] GroupTag = "group"
/// Attach a property to the captured event record to hold the metric information
// Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124
let internal withMetric (value : Metric) = Internal.Log.withScalarProperty PropertyTag value
let tryGetScalar<'t> key (logEvent : Serilog.Events.LogEvent) : 't voption =
let mutable p = Unchecked.defaultof<_>
logEvent.Properties.TryGetValue(key, &p) |> ignore
match p with Log.ScalarValue (:? 't as e) -> ValueSome e | _ -> ValueNone
let [<Literal>] GroupTag = "group"
let [<return: Struct>] (|MetricEvent|_|) logEvent =
match tryGetScalar<Metric> PropertyTag logEvent with
| ValueSome m -> ValueSome (m, tryGetScalar<string> GroupTag logEvent)
Expand Down
11 changes: 6 additions & 5 deletions tools/Propulsion.Tool/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ module Propulsion.Tool.Infrastructure

open Serilog

module Log =
module Metrics =

let forMetrics = Log.ForContext("isMetric", true)

let isStoreMetrics x = Serilog.Filters.Matching.WithProperty("isMetric").Invoke x
let [<Literal>] PropertyTag = "isMetric"
let log = Log.ForContext(PropertyTag, true)
/// Allow logging to filter out emission of log messages whose information is also surfaced as metrics
let logEventIsMetric e = Serilog.Filters.Matching.WithProperty(PropertyTag).Invoke e

module Sinks =

Expand Down Expand Up @@ -49,4 +50,4 @@ type Logging() =

[<System.Runtime.CompilerServices.Extension>]
static member Sinks(configuration : LoggerConfiguration, configureMetricsSinks, verboseStore, verboseConsole) =
configuration.Sinks(configureMetricsSinks, Sinks.console verboseConsole, ?isMetric = if verboseStore then None else Some Log.isStoreMetrics)
configuration.Sinks(configureMetricsSinks, Sinks.console verboseConsole, ?isMetric = if verboseStore then None else Some Metrics.logEventIsMetric)
16 changes: 8 additions & 8 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ module Checkpoints =
let cache = Equinox.Cache (appName, sizeMb = 1)
match a.StoreArgs with
| Choice1Of3 a ->
let! store = a.CreateCheckpointStore(group, cache, Log.forMetrics)
let! store = a.CreateCheckpointStore(group, cache, Metrics.log)
return (store : Propulsion.Feed.IFeedCheckpointStore), "cosmos", fun pos -> store.Override(source, tranche, pos, ct)
| Choice2Of3 a ->
let store = a.CreateCheckpointStore(group, cache, Log.forMetrics)
let store = a.CreateCheckpointStore(group, cache, Metrics.log)
return store, $"dynamo -t {a.IndexTable}", fun pos -> store.Override(source, tranche, pos, ct)
| Choice3Of3 a ->
let store = a.CreateCheckpointStore(group)
Expand Down Expand Up @@ -234,7 +234,7 @@ module Indexer =
Log.Warning("Gapped stream {stream}@{wp}: Missing {gap} events before {successorEventTypes}", stream, v.writePos, gap, v.spans[0].c)
elif gapped = gapsLimit then
Log.Error("Gapped Streams Dump limit ({gapsLimit}) reached; use commandline flag to show more", gapsLimit)
let level = if gapped > 0 then Serilog.Events.LogEventLevel.Warning else Serilog.Events.LogEventLevel.Information
let level = if gapped > 0 then LogEventLevel.Warning else LogEventLevel.Information
Log.Write(level, "Index {events:n0} events {streams:n0} streams ({spans:n0} spans) Buffered {buffered} Queueing {queuing} Gapped {gapped:n0}",
totalE, totalS, spanCount, buffered, queuing, gapped)

Expand All @@ -246,7 +246,7 @@ module Indexer =
| None when (not << List.isEmpty) a.ImportJsonFiles ->
missingArg "Must specify a trancheId parameter to import into"
| None ->
let index = AppendsIndex.Reader.create Log.forMetrics context
let index = AppendsIndex.Reader.create Metrics.log context
let! state = index.Read()
Log.Information("Current Partitions / Active Epochs {summary}",
seq { for kvp in state -> struct (kvp.Key, kvp.Value) } |> Seq.sortBy (fun struct (t, _) -> t))
Expand All @@ -260,7 +260,7 @@ module Indexer =
Log.Information("Inspect Batches in Epoch {epoch} of Index Partition {partition} 👉 {cmd}",
eid, pid, dumpCmd AppendsEpoch.Category (AppendsEpoch.streamId (pid, eid)) "-B ")
| Some trancheId ->
let! buffer, indexedSpans = DynamoStoreIndex.Reader.loadIndex (Log.Logger, Log.forMetrics, context) trancheId a.GapsLimit
let! buffer, indexedSpans = DynamoStoreIndex.Reader.loadIndex (Log.Logger, Metrics.log, context) trancheId a.GapsLimit
let dump ingestedCount = dumpSummary a.GapsLimit buffer.Items (indexedSpans + ingestedCount)
dump 0

Expand All @@ -271,7 +271,7 @@ module Indexer =
Log.Information("Ingesting {files}...", files)

let ingest =
let ingester = DynamoStoreIngester(Log.Logger, context, storeLog = Log.forMetrics)
let ingester = DynamoStoreIngester(Log.Logger, context, storeLog = Metrics.log)
fun batch -> ingester.Service.IngestWithoutConcurrency(trancheId, batch)
let import = DynamoDbExport.Importer(buffer, ingest, dump)
for file in files do
Expand Down Expand Up @@ -362,11 +362,11 @@ module Project =
let (indexStore, indexFilter), loadMode = sa.MonitoringParams()
let checkpoints =
let cache = Equinox.Cache (appName, sizeMb = 1)
sa.CreateCheckpointStore(group, cache, Log.forMetrics)
sa.CreateCheckpointStore(group, cache, Metrics.log)
Propulsion.DynamoStore.DynamoStoreSource(
Log.Logger, stats.StatsInterval,
indexStore, defaultArg maxItems 100, TimeSpan.FromSeconds 0.5,
checkpoints, sink, loadMode, startFromTail = startFromTail, storeLog = Log.forMetrics,
checkpoints, sink, loadMode, startFromTail = startFromTail, storeLog = Metrics.log,
?trancheIds = indexFilter
).Start()
| Choice3Of3 sa ->
Expand Down

0 comments on commit a6fb489

Please sign in to comment.