Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/datadogexporter] Retry per network call #6412

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/datadogexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The exporter will try to retrieve a hostname following the OpenTelemetry semanti

See the sample configuration files under the `example` folder for other available options, as well as an example K8s Manifest.
This exporter also supports the `exporterhelper` queuing, retry and timeout settings documented [here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration).
Retry settings will only affect metrics.

## Trace exporter
### **Important Pipeline Setup Details**
Expand Down
6 changes: 4 additions & 2 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func createMetricsExporter(
pushMetricsFn,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}),
exporterhelper.WithRetry(cfg.RetrySettings),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a breaking change. Could you add this to the changelog? Make sure also to add this to the readme. Is there a way to determine whether the retry settings were customized by users, and warn them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry settings will still be used here

expBackoff := backoff.ExponentialBackOff{
InitialInterval: r.cfg.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: r.cfg.MaxInterval,
MaxElapsedTime: r.cfg.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
, what this PR changes is that we now retry on each network call we do, instead of retrying the whole ConsumeMetrics function (so, we disable the exporterhelper retry mechanism). I believe this change should be transparent to users and it will probably align closer with what one would expect from the retry settings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! Makes sense then.

// We use our own custom mechanism for retries, since we hit several endpoints.
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
Expand Down Expand Up @@ -210,7 +211,8 @@ func createTracesExporter(
pushTracesFn,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0 * time.Second}),
exporterhelper.WithRetry(cfg.RetrySettings),
// We don't do retries on traces because of deduping concerns on APM Events.
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
Expand Down
14 changes: 7 additions & 7 deletions exporter/datadogexporter/internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes/gcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata/system"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils"
)

Expand Down Expand Up @@ -174,19 +175,17 @@ func pushMetadata(cfg *config.Config, buildInfo component.BuildInfo, metadata *H
return nil
}

func pushMetadataWithRetry(params component.ExporterCreateSettings, cfg *config.Config, hostMetadata *HostMetadata) {
const maxRetries = 5

func pushMetadataWithRetry(retrier *utils.Retrier, params component.ExporterCreateSettings, cfg *config.Config, hostMetadata *HostMetadata) {
params.Logger.Debug("Sending host metadata payload", zap.Any("payload", hostMetadata))

numRetries, err := utils.DoWithRetries(maxRetries, func() error {
err := retrier.DoWithRetries(context.Background(), func(context.Context) error {
return pushMetadata(cfg, params.BuildInfo, hostMetadata)
})

if err != nil {
params.Logger.Warn("Sending host metadata failed", zap.Error(err))
} else {
params.Logger.Info("Sent host metadata", zap.Int("retries", numRetries))
params.Logger.Info("Sent host metadata")
}

}
Expand All @@ -197,6 +196,7 @@ func Pusher(ctx context.Context, params component.ExporterCreateSettings, cfg *c
ticker := time.NewTicker(30 * time.Minute)
defer ticker.Stop()
defer params.Logger.Debug("Shut down host metadata routine")
retrier := utils.NewRetrier(params.Logger, cfg.RetrySettings, scrub.NewScrubber())

// Get host metadata from resources and fill missing info using our exporter.
// Currently we only retrieve it once but still send the same payload
Expand All @@ -212,14 +212,14 @@ func Pusher(ctx context.Context, params component.ExporterCreateSettings, cfg *c
fillHostMetadata(params, cfg, hostMetadata)

// Run one first time at startup
pushMetadataWithRetry(params, cfg, hostMetadata)
pushMetadataWithRetry(retrier, params, cfg, hostMetadata)

for {
select {
case <-ctx.Done():
return
case <-ticker.C: // Send host metadata
pushMetadataWithRetry(params, cfg, hostMetadata)
pushMetadataWithRetry(retrier, params, cfg, hostMetadata)
}
}
}
16 changes: 0 additions & 16 deletions exporter/datadogexporter/internal/utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,3 @@ func SetDDHeaders(reqHeader http.Header, buildInfo component.BuildInfo, apiKey s
reqHeader.Set("DD-Api-Key", apiKey)
reqHeader.Set("User-Agent", UserAgent(buildInfo))
}

// DoWithRetries repeats a fallible action up to `maxRetries` times
// with exponential backoff
func DoWithRetries(maxRetries int, fn func() error) (i int, err error) {
wait := 1 * time.Second
for i = 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return
}
time.Sleep(wait)
wait = 2 * wait
}

return
}
11 changes: 0 additions & 11 deletions exporter/datadogexporter/internal/utils/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
package utils

import (
"errors"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
)

Expand All @@ -44,12 +42,3 @@ func TestDDHeaders(t *testing.T) {
assert.Equal(t, header.Get("USer-Agent"), "otelcontribcol/1.0")

}

func TestDoWithRetries(t *testing.T) {
i, err := DoWithRetries(3, func() error { return nil })
require.NoError(t, err)
assert.Equal(t, i, 0)

_, err = DoWithRetries(1, func() error { return errors.New("action failed") })
require.Error(t, err)
}
97 changes: 97 additions & 0 deletions exporter/datadogexporter/internal/utils/retrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/utils"

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
)

type Retrier struct {
cfg exporterhelper.RetrySettings
logger *zap.Logger
scrubber scrub.Scrubber
}

func NewRetrier(logger *zap.Logger, settings exporterhelper.RetrySettings, scrubber scrub.Scrubber) *Retrier {
return &Retrier{
cfg: settings,
logger: logger,
scrubber: scrubber,
}
}

// DoWithRetries does a function with retries. This is a condensed version of the code on
// the exporterhelper, which we reuse here since we want custom retry logic.
func (r *Retrier) DoWithRetries(ctx context.Context, fn func(context.Context) error) error {
if !r.cfg.Enabled {
return fn(ctx)
}

// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := backoff.ExponentialBackOff{
InitialInterval: r.cfg.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: r.cfg.MaxInterval,
MaxElapsedTime: r.cfg.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()
retryNum := int64(0)
for {
err := fn(ctx)
if err == nil {
return nil
}

err = r.scrubber.Scrub(err)

if consumererror.IsPermanent(err) {
return err
}

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
err = fmt.Errorf("max elapsed time expired %w", err)
return err
}

backoffDelayStr := backoffDelay.String()
r.logger.Info(
"Request failed. Will retry the request after interval.",
zap.Error(err),
zap.String("interval", backoffDelayStr),
)
retryNum++

// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-time.After(backoffDelay):
}
}
}
49 changes: 49 additions & 0 deletions exporter/datadogexporter/internal/utils/retrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
)

