diff --git a/exporter/tanzuobservabilityexporter/metrics.go b/exporter/tanzuobservabilityexporter/metrics.go index 09207c345920..a0b8442cf9a8 100644 --- a/exporter/tanzuobservabilityexporter/metrics.go +++ b/exporter/tanzuobservabilityexporter/metrics.go @@ -18,14 +18,26 @@ import ( "context" "errors" "fmt" + "sync/atomic" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/multierr" + "go.uber.org/zap" +) + +const ( + missingValueMetricName = "~sdk.otel.collector.missing_values" + metricNameString = "metric name" + metricTypeString = "metric type" +) + +var ( + typeIsGaugeTags = map[string]string{"type": "gauge"} ) // metricsConsumer instances consume OTEL metrics type metricsConsumer struct { - consumerMap map[pdata.MetricDataType]metricConsumer + consumerMap map[pdata.MetricDataType]typedMetricConsumer sender flushCloser } @@ -34,8 +46,8 @@ type metricsConsumer struct { // of returned consumer calls the Flush method on sender after consuming // all the metrics. Calling Close on the returned metricsConsumer calls Close // on sender. sender can be nil. -func newMetricsConsumer(consumers []metricConsumer, sender flushCloser) *metricsConsumer { - consumerMap := make(map[pdata.MetricDataType]metricConsumer, len(consumers)) +func newMetricsConsumer(consumers []typedMetricConsumer, sender flushCloser) *metricsConsumer { + consumerMap := make(map[pdata.MetricDataType]typedMetricConsumer, len(consumers)) for _, consumer := range consumers { if consumerMap[consumer.Type()] != nil { panic("duplicate consumer type detected: " + consumer.Type().String()) @@ -49,7 +61,7 @@ func newMetricsConsumer(consumers []metricConsumer, sender flushCloser) *metrics } // Consume consumes OTEL metrics. For each metric in md, it delegates to the -// metricConsumer that consumes that type of metric. Once Consume consumes +// typedMetricConsumer that consumes that type of metric. Once Consume consumes // all the metrics, it calls Flush() on the sender passed to // newMetricsConsumer. func (c *metricsConsumer) Consume(ctx context.Context, md pdata.Metrics) error { @@ -70,6 +82,7 @@ func (c *metricsConsumer) Consume(ctx context.Context, md pdata.Metrics) error { } } } + c.pushInternalMetrics(&errs) if c.sender != nil { if err := c.sender.Flush(); err != nil { errs = append(errs, err) @@ -86,6 +99,12 @@ func (c *metricsConsumer) Close() { } } +func (c *metricsConsumer) pushInternalMetrics(errs *[]error) { + for _, consumer := range c.consumerMap { + consumer.PushInternalMetrics(errs) + } +} + func (c *metricsConsumer) pushSingleMetric(m pdata.Metric, errs *[]error) { dataType := m.DataType() consumer := c.consumerMap[dataType] @@ -98,8 +117,8 @@ func (c *metricsConsumer) pushSingleMetric(m pdata.Metric, errs *[]error) { } } -// metricConsumer consumes one specific type of OTEL metric -type metricConsumer interface { +// typedMetricConsumer consumes one specific type of OTEL metric +type typedMetricConsumer interface { // Type returns the type of metric this consumer consumes. For example // Gauge, Sum, or Histogram @@ -107,6 +126,12 @@ type metricConsumer interface { // Consume consumes the metric and appends any errors encountered to errs Consume(m pdata.Metric, errs *[]error) + + // PushInternalMetrics sends internal metrics for this consumer to tanzu observability + // and appends any errors encountered to errs. The Consume method of metricsConsumer calls + // PushInternalMetrics on each registered typedMetricConsumer after it has consumed all the + // metrics but before it calls Flush on the sender. + PushInternalMetrics(errs *[]error) } // flushCloser is the interface for the Flush and Close method @@ -115,19 +140,95 @@ type flushCloser interface { Close() } +// counter represents an internal counter metric. The zero value is ready to use +type counter struct { + count int64 +} + +// Report reports this counter to tanzu observability. name is the name of +// the metric to be reported. tags is the tags for the metric. sender is what +// sends the metric to tanzu observability. Any errors get added to errs. +func (c *counter) Report( + name string, tags map[string]string, sender gaugeSender, errs *[]error) { + err := sender.SendMetric(name, float64(c.Get()), 0, "", tags) + if err != nil { + *errs = append(*errs, err) + } +} + +// Inc increments this counter by one. +func (c *counter) Inc() { + atomic.AddInt64(&c.count, 1) +} + +// Get gets the value of this counter. +func (c *counter) Get() int64 { + return atomic.LoadInt64(&c.count) +} + +// logMissingValue keeps track of metrics with missing values. metric is the +// metric with the missing value. logger is the logger. count counts +// metrics with missing values. +func logMissingValue(metric pdata.Metric, logger *zap.Logger, count *counter) { + namef := zap.String(metricNameString, metric.Name()) + typef := zap.String(metricTypeString, metric.DataType().String()) + logger.Debug("Metric missing value", namef, typef) + count.Inc() +} + +// getValue gets the floating point value out of a NumberDataPoint +func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) { + switch numberDataPoint.Type() { + case pdata.MetricValueTypeInt: + return float64(numberDataPoint.IntVal()), nil + case pdata.MetricValueTypeDouble: + return numberDataPoint.DoubleVal(), nil + default: + return 0.0, errors.New("unsupported metric value type") + } +} + // gaugeSender sends gauge metrics to tanzu observability type gaugeSender interface { SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error } +// consumerOptions is general options for consumers +type consumerOptions struct { + + // The zap logger to use, nil means no logging + Logger *zap.Logger + + // If true, report internal metrics to wavefront + ReportInternalMetrics bool +} + +func (c *consumerOptions) replaceZeroFieldsWithDefaults() { + if c.Logger == nil { + c.Logger = zap.NewNop() + } +} + type gaugeConsumer struct { - sender gaugeSender + sender gaugeSender + logger *zap.Logger + missingValues counter + reportInternalMetrics bool } -// newGaugeConsumer returns a metricConsumer that consumes gauge metrics -// by sending them to tanzu observability -func newGaugeConsumer(sender gaugeSender) metricConsumer { - return &gaugeConsumer{sender: sender} +// newGaugeConsumer returns a typedMetricConsumer that consumes gauge metrics +// by sending them to tanzu observability. Caller can pass nil for options to get the defaults. +func newGaugeConsumer(sender gaugeSender, options *consumerOptions) typedMetricConsumer { + var fixedOptions consumerOptions + if options != nil { + fixedOptions = *options + } + fixedOptions.replaceZeroFieldsWithDefaults() + return &gaugeConsumer{ + sender: sender, + logger: fixedOptions.Logger, + reportInternalMetrics: fixedOptions.ReportInternalMetrics, + } } func (g *gaugeConsumer) Type() pdata.MetricDataType { @@ -142,13 +243,19 @@ func (g *gaugeConsumer) Consume(metric pdata.Metric, errs *[]error) { } } +func (g *gaugeConsumer) PushInternalMetrics(errs *[]error) { + if g.reportInternalMetrics { + g.missingValues.Report(missingValueMetricName, typeIsGaugeTags, g.sender, errs) + } +} + func (g *gaugeConsumer) pushSingleNumberDataPoint( metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error) { tags := attributesToTags(numberDataPoint.Attributes()) ts := numberDataPoint.Timestamp().AsTime().Unix() value, err := getValue(numberDataPoint) if err != nil { - *errs = append(*errs, err) + logMissingValue(metric, g.logger, &g.missingValues) return } err = g.sender.SendMetric(metric.Name(), value, ts, "", tags) @@ -156,14 +263,3 @@ func (g *gaugeConsumer) pushSingleNumberDataPoint( *errs = append(*errs, err) } } - -func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) { - switch numberDataPoint.Type() { - case pdata.MetricValueTypeInt: - return float64(numberDataPoint.IntVal()), nil - case pdata.MetricValueTypeDouble: - return numberDataPoint.DoubleVal(), nil - default: - return 0.0, errors.New("unsupported metric value type") - } -} diff --git a/exporter/tanzuobservabilityexporter/metrics_test.go b/exporter/tanzuobservabilityexporter/metrics_test.go index d684645fa51d..bf6e876e416f 100644 --- a/exporter/tanzuobservabilityexporter/metrics_test.go +++ b/exporter/tanzuobservabilityexporter/metrics_test.go @@ -21,7 +21,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) func TestMetricsConsumerNormal(t *testing.T) { @@ -29,16 +32,18 @@ func TestMetricsConsumerNormal(t *testing.T) { sum1 := newMetric("sum1", pdata.MetricDataTypeSum) gauge2 := newMetric("gauge2", pdata.MetricDataTypeGauge) sum2 := newMetric("sum2", pdata.MetricDataTypeSum) - mockGaugeConsumer := &mockMetricConsumer{typ: pdata.MetricDataTypeGauge} - mockSumConsumer := &mockMetricConsumer{typ: pdata.MetricDataTypeSum} + mockGaugeConsumer := &mockTypedMetricConsumer{typ: pdata.MetricDataTypeGauge} + mockSumConsumer := &mockTypedMetricConsumer{typ: pdata.MetricDataTypeSum} sender := &mockFlushCloser{} metrics := constructMetrics(gauge1, sum1, gauge2, sum2) - consumer := newMetricsConsumer([]metricConsumer{mockGaugeConsumer, mockSumConsumer}, sender) + consumer := newMetricsConsumer([]typedMetricConsumer{mockGaugeConsumer, mockSumConsumer}, sender) assert.NoError(t, consumer.Consume(context.Background(), metrics)) assert.ElementsMatch(t, []string{"gauge1", "gauge2"}, mockGaugeConsumer.names) assert.ElementsMatch(t, []string{"sum1", "sum2"}, mockSumConsumer.names) + assert.Equal(t, 1, mockGaugeConsumer.pushInternalMetricsCallCount) + assert.Equal(t, 1, mockSumConsumer.pushInternalMetricsCallCount) assert.Equal(t, 1, sender.numFlushCalls) assert.Equal(t, 0, sender.numCloseCalls) @@ -56,11 +61,11 @@ func TestMetricsConsumerNone(t *testing.T) { } func TestNewMetricsConsumerPanicsWithDuplicateMetricType(t *testing.T) { - mockGaugeConsumer1 := &mockMetricConsumer{typ: pdata.MetricDataTypeGauge} - mockGaugeConsumer2 := &mockMetricConsumer{typ: pdata.MetricDataTypeGauge} + mockGaugeConsumer1 := &mockTypedMetricConsumer{typ: pdata.MetricDataTypeGauge} + mockGaugeConsumer2 := &mockTypedMetricConsumer{typ: pdata.MetricDataTypeGauge} assert.Panics(t, func() { - newMetricsConsumer([]metricConsumer{mockGaugeConsumer1, mockGaugeConsumer2}, nil) + newMetricsConsumer([]typedMetricConsumer{mockGaugeConsumer1, mockGaugeConsumer2}, nil) }) } @@ -83,19 +88,32 @@ func TestMetricsConsumerErrorsWithUnregisteredMetricType(t *testing.T) { func TestMetricsConsumerErrorConsuming(t *testing.T) { gauge1 := newMetric("gauge1", pdata.MetricDataTypeGauge) - mockGaugeConsumer := &mockMetricConsumer{typ: pdata.MetricDataTypeGauge, errorOnConsume: true} + mockGaugeConsumer := &mockTypedMetricConsumer{typ: pdata.MetricDataTypeGauge, errorOnConsume: true} metrics := constructMetrics(gauge1) - consumer := newMetricsConsumer([]metricConsumer{mockGaugeConsumer}, nil) + consumer := newMetricsConsumer([]typedMetricConsumer{mockGaugeConsumer}, nil) assert.Error(t, consumer.Consume(context.Background(), metrics)) assert.Len(t, mockGaugeConsumer.names, 1) + assert.Equal(t, 1, mockGaugeConsumer.pushInternalMetricsCallCount) +} + +func TestMetricsConsumerErrorConsumingInternal(t *testing.T) { + gauge1 := newMetric("gauge1", pdata.MetricDataTypeGauge) + mockGaugeConsumer := &mockTypedMetricConsumer{ + typ: pdata.MetricDataTypeGauge, errorOnPushInternalMetrics: true} + metrics := constructMetrics(gauge1) + consumer := newMetricsConsumer([]typedMetricConsumer{mockGaugeConsumer}, nil) + + assert.Error(t, consumer.Consume(context.Background(), metrics)) + assert.Len(t, mockGaugeConsumer.names, 1) + assert.Equal(t, 1, mockGaugeConsumer.pushInternalMetricsCallCount) } func TestMetricsConsumerRespectContext(t *testing.T) { sender := &mockFlushCloser{} gauge1 := newMetric("gauge1", pdata.MetricDataTypeGauge) - mockGaugeConsumer := &mockMetricConsumer{typ: pdata.MetricDataTypeGauge} - consumer := newMetricsConsumer([]metricConsumer{mockGaugeConsumer}, sender) + mockGaugeConsumer := &mockTypedMetricConsumer{typ: pdata.MetricDataTypeGauge} + consumer := newMetricsConsumer([]typedMetricConsumer{mockGaugeConsumer}, sender) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -103,6 +121,7 @@ func TestMetricsConsumerRespectContext(t *testing.T) { assert.Zero(t, sender.numFlushCalls) assert.Empty(t, mockGaugeConsumer.names) + assert.Zero(t, mockGaugeConsumer.pushInternalMetricsCallCount) } func TestGaugeConsumerNormal(t *testing.T) { @@ -113,7 +132,7 @@ func TestGaugeConsumerErrorSending(t *testing.T) { verifyGaugeConsumer(t, true) } -func TestGaugeConsumerBadValue(t *testing.T) { +func TestGaugeConsumerMissingValueNoLogging(t *testing.T) { metric := newMetric("bad.metric", pdata.MetricDataTypeGauge) dataPoints := metric.Gauge().DataPoints() dataPoints.EnsureCapacity(1) @@ -124,13 +143,60 @@ func TestGaugeConsumerBadValue(t *testing.T) { dataPoints, ) sender := &mockGaugeSender{} - consumer := newGaugeConsumer(sender) + consumer := newGaugeConsumer(sender, nil) var errs []error + consumer.Consume(metric, &errs) - assert.Len(t, errs, 1) + consumer.Consume(metric, &errs) + consumer.PushInternalMetrics(&errs) + + assert.Empty(t, errs) assert.Empty(t, sender.metrics) } +func TestGaugeConsumerMissingValue(t *testing.T) { + metric := newMetric("bad.metric", pdata.MetricDataTypeGauge) + dataPoints := metric.Gauge().DataPoints() + dataPoints.EnsureCapacity(1) + addDataPoint( + nil, + 1633123456, + nil, + dataPoints, + ) + // Sending to tanzu observability should fail + sender := &mockGaugeSender{errorOnSend: true} + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + consumer := newGaugeConsumer(sender, &consumerOptions{ + Logger: zap.New(observedZapCore), + ReportInternalMetrics: true, + }) + var errs []error + expectedMissingValueCount := 2 + for i := 0; i < expectedMissingValueCount; i++ { + // This call to Consume does not emit any metrics to tanzuobservability + // because the metric is missing its value. + consumer.Consume(metric, &errs) + } + assert.Empty(t, errs) + + // This call adds one error to errs because it emits a metric to + // tanzu observability and emitting there is set up to fail. + consumer.PushInternalMetrics(&errs) + + // One error from emitting the internal metric + assert.Len(t, errs, 1) + // Only the internal metric was sent + require.Len(t, sender.metrics, 1) + assert.Equal(t, tobsMetric{ + Name: missingValueMetricName, + Value: float64(expectedMissingValueCount), + Tags: map[string]string{"type": "gauge"}}, + sender.metrics[0]) + allLogs := observedLogs.All() + assert.Len(t, allLogs, expectedMissingValueCount) +} + func verifyGaugeConsumer(t *testing.T, errorOnSend bool) { metric := newMetric("test.metric", pdata.MetricDataTypeGauge) dataPoints := metric.Gauge().DataPoints() @@ -162,11 +228,12 @@ func verifyGaugeConsumer(t *testing.T, errorOnSend bool) { }, } sender := &mockGaugeSender{errorOnSend: errorOnSend} - consumer := newGaugeConsumer(sender) + consumer := newGaugeConsumer(sender, nil) assert.Equal(t, pdata.MetricDataTypeGauge, consumer.Type()) var errs []error consumer.Consume(metric, &errs) + consumer.PushInternalMetrics(&errs) assert.ElementsMatch(t, expected, sender.metrics) if errorOnSend { assert.Len(t, errs, len(expected)) @@ -205,7 +272,7 @@ func addDataPoint( setDataPointValue(value, dataPoint) } setDataPointTimestamp(ts, dataPoint) - setTags(tags, dataPoint) + setTags(tags, dataPoint.Attributes()) } func setDataPointTimestamp(ts int64, dataPoint pdata.NumberDataPoint) { @@ -226,7 +293,7 @@ func setDataPointValue(value interface{}, dataPoint pdata.NumberDataPoint) { } } -func setTags(tags map[string]interface{}, dataPoint pdata.NumberDataPoint) { +func setTags(tags map[string]interface{}, attributes pdata.AttributeMap) { valueMap := make(map[string]pdata.AttributeValue, len(tags)) for key, value := range tags { switch v := value.(type) { @@ -243,7 +310,7 @@ func setTags(tags map[string]interface{}, dataPoint pdata.NumberDataPoint) { } } attributeMap := pdata.NewAttributeMapFromMap(valueMap) - attributeMap.CopyTo(dataPoint.Attributes()) + attributeMap.CopyTo(attributes) } type tobsMetric struct { @@ -261,16 +328,12 @@ type mockGaugeSender struct { func (m *mockGaugeSender) SendMetric( name string, value float64, ts int64, source string, tags map[string]string) error { - tagsCopy := make(map[string]string, len(tags)) - for k, v := range tags { - tagsCopy[k] = v - } m.metrics = append(m.metrics, tobsMetric{ Name: name, Value: value, Ts: ts, Source: source, - Tags: tagsCopy, + Tags: copyTags(tags), }) if m.errorOnSend { return errors.New("error sending") @@ -278,23 +341,32 @@ func (m *mockGaugeSender) SendMetric( return nil } -type mockMetricConsumer struct { - typ pdata.MetricDataType - errorOnConsume bool - names []string +type mockTypedMetricConsumer struct { + typ pdata.MetricDataType + errorOnConsume bool + errorOnPushInternalMetrics bool + names []string + pushInternalMetricsCallCount int } -func (m *mockMetricConsumer) Type() pdata.MetricDataType { +func (m *mockTypedMetricConsumer) Type() pdata.MetricDataType { return m.typ } -func (m *mockMetricConsumer) Consume(metric pdata.Metric, errs *[]error) { +func (m *mockTypedMetricConsumer) Consume(metric pdata.Metric, errs *[]error) { m.names = append(m.names, metric.Name()) if m.errorOnConsume { *errs = append(*errs, errors.New("error in consume")) } } +func (m *mockTypedMetricConsumer) PushInternalMetrics(errs *[]error) { + m.pushInternalMetricsCallCount++ + if m.errorOnPushInternalMetrics { + *errs = append(*errs, errors.New("error in consume")) + } +} + type mockFlushCloser struct { errorOnFlush bool numFlushCalls int @@ -312,3 +384,14 @@ func (m *mockFlushCloser) Flush() error { func (m *mockFlushCloser) Close() { m.numCloseCalls++ } + +func copyTags(tags map[string]string) map[string]string { + if tags == nil { + return nil + } + tagsCopy := make(map[string]string, len(tags)) + for k, v := range tags { + tagsCopy[k] = v + } + return tagsCopy +}