Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configurable AggregationTemporality in exporters; add OTLP missing sum point temporality/monotonic fields #1296

Merged
merged 10 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `ErrorOption` has been changed to an interface to conform with project design standards which included adding a `NewErrorConfig` function.
- `EmptySpanContext` is removed.
- Move the `go.opentelemetry.io/otel/api/trace/tracetest` package into `go.opentelemetry.io/otel/oteltest`. (#1229)
- OTLP Exporter supports OTLP v0.5.0. (#1230)
- OTLP Exporter updates:
- supports OTLP v0.5.0 (#1230)
- supports configurable aggregation temporality (default: Cumulative, optional: Stateless). (#1296)
- The Sampler is now called on local child spans. (#1233)
- The `Kind` type from the `go.opentelemetry.io/otel/api/metric` package was renamed to `InstrumentKind` to more specifically describe what it is and avoid semantic ambiguity. (#1240)
- The `MetricKind` method of the `Descriptor` type in the `go.opentelemetry.io/otel/api/metric` package was renamed to `Descriptor.InstrumentKind`.
Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func (e *Exporter) Controller() *pull.Controller {
return e.controller
}

func (e *Exporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) export.ExportKind {
func (e *Exporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) export.ExportKind {
// NOTE: Summary values should use Delta aggregation, then be
// combined into a sliding window, see the TODO below.
// NOTE: Prometheus also supports a "GaugeDelta" exposition format,
// which is expressed as a delta histogram. Need to understand if this
// should be a default behavior for ValueRecorder/ValueObserver.
return export.CumulativeExporter
return export.CumulativeExportKindSelector().ExportKindFor(desc, kind)
}

func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
38 changes: 26 additions & 12 deletions exporters/otlp/internal/transform/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, records, transformed)
transformer(ctx, exportSelector, records, transformed)
}()
}
go func() {
Expand Down Expand Up @@ -131,9 +131,9 @@ func source(ctx context.Context, exportSelector export.ExportKindSelector, cps e

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, exportSelector export.ExportKindSelector, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
m, err := Record(exportSelector, r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
Expand Down Expand Up @@ -250,7 +250,7 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e

// Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg
// error is returned if the Record Aggregator is not supported.
func Record(r export.Record) (*metricpb.Metric, error) {
func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricpb.Metric, error) {
agg := r.Aggregation()
switch agg.Kind() {
case aggregation.MinMaxSumCountKind:
Expand All @@ -265,7 +265,7 @@ func Record(r export.Record) (*metricpb.Metric, error) {
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
return histogram(r, h)
return histogramPoint(r, exportSelector.ExportKindFor(r.Descriptor(), aggregation.HistogramKind), h)

case aggregation.SumKind:
s, ok := agg.(aggregation.Sum)
Expand All @@ -276,7 +276,7 @@ func Record(r export.Record) (*metricpb.Metric, error) {
if err != nil {
return nil, err
}
return scalar(r, sum, r.StartTime(), r.EndTime())
return sumPoint(r, sum, r.StartTime(), r.EndTime(), exportSelector.ExportKindFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic())

case aggregation.LastValueKind:
lv, ok := agg.(aggregation.LastValue)
Expand All @@ -287,14 +287,14 @@ func Record(r export.Record) (*metricpb.Metric, error) {
if err != nil {
return nil, err
}
return gauge(r, value, time.Time{}, tm)
return gaugePoint(r, value, time.Time{}, tm)

default:
return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg)
}
}

func gauge(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) {
func gaugePoint(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()

Expand Down Expand Up @@ -338,9 +338,17 @@ func gauge(record export.Record, num otel.Number, start, end time.Time) (*metric
return m, nil
}

// scalar transforms a Sum or LastValue Aggregator into an OTLP Metric.
// For LastValue (Gauge), use start==time.Time{}.
func scalar(record export.Record, num otel.Number, start, end time.Time) (*metricpb.Metric, error) {
func exportKindToTemporality(ek export.ExportKind) metricpb.AggregationTemporality {
switch ek {
case export.DeltaExportKind:
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
case export.CumulativeExportKind:
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
}
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED
}

func sumPoint(record export.Record, num otel.Number, start, end time.Time, ek export.ExportKind, monotonic bool) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()

Expand All @@ -354,6 +362,8 @@ func scalar(record export.Record, num otel.Number, start, end time.Time) (*metri
case otel.Int64NumberKind:
m.Data = &metricpb.Metric_IntSum{
IntSum: &metricpb.IntSum{
IsMonotonic: monotonic,
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.IntDataPoint{
{
Value: num.CoerceToInt64(n),
Expand All @@ -367,6 +377,8 @@ func scalar(record export.Record, num otel.Number, start, end time.Time) (*metri
case otel.Float64NumberKind:
m.Data = &metricpb.Metric_DoubleSum{
DoubleSum: &metricpb.DoubleSum{
IsMonotonic: monotonic,
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.DoubleDataPoint{
{
Value: num.CoerceToFloat64(n),
Expand Down Expand Up @@ -473,7 +485,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []fl
}

// histogram transforms a Histogram Aggregator into an OTLP Metric.
func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric, error) {
func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Histogram) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
boundaries, counts, err := histogramValues(a)
Expand Down Expand Up @@ -504,6 +516,7 @@ func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric,
case otel.Int64NumberKind:
m.Data = &metricpb.Metric_IntHistogram{
IntHistogram: &metricpb.IntHistogram{
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.IntHistogramDataPoint{
{
Sum: sum.CoerceToInt64(n),
Expand All @@ -520,6 +533,7 @@ func histogram(record export.Record, a aggregation.Histogram) (*metricpb.Metric,
case otel.Float64NumberKind:
m.Data = &metricpb.Metric_DoubleHistogram{
DoubleHistogram: &metricpb.DoubleHistogram{
AggregationTemporality: exportKindToTemporality(ek),
DataPoints: []*metricpb.DoubleHistogramDataPoint{
{
Sum: sum.CoerceToFloat64(n),
Expand Down
43 changes: 27 additions & 16 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var (
intervalEnd = intervalStart.Add(time.Hour)
)

const (
otelCumulative = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
otelDelta = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
)

func TestStringKeyValues(t *testing.T) {
tests := []struct {
kvs []label.KeyValue
Expand Down Expand Up @@ -167,14 +172,17 @@ func TestSumIntDataPoints(t *testing.T) {
value, err := sum.Sum()
require.NoError(t, err)

if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) {
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true); assert.NoError(t, err) {
assert.Nil(t, m.GetIntGauge())
assert.Nil(t, m.GetIntHistogram())
assert.Equal(t, []*metricpb.IntDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.GetIntSum().DataPoints)
assert.Equal(t, &metricpb.IntSum{
AggregationTemporality: otelCumulative,
IsMonotonic: true,
DataPoints: []*metricpb.IntDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}}, m.GetIntSum())
assert.Nil(t, m.GetDoubleGauge())
assert.Nil(t, m.GetDoubleHistogram())
}
Expand All @@ -192,17 +200,20 @@ func TestSumFloatDataPoints(t *testing.T) {
value, err := sum.Sum()
require.NoError(t, err)

if m, err := scalar(record, value, record.StartTime(), record.EndTime()); assert.NoError(t, err) {
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.DeltaExportKind, false); assert.NoError(t, err) {
assert.Nil(t, m.GetIntGauge())
assert.Nil(t, m.GetIntHistogram())
assert.Nil(t, m.GetIntSum())
assert.Nil(t, m.GetDoubleGauge())
assert.Nil(t, m.GetDoubleHistogram())
assert.Equal(t, []*metricpb.DoubleDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.GetDoubleSum().DataPoints)
assert.Equal(t, &metricpb.DoubleSum{
IsMonotonic: false,
AggregationTemporality: otelDelta,
DataPoints: []*metricpb.DoubleDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}}, m.GetDoubleSum())
}
}

Expand All @@ -218,7 +229,7 @@ func TestLastValueIntDataPoints(t *testing.T) {
value, timestamp, err := sum.LastValue()
require.NoError(t, err)

if m, err := gauge(record, value, time.Time{}, timestamp); assert.NoError(t, err) {
if m, err := gaugePoint(record, value, time.Time{}, timestamp); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.IntDataPoint{{
Value: 100,
StartTimeUnixNano: 0,
Expand All @@ -240,7 +251,7 @@ func TestSumErrUnknownValueType(t *testing.T) {
value, err := s.Sum()
require.NoError(t, err)

_, err = scalar(record, value, record.StartTime(), record.EndTime())
_, err = sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)
Expand Down Expand Up @@ -325,7 +336,7 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
kind: kind,
agg: agg,
}
return Record(export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd))
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd))
}

mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0])
Expand Down Expand Up @@ -358,7 +369,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
desc := otel.NewDescriptor("things", otel.CounterInstrumentKind, otel.Int64NumberKind)
labels := label.NewSet()
res := resource.Empty()
return Record(export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd))
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd))
}

errEx := fmt.Errorf("timeout")
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
)

const (
Expand Down Expand Up @@ -83,6 +85,7 @@ type config struct {
headers map[string]string
clientCredentials credentials.TransportCredentials
numWorkers uint
exportKindSelector metricsdk.ExportKindSelector
}

// WorkerCount sets the number of Goroutines to use when processing telemetry.
Expand Down Expand Up @@ -165,3 +168,11 @@ func WithGRPCDialOption(opts ...grpc.DialOption) ExporterOption {
cfg.grpcDialOptions = opts
}
}

// WithMetricExportKindSelector defines the ExportKindSelector used for selecting
// AggregationTemporality (i.e., Cumulative vs. Delta aggregation).
func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) ExporterOption {
return func(cfg *config) {
cfg.exportKindSelector = selector
}
}
14 changes: 8 additions & 6 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func newConfig(opts ...ExporterOption) config {
cfg := config{
numWorkers: DefaultNumWorkers,
grpcServiceConfig: DefaultGRPCServiceConfig,

// Note: the default ExportKindSelector is specified
// as Cumulative:
// https://github.com/open-telemetry/opentelemetry-specification/issues/731
exportKindSelector: metricsdk.CumulativeExportKindSelector(),
}
for _, opt := range opts {
opt(&cfg)
Expand All @@ -93,9 +98,6 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
if len(e.c.headers) > 0 {
e.metadata = metadata.New(e.c.headers)
}

// TODO (rghetia): add resources

return e
}

Expand Down Expand Up @@ -286,9 +288,9 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
}

// ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter
// metric telemetry that it needs to be provided in a pass-through format.
func (e *Exporter) ExportKindFor(*otel.Descriptor, aggregation.Kind) metricsdk.ExportKind {
return metricsdk.PassThroughExporter
// metric telemetry that it needs to be provided in a cumulative format.
func (e *Exporter) ExportKindFor(desc *otel.Descriptor, kind aggregation.Kind) metricsdk.ExportKind {
return e.c.exportKindSelector.ExportKindFor(desc, kind)
}

// ExportSpans exports a batch of SpanData.
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}

selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, metricsdk.PassThroughExporter)
processor := processor.New(selector, metricsdk.StatelessExportKindSelector())
pusher := push.New(processor, exp)
pusher.Start()

Expand Down Expand Up @@ -509,7 +509,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
span.End()

selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, metricsdk.PassThroughExporter)
processor := processor.New(selector, metricsdk.StatelessExportKindSelector())
pusher := push.New(processor, exp)
pusher.Start()

Expand Down
Loading