Skip to content

Commit

Permalink
[tanzuobservability exporter] Add consumer for sum metrics. (open-tel…
Browse files Browse the repository at this point in the history
…emetry#6385)

* [tanzuobservability exporter] Add consumer for sum metrics.

* Function headers to be more readable.

pushGaugeSingleNumberDataPoint->pushGaugeNumberDataPoint
pushSingleNumberDataPoint->pushNumberDataPoint

* Break verifySumConsumer into four separate tests.

* Minor changes to tests.
  • Loading branch information
keep94 authored and jamesmoessis committed Dec 6, 2021
1 parent acba7b7 commit d436189
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 10 deletions.
96 changes: 89 additions & 7 deletions exporter/tanzuobservabilityexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync/atomic"

"github.com/wavefronthq/wavefront-sdk-go/senders"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -33,6 +34,7 @@ const (

var (
typeIsGaugeTags = map[string]string{"type": "gauge"}
typeIsSumTags = map[string]string{"type": "sum"}
)

// metricsConsumer instances consume OTEL metrics
Expand Down Expand Up @@ -149,7 +151,8 @@ type counter struct {
// the metric to be reported. tags is the tags for the metric. sender is what
// sends the metric to tanzu observability. Any errors get added to errs.
func (c *counter) Report(
name string, tags map[string]string, sender gaugeSender, errs *[]error) {
name string, tags map[string]string, sender gaugeSender, errs *[]error,
) {
err := sender.SendMetric(name, float64(c.Get()), 0, "", tags)
if err != nil {
*errs = append(*errs, err)
Expand Down Expand Up @@ -188,6 +191,32 @@ func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) {
}
}

// pushGaugeNumberDataPoint sends a metric as a gauge metric to tanzu
// observability. metric is the metric to send. numberDataPoint is the value
// of the metric. Any errors get appended to errs. sender is what sends the
// gauge metric to tanzu observability. logger is the logger. missingValues
// keeps track of metrics with missing values.
func pushGaugeNumberDataPoint(
metric pdata.Metric,
numberDataPoint pdata.NumberDataPoint,
errs *[]error,
sender gaugeSender,
logger *zap.Logger,
missingValues *counter,
) {
tags := attributesToTags(numberDataPoint.Attributes())
ts := numberDataPoint.Timestamp().AsTime().Unix()
value, err := getValue(numberDataPoint)
if err != nil {
logMissingValue(metric, logger, missingValues)
return
}
err = sender.SendMetric(metric.Name(), value, ts, "", tags)
if err != nil {
*errs = append(*errs, err)
}
}

// gaugeSender sends gauge metrics to tanzu observability
type gaugeSender interface {
SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error
Expand Down Expand Up @@ -239,7 +268,13 @@ func (g *gaugeConsumer) Consume(metric pdata.Metric, errs *[]error) {
gauge := metric.Gauge()
numberDataPoints := gauge.DataPoints()
for i := 0; i < numberDataPoints.Len(); i++ {
g.pushSingleNumberDataPoint(metric, numberDataPoints.At(i), errs)
pushGaugeNumberDataPoint(
metric,
numberDataPoints.At(i),
errs,
g.sender,
g.logger,
&g.missingValues)
}
}

Expand All @@ -249,16 +284,63 @@ func (g *gaugeConsumer) PushInternalMetrics(errs *[]error) {
}
}

func (g *gaugeConsumer) pushSingleNumberDataPoint(
metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error) {
type sumConsumer struct {
sender senders.MetricSender
logger *zap.Logger
missingValues counter
reportInternalMetrics bool
}

func newSumConsumer(sender senders.MetricSender, options *consumerOptions) typedMetricConsumer {
var fixedOptions consumerOptions
if options != nil {
fixedOptions = *options
}
fixedOptions.replaceZeroFieldsWithDefaults()
return &sumConsumer{
sender: sender,
logger: fixedOptions.Logger,
reportInternalMetrics: fixedOptions.ReportInternalMetrics,
}
}

func (s *sumConsumer) Type() pdata.MetricDataType {
return pdata.MetricDataTypeSum
}

func (s *sumConsumer) Consume(metric pdata.Metric, errs *[]error) {
sum := metric.Sum()
isDelta := sum.AggregationTemporality() == pdata.MetricAggregationTemporalityDelta
numberDataPoints := sum.DataPoints()
for i := 0; i < numberDataPoints.Len(); i++ {
// If sum metric is a delta type, send it to tanzu observability as a
// delta counter. Otherwise, send it to tanzu observability as a gauge
// metric.
if isDelta {
s.pushNumberDataPoint(metric, numberDataPoints.At(i), errs)
} else {
pushGaugeNumberDataPoint(
metric, numberDataPoints.At(i), errs, s.sender, s.logger, &s.missingValues)
}
}
}

func (s *sumConsumer) PushInternalMetrics(errs *[]error) {
if s.reportInternalMetrics {
s.missingValues.Report(missingValueMetricName, typeIsSumTags, s.sender, errs)
}
}

func (s *sumConsumer) pushNumberDataPoint(
metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error,
) {
tags := attributesToTags(numberDataPoint.Attributes())
ts := numberDataPoint.Timestamp().AsTime().Unix()
value, err := getValue(numberDataPoint)
if err != nil {
logMissingValue(metric, g.logger, &g.missingValues)
logMissingValue(metric, s.logger, &s.missingValues)
return
}
err = g.sender.SendMetric(metric.Name(), value, ts, "", tags)
err = s.sender.SendDeltaCounter(metric.Name(), value, "", tags)
if err != nil {
*errs = append(*errs, err)
}
Expand Down
Loading

0 comments on commit d436189

Please sign in to comment.