Skip to content

Commit

Permalink
Log and report missing metric values. (#5835)
Browse files Browse the repository at this point in the history
* Log and report missing metric values.

* Remove TODO. Slight refactor in a test.

* Small refactor to prepare for future concurrent pull requests.

* Refactorings that Peter Stone Proposed.

* Get rid of mutex in favor of atomic operations.
  • Loading branch information
keep94 authored Nov 18, 2021
1 parent ef0b89a commit e4c1fa0
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 51 deletions.
142 changes: 119 additions & 23 deletions exporter/tanzuobservabilityexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,26 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const (
missingValueMetricName = "~sdk.otel.collector.missing_values"
metricNameString = "metric name"
metricTypeString = "metric type"
)

var (
typeIsGaugeTags = map[string]string{"type": "gauge"}
)

// metricsConsumer instances consume OTEL metrics
type metricsConsumer struct {
consumerMap map[pdata.MetricDataType]metricConsumer
consumerMap map[pdata.MetricDataType]typedMetricConsumer
sender flushCloser
}

Expand All @@ -34,8 +46,8 @@ type metricsConsumer struct {
// of returned consumer calls the Flush method on sender after consuming
// all the metrics. Calling Close on the returned metricsConsumer calls Close
// on sender. sender can be nil.
func newMetricsConsumer(consumers []metricConsumer, sender flushCloser) *metricsConsumer {
consumerMap := make(map[pdata.MetricDataType]metricConsumer, len(consumers))
func newMetricsConsumer(consumers []typedMetricConsumer, sender flushCloser) *metricsConsumer {
consumerMap := make(map[pdata.MetricDataType]typedMetricConsumer, len(consumers))
for _, consumer := range consumers {
if consumerMap[consumer.Type()] != nil {
panic("duplicate consumer type detected: " + consumer.Type().String())
Expand All @@ -49,7 +61,7 @@ func newMetricsConsumer(consumers []metricConsumer, sender flushCloser) *metrics
}

// Consume consumes OTEL metrics. For each metric in md, it delegates to the
// metricConsumer that consumes that type of metric. Once Consume consumes
// typedMetricConsumer that consumes that type of metric. Once Consume consumes
// all the metrics, it calls Flush() on the sender passed to
// newMetricsConsumer.
func (c *metricsConsumer) Consume(ctx context.Context, md pdata.Metrics) error {
Expand All @@ -70,6 +82,7 @@ func (c *metricsConsumer) Consume(ctx context.Context, md pdata.Metrics) error {
}
}
}
c.pushInternalMetrics(&errs)
if c.sender != nil {
if err := c.sender.Flush(); err != nil {
errs = append(errs, err)
Expand All @@ -86,6 +99,12 @@ func (c *metricsConsumer) Close() {
}
}

func (c *metricsConsumer) pushInternalMetrics(errs *[]error) {
for _, consumer := range c.consumerMap {
consumer.PushInternalMetrics(errs)
}
}

func (c *metricsConsumer) pushSingleMetric(m pdata.Metric, errs *[]error) {
dataType := m.DataType()
consumer := c.consumerMap[dataType]
Expand All @@ -98,15 +117,21 @@ func (c *metricsConsumer) pushSingleMetric(m pdata.Metric, errs *[]error) {
}
}

// metricConsumer consumes one specific type of OTEL metric
type metricConsumer interface {
// typedMetricConsumer consumes one specific type of OTEL metric
type typedMetricConsumer interface {

// Type returns the type of metric this consumer consumes. For example
// Gauge, Sum, or Histogram
Type() pdata.MetricDataType

// Consume consumes the metric and appends any errors encountered to errs
Consume(m pdata.Metric, errs *[]error)

// PushInternalMetrics sends internal metrics for this consumer to tanzu observability
// and appends any errors encountered to errs. The Consume method of metricsConsumer calls
// PushInternalMetrics on each registered typedMetricConsumer after it has consumed all the
// metrics but before it calls Flush on the sender.
PushInternalMetrics(errs *[]error)
}

// flushCloser is the interface for the Flush and Close method
Expand All @@ -115,19 +140,95 @@ type flushCloser interface {
Close()
}

// counter represents an internal counter metric. The zero value is ready to use
type counter struct {
count int64
}

// Report reports this counter to tanzu observability. name is the name of
// the metric to be reported. tags is the tags for the metric. sender is what
// sends the metric to tanzu observability. Any errors get added to errs.
func (c *counter) Report(
name string, tags map[string]string, sender gaugeSender, errs *[]error) {
err := sender.SendMetric(name, float64(c.Get()), 0, "", tags)
if err != nil {
*errs = append(*errs, err)
}
}

// Inc increments this counter by one.
func (c *counter) Inc() {
atomic.AddInt64(&c.count, 1)
}

// Get gets the value of this counter.
func (c *counter) Get() int64 {
return atomic.LoadInt64(&c.count)
}

// logMissingValue keeps track of metrics with missing values. metric is the
// metric with the missing value. logger is the logger. count counts
// metrics with missing values.
func logMissingValue(metric pdata.Metric, logger *zap.Logger, count *counter) {
namef := zap.String(metricNameString, metric.Name())
typef := zap.String(metricTypeString, metric.DataType().String())
logger.Debug("Metric missing value", namef, typef)
count.Inc()
}

// getValue gets the floating point value out of a NumberDataPoint
func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) {
switch numberDataPoint.Type() {
case pdata.MetricValueTypeInt:
return float64(numberDataPoint.IntVal()), nil
case pdata.MetricValueTypeDouble:
return numberDataPoint.DoubleVal(), nil
default:
return 0.0, errors.New("unsupported metric value type")
}
}

// gaugeSender sends gauge metrics to tanzu observability
type gaugeSender interface {
SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error
}

// consumerOptions is general options for consumers
type consumerOptions struct {

// The zap logger to use, nil means no logging
Logger *zap.Logger

// If true, report internal metrics to wavefront
ReportInternalMetrics bool
}

func (c *consumerOptions) replaceZeroFieldsWithDefaults() {
if c.Logger == nil {
c.Logger = zap.NewNop()
}
}

type gaugeConsumer struct {
sender gaugeSender
sender gaugeSender
logger *zap.Logger
missingValues counter
reportInternalMetrics bool
}

// newGaugeConsumer returns a metricConsumer that consumes gauge metrics
// by sending them to tanzu observability
func newGaugeConsumer(sender gaugeSender) metricConsumer {
return &gaugeConsumer{sender: sender}
// newGaugeConsumer returns a typedMetricConsumer that consumes gauge metrics
// by sending them to tanzu observability. Caller can pass nil for options to get the defaults.
func newGaugeConsumer(sender gaugeSender, options *consumerOptions) typedMetricConsumer {
var fixedOptions consumerOptions
if options != nil {
fixedOptions = *options
}
fixedOptions.replaceZeroFieldsWithDefaults()
return &gaugeConsumer{
sender: sender,
logger: fixedOptions.Logger,
reportInternalMetrics: fixedOptions.ReportInternalMetrics,
}
}

func (g *gaugeConsumer) Type() pdata.MetricDataType {
Expand All @@ -142,28 +243,23 @@ func (g *gaugeConsumer) Consume(metric pdata.Metric, errs *[]error) {
}
}

func (g *gaugeConsumer) PushInternalMetrics(errs *[]error) {
if g.reportInternalMetrics {
g.missingValues.Report(missingValueMetricName, typeIsGaugeTags, g.sender, errs)
}
}

func (g *gaugeConsumer) pushSingleNumberDataPoint(
metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error) {
tags := attributesToTags(numberDataPoint.Attributes())
ts := numberDataPoint.Timestamp().AsTime().Unix()
value, err := getValue(numberDataPoint)
if err != nil {
*errs = append(*errs, err)
logMissingValue(metric, g.logger, &g.missingValues)
return
}
err = g.sender.SendMetric(metric.Name(), value, ts, "", tags)
if err != nil {
*errs = append(*errs, err)
}
}

func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) {
switch numberDataPoint.Type() {
case pdata.MetricValueTypeInt:
return float64(numberDataPoint.IntVal()), nil
case pdata.MetricValueTypeDouble:
return numberDataPoint.DoubleVal(), nil
default:
return 0.0, errors.New("unsupported metric value type")
}
}
Loading

0 comments on commit e4c1fa0

Please sign in to comment.