func TestDoWithRetries(t *testing.T) {
scrubber := scrub.NewScrubber()
retrier := NewRetrier(zap.NewNop(), exporterhelper.DefaultRetrySettings(), scrubber)
ctx := context.Background()

err := retrier.DoWithRetries(ctx, func(context.Context) error { return nil })
require.NoError(t, err)

retrier = NewRetrier(zap.NewNop(),
exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Millisecond,
MaxInterval: 30 * time.Millisecond,
MaxElapsedTime: 100 * time.Millisecond,
},
scrubber,
)
err = retrier.DoWithRetries(ctx, func(context.Context) error { return errors.New("action failed") })
require.Error(t, err)
}
27 changes: 19 additions & 8 deletions exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"
"gopkg.in/zorkian/go-datadog-api.v2"

Expand All @@ -42,6 +43,7 @@ type metricsExporter struct {
client *datadog.Client
tr *translator.Translator
scrubber scrub.Scrubber
retrier *utils.Retrier
}

// assert `hostProvider` implements HostnameProvider interface
Expand Down Expand Up @@ -104,13 +106,15 @@ func newMetricsExporter(ctx context.Context, params component.ExporterCreateSett
return nil, err
}

scrubber := scrub.NewScrubber()
return &metricsExporter{
params: params,
cfg: cfg,
ctx: ctx,
client: client,
tr: tr,
scrubber: scrub.NewScrubber(),
scrubber: scrubber,
retrier: utils.NewRetrier(params.Logger, cfg.RetrySettings, scrubber),
}, nil
}

Expand Down Expand Up @@ -173,17 +177,24 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pdata.Metric
ms, sl := consumer.All(pushTime, exp.params.BuildInfo)
metrics.ProcessMetrics(ms, exp.cfg)

err = nil
if len(ms) > 0 {
if err := exp.client.PostMetrics(ms); err != nil {
return err
}
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(context.Context) error {
return exp.client.PostMetrics(ms)
}),
)
}

if len(sl) > 0 {
if err := exp.pushSketches(ctx, sl); err != nil {
return err
}
err = multierr.Append(
err,
exp.retrier.DoWithRetries(ctx, func(ctx context.Context) error {
return exp.pushSketches(ctx, sl)
}),
)
}

return nil
return err
}