Skip to content

Commit

Permalink
implement histogram conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
huyan0 committed Aug 31, 2020
1 parent 09d5ee7 commit ebb3f11
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 51 deletions.
90 changes: 80 additions & 10 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"strconv"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old"
Expand Down Expand Up @@ -87,7 +89,7 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
default:
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
errs := []string{}
errs := []error{}

resourceMetrics := dataold.MetricDataToOtlp(pdatautil.MetricsToOldInternalMetrics(md))
for _, resourceMetric := range resourceMetrics {
Expand All @@ -107,7 +109,7 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
// check for valid type and temporality combination
if ok := validateMetrics(metric.MetricDescriptor); !ok {
dropped++
errs = append(errs, "invalid temporality and type combination")
errs = append(errs, errors.New("invalid temporality and type combination"))
continue
}
// handle individual metric based on type
Expand All @@ -116,19 +118,28 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if err := prwe.handleScalarMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err.Error())
errs = append(errs, err)
}
case otlp.MetricDescriptor_HISTOGRAM:
if err := prwe.handleHistogramMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err)
}
default:
dropped++
errs = append(errs, errors.New("unsupported metric type"))
}
}
}
}

if err := prwe.export(ctx, tsMap); err != nil {
return pdatautil.MetricCount(md), err
dropped = pdatautil.MetricCount(md)
errs = append(errs, err)
}

if dropped != 0 {
return dropped, errors.New(strings.Join(errs, "\n"))
return dropped, componenterror.CombineErrors(errs)
}

return 0, nil
Expand All @@ -146,11 +157,10 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
// int points
case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64:
if metric.Int64DataPoints == nil {
return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name)
return fmt.Errorf("nil data point field in metric %s", metric.GetMetricDescriptor().Name)
}

for _, pt := range metric.Int64DataPoints {

// create parameters for addSample
name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)
labels := createLabelSet(pt.GetLabels(), nameStr, name)
Expand All @@ -167,7 +177,7 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
// double points
case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if metric.DoubleDataPoints == nil {
return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name)
return fmt.Errorf("nil data point field in metric %s", metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.DoubleDataPoints {

Expand All @@ -183,10 +193,70 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
}
return nil
}

return errors.New("invalid metric type: wants int or double data points")
}

// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *prwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

if metric.HistogramDataPoints == nil {
return fmt.Errorf("nil data point field in metric %s", metric.GetMetricDescriptor().Name)
}

for _, pt := range metric.HistogramDataPoints {
if pt == nil {
continue
}
time := convertTimeStamp(pt.TimeUnixNano)
mType := metric.GetMetricDescriptor().GetType()

// sum, count, and buckets of the histogram should append suffix to baseName
baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)

// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, mType)

// treat count as a sample in an individual TimeSeries
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, mType)

// count for +Inf bound
var totalCount uint64

// process each bucket
for le, bk := range pt.GetBuckets() {
bucket := &prompb.Sample{
Value: float64(bk.Count),
Timestamp: time,
}
boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f', -1, 64)
labels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, boundStr)
addSample(tsMap, bucket, labels, mType)

totalCount += bk.GetCount()
}
// add le=+Inf bucket
infBucket := &prompb.Sample{
Value: float64(totalCount),
Timestamp: time,
}
infLabels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, pInfStr)
addSample(tsMap, infBucket, infLabels, mType)
}
return nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
//Calls the helper function to convert the TsMap to the desired format
Expand Down
Loading

0 comments on commit ebb3f11

Please sign in to comment.