-
Notifications
You must be signed in to change notification settings - Fork 772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metric AggregatorStore optimization for sorting Tag keys #2777
Changes from 8 commits
6d39035
9839605
18f6607
a4dcc68
1d72e8e
1a7d115
4cd8314
d24c265
956d836
4660264
3374769
2b1c532
e66874e
b62c0f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,13 +26,20 @@ namespace OpenTelemetry.Metrics | |
internal sealed class AggregatorStore | ||
{ | ||
private static readonly ObjectArrayEqualityComparer ObjectArrayComparer = new ObjectArrayEqualityComparer(); | ||
private static readonly StringArrayEqualityComparer StringArrayComparer = new StringArrayEqualityComparer(); | ||
private readonly object lockZeroTags = new object(); | ||
private readonly HashSet<string> tagKeysInteresting; | ||
private readonly int tagsKeysInterestingCount; | ||
|
||
private readonly ConcurrentDictionary<string[], string[]> tagKeyCombinations = | ||
new ConcurrentDictionary<string[], string[]>(StringArrayComparer); | ||
|
||
private readonly ConcurrentDictionary<object[], object[]> tagValueCombinations = | ||
new ConcurrentDictionary<object[], object[]>(ObjectArrayComparer); | ||
|
||
// Two-Level lookup. TagKeys x [ TagValues x Metrics ] | ||
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs = | ||
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(new StringArrayEqualityComparer()); | ||
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(StringArrayComparer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another alternate option which doesn't have the risk of too many entries (when user keeps providing keys in different order) Have the dictionary as before So that we only store atmost 2 entries per key set. And we only do a single lookup in hotpath, as opposed to 2 look ups. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good suggestion. The only issue with this would be if the user provides the sorted combination as the very first combination and uses some random combination later on. In this case, we would always be sorting the keys. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, if the same order is re-used, then you get max performance. else lower perf. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this is good optimization to try out. Reusing the same order is probably the most common scenario. I suppose it's possible for library to use a single instrument in multiple code paths which add dimensions in a different order, but probably an edge case. A different order may be likely in the event I have two libraries emitting the same metric name, but since they're different libraries they'd be separate Metric instances, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on offline sync, this may not be feasible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
They cannot emit with same metric name (unless different Meter). So it'll be different instances, yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I was wondering about this myself. Was the issue that synchronizing the two inserts (sorted and original order) would be tough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem here is in keeping the two entries for a given combination in sync. For example:
Here, since, ("A","C","B") is not present in the dictionary, we add these two entries: ("A","C","B") -> (1,3,2) -> MetricPointIndex1 ("A","B","C") -> (1,2,3) -> MetricPointIndex1 Now, if we encounter a
we will add another entry to the inner dictionary: ("A","B","C") -> (1,2,3) -> MetricPointIndex1 -> (10,20,30)-> MetricPointIndex2 This is fine but we also need to add this entry to ("A","C","B"): ("A","C","B") -> (1,3,2) -> MetricPointIndex1 -> (10,30,20)-> MetricPointIndex2 (This is the difficult part) When we get a new set of tag values we have to find if there is another tag key combination present in the dictionary, and if it's present we have to add the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugh... right gotta deal with the values as well. |
||
|
||
private readonly AggregationTemporality temporality; | ||
private readonly string name; | ||
|
@@ -179,26 +186,77 @@ private void InitializeZeroTagPointIfNotInitialized() | |
private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int length) | ||
{ | ||
int aggregatorIndex; | ||
string[] seqKey = null; | ||
string[] sortedTagKeys = null; | ||
object[] sortedTagValues = null; | ||
ConcurrentDictionary<object[], int> value2metrics; | ||
|
||
// GetOrAdd by TagKeys at 1st Level of 2-level dictionary structure. | ||
// Get back a Dictionary of [ Values x Metrics[] ]. | ||
if (!this.keyValue2MetricAggs.TryGetValue(tagKeys, out var value2metrics)) | ||
// We only need to sort if there is more than one Tag Key. | ||
if (!this.tagKeyCombinations.TryGetValue(tagKeys, out sortedTagKeys)) | ||
{ | ||
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. | ||
seqKey = new string[length]; | ||
var seqKey = new string[length]; | ||
tagKeys.CopyTo(seqKey, 0); | ||
|
||
var seqValue = new object[length]; | ||
tagValues.CopyTo(seqValue, 0); | ||
|
||
if (length > 1) | ||
{ | ||
// Create a new array for the sorted Tag keys. | ||
sortedTagKeys = new string[length]; | ||
tagKeys.CopyTo(sortedTagKeys, 0); | ||
|
||
// Create a new array for the sorted Tag values. | ||
sortedTagValues = new object[length]; | ||
tagValues.CopyTo(sortedTagValues, 0); | ||
|
||
Array.Sort(sortedTagKeys, sortedTagValues); | ||
} | ||
else | ||
{ | ||
sortedTagKeys = seqKey; | ||
sortedTagValues = seqValue; | ||
} | ||
|
||
this.tagValueCombinations.TryAdd(seqValue, sortedTagValues); | ||
this.tagKeyCombinations.TryAdd(seqKey, sortedTagKeys); | ||
} | ||
|
||
// GetOrAdd by the sorted Tag keys at 1st Level of 2-level dictionary structure. | ||
// Get back a Dictionary of [ Values x Metrics[] ]. | ||
if (!this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics)) | ||
{ | ||
value2metrics = new ConcurrentDictionary<object[], int>(ObjectArrayComparer); | ||
if (!this.keyValue2MetricAggs.TryAdd(seqKey, value2metrics)) | ||
if (!this.keyValue2MetricAggs.TryAdd(sortedTagKeys, value2metrics)) | ||
{ | ||
this.keyValue2MetricAggs.TryGetValue(seqKey, out value2metrics); | ||
this.keyValue2MetricAggs.TryGetValue(sortedTagKeys, out value2metrics); | ||
} | ||
} | ||
|
||
if (!this.tagValueCombinations.TryGetValue(tagValues, out sortedTagValues)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These new dictionaries may grow quite large. Could we do our check of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That check won't help as we are storing different possible combinations of the tag keys. So if we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah right I wasn't thinking about this correctly, so then maybe do the sort as you're doing, but defer |
||
{ | ||
var seqValue = new object[length]; | ||
tagValues.CopyTo(seqValue, 0); | ||
|
||
if (length > 1) | ||
{ | ||
// Create a new array for the sorted Tag values. | ||
sortedTagValues = new object[length]; | ||
tagValues.CopyTo(sortedTagValues, 0); | ||
|
||
Array.Sort(tagKeys, sortedTagValues); | ||
} | ||
else | ||
{ | ||
sortedTagValues = seqValue; | ||
} | ||
|
||
this.tagValueCombinations.TryAdd(seqValue, sortedTagValues); | ||
} | ||
|
||
// GetOrAdd by TagValues at 2st Level of 2-level dictionary structure. | ||
// Get back Metrics[]. | ||
if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex)) | ||
if (!value2metrics.TryGetValue(sortedTagValues, out aggregatorIndex)) | ||
{ | ||
aggregatorIndex = this.metricPointIndex; | ||
if (aggregatorIndex >= this.maxMetricPoints) | ||
|
@@ -213,7 +271,7 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng | |
lock (value2metrics) | ||
{ | ||
// check again after acquiring lock. | ||
if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex)) | ||
if (!value2metrics.TryGetValue(sortedTagValues, out aggregatorIndex)) | ||
{ | ||
aggregatorIndex = Interlocked.Increment(ref this.metricPointIndex); | ||
if (aggregatorIndex >= this.maxMetricPoints) | ||
|
@@ -225,24 +283,14 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng | |
return -1; | ||
} | ||
|
||
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. | ||
if (seqKey == null) | ||
{ | ||
seqKey = new string[length]; | ||
tagKeys.CopyTo(seqKey, 0); | ||
} | ||
|
||
var seqVal = new object[length]; | ||
tagValues.CopyTo(seqVal, 0); | ||
|
||
ref var metricPoint = ref this.metricPoints[aggregatorIndex]; | ||
var dt = DateTimeOffset.UtcNow; | ||
metricPoint = new MetricPoint(this.aggType, dt, seqKey, seqVal, this.histogramBounds); | ||
metricPoint = new MetricPoint(this.aggType, dt, sortedTagKeys, sortedTagValues, this.histogramBounds); | ||
|
||
// Add to dictionary *after* initializing MetricPoint | ||
// as other threads can start writing to the | ||
// MetricPoint, if dictionary entry found. | ||
value2metrics.TryAdd(seqVal, aggregatorIndex); | ||
value2metrics.TryAdd(sortedTagValues, aggregatorIndex); | ||
} | ||
} | ||
} | ||
|
@@ -355,11 +403,6 @@ private int FindMetricAggregatorsDefault(ReadOnlySpan<KeyValuePair<string, objec | |
|
||
storage.SplitToKeysAndValues(tags, tagLength, out var tagKeys, out var tagValues); | ||
|
||
if (tagLength > 1) | ||
{ | ||
Array.Sort(tagKeys, tagValues); | ||
} | ||
|
||
return this.LookupAggregatorStore(tagKeys, tagValues, tagLength); | ||
} | ||
|
||
|
@@ -388,11 +431,6 @@ private int FindMetricAggregatorsCustomTag(ReadOnlySpan<KeyValuePair<string, obj | |
return 0; | ||
} | ||
|
||
if (actualLength > 1) | ||
{ | ||
Array.Sort(tagKeys, tagValues); | ||
} | ||
|
||
return this.LookupAggregatorStore(tagKeys, tagValues, actualLength); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -382,6 +382,109 @@ public void ObservableCounterAggregationTest(bool exportDelta) | |
} | ||
} | ||
|
||
[Theory] | ||
[InlineData(false, false)] | ||
[InlineData(false, true)] | ||
[InlineData(true, false)] | ||
[InlineData(true, true)] | ||
public void DimensionsAreOrderInsensitive(bool exportDelta, bool hasView) | ||
{ | ||
var exportedItems = new List<Metric>(); | ||
|
||
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{exportDelta}.{hasView}"); | ||
var counterLong = meter.CreateCounter<long>("Counter"); | ||
var meterProviderBuilder = Sdk.CreateMeterProviderBuilder() | ||
.AddMeter(meter.Name) | ||
.AddReader(new BaseExportingMetricReader(new InMemoryExporter<Metric>(exportedItems)) | ||
{ | ||
Temporality = exportDelta ? AggregationTemporality.Delta : AggregationTemporality.Cumulative, | ||
}); | ||
|
||
if (hasView) | ||
{ | ||
meterProviderBuilder.AddView(instrumentName: "Counter", new MetricStreamConfiguration() { TagKeys = new string[] { "Key1", "Key2" } }); | ||
} | ||
|
||
using var meterProvider = meterProviderBuilder.Build(); | ||
|
||
counterLong.Add(10, new("Key1", "Value1"), new("Key2", "Value2"), new("Key3", "Value3")); | ||
counterLong.Add(10, new("Key1", "Value1"), new("Key3", "Value3"), new("Key2", "Value2")); | ||
meterProvider.ForceFlush(MaxTimeToAllowForFlush); | ||
|
||
List<KeyValuePair<string, object>> tags; | ||
if (hasView) | ||
{ | ||
tags = new List<KeyValuePair<string, object>>() | ||
{ | ||
new("Key1", "Value1"), | ||
new("Key2", "Value2"), | ||
}; | ||
} | ||
else | ||
{ | ||
tags = new List<KeyValuePair<string, object>>() | ||
{ | ||
new("Key1", "Value1"), | ||
new("Key2", "Value2"), | ||
new("Key3", "Value3"), | ||
}; | ||
} | ||
|
||
Assert.True(OnlyOneMetricPointExists(exportedItems)); | ||
CheckTagsForFirstMetricPoint(exportedItems, tags); | ||
long sumReceived = GetLongSum(exportedItems); | ||
Assert.Equal(20, sumReceived); | ||
|
||
exportedItems.Clear(); | ||
counterLong.Add(5, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); | ||
counterLong.Add(5, new("Key2", "Value2"), new("Key1", "Value1"), new("Key3", "Value3")); | ||
counterLong.Add(10, new("Key2", "Value2"), new("Key3", "Value3"), new("Key1", "Value1")); | ||
meterProvider.ForceFlush(MaxTimeToAllowForFlush); | ||
|
||
Assert.True(OnlyOneMetricPointExists(exportedItems)); | ||
CheckTagsForFirstMetricPoint(exportedItems, tags); | ||
sumReceived = GetLongSum(exportedItems); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that going to really tell if we exported one MetricPoint or more than one? IIRC, this method simply sums up all metric points, so won't really validate what you are after.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we should validate that only one Metric is received, and that metric has a single MetricPoint, with the tags (key1,key2,key3). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh sorry my bad. I thought this I'll update this. |
||
if (exportDelta) | ||
{ | ||
Assert.Equal(20, sumReceived); | ||
} | ||
else | ||
{ | ||
Assert.Equal(40, sumReceived); | ||
} | ||
|
||
exportedItems.Clear(); | ||
meterProvider.ForceFlush(MaxTimeToAllowForFlush); | ||
sumReceived = GetLongSum(exportedItems); | ||
if (exportDelta) | ||
{ | ||
Assert.Equal(0, sumReceived); | ||
} | ||
else | ||
{ | ||
Assert.True(OnlyOneMetricPointExists(exportedItems)); | ||
CheckTagsForFirstMetricPoint(exportedItems, tags); | ||
Assert.Equal(40, sumReceived); | ||
} | ||
|
||
exportedItems.Clear(); | ||
counterLong.Add(40, new("Key3", "Value3"), new("Key1", "Value1"), new("Key2", "Value2")); | ||
counterLong.Add(20, new("Key3", "Value3"), new("Key2", "Value2"), new("Key1", "Value1")); | ||
meterProvider.ForceFlush(MaxTimeToAllowForFlush); | ||
|
||
Assert.True(OnlyOneMetricPointExists(exportedItems)); | ||
CheckTagsForFirstMetricPoint(exportedItems, tags); | ||
sumReceived = GetLongSum(exportedItems); | ||
if (exportDelta) | ||
{ | ||
Assert.Equal(60, sumReceived); | ||
} | ||
else | ||
{ | ||
Assert.Equal(100, sumReceived); | ||
} | ||
} | ||
|
||
[Theory] | ||
[InlineData(AggregationTemporality.Cumulative)] | ||
[InlineData(AggregationTemporality.Delta)] | ||
|
@@ -658,8 +761,39 @@ private static double GetDoubleSum(List<Metric> metrics) | |
return sum; | ||
} | ||
|
||
private static bool OnlyOneMetricPointExists(List<Metric> metrics) | ||
{ | ||
int count = 0; | ||
foreach (var metric in metrics) | ||
{ | ||
foreach (ref readonly var metricPoint in metric.GetMetricPoints()) | ||
{ | ||
count++; | ||
} | ||
} | ||
|
||
return count == 1; | ||
} | ||
|
||
// Provide tags input sorted by Key | ||
private static void CheckTagsForFirstMetricPoint(List<Metric> metrics, List<KeyValuePair<string, object>> tags) | ||
{ | ||
var metric = metrics[0]; | ||
var metricPointEnumerator = metric.GetMetricPoints().GetEnumerator(); | ||
Assert.True(metricPointEnumerator.MoveNext()); | ||
|
||
int index = 0; | ||
var metricPoint = metricPointEnumerator.Current; | ||
foreach (var tag in metricPoint.Tags) | ||
{ | ||
Assert.Equal(tags[index].Key, tag.Key); | ||
Assert.Equal(tags[index].Value, tag.Value); | ||
index++; | ||
} | ||
} | ||
|
||
private static void CounterUpdateThread<T>(object obj) | ||
where T : struct, IComparable | ||
where T : struct, IComparable | ||
{ | ||
if (obj is not UpdateThreadArguments<T> arguments) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be maintained per tagKeyCombination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This crossed my mind too, but I considered something like
I think this would actually work fine as is - resulting in [1,3,2] inserted once, but it does seem a little goofy. So, maybe makes sense to scope it to tagKeyCombination for clarity.