Skip to content

Commit

Permalink
feat(outputs.stackdriver): Enable histogram support (#14275)
Browse files Browse the repository at this point in the history
  • Loading branch information
powersj authored Dec 7, 2023
1 parent 41b7a3d commit 3172fd5
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 41 deletions.
6 changes: 4 additions & 2 deletions plugins/outputs/stackdriver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## metric type set to the cooresponding type.
# metric_counter = []
# metric_gauge = []
# metric_histogram = []

## NOTE: Due to the way TOML is parsed, tables must be at the END of the
## plugin definition, otherwise additional config options are read as part of
Expand Down Expand Up @@ -101,8 +102,9 @@ Points collected with greater than 1 minute precision may need to be aggregated
before then can be written. Consider using the [basicstats][] aggregator to do
this.

Histogram / distribution and delta metrics are not yet supported. These will be
dropped silently unless debugging is on.
Histograms are supported only via metrics generated via the Prometheus metric
version 1 parser. The version 2 parser generates sparse metrics that would need
to be heavily transformed before sending to Stackdriver.

Note that the plugin keeps an in-memory cache of the start times and last
observed values of all COUNTER metrics in order to comply with the requirements
Expand Down
6 changes: 5 additions & 1 deletion plugins/outputs/stackdriver/counter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,9 @@ func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string {
tags = append(tags, strings.Join([]string{t.Key, t.Value}, "="))
}
sort.Strings(tags)
return path.Join(m.Name(), strings.Join(tags, "/"), f.Key)
key := ""
if f != nil {
key = f.Key
}
return path.Join(m.Name(), strings.Join(tags, "/"), key)
}
1 change: 1 addition & 0 deletions plugins/outputs/stackdriver/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
## metric type set to the cooresponding type.
# metric_counter = []
# metric_gauge = []
# metric_histogram = []

## NOTE: Due to the way TOML is parsed, tables must be at the END of the
## plugin definition, otherwise additional config options are read as part of
Expand Down
226 changes: 189 additions & 37 deletions plugins/outputs/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"hash/fnv"
"path"
"sort"
"strconv"
"strings"

monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"google.golang.org/api/option"
"google.golang.org/genproto/googleapis/api/distribution"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc/status"
Expand All @@ -39,18 +41,23 @@ type Stackdriver struct {
TagsAsResourceLabels []string `toml:"tags_as_resource_label"`
MetricCounter []string `toml:"metric_counter"`
MetricGauge []string `toml:"metric_gauge"`
MetricHistogram []string `toml:"metric_histogram"`
Log telegraf.Logger `toml:"-"`

client *monitoring.MetricClient
counterCache *counterCache
filterCounter filter.Filter
filterGauge filter.Filter
client *monitoring.MetricClient
counterCache *counterCache
filterCounter filter.Filter
filterGauge filter.Filter
fitlerHistogram filter.Filter
}

