diff --git a/exporter/tanzuobservabilityexporter/metrics.go b/exporter/tanzuobservabilityexporter/metrics.go index 5b15e0fc1006..600330727c26 100644 --- a/exporter/tanzuobservabilityexporter/metrics.go +++ b/exporter/tanzuobservabilityexporter/metrics.go @@ -20,6 +20,7 @@ import ( "fmt" "sync/atomic" + "github.com/wavefronthq/wavefront-sdk-go/senders" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/multierr" "go.uber.org/zap" @@ -33,6 +34,7 @@ const ( var ( typeIsGaugeTags = map[string]string{"type": "gauge"} + typeIsSumTags = map[string]string{"type": "sum"} ) // metricsConsumer instances consume OTEL metrics @@ -149,7 +151,8 @@ type counter struct { // the metric to be reported. tags is the tags for the metric. sender is what // sends the metric to tanzu observability. Any errors get added to errs. func (c *counter) Report( - name string, tags map[string]string, sender gaugeSender, errs *[]error) { + name string, tags map[string]string, sender gaugeSender, errs *[]error, +) { err := sender.SendMetric(name, float64(c.Get()), 0, "", tags) if err != nil { *errs = append(*errs, err) @@ -188,6 +191,32 @@ func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) { } } +// pushGaugeNumberDataPoint sends a metric as a gauge metric to tanzu +// observability. metric is the metric to send. numberDataPoint is the value +// of the metric. Any errors get appended to errs. sender is what sends the +// gauge metric to tanzu observability. logger is the logger. missingValues +// keeps track of metrics with missing values. +func pushGaugeNumberDataPoint( + metric pdata.Metric, + numberDataPoint pdata.NumberDataPoint, + errs *[]error, + sender gaugeSender, + logger *zap.Logger, + missingValues *counter, +) { + tags := attributesToTags(numberDataPoint.Attributes()) + ts := numberDataPoint.Timestamp().AsTime().Unix() + value, err := getValue(numberDataPoint) + if err != nil { + logMissingValue(metric, logger, missingValues) + return + } + err = sender.SendMetric(metric.Name(), value, ts, "", tags) + if err != nil { + *errs = append(*errs, err) + } +} + // gaugeSender sends gauge metrics to tanzu observability type gaugeSender interface { SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error @@ -239,7 +268,13 @@ func (g *gaugeConsumer) Consume(metric pdata.Metric, errs *[]error) { gauge := metric.Gauge() numberDataPoints := gauge.DataPoints() for i := 0; i < numberDataPoints.Len(); i++ { - g.pushSingleNumberDataPoint(metric, numberDataPoints.At(i), errs) + pushGaugeNumberDataPoint( + metric, + numberDataPoints.At(i), + errs, + g.sender, + g.logger, + &g.missingValues) } } @@ -249,16 +284,63 @@ func (g *gaugeConsumer) PushInternalMetrics(errs *[]error) { } } -func (g *gaugeConsumer) pushSingleNumberDataPoint( - metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error) { +type sumConsumer struct { + sender senders.MetricSender + logger *zap.Logger + missingValues counter + reportInternalMetrics bool +} + +func newSumConsumer(sender senders.MetricSender, options *consumerOptions) typedMetricConsumer { + var fixedOptions consumerOptions + if options != nil { + fixedOptions = *options + } + fixedOptions.replaceZeroFieldsWithDefaults() + return &sumConsumer{ + sender: sender, + logger: fixedOptions.Logger, + reportInternalMetrics: fixedOptions.ReportInternalMetrics, + } +} + +func (s *sumConsumer) Type() pdata.MetricDataType { + return pdata.MetricDataTypeSum +} + +func (s *sumConsumer) Consume(metric pdata.Metric, errs *[]error) { + sum := metric.Sum() + isDelta := sum.AggregationTemporality() == pdata.MetricAggregationTemporalityDelta + numberDataPoints := sum.DataPoints() + for i := 0; i < numberDataPoints.Len(); i++ { + // If sum metric is a delta type, send it to tanzu observability as a + // delta counter. Otherwise, send it to tanzu observability as a gauge + // metric. + if isDelta { + s.pushNumberDataPoint(metric, numberDataPoints.At(i), errs) + } else { + pushGaugeNumberDataPoint( + metric, numberDataPoints.At(i), errs, s.sender, s.logger, &s.missingValues) + } + } +} + +func (s *sumConsumer) PushInternalMetrics(errs *[]error) { + if s.reportInternalMetrics { + s.missingValues.Report(missingValueMetricName, typeIsSumTags, s.sender, errs) + } +} + +func (s *sumConsumer) pushNumberDataPoint( + metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error, +) { tags := attributesToTags(numberDataPoint.Attributes()) - ts := numberDataPoint.Timestamp().AsTime().Unix() value, err := getValue(numberDataPoint) if err != nil { - logMissingValue(metric, g.logger, &g.missingValues) + logMissingValue(metric, s.logger, &s.missingValues) return } - err = g.sender.SendMetric(metric.Name(), value, ts, "", tags) + err = s.sender.SendDeltaCounter(metric.Name(), value, "", tags) if err != nil { *errs = append(*errs, err) } diff --git a/exporter/tanzuobservabilityexporter/metrics_test.go b/exporter/tanzuobservabilityexporter/metrics_test.go index bf6e876e416f..a713c7d66b40 100644 --- a/exporter/tanzuobservabilityexporter/metrics_test.go +++ b/exporter/tanzuobservabilityexporter/metrics_test.go @@ -146,7 +146,6 @@ func TestGaugeConsumerMissingValueNoLogging(t *testing.T) { consumer := newGaugeConsumer(sender, nil) var errs []error - consumer.Consume(metric, &errs) consumer.Consume(metric, &errs) consumer.PushInternalMetrics(&errs) @@ -197,6 +196,203 @@ func TestGaugeConsumerMissingValue(t *testing.T) { assert.Len(t, allLogs, expectedMissingValueCount) } +func TestSumConsumerDelta(t *testing.T) { + deltaMetric := newMetric( + "test.delta.metric", pdata.MetricDataTypeSum) + sum := deltaMetric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) + dataPoints := sum.DataPoints() + dataPoints.EnsureCapacity(2) + addDataPoint( + 35, + 1635205001, + map[string]interface{}{ + "env": "dev", + }, + dataPoints, + ) + addDataPoint( + 52.375, + 1635205002, + map[string]interface{}{ + "env": "prod", + }, + dataPoints, + ) + + sender := &mockSumSender{} + consumer := newSumConsumer(sender, nil) + assert.Equal(t, pdata.MetricDataTypeSum, consumer.Type()) + var errs []error + + // delta sums get treated as delta counters + consumer.Consume(deltaMetric, &errs) + consumer.PushInternalMetrics(&errs) + + expected := []tobsMetric{ + { + Name: "test.delta.metric", + Value: 35.0, + Tags: map[string]string{"env": "dev"}, + }, + { + Name: "test.delta.metric", + Value: 52.375, + Tags: map[string]string{"env": "prod"}, + }, + } + assert.ElementsMatch(t, expected, sender.deltaMetrics) + assert.Empty(t, sender.metrics) + assert.Empty(t, errs) +} + +func TestSumConsumerErrorOnSend(t *testing.T) { + deltaMetric := newMetric( + "test.delta.metric", pdata.MetricDataTypeSum) + sum := deltaMetric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) + dataPoints := sum.DataPoints() + dataPoints.EnsureCapacity(2) + addDataPoint( + 35, + 1635205001, + map[string]interface{}{ + "env": "dev", + }, + dataPoints, + ) + addDataPoint( + 52.375, + 1635205002, + map[string]interface{}{ + "env": "prod", + }, + dataPoints, + ) + + sender := &mockSumSender{errorOnSend: true} + consumer := newSumConsumer(sender, nil) + assert.Equal(t, pdata.MetricDataTypeSum, consumer.Type()) + var errs []error + + // delta sums get treated as delta counters + consumer.Consume(deltaMetric, &errs) + consumer.PushInternalMetrics(&errs) + + assert.Len(t, errs, 2) +} + +func TestSumConsumerCumulative(t *testing.T) { + cumulativeMetric := newMetric( + "test.cumulative.metric", pdata.MetricDataTypeSum) + sum := cumulativeMetric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + dataPoints := sum.DataPoints() + dataPoints.EnsureCapacity(1) + addDataPoint( + 62.25, + 1634205001, + map[string]interface{}{ + "env": "dev", + }, + dataPoints, + ) + sender := &mockSumSender{} + consumer := newSumConsumer(sender, nil) + assert.Equal(t, pdata.MetricDataTypeSum, consumer.Type()) + var errs []error + + // cumulative sums get treated as regular wavefront metrics + consumer.Consume(cumulativeMetric, &errs) + consumer.PushInternalMetrics(&errs) + + expected := []tobsMetric{ + { + Name: "test.cumulative.metric", + Value: 62.25, + Ts: 1634205001, + Tags: map[string]string{"env": "dev"}, + }, + } + assert.ElementsMatch(t, expected, sender.metrics) + assert.Empty(t, sender.deltaMetrics) + assert.Empty(t, errs) +} + +func TestSumConsumerUnspecified(t *testing.T) { + cumulativeMetric := newMetric( + "test.unspecified.metric", pdata.MetricDataTypeSum) + sum := cumulativeMetric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityUnspecified) + dataPoints := sum.DataPoints() + dataPoints.EnsureCapacity(1) + addDataPoint( + 72.25, + 1634206001, + map[string]interface{}{ + "env": "qa", + }, + dataPoints, + ) + sender := &mockSumSender{} + consumer := newSumConsumer(sender, nil) + assert.Equal(t, pdata.MetricDataTypeSum, consumer.Type()) + var errs []error + + // unspecified sums get treated as regular wavefront metrics + consumer.Consume(cumulativeMetric, &errs) + consumer.PushInternalMetrics(&errs) + + expected := []tobsMetric{ + { + Name: "test.unspecified.metric", + Value: 72.25, + Ts: 1634206001, + Tags: map[string]string{"env": "qa"}, + }, + } + assert.ElementsMatch(t, expected, sender.metrics) + assert.Empty(t, sender.deltaMetrics) + assert.Empty(t, errs) +} + +func TestSumConsumerMissingValue(t *testing.T) { + metric := newMetric("bad.metric", pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) + dataPoints := sum.DataPoints() + dataPoints.EnsureCapacity(1) + addDataPoint( + nil, + 1633123456, + nil, + dataPoints, + ) + sender := &mockSumSender{} + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + consumer := newSumConsumer(sender, &consumerOptions{ + Logger: zap.New(observedZapCore), + ReportInternalMetrics: true, + }) + var errs []error + + expectedMissingValueCount := 2 + for i := 0; i < expectedMissingValueCount; i++ { + consumer.Consume(metric, &errs) + } + consumer.PushInternalMetrics(&errs) + + assert.Len(t, errs, 0) + assert.Empty(t, sender.deltaMetrics) + assert.Contains(t, sender.metrics, tobsMetric{ + Name: missingValueMetricName, + Value: float64(expectedMissingValueCount), + Tags: map[string]string{"type": "sum"}, + }) + allLogs := observedLogs.All() + assert.Len(t, allLogs, expectedMissingValueCount) +} + func verifyGaugeConsumer(t *testing.T, errorOnSend bool) { metric := newMetric("test.metric", pdata.MetricDataTypeGauge) dataPoints := metric.Gauge().DataPoints() @@ -266,7 +462,8 @@ func addDataPoint( value interface{}, ts int64, tags map[string]interface{}, - slice pdata.NumberDataPointSlice) { + slice pdata.NumberDataPointSlice, +) { dataPoint := slice.AppendEmpty() if value != nil { setDataPointValue(value, dataPoint) @@ -327,7 +524,8 @@ type mockGaugeSender struct { } func (m *mockGaugeSender) SendMetric( - name string, value float64, ts int64, source string, tags map[string]string) error { + name string, value float64, ts int64, source string, tags map[string]string, +) error { m.metrics = append(m.metrics, tobsMetric{ Name: name, Value: value, @@ -395,3 +593,40 @@ func copyTags(tags map[string]string) map[string]string { } return tagsCopy } + +type mockSumSender struct { + errorOnSend bool + metrics []tobsMetric + deltaMetrics []tobsMetric +} + +func (m *mockSumSender) SendMetric( + name string, value float64, ts int64, source string, tags map[string]string, +) error { + m.metrics = append(m.metrics, tobsMetric{ + Name: name, + Value: value, + Ts: ts, + Source: source, + Tags: copyTags(tags), + }) + if m.errorOnSend { + return errors.New("error sending") + } + return nil +} + +func (m *mockSumSender) SendDeltaCounter( + name string, value float64, source string, tags map[string]string, +) error { + m.deltaMetrics = append(m.deltaMetrics, tobsMetric{ + Name: name, + Value: value, + Source: source, + Tags: copyTags(tags), + }) + if m.errorOnSend { + return errors.New("error sending") + } + return nil +}