From b672af7ba5c94a6e63ca4fc6e21a9fe24b614fd3 Mon Sep 17 00:00:00 2001 From: Andrew Crump Date: Tue, 5 Dec 2023 18:20:54 +0000 Subject: [PATCH 1/2] Specify that counters are monotonic Sums must be declared as monotonic in order to be represented in prometheus as counters. Note that this causes the otel collector to rename metrics when using the prometheus exporter. Counters have `_total` appended to the name. There are cases where counters aren't monotonic, in particular when counters are emitted only as deltas (V1) and the counter totals are reset in the Forwarder Agent. Signed-off-by: Rebecca Roberts --- src/pkg/otelcolclient/otelcolclient.go | 1 + src/pkg/otelcolclient/otelcolclient_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pkg/otelcolclient/otelcolclient.go b/src/pkg/otelcolclient/otelcolclient.go index 30e87ae96..70f19f373 100644 --- a/src/pkg/otelcolclient/otelcolclient.go +++ b/src/pkg/otelcolclient/otelcolclient.go @@ -113,6 +113,7 @@ func (c *Client) addCounterToBatch(e *loggregator_v2.Envelope) { Data: &metricspb.Metric_Sum{ Sum: &metricspb.Sum{ AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, DataPoints: []*metricspb.NumberDataPoint{ { TimeUnixNano: uint64(e.GetTimestamp()), diff --git a/src/pkg/otelcolclient/otelcolclient_test.go b/src/pkg/otelcolclient/otelcolclient_test.go index 2e0a3c9e6..954baa7d2 100644 --- a/src/pkg/otelcolclient/otelcolclient_test.go +++ b/src/pkg/otelcolclient/otelcolclient_test.go @@ -310,7 +310,7 @@ var _ = Describe("Client", func() { Data: &metricspb.Metric_Sum{ Sum: &metricspb.Sum{ AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - IsMonotonic: false, + IsMonotonic: true, DataPoints: []*metricspb.NumberDataPoint{ { TimeUnixNano: 1257894000000000000, From 3531ab6466b92d2ec1c24722db23643afed0ac0c Mon Sep 17 00:00:00 2001 From: Rebecca Roberts Date: Tue, 5 Dec 2023 22:16:51 +0000 Subject: [PATCH 2/2] Set IsMonotonic based on presence of a delta - Envelopes that are emitted only with a delta are decorated by the counter aggregator within Forwarder Agent to have both a delta and a total. - The envelope totals are not monotonic as the Forwarder Agent will discard totals once the map of totals grows over a set size. - Envelopes that are emitted only with totals will not have a delta present. Signed-off-by: Andrew Crump --- src/pkg/otelcolclient/otelcolclient.go | 2 +- src/pkg/otelcolclient/otelcolclient_test.go | 179 +++++++++++++++----- 2 files changed, 133 insertions(+), 48 deletions(-) diff --git a/src/pkg/otelcolclient/otelcolclient.go b/src/pkg/otelcolclient/otelcolclient.go index 70f19f373..69e1cbf0e 100644 --- a/src/pkg/otelcolclient/otelcolclient.go +++ b/src/pkg/otelcolclient/otelcolclient.go @@ -113,7 +113,7 @@ func (c *Client) addCounterToBatch(e *loggregator_v2.Envelope) { Data: &metricspb.Metric_Sum{ Sum: &metricspb.Sum{ AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - IsMonotonic: true, + IsMonotonic: e.GetCounter().GetDelta() == 0, DataPoints: []*metricspb.NumberDataPoint{ { TimeUnixNano: uint64(e.GetTimestamp()), diff --git a/src/pkg/otelcolclient/otelcolclient_test.go b/src/pkg/otelcolclient/otelcolclient_test.go index 954baa7d2..d68bca024 100644 --- a/src/pkg/otelcolclient/otelcolclient_test.go +++ b/src/pkg/otelcolclient/otelcolclient_test.go @@ -284,57 +284,143 @@ var _ = Describe("Client", func() { Message: &loggregator_v2.Envelope_Counter{ Counter: &loggregator_v2.Counter{ Name: "dropped", - Delta: 5, Total: 10, }, }, } }) - It("returns nil", func() { - Expect(returnedErr).NotTo(HaveOccurred()) - }) + Context("when the envelope has a total but no delta", func() { + It("returns nil", func() { + Expect(returnedErr).NotTo(HaveOccurred()) + }) - It("succeeds", func() { - var msr *colmetricspb.ExportMetricsServiceRequest - Expect(spyMSC.requests).To(Receive(&msr)) + It("emits a monotonic sum", func() { + var msr *colmetricspb.ExportMetricsServiceRequest + Expect(spyMSC.requests).To(Receive(&msr)) - expectedReq := &colmetricspb.ExportMetricsServiceRequest{ - ResourceMetrics: []*metricspb.ResourceMetrics{ - { - ScopeMetrics: []*metricspb.ScopeMetrics{ - { - Metrics: []*metricspb.Metric{ - { - Name: "dropped", - Data: &metricspb.Metric_Sum{ - Sum: &metricspb.Sum{ - AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - IsMonotonic: true, - DataPoints: []*metricspb.NumberDataPoint{ - { - TimeUnixNano: 1257894000000000000, - Attributes: []*commonpb.KeyValue{ - { - Key: "direction", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "egress"}}, + expectedReq := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Metrics: []*metricspb.Metric{ + { + Name: "dropped", + Data: &metricspb.Metric_Sum{ + Sum: &metricspb.Sum{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, + DataPoints: []*metricspb.NumberDataPoint{ + { + TimeUnixNano: 1257894000000000000, + Attributes: []*commonpb.KeyValue{ + { + Key: "direction", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "egress"}}, + }, + { + Key: "instance_id", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}}, + }, + { + Key: "origin", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}}, + }, + { + Key: "source_id", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}}, + }, }, - { - Key: "instance_id", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}}, + Value: &metricspb.NumberDataPoint_AsInt{ + AsInt: 10, }, - { - Key: "origin", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool { + return x.Key < y.Key + }) + s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool { + return x.Name < y.Name + }) + Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty()) + }) + }) + Context("when the envelope has a delta and a calculated total", func() { + BeforeEach(func() { + envelope = &loggregator_v2.Envelope{ + Timestamp: 1257894000000000000, + SourceId: "fake-source-id", + InstanceId: "fake-instance-id", + Tags: map[string]string{ + "direction": "egress", + "origin": "fake-origin.some-vm", + }, + Message: &loggregator_v2.Envelope_Counter{ + Counter: &loggregator_v2.Counter{ + Name: "dropped", + Delta: 5, + Total: 10, + }, + }, + } + }) + + It("returns nil", func() { + Expect(returnedErr).NotTo(HaveOccurred()) + }) + + It("emits a non-monotonic sum", func() { + var msr *colmetricspb.ExportMetricsServiceRequest + Expect(spyMSC.requests).To(Receive(&msr)) + + expectedReq := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Metrics: []*metricspb.Metric{ + { + Name: "dropped", + Data: &metricspb.Metric_Sum{ + Sum: &metricspb.Sum{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: false, + DataPoints: []*metricspb.NumberDataPoint{ + { + TimeUnixNano: 1257894000000000000, + Attributes: []*commonpb.KeyValue{ + { + Key: "direction", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "egress"}}, + }, + { + Key: "instance_id", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}}, + }, + { + Key: "origin", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}}, + }, + { + Key: "source_id", + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}}, + }, }, - { - Key: "source_id", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}}, + Value: &metricspb.NumberDataPoint_AsInt{ + AsInt: 10, }, }, - Value: &metricspb.NumberDataPoint_AsInt{ - AsInt: 10, - }, }, }, }, @@ -344,18 +430,17 @@ var _ = Describe("Client", func() { }, }, }, - }, - } + } - s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool { - return x.Key < y.Key - }) - s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool { - return x.Name < y.Name + s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool { + return x.Key < y.Key + }) + s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool { + return x.Name < y.Name + }) + Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty()) }) - Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty()) }) - Context("when Metric Service Client returns an error", func() { BeforeEach(func() { spyMSC.responseErr = errors.New("test-error")