Skip to content

Commit

Permalink
Performance: Optimize MetricsEventManager.QueuedEvents
Browse files Browse the repository at this point in the history
Part of #7908. This is a contention hot path under load for 2 reasons: allocations on each key access (start and end of each event) and the `Monitor.Wait()` around the update portion of `.AddOrUpdate()`. There's also a string replication which isn't used on the `Data` string itself, but that's upstream and will pitch it in another PR.

This change overall: optimizes the key access using a `readonly record struct` (for fast hashing), `Lazy<T>` (so we don't lose initial metrics), and moves the update outside of the dictionary path since each of the attributes on a metric are atomicly viable.

Note though, this breaks StyleCop sporadically - before this can go in we'd need to update StyleCop and pull in DotNetAnalyzers/StyleCopAnalyzers#3401 which will be in the next 1.2.x beta. I've pinged Sam about getting a release there we can use.
  • Loading branch information
NickCraver authored and liliankasem committed Feb 24, 2023
1 parent 4a2b480 commit 03e9fb2
Showing 1 changed file with 22 additions and 33 deletions.
55 changes: 22 additions & 33 deletions src/WebJobs.Script.WebHost/Diagnostics/MetricsEventManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MetricsEventManager(IOptionsMonitor<AppServiceOptions> appServiceOptionsM

_eventGenerator = generator;
_functionActivityFlushIntervalSeconds = functionActivityFlushIntervalSeconds;
QueuedEvents = new ConcurrentDictionary<string, SystemMetricEvent>(StringComparer.OrdinalIgnoreCase);
QueuedEvents = new ConcurrentDictionary<EventKey, Lazy<SystemMetricEvent>>();

// Initialize the periodic log flush timer
_metricsFlushTimer = new Timer(TimerFlush, null, metricsFlushIntervalMS, metricsFlushIntervalMS);
Expand All @@ -54,7 +54,9 @@ public MetricsEventManager(IOptionsMonitor<AppServiceOptions> appServiceOptionsM
/// <summary>
/// Gets the collection of events that will be flushed on the next flush interval.
/// </summary>
public ConcurrentDictionary<string, SystemMetricEvent> QueuedEvents { get; }
public ConcurrentDictionary<EventKey, Lazy<SystemMetricEvent>> QueuedEvents { get; }

public readonly record struct EventKey(string eventName, string functionName);

public object BeginEvent(string eventName, string functionName = null, string data = null)
{
Expand Down Expand Up @@ -83,10 +85,8 @@ public void EndEvent(object eventHandle)
// event aggregation is based on this key
// for each unique key, there will be only 1
// queued event that we aggregate into
string key = GetAggregateKey(evt.EventName, evt.FunctionName);

QueuedEvents.AddOrUpdate(key,
(name) =>
var queuedEvent = QueuedEvents.GetOrAdd(new EventKey(evt.EventName, evt.FunctionName),
name => new Lazy<SystemMetricEvent>(() =>
{
// create the default event that will be added
// if an event isn't already queued for this key
Expand All @@ -96,23 +96,17 @@ public void EndEvent(object eventHandle)
EventName = evt.EventName,
Minimum = latencyMS,
Maximum = latencyMS,
Average = latencyMS,
Count = 1,
// Intentionally 0, as they'll immediately be incremented blindly belo2
Average = 0,
Count = 0,
Data = evt.Data
};
},
(name, evtToUpdate) =>
{
// Aggregate into the existing event
// While we'll be performing an aggregation later,
// we retain the count so weighted averages can be performed
evtToUpdate.Maximum = Math.Max(evtToUpdate.Maximum, latencyMS);
evtToUpdate.Minimum = Math.Min(evtToUpdate.Minimum, latencyMS);
evtToUpdate.Average += latencyMS; // the average is calculated later - for now we sum
evtToUpdate.Count++;
return evtToUpdate;
});
})).Value;

queuedEvent.Maximum = Math.Max(queuedEvent.Maximum, latencyMS);
queuedEvent.Minimum = Math.Min(queuedEvent.Minimum, latencyMS);
queuedEvent.Average += latencyMS; // the average is calculated later - for now we sum
queuedEvent.Count++;
}
}

Expand All @@ -122,27 +116,22 @@ public void LogEvent(string eventName, string functionName = null, string data =

eventName = Sanitizer.Sanitize(eventName);

string key = GetAggregateKey(eventName, functionName);
QueuedEvents.AddOrUpdate(key,
(name) =>
var queuedEvent = QueuedEvents.GetOrAdd(new EventKey(eventName, functionName), (name) =>
new Lazy<SystemMetricEvent>(() =>
{
// create the default event that will be added
// if an event isn't already queued for this key
// note the count is 0, so that we can always blindly increment below outside the concurrency monitor waits
return new SystemMetricEvent
{
FunctionName = functionName,
EventName = eventName.ToLowerInvariant(),
Count = 1,
Count = 0,
Data = data
};
},
(name, evtToUpdate) =>
{
// update the existing event
evtToUpdate.Count++;
}));

return evtToUpdate;
});
queuedEvent.Value.Count++;
}

internal void FunctionStarted(FunctionStartedEvent startedEvent)
Expand Down Expand Up @@ -223,7 +212,7 @@ private void FlushMetrics()
return;
}

SystemMetricEvent[] eventsToFlush = QueuedEvents.Values.ToArray();
SystemMetricEvent[] eventsToFlush = QueuedEvents.Values.Select(e => e.Value).ToArray();
QueuedEvents.Clear();

// Use the same timestamp for all events. Since these are
Expand Down

0 comments on commit 03e9fb2

Please sign in to comment.