Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta exporters will Export only those points which received new update #2629

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* Metrics SDK will not provide inactive Metrics to delta exporter.
([#2629](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2629))

* Histogram bounds are validated when added to a View.
([#2573](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2573))

Expand Down
49 changes: 39 additions & 10 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ internal sealed class AggregatorStore
private readonly AggregationTemporality temporality;
private readonly bool outputDelta;
private readonly MetricPoint[] metricPoints;
private readonly int[] currentMetricPointBatch;
private readonly AggregationType aggType;
private readonly double[] histogramBounds;
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private int metricPointIndex = 0;
private int batchSize = 0;
private bool zeroTagMetricPointInitialized;
private DateTimeOffset startTimeExclusive;
private DateTimeOffset endTimeInclusive;
Expand All @@ -53,6 +55,7 @@ internal AggregatorStore(
string[] tagKeysInteresting = null)
{
this.metricPoints = new MetricPoint[MaxMetricPoints];
this.currentMetricPointBatch = new int[MaxMetricPoints];
this.aggType = aggType;
this.temporality = temporality;
this.outputDelta = temporality == AggregationTemporality.Delta ? true : false;
Expand Down Expand Up @@ -92,37 +95,63 @@ internal void Update(double value, ReadOnlySpan<KeyValuePair<string, object>> ta
this.updateDoubleCallback(value, tags);
}

internal void SnapShot()
internal int SnapShot()
{
this.batchSize = 0;
var indexSnapShot = Math.Min(this.metricPointIndex, MaxMetricPoints - 1);
if (this.temporality == AggregationTemporality.Delta)
{
this.SnapShotDelta(indexSnapShot);
}
else
{
this.SnapShotCumulative(indexSnapShot);
}

this.endTimeInclusive = DateTimeOffset.UtcNow;
return this.batchSize;
}

internal void SnapShotDelta(int indexSnapShot)
{
for (int i = 0; i <= indexSnapShot; i++)
{
ref var metricPoint = ref this.metricPoints[i];
if (metricPoint.StartTime == default)
if (metricPoint.MetricPointStatus == MetricPointStatus.NoCollectPending)
{
continue;
}

metricPoint.TakeSnapShot(this.outputDelta);
this.currentMetricPointBatch[this.batchSize] = i;
this.batchSize++;
}

if (this.temporality == AggregationTemporality.Delta)
if (this.endTimeInclusive != default)
{
this.startTimeExclusive = this.endTimeInclusive;
}
}

internal void SnapShotCumulative(int indexSnapShot)
{
for (int i = 0; i <= indexSnapShot; i++)
{
if (this.endTimeInclusive != default)
ref var metricPoint = ref this.metricPoints[i];
if (metricPoint.StartTime == default)
{
this.startTimeExclusive = this.endTimeInclusive;
continue;
}
}

DateTimeOffset dt = DateTimeOffset.UtcNow;
this.endTimeInclusive = dt;
metricPoint.TakeSnapShot(this.outputDelta);
this.currentMetricPointBatch[this.batchSize] = i;
this.batchSize++;
}
}

internal BatchMetricPoint GetMetricPoints()
{
var indexSnapShot = Math.Min(this.metricPointIndex, MaxMetricPoints - 1);
return new BatchMetricPoint(this.metricPoints, indexSnapShot + 1, this.startTimeExclusive, this.endTimeInclusive);
return new BatchMetricPoint(this.metricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
23 changes: 10 additions & 13 deletions src/OpenTelemetry/Metrics/BatchMetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,25 @@

using System;
using System.Collections;
using System.Diagnostics;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics
{
public readonly struct BatchMetricPoint : IDisposable
{
private readonly MetricPoint[] metricsPoints;
private readonly int[] metricPointsToProcess;
private readonly long targetCount;
private readonly DateTimeOffset start;
private readonly DateTimeOffset end;

internal BatchMetricPoint(MetricPoint[] metricsPoints, int maxSize, DateTimeOffset start, DateTimeOffset end)
internal BatchMetricPoint(MetricPoint[] metricsPoints, int[] metricPointsToProcess, long targetCount, DateTimeOffset start, DateTimeOffset end)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(metricsPoints, nameof(metricsPoints));

this.metricsPoints = metricsPoints;
this.targetCount = maxSize;
this.metricPointsToProcess = metricPointsToProcess;
this.targetCount = targetCount;
this.start = start;
this.end = end;
}
Expand All @@ -50,7 +50,7 @@ public void Dispose()
/// <returns><see cref="Enumerator"/>.</returns>
public Enumerator GetEnumerator()
{
return new Enumerator(this.metricsPoints, this.targetCount, this.start, this.end);
return new Enumerator(this.metricsPoints, this.metricPointsToProcess, this.targetCount, this.start, this.end);
}

/// <summary>
Expand All @@ -59,14 +59,16 @@ public Enumerator GetEnumerator()
public struct Enumerator : IEnumerator
{
private readonly MetricPoint[] metricsPoints;
private readonly int[] metricPointsToProcess;
private readonly DateTimeOffset start;
private readonly DateTimeOffset end;
private long targetCount;
private long index;

internal Enumerator(MetricPoint[] metricsPoints, long targetCount, DateTimeOffset start, DateTimeOffset end)
internal Enumerator(MetricPoint[] metricsPoints, int[] metricPointsToProcess, long targetCount, DateTimeOffset start, DateTimeOffset end)
{
this.metricsPoints = metricsPoints;
this.metricPointsToProcess = metricPointsToProcess;
this.targetCount = targetCount;
this.index = -1;
this.start = start;
Expand All @@ -77,7 +79,7 @@ public ref MetricPoint Current
{
get
{
return ref this.metricsPoints[this.index];
return ref this.metricsPoints[this.metricPointsToProcess[this.index]];
}
}

Expand All @@ -93,12 +95,7 @@ public bool MoveNext()
{
while (++this.index < this.targetCount)
{
ref var metricPoint = ref this.metricsPoints[this.index];
if (metricPoint.StartTime == default)
{
continue;
}

ref var metricPoint = ref this.metricsPoints[this.metricPointsToProcess[this.index]];
metricPoint.StartTime = this.start;
metricPoint.EndTime = this.end;
return true;
Expand Down
10 changes: 7 additions & 3 deletions src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,19 +443,23 @@ internal Batch<Metric> Collect()
for (int i = 0; i < target; i++)
{
var metric = this.metrics[i];
int metricPointSize = 0;
if (metric != null)
{
if (metric.InstrumentDisposed)
{
metric.SnapShot();
metricPointSize = metric.SnapShot();
this.metrics[i] = null;
}
else
{
metric.SnapShot();
metricPointSize = metric.SnapShot();
}

this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
if (metricPointSize > 0)
{
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/OpenTelemetry/Metrics/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ internal void UpdateDouble(double value, ReadOnlySpan<KeyValuePair<string, objec
this.aggStore.Update(value, tags);
}

internal void SnapShot()
internal int SnapShot()
{
this.aggStore.SnapShot();
return this.aggStore.SnapShot();
}
}
}
67 changes: 67 additions & 0 deletions src/OpenTelemetry/Metrics/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal MetricPoint(
this.DoubleValue = default;
this.doubleVal = default;
this.lastDoubleSum = default;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

if (this.AggType == AggregationType.Histogram)
{
Expand Down Expand Up @@ -86,6 +87,8 @@ internal MetricPoint(

public double[] ExplicitBounds { get; internal set; }

internal MetricPointStatus MetricPointStatus { get; private set; }

private readonly AggregationType AggType { get; }

internal void Update(long number)
Expand Down Expand Up @@ -117,6 +120,19 @@ internal void Update(long number)
break;
}
}

// There is a race with Snapshot:
// Update() updates the value
// Snapshot snapshots the value
// Snapshot sets status to NoCollectPending
// Update sets status to CollectPending -- this is not right as the Snapshot
// already included the updated value.
// In the absence of any new Update call until next Snapshot,
// this results in exporting an Update even though
// it had no update.
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
// TODO: For Delta, this can be mitigated
// by ignoring Zero points
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

internal void Update(double number)
Expand Down Expand Up @@ -180,6 +196,19 @@ internal void Update(double number)
break;
}
}

// There is a race with Snapshot:
// Update() updates the value
// Snapshot snapshots the value
// Snapshot sets status to NoCollectPending
// Update sets status to CollectPending -- this is not right as the Snapshot
// already included the updated value.
// In the absence of any new Update call until next Snapshot,
// this results in exporting an Update even though
// it had no update.
// TODO: For Delta, this can be mitigated
// by ignoring Zero points
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

internal void TakeSnapShot(bool outputDelta)
Expand All @@ -194,6 +223,14 @@ internal void TakeSnapShot(bool outputDelta)
long initValue = Interlocked.Read(ref this.longVal);
this.LongValue = initValue - this.lastLongSum;
this.lastLongSum = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.Read(ref this.longVal))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
Expand All @@ -216,6 +253,14 @@ internal void TakeSnapShot(bool outputDelta)
double initValue = Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity);
this.DoubleValue = initValue - this.lastDoubleSum;
this.lastDoubleSum = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
Expand All @@ -233,6 +278,15 @@ internal void TakeSnapShot(bool outputDelta)
case AggregationType.LongGauge:
{
this.LongValue = Interlocked.Read(ref this.longVal);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.LongValue != Interlocked.Read(ref this.longVal))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

break;
}

Expand All @@ -244,6 +298,15 @@ internal void TakeSnapShot(bool outputDelta)
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.DoubleValue = Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.DoubleValue != Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

break;
}

Expand All @@ -267,6 +330,8 @@ internal void TakeSnapShot(bool outputDelta)
this.bucketCounts[i] = 0;
}
}

this.MetricPointStatus = MetricPointStatus.NoCollectPending;
}

break;
Expand All @@ -283,6 +348,8 @@ internal void TakeSnapShot(bool outputDelta)
this.longVal = 0;
this.doubleVal = 0;
}

this.MetricPointStatus = MetricPointStatus.NoCollectPending;
}

break;
Expand Down
33 changes: 33 additions & 0 deletions src/OpenTelemetry/Metrics/MetricPointStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// <copyright file="MetricPointStatus.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

namespace OpenTelemetry.Metrics
{
internal enum MetricPointStatus
{
/// <summary>
/// This status is applied to <see cref="MetricPoint"/>s with status <see cref="CollectPending"/> after a Collect.
/// If an update occurs, status will be moved to <see cref="CollectPending"/>.
/// </summary>
NoCollectPending,

/// <summary>
/// The <see cref="MetricPoint"/> has been updated since the previous Collect cycle.
/// Collect will move it to <see cref="NoCollectPending"/>.
/// </summary>
CollectPending,
}
}
Loading