Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not block Temporality/Aggregation on OTLP metric export #4395

Merged
merged 7 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349)
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)
- Improve context cancelation handling in batch span processor's `ForceFlush` in `go.opentelemetry.io/otel/sdk/trace`. (#4369)
- Do not block the metric SDK when OTLP metric exports are blocked in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#3925, #4395)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
10 changes: 10 additions & 0 deletions exporters/otlp/otlpmetric/internal/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
)

// Exporter exports metrics data as OTLP.
//
// Deprecated: Exporter exists for historical compatibility, it should not be
// used. Do not remove Exporter unless the whole
// "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" module is
// removed.
type Exporter struct {
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
Expand Down Expand Up @@ -96,6 +101,11 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
// New return an Exporter that uses client to transmits the OTLP data it
// produces. The client is assumed to be fully started and able to communicate
// with its OTLP receiving endpoint.
//
// Deprecated: New exists for historical compatibility, it should not be used.
// Do not remove New unless the whole
// "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal" module is
// removed.
func New(client Client) *Exporter {
return &Exporter{client: client}
}
Expand Down
27 changes: 1 addition & 26 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -41,9 +37,6 @@ type client struct {
exportTimeout time.Duration
requestFunc retry.RequestFunc

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as the conn should only be closed if we created it. Otherwise,
Expand All @@ -54,16 +47,11 @@ type client struct {
}

// newClient creates a new gRPC metric client.
func newClient(ctx context.Context, options ...Option) (ominternal.Client, error) {
cfg := oconf.NewGRPCConfig(asGRPCOptions(options)...)

func newClient(ctx context.Context, cfg oconf.Config) (*client, error) {
c := &client{
exportTimeout: cfg.Metrics.Timeout,
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
conn: cfg.GRPCConn,

temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}

if len(cfg.Metrics.Headers) > 0 {
Expand All @@ -88,19 +76,6 @@ func newClient(ctx context.Context, options ...Option) (ominternal.Client, error
return c, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

// Shutdown shuts down the client, freeing all resource.
//
// Any active connections to a remote endpoint are closed if they were created
Expand Down
22 changes: 20 additions & 2 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -129,16 +131,32 @@ func TestRetryable(t *testing.T) {
}
}

type clientShim struct {
*client
}

func (clientShim) Temporality(metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
func (clientShim) Aggregation(metric.InstrumentKind) aggregation.Aggregation {
return nil
}
func (clientShim) ForceFlush(ctx context.Context) error {
return ctx.Err()
}

func TestClient(t *testing.T) {
factory := func(rCh <-chan otest.ExportResult) (ominternal.Client, otest.Collector) {
coll, err := otest.NewGRPCCollector("", rCh)
require.NoError(t, err)

ctx := context.Background()
addr := coll.Addr().String()
client, err := newClient(ctx, WithEndpoint(addr), WithInsecure())
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := oconf.NewGRPCConfig(asGRPCOptions(opts)...)
client, err := newClient(ctx, cfg)
require.NoError(t, err)
return client, coll
return clientShim{client}, coll
}

t.Run("Integration", otest.RunClientTests(factory))
Expand Down
97 changes: 86 additions & 11 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,83 @@

import (
"context"
"fmt"
"sync"

ominternal "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// Exporter is a OpenTelemetry metric Exporter using gRPC.
type Exporter struct {
wrapped *ominternal.Exporter
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
client interface {
UploadMetrics(context.Context, *metricpb.ResourceMetrics) error
Shutdown(context.Context) error
}

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

shutdownOnce sync.Once
}

func newExporter(c *client, cfg oconf.Config) (*Exporter, error) {
ts := cfg.Metrics.TemporalitySelector
if ts == nil {
ts = func(metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
}

as := cfg.Metrics.AggregationSelector
if as == nil {
as = metric.DefaultAggregationSelector
}

return &Exporter{
client: c,

temporalitySelector: ts,
aggregationSelector: as,
}, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *Exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return e.wrapped.Temporality(k)
return e.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *Exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
return e.wrapped.Aggregation(k)
return e.aggregationSelector(k)
}

// Export transforms and transmits metric data to an OTLP receiver.
//
// This method returns an error if called after Shutdown.
// This method returns an error if the method is canceled by the passed context.
func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
err := e.wrapped.Export(ctx, rm)
global.Debug("OTLP/gRPC exporter export", "Data", rm)
defer global.Debug("OTLP/gRPC exporter export", "Data", rm)

otlpRm, err := transform.ResourceMetrics(rm)
// Best effort upload of transformable metrics.
e.clientMu.Lock()
upErr := e.client.UploadMetrics(ctx, otlpRm)
e.clientMu.Unlock()
if upErr != nil {
if err == nil {
return fmt.Errorf("failed to upload metrics: %w", upErr)
}
// Merge the two errors.
return fmt.Errorf("failed to upload incomplete metrics (%s): %w", err, upErr)

Check warning on line 94 in exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go#L94

Added line #L94 was not covered by tests
}
return err
}

Expand All @@ -56,7 +103,8 @@
//
// This method is safe to call concurrently.
func (e *Exporter) ForceFlush(ctx context.Context) error {
return e.wrapped.ForceFlush(ctx)
// The exporter and client hold no state, nothing to flush.
return ctx.Err()
}

// Shutdown flushes all metric data held by an exporter and releases any held
Expand All @@ -67,7 +115,34 @@
//
// This method is safe to call concurrently.
func (e *Exporter) Shutdown(ctx context.Context) error {
return e.wrapped.Shutdown(ctx)
err := errShutdown
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
return err
}

var errShutdown = fmt.Errorf("gRPC exporter is shutdown")

type shutdownClient struct{}

func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return err
}

Check warning on line 136 in exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go#L135-L136

Added lines #L135 - L136 were not covered by tests
return errShutdown
}

func (c shutdownClient) UploadMetrics(ctx context.Context, _ *metricpb.ResourceMetrics) error {
return c.err(ctx)
}

func (c shutdownClient) Shutdown(ctx context.Context) error {
return c.err(ctx)

Check warning on line 145 in exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go#L144-L145

Added lines #L144 - L145 were not covered by tests
}

// MarshalLog returns logging data about the Exporter.
Expand All @@ -84,10 +159,10 @@
// on options. If a connection cannot be establishes in the lifetime of ctx,
// an error will be returned.
func New(ctx context.Context, options ...Option) (*Exporter, error) {
c, err := newClient(ctx, options...)
cfg := oconf.NewGRPCConfig(asGRPCOptions(options)...)
c, err := newClient(ctx, cfg)
if err != nil {
return nil, err
}
exp := ominternal.New(c)
return &Exporter{exp}, nil
return newExporter(c, cfg)
}
Loading