From b56e8e0dd8e56c4abeb3e3d7cfe5d44d50cdf4fb Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 18 Apr 2018 16:57:21 +0100 Subject: [PATCH] Add monitor facility --- README.md | 16 ++++++++++ .../LoggerConfigurationAsyncExtensions.cs | 8 +++-- .../Serilog.Sinks.Async.csproj | 11 ++++++- .../Sinks/Async/BackgroundWorkerSink.cs | 30 ++++++++++++++++--- .../BackgroundWorkerSinkSpec.cs | 25 ++++++++++++++++ 5 files changed, 83 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 91abfb2..930dc0c 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,22 @@ The default memory buffer feeding the worker thread is capped to 10,000 items, a .WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500) ``` +### Monitoring + +Typically, one should assign adequate buffer capacity to enable the wrapped sinks to ingest the events as they are processed without ever approaching the limit. In order to gain awareness of the processing backlog becoming abnormal, it's possible to instrument the Async sink by suppling a `monitor` callback that allows for periodic inspection of the backlog + +```csharp + void LogBufferMonitor(buffer : BlockingQueue queue) + { + var usagePct = queue.Count * 100 / queue.BoundedCapacity; + if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usage, queue.BoundedCapacity); + } + + // Wait for any queued event to be accepted by the `File` log before allowing the calling thread + // to resume its application work after a logging call when there are 10,000 LogEvents waiting + .WriteTo.Async(a => a.File("logs/myapp.log"), monitorIntervalSeconds: 60, monitor: LogBufferMonitor) +``` + ### Blocking Warning: For the same reason one typically does not want exceptions from logging to leak into the execution path, one typically does not want a logger to be able to have the side-efect of actually interrupting application processing until the log propagation has been unblocked. diff --git a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs index c1a7681..9ccd829 100644 --- a/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs +++ b/src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs @@ -38,16 +38,20 @@ public static LoggerConfiguration Async( /// the queue will block or subsequent events will be dropped until /// room is made in the queue. /// Block when the queue is full, instead of dropping events. + /// Interval between invocations of . + /// Callback to facilitate health checking the internal queue. Frequency is controlled by . /// A allowing configuration to continue. public static LoggerConfiguration Async( this LoggerSinkConfiguration loggerSinkConfiguration, Action configure, int bufferSize = 10000, - bool blockWhenFull = false) + bool blockWhenFull = false, + int monitorIntervalSeconds = 10, + Action> monitor = null) { return LoggerSinkConfiguration.Wrap( loggerSinkConfiguration, - wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull), + wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitorIntervalSeconds, monitor), configure); } diff --git a/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj b/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj index ace381e..d597b4f 100644 --- a/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj +++ b/src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj @@ -5,7 +5,7 @@ 1.0.0 1.2.0 Jezz Santos;Serilog Contributors - net45;netstandard1.1 + net45;netstandard1.1;netstandard1.2 true true Serilog.Sinks.Async @@ -26,6 +26,10 @@ + + $(DefineConstants);NETSTANDARD_NO_TIMER + + @@ -35,4 +39,9 @@ + + + + + diff --git a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs index 29175fd..62d9cfe 100644 --- a/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs +++ b/src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs @@ -14,15 +14,32 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable readonly bool _blockWhenFull; readonly BlockingCollection _queue; readonly Task _worker; - - public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull) +#if! NETSTANDARD_NO_TIMER + readonly Timer _monitorCallbackInvocationTimer; +#endif + public BackgroundWorkerSink( + ILogEventSink pipeline, int bufferCapacity, + bool blockWhenFull, + int monitorIntervalSeconds = 1, Action> monitor = null) { if (pipeline == null) throw new ArgumentNullException(nameof(pipeline)); if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); + if (monitorIntervalSeconds < 0) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds)); _pipeline = pipeline; _blockWhenFull = blockWhenFull; _queue = new BlockingCollection(bufferCapacity); _worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + + if (monitor != null) + { + if (monitorIntervalSeconds <= 0) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds), "must be >=1"); +#if! NETSTANDARD_NO_TIMER + var interval = TimeSpan.FromSeconds(monitorIntervalSeconds); + _monitorCallbackInvocationTimer = new Timer(queue => monitor((BlockingCollection)queue), _queue, interval, interval); +#else + throw new PlatformNotSupportedException($"Please use a platform supporting .netstandard1.2 or later to use the ${nameof(monitor)} facility."); +#endif + } } public void Emit(LogEvent logEvent) @@ -55,8 +72,13 @@ public void Dispose() _queue.CompleteAdding(); // Allow queued events to be flushed - _worker.Wait(); - + _worker.Wait(); + +#if! NETSTANDARD_NO_TIMER + // Only stop monitoring when we've actually completed flushing + _monitorCallbackInvocationTimer?.Dispose(); +#endif + (_pipeline as IDisposable)?.Dispose(); } diff --git a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs index dd121f6..97442f4 100644 --- a/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs +++ b/test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -156,6 +157,30 @@ public async Task WhenQueueFull_ThenBlocks() } } +#if !NETSTANDARD_NO_TIMER + [Fact] + public void MonitorArgumentAffordsBacklogHealthMonitoringFacility() + { + bool logWasObservedToHaveReachedHalfFull = false; + void inspectBuffer(BlockingCollection queue) => + + logWasObservedToHaveReachedHalfFull = logWasObservedToHaveReachedHalfFull + || queue.Count * 100 / queue.BoundedCapacity >= 50; + + var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(3) }; + using (var logger = new LoggerConfiguration() + .WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitorIntervalSeconds: 1, monitor: inspectBuffer) + .CreateLogger()) + { + logger.Information("Something to block the pipe"); + logger.Information("I'll just leave this here pending for a few seconds so I can observe it"); + System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2)); + } + + Assert.True(logWasObservedToHaveReachedHalfFull); + } +#endif + private BackgroundWorkerSink CreateSinkWithDefaultOptions() { return new BackgroundWorkerSink(_logger, 10000, false);