From dcd24ac49091e5dfc0c74e796e66ed1c15e2f978 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 20 Apr 2018 01:54:48 +0100 Subject: [PATCH] Add IQueueState inspector --- README.md | 21 ++++--- .../LoggerConfigurationAsyncExtensions.cs | 60 +++++++++++++++++-- .../Serilog.Sinks.Async.csproj | 11 +--- .../Sinks/Async/BackgroundWorkerSink.cs | 35 +++-------- .../BackgroundWorkerSinkSpec.cs | 41 ++++++------- 5 files changed, 97 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 930dc0c..f167bcf 100644 --- a/README.md +++ b/README.md @@ -39,20 +39,25 @@ The default memory buffer feeding the worker thread is capped to 10,000 items, a .WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500) ``` -### Monitoring +### Health Monitoring via the Buffer Inspection interface -Typically, one should assign adequate buffer capacity to enable the wrapped sinks to ingest the events as they are processed without ever approaching the limit. In order to gain awareness of the processing backlog becoming abnormal, it's possible to instrument the Async sink by suppling a `monitor` callback that allows for periodic inspection of the backlog +The Async wrapper is primarily intended to allow one to achieve minimal logging latency at all times, even when writing to sinks that may momentarily block during the course of their processing (e.g., a File sink might block for a low number of ms while flushing). The dropping behavior is an important failsafe in that it avoids having an unbounded buffering behaviour should logging frequency overwhelm the sink, or the sink ingestion throughput degrades to a major degree. + +In practice, this configuration (assuming one provisions an adequate `bufferSize`) achieves an efficient and resilient logging configuration that can handle load gracefully. The key risk is of course that events may be dropped when the buffer threshold gets breached. The `inspector` allows one to arrange for your Application's health monitoring mechanism to actively validate that the buffer allocation is not being exceeded in practice. ```csharp - void LogBufferMonitor(buffer : BlockingQueue queue) + // Example check: log message to an out of band alarm channel if logging is showing signs of getting overwhelmed + void PeriodicMonitorCheck(IQueueState inspector) { - var usagePct = queue.Count * 100 / queue.BoundedCapacity; - if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usage, queue.BoundedCapacity); + var usagePct = inspector.Count * 100 / inspector.BoundedCapacity; + if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usagePct, inspector.BoundedCapacity); } - // Wait for any queued event to be accepted by the `File` log before allowing the calling thread - // to resume its application work after a logging call when there are 10,000 LogEvents waiting - .WriteTo.Async(a => a.File("logs/myapp.log"), monitorIntervalSeconds: 60, monitor: LogBufferMonitor) + // Allow a backlog of up to 10,000 items to be maintained (dropping extras if full) + .WriteTo.Async(a => a.File("logs/myapp.log"), inspector: out IQueueState inspector) ... + + // Wire the inspector through to health monitoring and/or metrics in order to periodically emit a metric, raise an alarm, etc. + ... healthMonitoring.RegisterCheck(() => new PeriodicMonitorCheck(inspector)); ``` ### Blocking diff --git a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs index 9ccd829..77d630b 100644 --- a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs +++ b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs @@ -38,22 +38,70 @@ public static LoggerConfiguration Async( /// the queue will block or subsequent events will be dropped until /// room is made in the queue. /// Block when the queue is full, instead of dropping events. - /// Interval between invocations of . - /// Callback to facilitate health checking the internal queue. Frequency is controlled by . /// A allowing configuration to continue. public static LoggerConfiguration Async( this LoggerSinkConfiguration loggerSinkConfiguration, Action configure, int bufferSize = 10000, - bool blockWhenFull = false, - int monitorIntervalSeconds = 10, - Action> monitor = null) + bool blockWhenFull = false) { return LoggerSinkConfiguration.Wrap( loggerSinkConfiguration, - wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitorIntervalSeconds, monitor), + wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull), configure); } + /// + /// Configure a sink to be invoked asynchronously, on a background worker thread. + /// Provides an that can be used to check the live state of the buffer for health monitoring purposes. + /// + /// The being configured. + /// An action that configures the wrapped sink. + /// The size of the concurrent queue used to feed the background worker thread. If + /// the thread is unable to process events quickly enough and the queue is filled, depending on + /// the queue will block or subsequent events will be dropped until + /// room is made in the queue. + /// Block when the queue is full, instead of dropping events. + /// Provides a way to inspect the state of the queue for health monitoring purposes. + /// A allowing configuration to continue. + public static LoggerConfiguration Async( + this LoggerSinkConfiguration loggerSinkConfiguration, + Action configure, + out IQueueState inspector, + int bufferSize = 10000, + bool blockWhenFull = false) + { + // Cannot assign directly to the out param from within the lambda, so we need a temp + IQueueState stateLens = null; + var result = LoggerSinkConfiguration.Wrap( + loggerSinkConfiguration, + wrappedSink => + { + var sink = new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull); + stateLens = sink; + return sink; + }, + configure); + inspector = stateLens; + return result; + } + } + + /// + /// Provides a way to inspect the current state of Async wrapper's ingestion queue. + /// + public interface IQueueState + { + /// + /// Count of items currently awaiting ingestion. + /// + /// The Sink has been disposed. + int Count { get; } + + /// + /// Maximum number of items permitted to be held in the buffer awaiting ingestion. + /// + /// The Sink has been disposed. + int BufferSize { get; } } } diff --git a/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj b/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj index d597b4f..ace381e 100644 --- a/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj +++ b/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj @@ -5,7 +5,7 @@ 1.0.0 1.2.0 Jezz Santos;Serilog Contributors - net45;netstandard1.1;netstandard1.2 + net45;netstandard1.1 true true Serilog.Sinks.Async @@ -26,10 +26,6 @@ - - $(DefineConstants);NETSTANDARD_NO_TIMER - - @@ -39,9 +35,4 @@ - - - - - diff --git a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs index a3bc896..8097571 100644 --- a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs +++ b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs @@ -8,38 +8,20 @@ namespace Serilog.Sinks.Async { - sealed class BackgroundWorkerSink : ILogEventSink, IDisposable + sealed class BackgroundWorkerSink : ILogEventSink, IQueueState, IDisposable { readonly ILogEventSink _pipeline; readonly bool _blockWhenFull; readonly BlockingCollection _queue; readonly Task _worker; -#if! NETSTANDARD_NO_TIMER - readonly Timer _monitorCallbackInvocationTimer; -#endif - public BackgroundWorkerSink( - ILogEventSink pipeline, int bufferCapacity, - bool blockWhenFull, - int monitorIntervalSeconds = 0, Action> monitor = null) + + public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull) { - if (pipeline == null) throw new ArgumentNullException(nameof(pipeline)); if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); - if (monitorIntervalSeconds < 0) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds)); - _pipeline = pipeline; + _pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline)); _blockWhenFull = blockWhenFull; _queue = new BlockingCollection(bufferCapacity); _worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); - - if (monitor != null) - { - if (monitorIntervalSeconds < 1) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds), "must be >=1"); -#if! NETSTANDARD_NO_TIMER - var interval = TimeSpan.FromSeconds(monitorIntervalSeconds); - _monitorCallbackInvocationTimer = new Timer(queue => monitor((BlockingCollection)queue), _queue, interval, interval); -#else - throw new PlatformNotSupportedException($"Please use a platform supporting .netstandard1.2 or later to avail of the ${nameof(monitor)} facility."); -#endif - } } public void Emit(LogEvent logEvent) @@ -74,11 +56,6 @@ public void Dispose() // Allow queued events to be flushed _worker.Wait(); -#if! NETSTANDARD_NO_TIMER - // Only stop monitoring when we've actually completed flushing - _monitorCallbackInvocationTimer?.Dispose(); -#endif - (_pipeline as IDisposable)?.Dispose(); } @@ -96,5 +73,9 @@ void Pump() SelfLog.WriteLine("{0} fatal error in worker thread: {1}", typeof(BackgroundWorkerSink), ex); } } + + int IQueueState.Count => _queue.Count; + + int IQueueState.BufferSize => _queue.BoundedCapacity; } } diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs index d4d347f..b43706e 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -85,12 +84,12 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink() } [Fact] - public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock() + public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_DoesNotBlock() { var batchTiming = Stopwatch.StartNew(); using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/)) { - // Cause a delay when emmitting to the inner sink, allowing us to easily fill the queue to capacity + // Cause a delay when emitting to the inner sink, allowing us to easily fill the queue to capacity // while the first event is being propagated var acceptInterval = TimeSpan.FromMilliseconds(500); _innerSink.DelayEmit = acceptInterval; @@ -115,7 +114,7 @@ public async Task GivenDefaultConfig_WhenQueueOverCapacity_DoesNotBlock() } [Fact] - public async Task GivenDefaultConfig_WhenRequestsOverCapacity_ThenDropsEventsAndRecovers() + public async Task GivenDefaultConfig_WhenRequestsExceedCapacity_ThenDropsEventsAndRecovers() { using (var sink = new BackgroundWorkerSink(_logger, 1, blockWhenFull: false /*default*/)) { @@ -186,29 +185,31 @@ public async Task GivenConfiguredToBlock_WhenQueueFilled_ThenBlocks() } } -#if !NETSTANDARD_NO_TIMER [Fact] - public void MonitorArgumentAffordsBacklogHealthMonitoringFacility() + public async Task InspectorOutParameterAffordsHealthMonitoringHook() { - bool logWasObservedToHaveReachedHalfFull = false; - void inspectBuffer(BlockingCollection queue) => - - logWasObservedToHaveReachedHalfFull = logWasObservedToHaveReachedHalfFull - || queue.Count * 100 / queue.BoundedCapacity >= 50; - - var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(3) }; + var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(2) }; + // 2 spaces in queue; 1 would make the second log entry eligible for dropping if consumer does not activate instantaneously + var bufferSize = 2; using (var logger = new LoggerConfiguration() - .WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitorIntervalSeconds: 1, monitor: inspectBuffer) + .WriteTo.Async(w => w.Sink(collector), bufferSize: 2, inspector: out IQueueState inspector) .CreateLogger()) { - logger.Information("Something to block the pipe"); - logger.Information("I'll just leave this here pending for a few seconds so I can observe it"); - System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2)); + Assert.Equal(bufferSize, inspector.BufferSize); + Assert.Equal(0, inspector.Count); + logger.Information("Something to freeze the processing for 2s"); + await Task.Delay(TimeSpan.FromMilliseconds(200)); + // Can be taken from queue either instantanously or be awaiting consumer to take + Assert.InRange(inspector.Count, 0, 1); + logger.Information("Something that will sit in the queue"); + // Unless we are put to sleep for a Rip Van Winkle period, either: + // a) the BackgroundWorker will be emitting the item [and incurring the 2s delay we established], leaving a single item in the buffer + // or b) neither will have been picked out of the buffer yet. + await Task.Delay(TimeSpan.FromMilliseconds(200)); + Assert.InRange(inspector.Count, 1, 2); + Assert.Equal(bufferSize, inspector.BufferSize); } - - Assert.True(logWasObservedToHaveReachedHalfFull); } -#endif private BackgroundWorkerSink CreateSinkWithDefaultOptions() {