Skip to content

Commit

Permalink
Logging: Use pre-allocated structs for logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
CptMoore committed Dec 5, 2024
1 parent 8148970 commit c4e8573
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 167 deletions.
55 changes: 26 additions & 29 deletions ModTek/Features/Logging/LightWeightBlockingQueue.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
#nullable enable
using System.Diagnostics.CodeAnalysis;
using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace ModTek.Features.Logging;

// (this)RingBuffer with reference types=38ns
// RingBuffer with pre-allocated structs=32ns (pre-allocation also reduces DTO setup times from 150-300ns to 50ns)
// Other variants:
// RingBuffer with nullable reference types=38ns
// ConcurrentQueue+custom size tracking+addingComplete=40ns
// BlockingCollection=170ns
internal class LightWeightBlockingQueue<T> where T : class
internal class LightWeightBlockingQueue
{
private volatile bool _addingCompleted; // some way to shut down the thread
internal void Shutdown() => _shutdown = true;
private volatile bool _shutdown; // some way to break the waiting

// 100k leads to about ~30MB
// 100k leads to about ~30MB, pre-allocation uses much less due to absent strings
private const int MaxRingBufferSize = 1 << 16; // power of 2 required by FastModuloMaskForBitwiseAnd
private const int MaxQueueSize = MaxRingBufferSize - 1; // Start=End need to be distinguishable
// ring buffer is used by Disruptor(.NET), seems to work well for them
// typed based exchanges are 56ns (fixed as of .NET 7) hence why we use object based ones
private readonly object?[] _ringBuffer = new object?[MaxRingBufferSize];
private readonly MTLoggerMessageDto[] _ringBuffer = new MTLoggerMessageDto[MaxRingBufferSize];
// end - start = size // all indexes are "excluding" basically, 0 means 0 not yet written (or read)
// TODO how to avoid douple indexes?
private volatile int _nextWritingIndex; // sync in between writers
Expand All @@ -41,63 +44,57 @@ private static int Size(int startIndex, int endIndex)
}

// returns false if nothing can be dequeued anymore (empty + _addingCompleted)
internal bool TryDequeueOrWait([NotNullWhen(true)] out T? item)
internal ref MTLoggerMessageDto AcquireCommittedOrWait()
{
var spinWait = new SpinWait();
while (true)
{
var index = _nextReadIndex;
if (Size(index, _nextWritingIndex) > 0)
{
item = Unsafe.As<T?>(Interlocked.Exchange(ref _ringBuffer[index], default));
if (!ReferenceEquals(item, default))
ref var item = ref _ringBuffer[index];
if (item.CommittedToQueue)
{
_nextReadIndex = Next(index);
return true;
if (Interlocked.CompareExchange(ref _nextReadIndex, Next(index), index) == index)
{
return ref item;
}
}
}

spinWait.SpinOnce(); // reader should yield and sleep if nothing comes in after some time
if (_addingCompleted)

if (_shutdown)
{
// this can still drop logs, very unlikely but possible
Thread.Sleep(1);
if (Size(_nextReadIndex, _nextWritingIndex) == 0)
{
item = default;
return false;
throw new ObjectDisposedException("Shutting down logging thread using an exception, this is harmless");
}
}
}
}

// returns false if nothing can be enqueued anymore (_addingCompleted)
internal bool TryEnqueueOrWait(T item)
internal ref MTLoggerMessageDto AcquireUncommitedOrWait()
{
while (true)
{
if (_addingCompleted)
{
return false;
}

var index = _nextWritingIndex;
if (Size(_nextReadIndex, index) < MaxQueueSize)
{
if (Interlocked.CompareExchange(ref _nextWritingIndex, Next(index), index) == index)
ref var item = ref _ringBuffer[index];
if (!item.CommittedToQueue)
{
_ringBuffer[index] = item;
return true;
if (Interlocked.CompareExchange(ref _nextWritingIndex, Next(index), index) == index)
{
return ref item;
}
}
}

Thread.SpinWait(4); // main thread should always try to dispatch asap, never wait that much!
}
}

internal void CompleteAdding()
{
_addingCompleted = true;
}
}
58 changes: 36 additions & 22 deletions ModTek/Features/Logging/LoggingFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using HBS.Logging;
using ModTek.Misc;
using UnityEngine;

namespace ModTek.Features.Logging;

