Skip to content

Commit

Permalink
Discard counter span metric exemplars after flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
swar8080 committed Apr 6, 2024
1 parent 191c2c9 commit a695ee0
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 36 deletions.
27 changes: 27 additions & 0 deletions .chloggen/span-metric-exemplar-memory-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Discard counter span metric exemplars after each flush interval to avoid unbounded memory growth

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31683]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This aligns exemplar discarding for counter span metrics with the existing logic for histogram span metrics

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 2 additions & 2 deletions connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ The following settings can be optionally configured:
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
- `exemplars`: Use to configure how to attach exemplars to histograms
- `enabled` (default: `false`): enabling will add spans as Exemplars.
- `exemplars`: Use to configure how to attach exemplars to metrics.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
Expand Down
12 changes: 8 additions & 4 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,21 @@ func (p *connectorImp) resetState() {
p.resourceMetrics.RemoveEvictedItems()
p.metricKeyToDimensions.RemoveEvictedItems()

// If no histogram and no metrics expiration is configured, we can skip the remaining operations.
// If none of these features are enabled then we can skip the remaining operations.
// Enabling either of these features requires to go over resource metrics and do operation on each.
if p.config.Histogram.Disable && p.config.MetricsExpiration == 0 {
if p.config.Histogram.Disable && p.config.MetricsExpiration == 0 && !p.config.Exemplars.Enabled {
return
}

now := time.Now()
p.resourceMetrics.ForEach(func(k resourceKey, m *resourceMetrics) {
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
if !p.config.Histogram.Disable {
m.histograms.Reset(true)
if p.config.Exemplars.Enabled {
m.sums.ClearExemplars()
m.events.ClearExemplars()
if !p.config.Histogram.Disable {
m.histograms.ClearExemplars()
}
}

// If metrics expiration is configured, remove metrics that haven't been seen for longer than the expiration period.
Expand Down
114 changes: 103 additions & 11 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,18 +1519,89 @@ func TestSpanMetrics_Events(t *testing.T) {
})
}
}
func TestExemplarsForSumMetrics(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, 0, []string{})
require.NoError(t, err)
traces := buildSampleTrace()
func TestExemplarsAreDiscardedAfterFlushing(t *testing.T) {
tests := []struct {
name string
temporality string
histogramConfig func() HistogramConfig
}{
{
name: "cumulative explicit histogram",
temporality: cumulative,
histogramConfig: explicitHistogramsConfig,
},
{
name: "cumulative exponential histogram",
temporality: cumulative,
histogramConfig: exponentialHistogramsConfig,
},
{
name: "delta explicit histogram",
temporality: delta,
histogramConfig: explicitHistogramsConfig,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{})
p.metricsConsumer = &consumertest.MetricsSink{}
require.NoError(t, err)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
traces := ptrace.NewTraces()
trace1ID := [16]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x10}
initServiceSpans(
serviceSpans{
serviceName: "service-b",
spans: []span{
{
name: "/ping",
kind: ptrace.SpanKindServer,
statusCode: ptrace.StatusCodeError,
traceID: trace1ID,
spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18},
},
},
}, traces.ResourceSpans().AppendEmpty())

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)
metrics := p.buildMetrics()
// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)

// Verify exactly 1 exemplar is added to all data points when flushing
err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)

p.exportMetrics(ctx)
m := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[0]
assertDataPointsHaveExactlyOneExemplarForTrace(t, m, trace1ID)

// Verify exemplars from previous batch's trace are replaced with exemplars for the new batch's trace
traces = ptrace.NewTraces()
trace2ID := [16]byte{0x00, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x10}
initServiceSpans(
serviceSpans{
serviceName: "service-b",
spans: []span{
{
name: "/ping",
kind: ptrace.SpanKindServer,
statusCode: ptrace.StatusCodeError,
traceID: trace2ID,
spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18},
},
},
}, traces.ResourceSpans().AppendEmpty())

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)

p.exportMetrics(ctx)
m = p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[1]
assertDataPointsHaveExactlyOneExemplarForTrace(t, m, trace2ID)
})
}
}

func assertDataPointsHaveExactlyOneExemplarForTrace(t *testing.T, metrics pmetric.Metrics, traceID pcommon.TraceID) {
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
ism := rm.ScopeMetrics()
Expand All @@ -1539,12 +1610,33 @@ func TestExemplarsForSumMetrics(t *testing.T) {
m := ism.At(ilmC).Metrics()
for mC := 0; mC < m.Len(); mC++ {
metric := m.At(mC)
if metric.Type() == pmetric.MetricTypeSum {
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Greater(t, dp.Exemplars().Len(), 0)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
default:
t.Fatalf("Unexpected metric type %s", metric.Type())
}
}
}
Expand Down
30 changes: 11 additions & 19 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Key string
type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
Reset(onlyExemplars bool)
ClearExemplars()
}

type Histogram interface {
Expand Down Expand Up @@ -116,15 +116,10 @@ func (m *explicitHistogramMetrics) BuildMetrics(
}
}

func (m *explicitHistogramMetrics) Reset(onlyExemplars bool) {
if onlyExemplars {
for _, h := range m.metrics {
h.exemplars = pmetric.NewExemplarSlice()
}
return
func (m *explicitHistogramMetrics) ClearExemplars() {
for _, h := range m.metrics {
h.exemplars = pmetric.NewExemplarSlice()
}

m.metrics = make(map[Key]*explicitHistogram)
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
Expand Down Expand Up @@ -202,15 +197,10 @@ func expoHistToExponentialDataPoint(agg *structure.Histogram[float64], dp pmetri
}
}

func (m *exponentialHistogramMetrics) Reset(onlyExemplars bool) {
if onlyExemplars {
for _, m := range m.metrics {
m.exemplars = pmetric.NewExemplarSlice()
}
return
func (m *exponentialHistogramMetrics) ClearExemplars() {
for _, m := range m.metrics {
m.exemplars = pmetric.NewExemplarSlice()
}

m.metrics = make(map[Key]*exponentialHistogram)
}

func (h *explicitHistogram) Observe(value float64) {
Expand Down Expand Up @@ -316,6 +306,8 @@ func (m *SumMetrics) BuildMetrics(
}
}

func (m *SumMetrics) Reset() {
m.metrics = make(map[Key]*Sum)
func (m *SumMetrics) ClearExemplars() {
for _, sum := range m.metrics {
sum.exemplars = pmetric.NewExemplarSlice()
}
}

0 comments on commit a695ee0

Please sign in to comment.