Skip to content

Commit

Permalink
Add monitor facility
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 18, 2018
1 parent 5cdcb14 commit b56e8e0
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 7 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ The default memory buffer feeding the worker thread is capped to 10,000 items, a
.WriteTo.Async(a => a.File("logs/myapp.log"), bufferSize: 500)
```

### Monitoring

Typically, one should assign adequate buffer capacity to enable the wrapped sinks to ingest the events as they are processed without ever approaching the limit. In order to gain awareness of the processing backlog becoming abnormal, it's possible to instrument the Async sink by suppling a `monitor` callback that allows for periodic inspection of the backlog

```csharp
void LogBufferMonitor(buffer : BlockingQueue<Serilog.Events.LogEvent> queue)
{
var usagePct = queue.Count * 100 / queue.BoundedCapacity;
if (usagePct > 50) SelfLog.WriteLine("Log buffer exceeded {0:p0} usage (limit: {1})", usage, queue.BoundedCapacity);
}

// Wait for any queued event to be accepted by the `File` log before allowing the calling thread
// to resume its application work after a logging call when there are 10,000 LogEvents waiting
.WriteTo.Async(a => a.File("logs/myapp.log"), monitorIntervalSeconds: 60, monitor: LogBufferMonitor)
```

### Blocking

Warning: For the same reason one typically does not want exceptions from logging to leak into the execution path, one typically does not want a logger to be able to have the side-efect of actually interrupting application processing until the log propagation has been unblocked.
Expand Down
8 changes: 6 additions & 2 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@ public static LoggerConfiguration Async(
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <param name="monitorIntervalSeconds">Interval between invocations of <paramref name="monitor"/>.</param>
/// <param name="monitor">Callback to facilitate health checking the internal queue. Frequency is controlled by <paramref name="monitorIntervalSeconds"/>.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000,
bool blockWhenFull = false)
bool blockWhenFull = false,
int monitorIntervalSeconds = 10,
Action<System.Collections.Concurrent.BlockingCollection<Events.LogEvent>> monitor = null)
{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull, monitorIntervalSeconds, monitor),
configure);
}

Expand Down
11 changes: 10 additions & 1 deletion src/Serilog.Sinks.Async/Serilog.Sinks.Async.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<AssemblyVersion>1.0.0</AssemblyVersion>
<VersionPrefix>1.2.0</VersionPrefix>
<Authors>Jezz Santos;Serilog Contributors</Authors>
<TargetFrameworks>net45;netstandard1.1</TargetFrameworks>
<TargetFrameworks>net45;netstandard1.1;netstandard1.2</TargetFrameworks>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<AssemblyName>Serilog.Sinks.Async</AssemblyName>
Expand All @@ -26,6 +26,10 @@
<PackageReference Include="Serilog" Version="2.5.0" />
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard1.1' ">
<DefineConstants>$(DefineConstants);NETSTANDARD_NO_TIMER</DefineConstants>
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
Expand All @@ -35,4 +39,9 @@
<PackageReference Include="System.Collections.Concurrent" Version="4.0.12" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.2' ">
<PackageReference Include="System.Collections.Concurrent" Version="4.0.12" />
<PackageReference Include="System.Threading.Timer" Version="4.0.1" />
</ItemGroup>

</Project>
30 changes: 26 additions & 4 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,32 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
#if! NETSTANDARD_NO_TIMER
readonly Timer _monitorCallbackInvocationTimer;
#endif
public BackgroundWorkerSink(
ILogEventSink pipeline, int bufferCapacity,
bool blockWhenFull,
int monitorIntervalSeconds = 1, Action<BlockingCollection<LogEvent>> monitor = null)
{
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
if (monitorIntervalSeconds < 0) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds));
_pipeline = pipeline;
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

if (monitor != null)
{
if (monitorIntervalSeconds <= 0) throw new ArgumentOutOfRangeException(nameof(monitorIntervalSeconds), "must be >=1");
#if! NETSTANDARD_NO_TIMER
var interval = TimeSpan.FromSeconds(monitorIntervalSeconds);
_monitorCallbackInvocationTimer = new Timer(queue => monitor((BlockingCollection<LogEvent>)queue), _queue, interval, interval);
#else
throw new PlatformNotSupportedException($"Please use a platform supporting .netstandard1.2 or later to use the ${nameof(monitor)} facility.");
#endif
}
}

public void Emit(LogEvent logEvent)
Expand Down Expand Up @@ -55,8 +72,13 @@ public void Dispose()
_queue.CompleteAdding();

// Allow queued events to be flushed
_worker.Wait();

_worker.Wait();

#if! NETSTANDARD_NO_TIMER
// Only stop monitoring when we've actually completed flushing
_monitorCallbackInvocationTimer?.Dispose();
#endif

(_pipeline as IDisposable)?.Dispose();
}

Expand Down
25 changes: 25 additions & 0 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -156,6 +157,30 @@ public async Task WhenQueueFull_ThenBlocks()
}
}

#if !NETSTANDARD_NO_TIMER
[Fact]
public void MonitorArgumentAffordsBacklogHealthMonitoringFacility()
{
bool logWasObservedToHaveReachedHalfFull = false;
void inspectBuffer(BlockingCollection<LogEvent> queue) =>

logWasObservedToHaveReachedHalfFull = logWasObservedToHaveReachedHalfFull
|| queue.Count * 100 / queue.BoundedCapacity >= 50;

var collector = new MemorySink { DelayEmit = TimeSpan.FromSeconds(3) };
using (var logger = new LoggerConfiguration()
.WriteTo.Async(w => w.Sink(collector), bufferSize: 2, monitorIntervalSeconds: 1, monitor: inspectBuffer)
.CreateLogger())
{
logger.Information("Something to block the pipe");
logger.Information("I'll just leave this here pending for a few seconds so I can observe it");
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));
}

Assert.True(logWasObservedToHaveReachedHalfFull);
}
#endif

private BackgroundWorkerSink CreateSinkWithDefaultOptions()
{
return new BackgroundWorkerSink(_logger, 10000, false);
Expand Down

0 comments on commit b56e8e0

Please sign in to comment.