Expand Down Expand Up @@ -50,6 +51,7 @@ internal static void Init()

if (_settings.AsynchronousLoggingEnabled)
{
Application.quitting += () => _queue = null;
_queue = new MTLoggerAsyncQueue(ProcessLoggerMessage);
}

Expand Down Expand Up @@ -100,12 +102,17 @@ internal static void AddModLogAppender(string logPath, string loggerName)
}

internal static readonly MTStopwatch MessageDtoStopWatch = new();
// tracks all overhead on the main thread that happens due to async logging
internal static readonly MTStopwatch DispatchStopWatch = new();
// used for intercepting all logging attempts and to log centrally
internal static void LogAtLevel(string loggerName, LogLevel logLevel, object message, Exception exception, IStackTrace location)
{
// capture timestamp as early as possible, to be as close to the callers intended time
var timestamp = MTLoggerMessageDto.GetTimestamp();


// convert message to string while still in caller thread
var messageAsString = message?.ToString(); // do this asap too, as this can throw exceptions

// fill out location if not already filled out while still on caller stack
if (location == null && (_settings.LogStackTraces || (_settings.LogStackTracesOnExceptions && exception != null)))
{
Expand All @@ -115,30 +122,37 @@ internal static void LogAtLevel(string loggerName, LogLevel logLevel, object mes
// capture caller thread
var threadId = Thread.CurrentThread.ManagedThreadId;

// convert message to string while still in caller thread
var messageAsString = message?.ToString();

var messageDto = new MTLoggerMessageDto
(
timestamp,
loggerName,
logLevel,
messageAsString,
exception,
location,
threadId
);

MessageDtoStopWatch.AddMeasurement(MTLoggerMessageDto.GetTimestamp() - timestamp);

if (
_queue == null
|| _queue.LogWriterThreadId == threadId
|| !_queue.Add(messageDto)
)
if (_queue == null || _queue.LogWriterThreadId == threadId)
{
var messageDto = new MTLoggerMessageDto
(
timestamp,
loggerName,
logLevel,
messageAsString,
exception,
location,
threadId
);
ProcessLoggerMessage(messageDto);
return;
}

DispatchStopWatch.Start();
ref var updateDto = ref _queue.AcquireUncommitedOrWait();
DispatchStopWatch.Stop();

updateDto.Timestamp = timestamp;
updateDto.LoggerName = loggerName;
updateDto.LogLevel = logLevel;
updateDto.Message = messageAsString;
updateDto.Exception = exception;
updateDto.Location = location;
updateDto.ThreadId = threadId;

updateDto.Commit();

MessageDtoStopWatch.AddMeasurement(MTLoggerMessageDto.GetTimestamp() - timestamp);
}

private static DiagnosticsStackTrace GrabStackTrace()
Expand Down
93 changes: 13 additions & 80 deletions ModTek/Features/Logging/MTLoggerAsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ namespace ModTek.Features.Logging;
internal class MTLoggerAsyncQueue
{
private readonly Action<MTLoggerMessageDto> _processor;
private readonly LightWeightBlockingQueue<MTLoggerMessageDto> _queue;
private readonly LightWeightBlockingQueue _queue;
internal readonly int LogWriterThreadId;

internal MTLoggerAsyncQueue(Action<MTLoggerMessageDto> processor)
{
_processor = processor;
_queue = new LightWeightBlockingQueue<MTLoggerMessageDto>();
Application.quitting += () => _queue.CompleteAdding();
_queue = new LightWeightBlockingQueue();
Application.quitting += () => _queue.Shutdown();
var thread = new Thread(LoggingLoop)
{
Name = nameof(MTLoggerAsyncQueue),
Expand All @@ -26,11 +26,11 @@ internal MTLoggerAsyncQueue(Action<MTLoggerMessageDto> processor)
LogWriterThreadId = thread.ManagedThreadId;
}

private static readonly MTStopwatch _loggingStopwatch = new()
private static readonly MTStopwatch s_loggingStopwatch = new()
{
Callback = stats =>
{
var dispatchStats = _dispatchStopWatch.GetStats(); // fetch the overhead introduced by async logging
var dispatchStats = LoggingFeature.DispatchStopWatch.GetStats(); // fetch the overhead introduced by async logging
var offloadedTime = stats.TotalTime.Subtract(dispatchStats.TotalTime);
Log.Main.Debug?.Log($"Asynchronous logging offloaded {offloadedTime} from the main thread.");

Expand All @@ -43,7 +43,7 @@ internal MTLoggerAsyncQueue(Action<MTLoggerMessageDto> processor)
trace.Log($"Dispatched {dispatchStats.Count} times, taking a total of {dispatchStats.TotalTime} with an average of {dispatchStats.AverageNanoseconds}ns.");

var filterStats = AppenderFile.FiltersStopWatch.GetStats();
trace.Log($"Filters took at total of {filterStats.TotalTime} with an average of {filterStats.AverageNanoseconds}ns.");
trace.Log($"Filters took a total of {filterStats.TotalTime} with an average of {filterStats.AverageNanoseconds}ns.");

var formatterStats = AppenderFile.FormatterStopWatch.GetStats();
trace.Log($"Formatter took a total of {formatterStats.TotalTime} with an average of {formatterStats.AverageNanoseconds}ns.");
Expand All @@ -53,10 +53,6 @@ internal MTLoggerAsyncQueue(Action<MTLoggerMessageDto> processor)

var writeStats = AppenderFile.WriteStopwatch.GetStats();
trace.Log($"Write called {writeStats.Count} times, taking a total of {writeStats.TotalTime} with an average of {writeStats.AverageNanoseconds}ns.");

#if MEMORY_TRACE
trace.Log($"An estimated maximum of {s_memoryEstimatedUsageMax / 1_000_000} MB was ever used by {s_memoryObjectCountMax})
#endif
}
},
CallbackForEveryNumberOfMeasurements = 50_000
Expand All @@ -66,12 +62,9 @@ private void LoggingLoop()
{
while (true)
{
if (!_queue.TryDequeueOrWait(out var message))
{
return;
}

_loggingStopwatch.Start();
ref var message = ref _queue.AcquireCommittedOrWait();

s_loggingStopwatch.Start();
try
{
_processor(message);
Expand All @@ -82,74 +75,14 @@ private void LoggingLoop()
}
finally
{
_loggingStopwatch.Stop();
#if MEMORY_TRACE
UnTrackMemory(message);
#endif
message.Reset();
s_loggingStopwatch.Stop();
}
}
}

// tracks all overhead on the main thread that happens due to async logging
private static readonly MTStopwatch _dispatchStopWatch = new();
// return false if, for example, the queue was already "completed"
internal bool Add(MTLoggerMessageDto messageDto)
{
_dispatchStopWatch.Start();
try
{
if (_queue.TryEnqueueOrWait(messageDto))
{
#if MEMORY_TRACE
TrackMemory(messageDto);
#endif
return true;
}
return false;
}
finally
{
_dispatchStopWatch.Stop();
}
}

#if MEMORY_TRACE
// memory tracking
private static long s_memoryEstimatedUsage;
private static long s_memoryEstimatedUsageMax;
private static long s_memoryObjectCount;
private static long s_memoryObjectCountMax;
private static void TrackMemory(MTLoggerMessageDto messageDto)
{
var currentCount = Interlocked.Increment(ref s_memoryObjectCount);
var currentMemoryUse = Interlocked.Add(ref s_memoryEstimatedUsage, messageDto.EstimatedSizeInMemory);

var knownMax = Interlocked.Read(ref s_memoryEstimatedUsageMax);
if (knownMax >= currentMemoryUse)
{
return;
}

while (true)
{
var setMax = Interlocked.CompareExchange(ref s_memoryEstimatedUsageMax, currentMemoryUse, knownMax);
if (setMax == knownMax) // our attempt did set a new max
{
// no CAS here, as we don't care if slightly wrong stats are saved
Volatile.Write(ref s_memoryObjectCountMax, currentCount);
break;
}
if (setMax >= currentMemoryUse) // another thread already set a higher max than we estimated
{
break;
}
knownMax = setMax; // let's retry
}
}
private static void UnTrackMemory(MTLoggerMessageDto messageDto)
internal ref MTLoggerMessageDto AcquireUncommitedOrWait()
{
Interlocked.Add(ref s_memoryEstimatedUsage, -messageDto.EstimatedSizeInMemory);
Interlocked.Decrement(ref s_memoryObjectCount);
return ref _queue.AcquireUncommitedOrWait();
}
#endif
}
Loading

0 comments on commit c4e8573

Please sign in to comment.