From cdd11e7228ea646dd6df9094ea7062f03563046a Mon Sep 17 00:00:00 2001 From: Stepan Rakitin Date: Tue, 19 Apr 2022 15:59:08 +0200 Subject: [PATCH] [exporter/otlp] Retry RESOURCE_EXHAUSTED only if the server returns RetryInfo (#5147) This makes us retry on`ResourceExhausted` only if retry info is provided. It also makes code a bit more explicit. In my case it caused an issue in production where upstream denied requests above `max_recv_msg_size` and we kept retrying. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector/issues/1123 --- CHANGELOG.md | 1 + exporter/otlpexporter/otlp.go | 37 ++++++++++++------- exporter/otlpexporter/otlp_test.go | 59 +++++++++++++++++++++++++++++- 3 files changed, 83 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 467e5df5dfb..576af61833c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Do not set MeterProvider to global otel (#5146) - Make `InstrumentationLibraryToScope` helper functions unexported (#5164) - Remove Log's "ShortName" from logging exporter output (#5172) +- `exporter/otlp`: Retry RESOURCE_EXHAUSTED only if the server returns RetryInfo (#5147) ### 🚩 Deprecations 🚩 diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 83138775b3c..ce5a489020f 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -147,48 +147,59 @@ func processError(err error) error { // Now, this is this a real error. - if !shouldRetry(st.Code()) { + retryInfo := getRetryInfo(st) + + if !shouldRetry(st.Code(), retryInfo) { // It is not a retryable error, we should not retry. return consumererror.NewPermanent(err) } - // Need to retry. - // Check if server returned throttling information. - throttleDuration := getThrottleDuration(st) + throttleDuration := getThrottleDuration(retryInfo) if throttleDuration != 0 { + // We are throttled. Wait before retrying as requested by the server. return exporterhelper.NewThrottleRetry(err, throttleDuration) } + // Need to retry. + return err } -func shouldRetry(code codes.Code) bool { +func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool { switch code { case codes.Canceled, codes.DeadlineExceeded, - codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: // These are retryable errors. return true + case codes.ResourceExhausted: + // Retry only if RetryInfo was supplied by the server. + // This indicates that the server can still recover from resource exhaustion. + return retryInfo != nil } // Don't retry on any other code. return false } -func getThrottleDuration(status *status.Status) time.Duration { - // See if throttling information is available. +func getRetryInfo(status *status.Status) *errdetails.RetryInfo { for _, detail := range status.Details() { if t, ok := detail.(*errdetails.RetryInfo); ok { - if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { - // We are throttled. Wait before retrying as requested by the server. - return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond - } - return 0 + return t } } + return nil +} + +func getThrottleDuration(t *errdetails.RetryInfo) time.Duration { + if t == nil || t.RetryDelay == nil { + return 0 + } + if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { + return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond + } return 0 } diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 60620adf122..8cd952641af 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -26,9 +26,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -59,6 +63,7 @@ func (r *mockReceiver) GetMetadata() metadata.MD { type mockTracesReceiver struct { mockReceiver + exportError error lastRequest ptrace.Traces } @@ -70,7 +75,7 @@ func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.Request) defer r.mux.Unlock() r.lastRequest = td r.metadata, _ = metadata.FromIncomingContext(ctx) - return ptraceotlp.NewResponse(), nil + return ptraceotlp.NewResponse(), r.exportError } func (r *mockTracesReceiver) GetLastRequest() ptrace.Traces { @@ -523,6 +528,58 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) { cancel() } +func TestSendTracesOnResourceExhaustion(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + rcv.exportError = status.Error(codes.ResourceExhausted, "resource exhausted") + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.RetrySettings.InitialInterval = 0 + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + set := componenttest.NewNopExporterCreateSettings() + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + assert.NoError(t, exp.Start(context.Background(), host)) + + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + td := ptrace.NewTraces() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + + assert.Never(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 1*time.Second, 5*time.Millisecond, "Should not retry if RetryInfo is not included into status details by the server.") + + rcv.requestCount.Swap(0) + + st := status.New(codes.ResourceExhausted, "resource exhausted") + st, _ = st.WithDetails(&errdetails.RetryInfo{ + RetryDelay: durationpb.New(100 * time.Millisecond), + }) + rcv.exportError = st.Err() + + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() > 1 + }, 10*time.Second, 5*time.Millisecond, "Should retry if RetryInfo is included into status details by the server.") +} + func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td ptrace.Traces, ln net.Listener) { rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) defer rcv.srv.GracefulStop()