const (
// The user-defined limits are documented below:
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas

// QuotaLabelsPerMetricDescriptor is the limit
// to labels (tags) per metric descriptor.
QuotaLabelsPerMetricDescriptor = 10
QuotaLabelsPerMetricDescriptor = 30
// QuotaStringLengthForLabelKey is the limit
// to string length for label key.
QuotaStringLengthForLabelKey = 100
Expand Down Expand Up @@ -92,6 +99,10 @@ func (s *Stackdriver) Init() error {
if err != nil {
return fmt.Errorf("creating gauge filter failed: %w", err)
}
s.fitlerHistogram, err = filter.Compile(s.MetricHistogram)
if err != nil {
return fmt.Errorf("creating histogram filter failed: %w", err)
}

return nil
}
Expand Down Expand Up @@ -152,12 +163,14 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric {

type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries

func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) {
func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f []*telegraf.Field, ts *monitoringpb.TimeSeries) {
h := fnv.New64a()
h.Write([]byte(m.Name()))
h.Write([]byte{'\n'})
h.Write([]byte(f.Key))
h.Write([]byte{'\n'})
for _, field := range f {
h.Write([]byte(field.Key))
h.Write([]byte{'\n'})
}
for key, value := range m.Tags() {
h.Write([]byte(key))
h.Write([]byte{'\n'})
Expand Down Expand Up @@ -205,34 +218,88 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {

buckets := make(timeSeriesBuckets)
for _, m := range batch {
for _, f := range m.FieldList() {
value, err := s.getStackdriverTypedValue(f.Value)
// Set metric types based on user-provided filter
metricType := m.Type()
if s.filterCounter != nil && s.filterCounter.Match(m.Name()) {
metricType = telegraf.Counter
}
if s.filterGauge != nil && s.filterGauge.Match(m.Name()) {
metricType = telegraf.Gauge
}
if s.fitlerHistogram != nil && s.fitlerHistogram.Match(m.Name()) {
metricType = telegraf.Histogram
}

metricKind, err := getStackdriverMetricKind(metricType)
if err != nil {
s.Log.Errorf("Get kind for metric %q (%T) failed: %s", m.Name(), metricType, err)
continue
}

// Convert any declared tag to a resource label and remove it from
// the metric
resourceLabels := make(map[string]string, len(s.ResourceLabels)+len(s.TagsAsResourceLabels))
for k, v := range s.ResourceLabels {
resourceLabels[k] = v
}
for _, tag := range s.TagsAsResourceLabels {
if val, ok := m.GetTag(tag); ok {
resourceLabels[tag] = val
m.RemoveTag(tag)
}
}

if m.Type() == telegraf.Histogram {
value, err := s.buildHistogram(m)
if err != nil {
s.Log.Errorf("Get type failed: %q", err)
s.Log.Errorf("Unable to build distribution from metric %s: %s", m, err)
continue
}

if value == nil {
startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, nil, s.counterCache)
timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime)
if err != nil {
s.Log.Errorf("Get time interval failed: %s", err)
continue
}

// Set metric types based on user-provided filter
metricType := m.Type()
if s.filterCounter != nil && s.filterCounter.Match(m.Name()) {
metricType = telegraf.Counter
// Prepare an individual data point.
dataPoint := &monitoringpb.Point{
Interval: timeInterval,
Value: value,
}
if s.filterGauge != nil && s.filterGauge.Match(m.Name()) {
metricType = telegraf.Gauge

// Prepare time series.
timeSeries := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: s.generateHistogramName(m),
Labels: s.getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: s.ResourceType,
Labels: resourceLabels,
},
Points: []*monitoringpb.Point{
dataPoint,
},
}

metricKind, err := getStackdriverMetricKind(metricType)
buckets.Add(m, m.FieldList(), timeSeries)
continue
}

for _, f := range m.FieldList() {
value, err := s.getStackdriverTypedValue(f.Value)
if err != nil {
s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), metricType, f, err)
s.Log.Errorf("Get type failed: %q", err)
continue
}
if value == nil {
continue
}

startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache)

timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime)
if err != nil {
s.Log.Errorf("Get time interval failed: %s", err)
Expand All @@ -245,19 +312,6 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
Value: value,
}

// Convert any declared tag to a resource label and remove it from
// the metric
resourceLabels := make(map[string]string, len(s.ResourceLabels)+len(s.TagsAsResourceLabels))
for k, v := range s.ResourceLabels {
resourceLabels[k] = v
}
for _, tag := range s.TagsAsResourceLabels {
if val, ok := m.GetTag(tag); ok {
resourceLabels[tag] = val
m.RemoveTag(tag)
}
}

// Prepare time series.
timeSeries := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Expand All @@ -274,7 +328,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
},
}

buckets.Add(m, f, timeSeries)
buckets.Add(m, []*telegraf.Field{f}, timeSeries)

