Skip to content

Commit

Permalink
Retry RESOURCE_EXHAUSTED only if the server can recover
Browse files Browse the repository at this point in the history
  • Loading branch information
svrakitin committed Apr 8, 2022
1 parent 620c88c commit fea9bf6
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 13 deletions.
37 changes: 24 additions & 13 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,48 +143,59 @@ func processError(err error) error {

// Now, this is this a real error.

if !shouldRetry(st.Code()) {
retryInfo := getRetryInfo(st)

if !shouldRetry(st.Code(), retryInfo) {
// It is not a retryable error, we should not retry.
return consumererror.NewPermanent(err)
}

// Need to retry.

// Check if server returned throttling information.
throttleDuration := getThrottleDuration(st)
throttleDuration := getThrottleDuration(retryInfo)
if throttleDuration != 0 {
// We are throttled. Wait before retrying as requested by the server.
return exporterhelper.NewThrottleRetry(err, throttleDuration)
}

// Need to retry.

return err
}

func shouldRetry(code codes.Code) bool {
func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool {
switch code {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true
case codes.ResourceExhausted:
// Retry only if RetryInfo was supplied by the server.
// This indicates that the server can still recover from resource exhaustion.
return retryInfo != nil
}
// Don't retry on any other code.
return false
}

func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
func getRetryInfo(status *status.Status) *errdetails.RetryInfo {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
return t
}
}
return nil
}

func getThrottleDuration(t *errdetails.RetryInfo) time.Duration {
if t.RetryDelay == nil {
return 0
}
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
}
60 changes: 60 additions & 0 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -55,6 +59,7 @@ func (r *mockReceiver) GetMetadata() metadata.MD {

type mockTracesReceiver struct {
mockReceiver
exportError error
lastRequest pdata.Traces
}

Expand All @@ -66,6 +71,9 @@ func (r *mockTracesReceiver) Export(ctx context.Context, req otlpgrpc.TracesRequ
defer r.mux.Unlock()
r.lastRequest = td
r.metadata, _ = metadata.FromIncomingContext(ctx)
if r.exportError != nil {
return otlpgrpc.TracesResponse{}, r.exportError
}
return otlpgrpc.NewTracesResponse(), nil
}

Expand Down Expand Up @@ -513,6 +521,58 @@ func TestSendTraceDataServerStartWhileRequest(t *testing.T) {
cancel()
}

func TestSendTracesOnResourceExhaustion(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false)
rcv.exportError = status.Error(codes.ResourceExhausted, "resource exhausted")
defer rcv.srv.GracefulStop()

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.RetrySettings.InitialInterval = 0
cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{
Endpoint: ln.Addr().String(),
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
}
set := componenttest.NewNopExporterCreateSettings()
exp, err := factory.CreateTracesExporter(context.Background(), set, cfg)
require.NoError(t, err)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()
assert.NoError(t, exp.Start(context.Background(), host))

assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.requestCount))

td := pdata.NewTraces()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

assert.Never(t, func() bool {
return atomic.LoadInt32(&rcv.requestCount) > 1
}, 1*time.Second, 5*time.Millisecond, "Should not retry if RetryInfo is not included into status details by the server.")

rcv.requestCount = 0

st := status.New(codes.ResourceExhausted, "resource exhausted")
st, _ = st.WithDetails(&errdetails.RetryInfo{
RetryDelay: durationpb.New(100 * time.Millisecond),
})
rcv.exportError = st.Err()

assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

assert.Eventually(t, func() bool {
return atomic.LoadInt32(&rcv.requestCount) > 1
}, 10*time.Second, 5*time.Millisecond, "Should retry if RetryInfo is included into status details by the server.")
}

func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pdata.Traces, ln net.Listener) {
rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false)
defer rcv.srv.GracefulStop()
Expand Down

0 comments on commit fea9bf6

Please sign in to comment.