Skip to content

Commit

Permalink
Delta exporters will Export only those points which received new upda…
Browse files Browse the repository at this point in the history
…te (#2629)
  • Loading branch information
cijothomas authored Nov 17, 2021
1 parent 5325185 commit ac98506
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 49 deletions.
3 changes: 3 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* Metrics SDK will not provide inactive Metrics to delta exporter.
([#2629](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2629))

* Histogram bounds are validated when added to a View.
([#2573](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2573))

Expand Down
49 changes: 39 additions & 10 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ internal sealed class AggregatorStore
private readonly AggregationTemporality temporality;
private readonly bool outputDelta;
private readonly MetricPoint[] metricPoints;
private readonly int[] currentMetricPointBatch;
private readonly AggregationType aggType;
private readonly double[] histogramBounds;
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private int metricPointIndex = 0;
private int batchSize = 0;
private bool zeroTagMetricPointInitialized;
private DateTimeOffset startTimeExclusive;
private DateTimeOffset endTimeInclusive;
Expand All @@ -53,6 +55,7 @@ internal AggregatorStore(
string[] tagKeysInteresting = null)
{
this.metricPoints = new MetricPoint[MaxMetricPoints];
this.currentMetricPointBatch = new int[MaxMetricPoints];
this.aggType = aggType;
this.temporality = temporality;
this.outputDelta = temporality == AggregationTemporality.Delta ? true : false;
Expand Down Expand Up @@ -92,37 +95,63 @@ internal void Update(double value, ReadOnlySpan<KeyValuePair<string, object>> ta
this.updateDoubleCallback(value, tags);
}

internal void SnapShot()
internal int SnapShot()
{
this.batchSize = 0;
var indexSnapShot = Math.Min(this.metricPointIndex, MaxMetricPoints - 1);
if (this.temporality == AggregationTemporality.Delta)
{
this.SnapShotDelta(indexSnapShot);
}
else
{
this.SnapShotCumulative(indexSnapShot);
}

this.endTimeInclusive = DateTimeOffset.UtcNow;
return this.batchSize;
}

internal void SnapShotDelta(int indexSnapShot)
{
for (int i = 0; i <= indexSnapShot; i++)
{
ref var metricPoint = ref this.metricPoints[i];
if (metricPoint.StartTime == default)
if (metricPoint.MetricPointStatus == MetricPointStatus.NoCollectPending)
{
continue;
}

metricPoint.TakeSnapShot(this.outputDelta);
this.currentMetricPointBatch[this.batchSize] = i;
this.batchSize++;
}

if (this.temporality == AggregationTemporality.Delta)
if (this.endTimeInclusive != default)
{
this.startTimeExclusive = this.endTimeInclusive;
}
}

internal void SnapShotCumulative(int indexSnapShot)
{
for (int i = 0; i <= indexSnapShot; i++)
{
if (this.endTimeInclusive != default)
ref var metricPoint = ref this.metricPoints[i];
if (metricPoint.StartTime == default)
{
this.startTimeExclusive = this.endTimeInclusive;
continue;
}
}

DateTimeOffset dt = DateTimeOffset.UtcNow;
this.endTimeInclusive = dt;
metricPoint.TakeSnapShot(this.outputDelta);
this.currentMetricPointBatch[this.batchSize] = i;
this.batchSize++;
}
}

internal BatchMetricPoint GetMetricPoints()
{
var indexSnapShot = Math.Min(this.metricPointIndex, MaxMetricPoints - 1);
return new BatchMetricPoint(this.metricPoints, indexSnapShot + 1, this.startTimeExclusive, this.endTimeInclusive);
return new BatchMetricPoint(this.metricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
23 changes: 10 additions & 13 deletions src/OpenTelemetry/Metrics/BatchMetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,25 @@

using System;
using System.Collections;
using System.Diagnostics;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics
{
public readonly struct BatchMetricPoint : IDisposable
{
private readonly MetricPoint[] metricsPoints;
private readonly int[] metricPointsToProcess;
private readonly long targetCount;
private readonly DateTimeOffset start;
private readonly DateTimeOffset end;

internal BatchMetricPoint(MetricPoint[] metricsPoints, int maxSize, DateTimeOffset start, DateTimeOffset end)
internal BatchMetricPoint(MetricPoint[] metricsPoints, int[] metricPointsToProcess, long targetCount, DateTimeOffset start, DateTimeOffset end)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(metricsPoints, nameof(metricsPoints));

this.metricsPoints = metricsPoints;
this.targetCount = maxSize;
this.metricPointsToProcess = metricPointsToProcess;
this.targetCount = targetCount;
this.start = start;
this.end = end;
}
Expand All @@ -50,7 +50,7 @@ public void Dispose()
/// <returns><see cref="Enumerator"/>.</returns>
public Enumerator GetEnumerator()
{
return new Enumerator(this.metricsPoints, this.targetCount, this.start, this.end);
return new Enumerator(this.metricsPoints, this.metricPointsToProcess, this.targetCount, this.start, this.end);
}

/// <summary>
Expand All @@ -59,14 +59,16 @@ public Enumerator GetEnumerator()
public struct Enumerator : IEnumerator
{
private readonly MetricPoint[] metricsPoints;
private readonly int[] metricPointsToProcess;
private readonly DateTimeOffset start;
private readonly DateTimeOffset end;
private long targetCount;
private long index;

internal Enumerator(MetricPoint[] metricsPoints, long targetCount, DateTimeOffset start, DateTimeOffset end)
internal Enumerator(MetricPoint[] metricsPoints, int[] metricPointsToProcess, long targetCount, DateTimeOffset start, DateTimeOffset end)
{
this.metricsPoints = metricsPoints;
this.metricPointsToProcess = metricPointsToProcess;
this.targetCount = targetCount;
this.index = -1;
this.start = start;
Expand All @@ -77,7 +79,7 @@ public ref MetricPoint Current
{
get
{
return ref this.metricsPoints[this.index];
return ref this.metricsPoints[this.metricPointsToProcess[this.index]];
}
}

Expand All @@ -93,12 +95,7 @@ public bool MoveNext()
{
while (++this.index < this.targetCount)
{
ref var metricPoint = ref this.metricsPoints[this.index];
if (metricPoint.StartTime == default)
{
continue;
}

ref var metricPoint = ref this.metricsPoints[this.metricPointsToProcess[this.index]];
metricPoint.StartTime = this.start;
metricPoint.EndTime = this.end;
return true;
Expand Down
10 changes: 7 additions & 3 deletions src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,19 +443,23 @@ internal Batch<Metric> Collect()
for (int i = 0; i < target; i++)
{
var metric = this.metrics[i];
int metricPointSize = 0;
if (metric != null)
{
if (metric.InstrumentDisposed)
{
metric.SnapShot();
metricPointSize = metric.SnapShot();
this.metrics[i] = null;
}
else
{
metric.SnapShot();
metricPointSize = metric.SnapShot();
}

this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
if (metricPointSize > 0)
{
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/OpenTelemetry/Metrics/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ internal void UpdateDouble(double value, ReadOnlySpan<KeyValuePair<string, objec
this.aggStore.Update(value, tags);
}

internal void SnapShot()
internal int SnapShot()
{
this.aggStore.SnapShot();
return this.aggStore.SnapShot();
}
}
}
67 changes: 67 additions & 0 deletions src/OpenTelemetry/Metrics/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal MetricPoint(
this.DoubleValue = default;
this.doubleVal = default;
this.lastDoubleSum = default;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

if (this.AggType == AggregationType.Histogram)
{
Expand Down Expand Up @@ -86,6 +87,8 @@ internal MetricPoint(

public double[] ExplicitBounds { get; internal set; }

internal MetricPointStatus MetricPointStatus { get; private set; }

private readonly AggregationType AggType { get; }

internal void Update(long number)
Expand Down Expand Up @@ -117,6 +120,19 @@ internal void Update(long number)
break;
}
}

// There is a race with Snapshot:
// Update() updates the value
// Snapshot snapshots the value
// Snapshot sets status to NoCollectPending
// Update sets status to CollectPending -- this is not right as the Snapshot
// already included the updated value.
// In the absence of any new Update call until next Snapshot,
// this results in exporting an Update even though
// it had no update.
// TODO: For Delta, this can be mitigated
// by ignoring Zero points
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

internal void Update(double number)
Expand Down Expand Up @@ -180,6 +196,19 @@ internal void Update(double number)
break;
}
}

// There is a race with Snapshot:
// Update() updates the value
// Snapshot snapshots the value
// Snapshot sets status to NoCollectPending
// Update sets status to CollectPending -- this is not right as the Snapshot
// already included the updated value.
// In the absence of any new Update call until next Snapshot,
// this results in exporting an Update even though
// it had no update.
// TODO: For Delta, this can be mitigated
// by ignoring Zero points
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

internal void TakeSnapShot(bool outputDelta)
Expand All @@ -194,6 +223,14 @@ internal void TakeSnapShot(bool outputDelta)
long initValue = Interlocked.Read(ref this.longVal);
this.LongValue = initValue - this.lastLongSum;
this.lastLongSum = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.Read(ref this.longVal))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
Expand All @@ -216,6 +253,14 @@ internal void TakeSnapShot(bool outputDelta)
double initValue = Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity);
this.DoubleValue = initValue - this.lastDoubleSum;
this.lastDoubleSum = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
Expand All @@ -233,6 +278,15 @@ internal void TakeSnapShot(bool outputDelta)
case AggregationType.LongGauge:
{
this.LongValue = Interlocked.Read(ref this.longVal);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.LongValue != Interlocked.Read(ref this.longVal))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

break;
}

Expand All @@ -244,6 +298,15 @@ internal void TakeSnapShot(bool outputDelta)
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.DoubleValue = Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.DoubleValue != Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}

break;
}

Expand All @@ -267,6 +330,8 @@ internal void TakeSnapShot(bool outputDelta)
this.bucketCounts[i] = 0;
}
}

this.MetricPointStatus = MetricPointStatus.NoCollectPending;
}

break;
Expand All @@ -283,6 +348,8 @@ internal void TakeSnapShot(bool outputDelta)
this.longVal = 0;
this.doubleVal = 0;
}

this.MetricPointStatus = MetricPointStatus.NoCollectPending;
}

break;
Expand Down
33 changes: 33 additions & 0 deletions src/OpenTelemetry/Metrics/MetricPointStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// <copyright file="MetricPointStatus.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

namespace OpenTelemetry.Metrics
{
internal enum MetricPointStatus
{
/// <summary>
/// This status is applied to <see cref="MetricPoint"/>s with status <see cref="CollectPending"/> after a Collect.
/// If an update occurs, status will be moved to <see cref="CollectPending"/>.
/// </summary>
NoCollectPending,

/// <summary>
/// The <see cref="MetricPoint"/> has been updated since the previous Collect cycle.
/// Collect will move it to <see cref="NoCollectPending"/>.
/// </summary>
CollectPending,
}
}
Loading

0 comments on commit ac98506

Please sign in to comment.