diff --git a/exporter/datadogexporter/README.md b/exporter/datadogexporter/README.md index 85c68bc49b20..df94f1262835 100644 --- a/exporter/datadogexporter/README.md +++ b/exporter/datadogexporter/README.md @@ -33,6 +33,7 @@ The exporter will try to retrieve a hostname following the OpenTelemetry semanti See the sample configuration files under the `example` folder for other available options, as well as an example K8s Manifest. This exporter also supports the `exporterhelper` queuing, retry and timeout settings documented [here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration). +Retry settings will only affect metrics. ## Trace exporter ### **Important Pipeline Setup Details** diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 810f04972dc3..d4a65ea9d4a1 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -156,7 +156,8 @@ func createMetricsExporter( pushMetricsFn, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}), - exporterhelper.WithRetry(cfg.RetrySettings), + // We use our own custom mechanism for retries, since we hit several endpoints. + exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithShutdown(func(context.Context) error { cancel() @@ -210,7 +211,8 @@ func createTracesExporter( pushTracesFn, // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}), - exporterhelper.WithRetry(cfg.RetrySettings), + // We don't do retries on traces because of deduping concerns on APM Events. + exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithShutdown(func(context.Context) error { cancel() diff --git a/exporter/datadogexporter/internal/metadata/metadata.go b/exporter/datadogexporter/internal/metadata/metadata.go index f6b5f90082db..1d74db870027 100644 --- a/exporter/datadogexporter/internal/metadata/metadata.go +++ b/exporter/datadogexporter/internal/metadata/metadata.go @@ -34,6 +34,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes/gcp" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata/ec2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata/system" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils" ) @@ -174,19 +175,17 @@ func pushMetadata(cfg *config.Config, buildInfo component.BuildInfo, metadata *H return nil } -func pushMetadataWithRetry(params component.ExporterCreateSettings, cfg *config.Config, hostMetadata *HostMetadata) { - const maxRetries = 5 - +func pushMetadataWithRetry(retrier *utils.Retrier, params component.ExporterCreateSettings, cfg *config.Config, hostMetadata *HostMetadata) { params.Logger.Debug("Sending host metadata payload", zap.Any("payload", hostMetadata)) - numRetries, err := utils.DoWithRetries(maxRetries, func() error { + err := retrier.DoWithRetries(context.Background(), func(context.Context) error { return pushMetadata(cfg, params.BuildInfo, hostMetadata) }) if err != nil { params.Logger.Warn("Sending host metadata failed", zap.Error(err)) } else { - params.Logger.Info("Sent host metadata", zap.Int("retries", numRetries)) + params.Logger.Info("Sent host metadata") } } @@ -197,6 +196,7 @@ func Pusher(ctx context.Context, params component.ExporterCreateSettings, cfg *c ticker := time.NewTicker(30 * time.Minute) defer ticker.Stop() defer params.Logger.Debug("Shut down host metadata routine") + retrier := utils.NewRetrier(params.Logger, cfg.RetrySettings, scrub.NewScrubber()) // Get host metadata from resources and fill missing info using our exporter. // Currently we only retrieve it once but still send the same payload @@ -212,14 +212,14 @@ func Pusher(ctx context.Context, params component.ExporterCreateSettings, cfg *c fillHostMetadata(params, cfg, hostMetadata) // Run one first time at startup - pushMetadataWithRetry(params, cfg, hostMetadata) + pushMetadataWithRetry(retrier, params, cfg, hostMetadata) for { select { case <-ctx.Done(): return case <-ticker.C: // Send host metadata - pushMetadataWithRetry(params, cfg, hostMetadata) + pushMetadataWithRetry(retrier, params, cfg, hostMetadata) } } } diff --git a/exporter/datadogexporter/internal/utils/http.go b/exporter/datadogexporter/internal/utils/http.go index da4b33cc0d72..949085b379a3 100644 --- a/exporter/datadogexporter/internal/utils/http.go +++ b/exporter/datadogexporter/internal/utils/http.go @@ -72,19 +72,3 @@ func SetDDHeaders(reqHeader http.Header, buildInfo component.BuildInfo, apiKey s reqHeader.Set("DD-Api-Key", apiKey) reqHeader.Set("User-Agent", UserAgent(buildInfo)) } - -// DoWithRetries repeats a fallible action up to `maxRetries` times -// with exponential backoff -func DoWithRetries(maxRetries int, fn func() error) (i int, err error) { - wait := 1 * time.Second - for i = 0; i < maxRetries; i++ { - err = fn() - if err == nil { - return - } - time.Sleep(wait) - wait = 2 * wait - } - - return -} diff --git a/exporter/datadogexporter/internal/utils/http_test.go b/exporter/datadogexporter/internal/utils/http_test.go index 1e807120f32a..843f42a3a3c8 100644 --- a/exporter/datadogexporter/internal/utils/http_test.go +++ b/exporter/datadogexporter/internal/utils/http_test.go @@ -15,12 +15,10 @@ package utils import ( - "errors" "net/http" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" ) @@ -44,12 +42,3 @@ func TestDDHeaders(t *testing.T) { assert.Equal(t, header.Get("USer-Agent"), "otelcontribcol/1.0") } - -func TestDoWithRetries(t *testing.T) { - i, err := DoWithRetries(3, func() error { return nil }) - require.NoError(t, err) - assert.Equal(t, i, 0) - - _, err = DoWithRetries(1, func() error { return errors.New("action failed") }) - require.Error(t, err) -} diff --git a/exporter/datadogexporter/internal/utils/retrier.go b/exporter/datadogexporter/internal/utils/retrier.go new file mode 100644 index 000000000000..d2f1ba649c85 --- /dev/null +++ b/exporter/datadogexporter/internal/utils/retrier.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils" + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub" +) + +type Retrier struct { + cfg exporterhelper.RetrySettings + logger *zap.Logger + scrubber scrub.Scrubber +} + +func NewRetrier(logger *zap.Logger, settings exporterhelper.RetrySettings, scrubber scrub.Scrubber) *Retrier { + return &Retrier{ + cfg: settings, + logger: logger, + scrubber: scrubber, + } +} + +// DoWithRetries does a function with retries. This is a condensed version of the code on +// the exporterhelper, which we reuse here since we want custom retry logic. +func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) error) error { + if !r.cfg.Enabled { + return fn(ctx) + } + + // Do not use NewExponentialBackOff since it calls Reset and the code here must + // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). + expBackoff := backoff.ExponentialBackOff{ + InitialInterval: r.cfg.InitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: r.cfg.MaxInterval, + MaxElapsedTime: r.cfg.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + expBackoff.Reset() + retryNum := int64(0) + for { + err := fn(ctx) + if err == nil { + return nil + } + + err = r.scrubber.Scrub(err) + + if consumererror.IsPermanent(err) { + return err + } + + backoffDelay := expBackoff.NextBackOff() + if backoffDelay == backoff.Stop { + err = fmt.Errorf("max elapsed time expired %w", err) + return err + } + + backoffDelayStr := backoffDelay.String() + r.logger.Info( + "Request failed. Will retry the request after interval.", + zap.Error(err), + zap.String("interval", backoffDelayStr), + ) + retryNum++ + + // back-off, but get interrupted when shutting down or request is cancelled or timed out. + select { + case <-ctx.Done(): + return fmt.Errorf("request is cancelled or timed out %w", err) + case <-time.After(backoffDelay): + } + } +} diff --git a/exporter/datadogexporter/internal/utils/retrier_test.go b/exporter/datadogexporter/internal/utils/retrier_test.go new file mode 100644 index 000000000000..653dd2a7ea39 --- /dev/null +++ b/exporter/datadogexporter/internal/utils/retrier_test.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub" +) + +func TestDoWithRetries(t *testing.T) { + scrubber := scrub.NewScrubber() + retrier := NewRetrier(zap.NewNop(), exporterhelper.DefaultRetrySettings(), scrubber) + ctx := context.Background() + + err := retrier.DoWithRetries(ctx, func(context.Context) error { return nil }) + require.NoError(t, err) + + retrier = NewRetrier(zap.NewNop(), + exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 5 * time.Millisecond, + MaxInterval: 30 * time.Millisecond, + MaxElapsedTime: 100 * time.Millisecond, + }, + scrubber, + ) + err = retrier.DoWithRetries(ctx, func(context.Context) error { return errors.New("action failed") }) + require.Error(t, err) +} diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 554744237f9c..5112dd84fee8 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/multierr" "go.uber.org/zap" "gopkg.in/zorkian/go-datadog-api.v2" @@ -42,6 +43,7 @@ type metricsExporter struct { client *datadog.Client tr *translator.Translator scrubber scrub.Scrubber + retrier *utils.Retrier } // assert `hostProvider` implements HostnameProvider interface @@ -104,13 +106,15 @@ func newMetricsExporter(ctx context.Context, params component.ExporterCreateSett return nil, err } + scrubber := scrub.NewScrubber() return &metricsExporter{ params: params, cfg: cfg, ctx: ctx, client: client, tr: tr, - scrubber: scrub.NewScrubber(), + scrubber: scrubber, + retrier: utils.NewRetrier(params.Logger, cfg.RetrySettings, scrubber), }, nil } @@ -173,17 +177,24 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pdata.Metric ms, sl := consumer.All(pushTime, exp.params.BuildInfo) metrics.ProcessMetrics(ms, exp.cfg) + err = nil if len(ms) > 0 { - if err := exp.client.PostMetrics(ms); err != nil { - return err - } + err = multierr.Append( + err, + exp.retrier.DoWithRetries(ctx, func(context.Context) error { + return exp.client.PostMetrics(ms) + }), + ) } if len(sl) > 0 { - if err := exp.pushSketches(ctx, sl); err != nil { - return err - } + err = multierr.Append( + err, + exp.retrier.DoWithRetries(ctx, func(ctx context.Context) error { + return exp.pushSketches(ctx, sl) + }), + ) } - return nil + return err }