Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/spanmetrics] Discard counter span metric exemplars after flushing #32210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/span-metric-exemplar-memory-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Discard counter span metric exemplars after each flush interval to avoid unbounded memory growth

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31683]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This aligns exemplar discarding for counter span metrics with the existing logic for histogram span metrics

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 2 additions & 2 deletions connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ The following settings can be optionally configured:
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
- `exemplars`: Use to configure how to attach exemplars to histograms
- `enabled` (default: `false`): enabling will add spans as Exemplars.
- `exemplars`: Use to configure how to attach exemplars to metrics.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
Expand Down
12 changes: 8 additions & 4 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,21 @@ func (p *connectorImp) resetState() {
p.resourceMetrics.RemoveEvictedItems()
p.metricKeyToDimensions.RemoveEvictedItems()

// If no histogram and no metrics expiration is configured, we can skip the remaining operations.
// If none of these features are enabled then we can skip the remaining operations.
// Enabling either of these features requires to go over resource metrics and do operation on each.
if p.config.Histogram.Disable && p.config.MetricsExpiration == 0 {
if p.config.Histogram.Disable && p.config.MetricsExpiration == 0 && !p.config.Exemplars.Enabled {
return
}

now := time.Now()
p.resourceMetrics.ForEach(func(k resourceKey, m *resourceMetrics) {
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
if !p.config.Histogram.Disable {
m.histograms.Reset(true)
if p.config.Exemplars.Enabled {
m.sums.ClearExemplars()
m.events.ClearExemplars()
if !p.config.Histogram.Disable {
m.histograms.ClearExemplars()
}
}

// If metrics expiration is configured, remove metrics that haven't been seen for longer than the expiration period.
Expand Down
114 changes: 103 additions & 11 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,18 +1519,89 @@ func TestSpanMetrics_Events(t *testing.T) {
})
}
}
func TestExemplarsForSumMetrics(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, 0, []string{})
require.NoError(t, err)
traces := buildSampleTrace()
func TestExemplarsAreDiscardedAfterFlushing(t *testing.T) {
tests := []struct {
name string
temporality string
histogramConfig func() HistogramConfig
}{
{
name: "cumulative explicit histogram",
temporality: cumulative,
histogramConfig: explicitHistogramsConfig,
},
{
name: "cumulative exponential histogram",
temporality: cumulative,
histogramConfig: exponentialHistogramsConfig,
},
{
name: "delta explicit histogram",
temporality: delta,
histogramConfig: explicitHistogramsConfig,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{})
p.metricsConsumer = &consumertest.MetricsSink{}
require.NoError(t, err)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
traces := ptrace.NewTraces()
trace1ID := [16]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x10}
initServiceSpans(
serviceSpans{
serviceName: "service-b",
spans: []span{
{
name: "/ping",
kind: ptrace.SpanKindServer,
statusCode: ptrace.StatusCodeError,
traceID: trace1ID,
spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18},
},
},
}, traces.ResourceSpans().AppendEmpty())

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)
metrics := p.buildMetrics()
// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)

// Verify exactly 1 exemplar is added to all data points when flushing
err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)

p.exportMetrics(ctx)
m := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[0]
assertDataPointsHaveExactlyOneExemplarForTrace(t, m, trace1ID)

// Verify exemplars from previous batch's trace are replaced with exemplars for the new batch's trace
traces = ptrace.NewTraces()
trace2ID := [16]byte{0x00, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x10}
initServiceSpans(
serviceSpans{
serviceName: "service-b",
spans: []span{
{
name: "/ping",
kind: ptrace.SpanKindServer,
statusCode: ptrace.StatusCodeError,
traceID: trace2ID,
spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18},
},
},
}, traces.ResourceSpans().AppendEmpty())

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)

p.exportMetrics(ctx)
m = p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[1]
assertDataPointsHaveExactlyOneExemplarForTrace(t, m, trace2ID)
})
}
}

func assertDataPointsHaveExactlyOneExemplarForTrace(t *testing.T, metrics pmetric.Metrics, traceID pcommon.TraceID) {
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
ism := rm.ScopeMetrics()
Expand All @@ -1539,12 +1610,33 @@ func TestExemplarsForSumMetrics(t *testing.T) {
m := ism.At(ilmC).Metrics()
for mC := 0; mC < m.Len(); mC++ {
metric := m.At(mC)
if metric.Type() == pmetric.MetricTypeSum {
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Greater(t, dp.Exemplars().Len(), 0)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
default:
t.Fatalf("Unexpected metric type %s", metric.Type())
}
}
}
Expand Down
30 changes: 11 additions & 19 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Key string
type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
Reset(onlyExemplars bool)
ClearExemplars()
}

type Histogram interface {
Expand Down Expand Up @@ -116,15 +116,10 @@ func (m *explicitHistogramMetrics) BuildMetrics(
}
}

func (m *explicitHistogramMetrics) Reset(onlyExemplars bool) {
if onlyExemplars {
for _, h := range m.metrics {
h.exemplars = pmetric.NewExemplarSlice()
}
return
func (m *explicitHistogramMetrics) ClearExemplars() {
for _, h := range m.metrics {
h.exemplars = pmetric.NewExemplarSlice()
}

m.metrics = make(map[Key]*explicitHistogram)
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
Expand Down Expand Up @@ -202,15 +197,10 @@ func expoHistToExponentialDataPoint(agg *structure.Histogram[float64], dp pmetri
}
}

func (m *exponentialHistogramMetrics) Reset(onlyExemplars bool) {
if onlyExemplars {
for _, m := range m.metrics {
m.exemplars = pmetric.NewExemplarSlice()
}
return
func (m *exponentialHistogramMetrics) ClearExemplars() {
for _, m := range m.metrics {
m.exemplars = pmetric.NewExemplarSlice()
}

m.metrics = make(map[Key]*exponentialHistogram)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No situation where we need to retain this functionality? This is the only bit that stands out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep this was dead code

Reset(onlyExemplars bool) was only called with true as the parameter, so the function would always return early before reaching the code path I deleted

}

func (h *explicitHistogram) Observe(value float64) {
Expand Down Expand Up @@ -316,6 +306,8 @@ func (m *SumMetrics) BuildMetrics(
}
}

func (m *SumMetrics) Reset() {
m.metrics = make(map[Key]*Sum)
func (m *SumMetrics) ClearExemplars() {
for _, sum := range m.metrics {
sum.exemplars = pmetric.NewExemplarSlice()
}
}