Skip to content

Commit

Permalink
Set IsMonotonic based on presence of a delta
Browse files Browse the repository at this point in the history
- Envelopes that are emitted only with a delta are decorated by the
  counter aggregator within Forwarder Agent to have both a delta and a
  total.
- The envelope totals are not monotonic as the Forwarder Agent will
  discard totals once the map of totals grows over a set size.
- Envelopes that are emitted only with totals will not have a delta
  present.

Signed-off-by: Andrew Crump <crumpan@vmware.com>
  • Loading branch information
rroberts2222 authored and acrmp committed Dec 6, 2023
1 parent f5fa560 commit bbbd515
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/pkg/otelcolclient/otelcolclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Client) addCounterToBatch(e *loggregator_v2.Envelope) {
Data: &metricspb.Metric_Sum{
Sum: &metricspb.Sum{
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: true,
IsMonotonic: e.GetCounter().GetDelta() == 0,
DataPoints: []*metricspb.NumberDataPoint{
{
TimeUnixNano: uint64(e.GetTimestamp()),
Expand Down
179 changes: 132 additions & 47 deletions src/pkg/otelcolclient/otelcolclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,57 +284,143 @@ var _ = Describe("Client", func() {
Message: &loggregator_v2.Envelope_Counter{
Counter: &loggregator_v2.Counter{
Name: "dropped",
Delta: 5,
Total: 10,
},
},
}
})

It("returns nil", func() {
Expect(returnedErr).NotTo(HaveOccurred())
})
Context("when the envelope has a total but no delta", func() {
It("returns nil", func() {
Expect(returnedErr).NotTo(HaveOccurred())
})

It("succeeds", func() {
var msr *colmetricspb.ExportMetricsServiceRequest
Expect(spyMSC.requests).To(Receive(&msr))
It("emits a monotonic sum", func() {
var msr *colmetricspb.ExportMetricsServiceRequest
Expect(spyMSC.requests).To(Receive(&msr))

expectedReq := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
{
Metrics: []*metricspb.Metric{
{
Name: "dropped",
Data: &metricspb.Metric_Sum{
Sum: &metricspb.Sum{
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: true,
DataPoints: []*metricspb.NumberDataPoint{
{
TimeUnixNano: 1257894000000000000,
Attributes: []*commonpb.KeyValue{
{
Key: "direction",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "egress"}},
expectedReq := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
{
Metrics: []*metricspb.Metric{
{
Name: "dropped",
Data: &metricspb.Metric_Sum{
Sum: &metricspb.Sum{
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: true,
DataPoints: []*metricspb.NumberDataPoint{
{
TimeUnixNano: 1257894000000000000,
Attributes: []*commonpb.KeyValue{
{
Key: "direction",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "egress"}},
},
{
Key: "instance_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}},
},
{
Key: "origin",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}},
},
{
Key: "source_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}},
},
},
{
Key: "instance_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}},
Value: &metricspb.NumberDataPoint_AsInt{
AsInt: 10,
},
{
Key: "origin",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}},
},
},
},
},
},
},
},
},
},
},
}

s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool {
return x.Key < y.Key
})
s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool {
return x.Name < y.Name
})
Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty())
})
})
Context("when the envelope has a delta and a calculated total", func() {
BeforeEach(func() {
envelope = &loggregator_v2.Envelope{
Timestamp: 1257894000000000000,
SourceId: "fake-source-id",
InstanceId: "fake-instance-id",
Tags: map[string]string{
"direction": "egress",
"origin": "fake-origin.some-vm",
},
Message: &loggregator_v2.Envelope_Counter{
Counter: &loggregator_v2.Counter{
Name: "dropped",
Delta: 5,
Total: 10,
},
},
}
})

It("returns nil", func() {
Expect(returnedErr).NotTo(HaveOccurred())
})

It("emits a non-monotonic sum", func() {
var msr *colmetricspb.ExportMetricsServiceRequest
Expect(spyMSC.requests).To(Receive(&msr))

expectedReq := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
{
Metrics: []*metricspb.Metric{
{
Name: "dropped",
Data: &metricspb.Metric_Sum{
Sum: &metricspb.Sum{
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: false,
DataPoints: []*metricspb.NumberDataPoint{
{
TimeUnixNano: 1257894000000000000,
Attributes: []*commonpb.KeyValue{
{
Key: "direction",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "egress"}},
},
{
Key: "instance_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}},
},
{
Key: "origin",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-origin.some-vm"}},
},
{
Key: "source_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}},
},
},
{
Key: "source_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}},
Value: &metricspb.NumberDataPoint_AsInt{
AsInt: 10,
},
},
Value: &metricspb.NumberDataPoint_AsInt{
AsInt: 10,
},
},
},
},
Expand All @@ -344,18 +430,17 @@ var _ = Describe("Client", func() {
},
},
},
},
}
}

s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool {
return x.Key < y.Key
})
s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool {
return x.Name < y.Name
s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool {
return x.Key < y.Key
})
s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool {
return x.Name < y.Name
})
Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty())
})
Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty())
})

Context("when Metric Service Client returns an error", func() {
BeforeEach(func() {
spyMSC.responseErr = errors.New("test-error")
Expand Down

0 comments on commit bbbd515

Please sign in to comment.