Skip to content

Commit

Permalink
[exporter/otlp] Retry RESOURCE_EXHAUSTED only if the server returns R…
Browse files Browse the repository at this point in the history
…etryInfo (#5147)

This makes us retry on`ResourceExhausted` only if retry info is provided. It also makes code a bit more explicit.

In my case it caused an issue in production where upstream denied requests above `max_recv_msg_size` and we kept retrying.

**Link to tracking Issue:**
#1123
  • Loading branch information
svrakitin committed Apr 19, 2022
1 parent ef7754a commit cdd11e7
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Do not set MeterProvider to global otel (#5146)
- Make `InstrumentationLibrary<signal>ToScope` helper functions unexported (#5164)
- Remove Log's "ShortName" from logging exporter output (#5172)
- `exporter/otlp`: Retry RESOURCE_EXHAUSTED only if the server returns RetryInfo (#5147)

### 🚩 Deprecations 🚩

Expand Down
37 changes: 24 additions & 13 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,48 +147,59 @@ func processError(err error) error {

// Now, this is this a real error.

if !shouldRetry(st.Code()) {
retryInfo := getRetryInfo(st)

if !shouldRetry(st.Code(), retryInfo) {
// It is not a retryable error, we should not retry.
return consumererror.NewPermanent(err)
}

// Need to retry.

// Check if server returned throttling information.
throttleDuration := getThrottleDuration(st)
throttleDuration := getThrottleDuration(retryInfo)
if throttleDuration != 0 {
// We are throttled. Wait before retrying as requested by the server.
return exporterhelper.NewThrottleRetry(err, throttleDuration)
}

// Need to retry.

return err
}

func shouldRetry(code codes.Code) bool {
func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool {
switch code {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true
case codes.ResourceExhausted:
// Retry only if RetryInfo was supplied by the server.
// This indicates that the server can still recover from resource exhaustion.
return retryInfo != nil
}
// Don't retry on any other code.
return false
}

func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
func getRetryInfo(status *status.Status) *errdetails.RetryInfo {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
return t
}
}
return nil
}

func getThrottleDuration(t *errdetails.RetryInfo) time.Duration {
if t == nil || t.RetryDelay == nil {
return 0
}
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
}
59 changes: 58 additions & 1 deletion exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -59,6 +63,7 @@ func (r *mockReceiver) GetMetadata() metadata.MD {

type mockTracesReceiver struct {
mockReceiver
exportError error
lastRequest ptrace.Traces
}

Expand All @@ -70,7 +75,7 @@ func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.Request)
defer r.mux.Unlock()
r.lastRequest = td
r.metadata, _ = metadata.FromIncomingContext(ctx)
return ptraceotlp.NewResponse(), nil
return ptraceotlp.NewResponse(), r.exportError
}

func (r *mockTracesReceiver) GetLastRequest() ptrace.Traces {
Expand Down Expand Up @@ -523,6 +528,58 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) {
cancel()
}

func TestSendTracesOnResourceExhaustion(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false)
rcv.exportError = status.Error(codes.ResourceExhausted, "resource exhausted")
defer rcv.srv.GracefulStop()

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.RetrySettings.InitialInterval = 0
cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{
Endpoint: ln.Addr().String(),
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
}
set := componenttest.NewNopExporterCreateSettings()
exp, err := factory.CreateTracesExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()
assert.NoError(t, exp.Start(context.Background(), host))

assert.EqualValues(t, 0, rcv.requestCount.Load())

td := ptrace.NewTraces()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

assert.Never(t, func() bool {
return rcv.requestCount.Load() > 1
}, 1*time.Second, 5*time.Millisecond, "Should not retry if RetryInfo is not included into status details by the server.")

rcv.requestCount.Swap(0)

st := status.New(codes.ResourceExhausted, "resource exhausted")
st, _ = st.WithDetails(&errdetails.RetryInfo{
RetryDelay: durationpb.New(100 * time.Millisecond),
})
rcv.exportError = st.Err()

assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

assert.Eventually(t, func() bool {
return rcv.requestCount.Load() > 1
}, 10*time.Second, 5*time.Millisecond, "Should retry if RetryInfo is included into status details by the server.")
}

func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td ptrace.Traces, ln net.Listener) {
rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false)
defer rcv.srv.GracefulStop()
Expand Down

0 comments on commit cdd11e7

Please sign in to comment.