// If the metric is untyped, it will end with unknown. We will also
// send another metric with the unknown:counter suffix. Google will
Expand Down Expand Up @@ -307,7 +361,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
dataPoint,
},
}
buckets.Add(m, f, counterTimeSeries)
buckets.Add(m, []*telegraf.Field{f}, counterTimeSeries)
}
}
}
Expand Down Expand Up @@ -388,6 +442,19 @@ func (s *Stackdriver) generateMetricName(m telegraf.Metric, metricType telegraf.
return path.Join(s.MetricTypePrefix, name, kind)
}

func (s *Stackdriver) generateHistogramName(m telegraf.Metric) string {
if s.MetricNameFormat == "path" {
return path.Join(s.MetricTypePrefix, s.Namespace, m.Name())
}

name := m.Name()
if s.Namespace != "" {
name = s.Namespace + "_" + m.Name()
}

return path.Join(s.MetricTypePrefix, name, "histogram")
}

func getStackdriverIntervalEndpoints(
kind metricpb.MetricDescriptor_MetricKind,
value *monitoringpb.TypedValue,
Expand Down Expand Up @@ -436,7 +503,9 @@ func getStackdriverMetricKind(vt telegraf.ValueType) (metricpb.MetricDescriptor_
return metricpb.MetricDescriptor_GAUGE, nil
case telegraf.Counter:
return metricpb.MetricDescriptor_CUMULATIVE, nil
case telegraf.Histogram, telegraf.Summary:
case telegraf.Histogram:
return metricpb.MetricDescriptor_CUMULATIVE, nil
case telegraf.Summary:
fallthrough
default:
return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type: %T", vt)
Expand Down Expand Up @@ -497,6 +566,89 @@ func (s *Stackdriver) getStackdriverTypedValue(value interface{}) (*monitoringpb
}
}

func (s *Stackdriver) buildHistogram(m telegraf.Metric) (*monitoringpb.TypedValue, error) {
sumInter, ok := m.GetField("sum")
if !ok {
return nil, fmt.Errorf("no sum field present")
}
sum, err := internal.ToFloat64(sumInter)
if err != nil {
return nil, fmt.Errorf("unable to convert sum value to float64: %w", err)
}
m.RemoveField("sum")

countInter, ok := m.GetField("count")
if !ok {
return nil, fmt.Errorf("no count field present")
}
count, err := internal.ToFloat64(countInter)
if err != nil {
return nil, fmt.Errorf("unable to convert count value to float64: %w", err)
}
m.RemoveField("count")

// Build map of the buckets and their values
buckets := make([]float64, 0)
bucketCounts := make([]int64, 0)
for _, field := range m.FieldList() {
// Add the +inf value to bucket counts, no need to define a bound
if strings.Contains(strings.ToLower(field.Key), "+inf") {
count, err := internal.ToInt64(field.Value)
if err != nil {
continue
}
bucketCounts = append(bucketCounts, count)
continue
}

bucket, err := strconv.ParseFloat(field.Key, 64)
if err != nil {
continue
}

count, err := internal.ToInt64(field.Value)
if err != nil {
continue
}

buckets = append(buckets, bucket)
bucketCounts = append(bucketCounts, count)
}

sort.Slice(buckets, func(i, j int) bool {
return buckets[i] < buckets[j]
})
sort.Slice(bucketCounts, func(i, j int) bool {
return bucketCounts[i] < bucketCounts[j]
})

// Bucket counts contain the count for a specific bucket, not the running
// total like Prometheus histograms use. Loop backwards to determine the
// count of each bucket rather than the running total count.
for i := len(bucketCounts) - 1; i > 0; i-- {
bucketCounts[i] = bucketCounts[i] - bucketCounts[i-1]
}

v := &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distribution.Distribution{
Count: int64(count),
Mean: sum / count,
BucketCounts: bucketCounts,
BucketOptions: &distribution.Distribution_BucketOptions{
Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{
Bounds: buckets,
},
},
},
},
},
}

return v, nil
}

func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
labels := make(map[string]string)
for _, t := range tags {
Expand Down
Loading

0 comments on commit 3172fd5

Please sign in to comment.