Skip to content

Commit

Permalink
Do not block Temporality/Aggregation on OTLP metric export (#4395)
Browse files Browse the repository at this point in the history
* otlpmetricgrpc - no block temp/agg selection with export

* otlpmetrichttp - no block temp/agg selection with export

* Add test Export doesn't block Temporality or Aggregation

* Deprecate internal New and Exporter

* Add changes to changelog

* Apply suggestions from code review
  • Loading branch information
MrAlias committed Aug 2, 2023
1 parent 528a0cb commit 378e51e
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349)
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)
- Improve context cancelation handling in batch span processor's `ForceFlush` in `go.opentelemetry.io/otel/sdk/trace`. (#4369)
- Do not block the metric SDK when OTLP metric exports are blocked in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#3925, #4395)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
10 changes: 10 additions & 0 deletions exporters/otlp/otlpmetric/internal/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
)

// Exporter exports metrics data as OTLP.
//
// Deprecated: Exporter exists for historical compatibility, it should not be
// used. Do not remove Exporter unless the whole
// "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" module is
// removed.
type Exporter struct {
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
Expand Down Expand Up @@ -96,6 +101,11 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
// New return an Exporter that uses client to transmits the OTLP data it
// produces. The client is assumed to be fully started and able to communicate
// with its OTLP receiving endpoint.
//
// Deprecated: New exists for historical compatibility, it should not be used.
// Do not remove New unless the whole
// "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" module is
// removed.
func New(client Client) *Exporter {
return &Exporter{client: client}
}
Expand Down
27 changes: 1 addition & 26 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -41,9 +37,6 @@ type client struct {
exportTimeout time.Duration
requestFunc retry.RequestFunc

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as the conn should only be closed if we created it. Otherwise,
Expand All @@ -54,16 +47,11 @@ type client struct {
}

// newClient creates a new gRPC metric client.
func newClient(ctx context.Context, options ...Option) (ominternal.Client, error) {
cfg := oconf.NewGRPCConfig(asGRPCOptions(options)...)

func newClient(ctx context.Context, cfg oconf.Config) (*client, error) {
c := &client{
exportTimeout: cfg.Metrics.Timeout,
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
conn: cfg.GRPCConn,

temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}

if len(cfg.Metrics.Headers) > 0 {
Expand All @@ -88,19 +76,6 @@ func newClient(ctx context.Context, options ...Option) (ominternal.Client, error
return c, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

// Shutdown shuts down the client, freeing all resource.
//
// Any active connections to a remote endpoint are closed if they were created
Expand Down
22 changes: 20 additions & 2 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -129,16 +131,32 @@ func TestRetryable(t *testing.T) {
}
}

type clientShim struct {
*client
}

func (clientShim) Temporality(metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
func (clientShim) Aggregation(metric.InstrumentKind) aggregation.Aggregation {
return nil
}
func (clientShim) ForceFlush(ctx context.Context) error {
return ctx.Err()
}

func TestClient(t *testing.T) {
factory := func(rCh <-chan otest.ExportResult) (ominternal.Client, otest.Collector) {
coll, err := otest.NewGRPCCollector("", rCh)
require.NoError(t, err)

ctx := context.Background()
addr := coll.Addr().String()
client, err := newClient(ctx, WithEndpoint(addr), WithInsecure())
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := oconf.NewGRPCConfig(asGRPCOptions(opts)...)
client, err := newClient(ctx, cfg)
require.NoError(t, err)
return client, coll
return clientShim{client}, coll
}

t.Run("Integration", otest.RunClientTests(factory))
Expand Down
97 changes: 86 additions & 11 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,83 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme

import (
"context"
"fmt"
"sync"

ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// Exporter is a OpenTelemetry metric Exporter using gRPC.
type Exporter struct {
wrapped *ominternal.Exporter
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
client interface {
UploadMetrics(context.Context, *metricpb.ResourceMetrics) error
Shutdown(context.Context) error
}

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

shutdownOnce sync.Once
}

func newExporter(c *client, cfg oconf.Config) (*Exporter, error) {
ts := cfg.Metrics.TemporalitySelector
if ts == nil {
ts = func(metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
}

as := cfg.Metrics.AggregationSelector
if as == nil {
as = metric.DefaultAggregationSelector
}

return &Exporter{
client: c,

temporalitySelector: ts,
aggregationSelector: as,
}, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *Exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return e.wrapped.Temporality(k)
return e.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
return e.wrapped.Aggregation(k)
return e.aggregationSelector(k)
}

// Export transforms and transmits metric data to an OTLP receiver.
//
// This method returns an error if called after Shutdown.
// This method returns an error if the method is canceled by the passed context.
func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
err := e.wrapped.Export(ctx, rm)
global.Debug("OTLP/gRPC exporter export", "Data", rm)
defer global.Debug("OTLP/gRPC exporter export", "Data", rm)

otlpRm, err := transform.ResourceMetrics(rm)
// Best effort upload of transformable metrics.
e.clientMu.Lock()
upErr := e.client.UploadMetrics(ctx, otlpRm)
e.clientMu.Unlock()
if upErr != nil {
if err == nil {
return fmt.Errorf("failed to upload metrics: %w", upErr)
}
// Merge the two errors.
return fmt.Errorf("failed to upload incomplete metrics (%s): %w", err, upErr)
}
return err
}

Expand All @@ -56,7 +103,8 @@ func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) e
//
// This method is safe to call concurrently.
func (e *Exporter) ForceFlush(ctx context.Context) error {
return e.wrapped.ForceFlush(ctx)
// The exporter and client hold no state, nothing to flush.
return ctx.Err()
}

// Shutdown flushes all metric data held by an exporter and releases any held
Expand All @@ -67,7 +115,34 @@ func (e *Exporter) ForceFlush(ctx context.Context) error {
//
// This method is safe to call concurrently.
func (e *Exporter) Shutdown(ctx context.Context) error {
return e.wrapped.Shutdown(ctx)
err := errShutdown
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
return err
}

var errShutdown = fmt.Errorf("gRPC exporter is shutdown")

type shutdownClient struct{}

func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return err
}
return errShutdown
}

func (c shutdownClient) UploadMetrics(ctx context.Context, _ *metricpb.ResourceMetrics) error {
return c.err(ctx)
}

func (c shutdownClient) Shutdown(ctx context.Context) error {
return c.err(ctx)
}

// MarshalLog returns logging data about the Exporter.
Expand All @@ -84,10 +159,10 @@ func (e *Exporter) MarshalLog() interface{} {
// on options. If a connection cannot be establishes in the lifetime of ctx,
// an error will be returned.
func New(ctx context.Context, options ...Option) (*Exporter, error) {
c, err := newClient(ctx, options...)
cfg := oconf.NewGRPCConfig(asGRPCOptions(options)...)
c, err := newClient(ctx, cfg)
if err != nil {
return nil, err
}
exp := ominternal.New(c)
return &Exporter{exp}, nil
return newExporter(c, cfg)
}
Loading

0 comments on commit 378e51e

Please sign in to comment.