Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): add backoff to gRPC write retries #11200

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions storage/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,11 @@ func TestBucketRetryer(t *testing.T) {
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
backoff: &gax.Backoff{
backoff: gaxBackoffFromStruct(&gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
},
}),
policy: RetryAlways,
maxAttempts: expectedAttempts(5),
shouldRetry: func(err error) bool { return false },
Expand All @@ -1135,9 +1135,9 @@ func TestBucketRetryer(t *testing.T) {
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
backoff: gaxBackoffFromStruct(&gax.Backoff{
Multiplier: 3,
}},
})},
},
{
name: "set policy only",
Expand Down
6 changes: 3 additions & 3 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ func TestRetryMaxAttemptsEmulated(t *testing.T) {
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
Expand All @@ -1421,7 +1421,7 @@ func TestTimeoutErrorEmulated(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()
time.Sleep(5 * time.Nanosecond)
config := &retryConfig{backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

// Error may come through as a context.DeadlineExceeded (HTTP) or status.DeadlineExceeded (gRPC)
Expand All @@ -1447,7 +1447,7 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
t.Fatalf("GetBucket: got unexpected error %v, want nil", err)
}
Expand Down
134 changes: 114 additions & 20 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"cloud.google.com/go/internal/trace"
gapic "cloud.google.com/go/storage/internal/apiv2"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -1223,7 +1224,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}
}

o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
o, off, err := gw.uploadBuffer(recvd, offset, doneReading, newUploadBufferRetryConfig(gw.settings))
if err != nil {
err = checkCanceled(err)
errorf(err)
Expand Down Expand Up @@ -2091,12 +2092,7 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
// completed.
//
// Returns object, persisted size, and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
var shouldRetry = ShouldRetry
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
shouldRetry = w.settings.retry.shouldRetry
}

func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool, retryConfig *uploadBufferRetryConfig) (*storagepb.Object, int64, error) {
var err error
var lastWriteOfEntireObject bool

Expand Down Expand Up @@ -2143,6 +2139,7 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
if w.stream == nil {
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
ctx = setInvocationHeaders(ctx, retryConfig.invocationID, retryConfig.attempts)

w.stream, err = w.c.raw.BidiWriteObject(ctx)
if err != nil {
Expand Down Expand Up @@ -2188,7 +2185,11 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received.
if shouldRetry(err) {
err = retryConfig.retriable(w.ctx, err)

if err == nil {
retryConfig.doBackOff(w.ctx)

// TODO: Add test case for failure modes of querying progress.
writeOffset, err = w.determineOffset(start)
if err != nil {
Expand Down Expand Up @@ -2230,11 +2231,17 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
if !lastWriteOfEntireObject {
resp, err := w.stream.Recv()

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
if err != nil {
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
err = retryConfig.retriable(w.ctx, err)
if err != nil {
return nil, 0, err
}

retryConfig.doBackOff(w.ctx)
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
Expand All @@ -2246,9 +2253,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes

continue sendBytes
}
if err != nil {
return nil, 0, err
}

if resp.GetPersistedSize() != writeOffset {
// Retry if not all bytes were persisted.
Expand All @@ -2274,7 +2278,14 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if shouldRetry(err) {

if err != nil {
err = retryConfig.retriable(w.ctx, err)
if err != nil {
return nil, 0, err
}
retryConfig.doBackOff(w.ctx)

writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
Expand All @@ -2283,9 +2294,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
w.stream = nil
continue sendBytes
}
if err != nil {
return nil, 0, err
}

obj = resp.GetResource()
}
Expand Down Expand Up @@ -2370,3 +2378,89 @@ func checkCanceled(err error) error {

return err
}

type uploadBufferRetryConfig struct {
attempts int
invocationID string
config *retryConfig
lastErr error
}

func newUploadBufferRetryConfig(settings *settings) *uploadBufferRetryConfig {
config := settings.retry

if config == nil {
config = defaultRetry.clone()
}

if config.shouldRetry == nil {
config.shouldRetry = ShouldRetry
}

if config.backoff == nil {
config.backoff = &gaxBackoff{}
} else {
config.backoff.SetMultiplier(settings.retry.backoff.GetMultiplier())
config.backoff.SetInitial(settings.retry.backoff.GetInitial())
config.backoff.SetMax(settings.retry.backoff.GetMax())
}

return &uploadBufferRetryConfig{
attempts: 1,
invocationID: uuid.New().String(),
config: config,
}
}

// retriable determines if a retry is necessary and if so returns a nil error;
// otherwise it returns the error to be surfaced to the user.
func (retry *uploadBufferRetryConfig) retriable(ctx context.Context, err error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little odd to have this return the error to be resurfaced. Maybe just have it return a bool, and you can use retry.lastErr to set the error to return if need be?

Also not a big deal if you want to leave this as-is for now, since we'll be overwriting with the refactored version anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair; I think I may have been too deep in this. Although, this returns a formatted error for when max attempts are reached, which you wouldn't know outside this method unless you returned two bools... but I guess adding the number of attempts to the error may be useful for all cases anyway. I'll leave as-is and consider this feedback if relevant to the other version.

if err == nil {
// a nil err does not need to be retried
return nil
}
if err != context.Canceled && err != context.DeadlineExceeded {
retry.lastErr = err
}

if retry.config.policy == RetryNever {
return err
}

if retry.config.maxAttempts != nil && retry.attempts >= *retry.config.maxAttempts {
return fmt.Errorf("storage: retry failed after %v attempts; last error: %w", retry.attempts, err)
}

retry.attempts++

// Explicitly check context cancellation so that we can distinguish between a
// DEADLINE_EXCEEDED error from the server and a user-set context deadline.
// Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's
// sent by the server) in both cases.
ctxErr := ctx.Err()
if errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
if retry.lastErr != nil {
return fmt.Errorf("retry failed with %v; last error: %w", ctxErr, retry.lastErr)
}
return ctxErr
}

if !retry.config.shouldRetry(err) {
return err
}
return nil
}

// doBackOff pauses for the appropriate amount of time; it should be called after
// encountering a retriable error.
func (retry *uploadBufferRetryConfig) doBackOff(ctx context.Context) error {
p := retry.config.backoff.Pause()

if ctxErr := gax.Sleep(ctx, p); ctxErr != nil {
if retry.lastErr != nil {
return fmt.Errorf("retry failed with %v; last error: %w", ctxErr, retry.lastErr)
}
return ctxErr
}
return nil
}
Loading
Loading