Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): add backoff to gRPC write retries #11200

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 95 additions & 8 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"cloud.google.com/go/internal/trace"
gapic "cloud.google.com/go/storage/internal/apiv2"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -1183,6 +1184,8 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
}

gw.initializeRetryConfig()

// This function reads the data sent to the pipe and sends sets of messages
// on the gRPC client-stream as the buffer is filled.
go func() {
Expand Down Expand Up @@ -2044,6 +2047,12 @@ type gRPCWriter struct {

// The Resumable Upload ID started by a gRPC-based Writer.
upid string

// Retry tracking vars.
backoff gax.Backoff // backoff for all retries of write calls
attempts int
invocationID string
lastErr error
}

// startResumableUpload initializes a Resumable Upload with gRPC and sets the
Expand Down Expand Up @@ -2092,11 +2101,6 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
//
// Returns object, persisted size, and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
var shouldRetry = ShouldRetry
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
shouldRetry = w.settings.retry.shouldRetry
}

var err error
var lastWriteOfEntireObject bool

Expand Down Expand Up @@ -2143,6 +2147,7 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
if w.stream == nil {
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
ctx = setInvocationHeaders(ctx, w.invocationID, w.attempts)

w.stream, err = w.c.raw.BidiWriteObject(ctx)
if err != nil {
Expand All @@ -2167,6 +2172,7 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
}
}
w.stream.Context()

err = w.stream.Send(req)
if err == io.EOF {
Expand All @@ -2188,7 +2194,9 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received.
if shouldRetry(err) {
var shouldRetry bool
shouldRetry, err = w.shouldRetry(w.ctx, err)
if shouldRetry {
// TODO: Add test case for failure modes of querying progress.
writeOffset, err = w.determineOffset(start)
if err != nil {
Expand Down Expand Up @@ -2234,7 +2242,9 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
var shouldRetry bool
shouldRetry, err = w.shouldRetry(w.ctx, err)
if shouldRetry {
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -2274,7 +2284,9 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if shouldRetry(err) {
var shouldRetry bool
shouldRetry, err = w.shouldRetry(w.ctx, err)
if shouldRetry {
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -2370,3 +2382,78 @@ func checkCanceled(err error) error {

return err
}

func (w *gRPCWriter) initializeRetryConfig() {
if w.attempts == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, are we tracking attempts across the entire upload? I would think we should track this per-chunk as we do for JSON.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this

w.attempts = 1

if w.settings.retry == nil {
w.settings.retry = defaultRetry
}

if w.settings.retry.shouldRetry == nil {
w.settings.retry.shouldRetry = ShouldRetry
}

w.invocationID = uuid.New().String()

if w.settings.retry.backoff != nil {
w.backoff.Multiplier = w.settings.retry.backoff.Multiplier
w.backoff.Initial = w.settings.retry.backoff.Initial
w.backoff.Max = w.settings.retry.backoff.Max
}
}
}

// shouldRetry determines if a retry is necessary and if so waits the appropriate
// amount of time. It returns true if the error is retryable or the error to be
// surfaced to the user if not.
func (w *gRPCWriter) shouldRetry(ctx context.Context, err error) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this something else? Just confusing to conflate this with the ShouldRetry that only takes an err and returns a bool, especially since the backoff pause happens inside this func.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair. I'll think of something better.

if err == nil {
// a nil err does not need to be retried
return false, nil
}
if err != context.Canceled && err != context.DeadlineExceeded {
w.lastErr = err
}

retryConfig := w.settings.retry

if retryConfig.policy == RetryNever {
return false, err
}

if retryConfig.maxAttempts != nil && w.attempts >= *retryConfig.maxAttempts {
return false, fmt.Errorf("storage: retry failed after %v attempts; last error: %w", w.attempts, err)
}

w.attempts++

retryable := retryConfig.shouldRetry(err)
// Explicitly check context cancellation so that we can distinguish between a
// DEADLINE_EXCEEDED error from the server and a user-set context deadline.
// Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's
// sent by the server) in both cases.
ctxErr := ctx.Err()
if errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
retryable = false

if w.lastErr != nil {
return false, fmt.Errorf("retry failed with %v; last error: %w", ctxErr, w.lastErr)
}
return false, ctxErr
}

if retryable {
p := w.backoff.Pause()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, maybe we can add a mock test in this PR to ensure that backoff.Pause is actually called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a mock for the backoff and had to change more than anticipated to be able to inject it, PTAL


if ctxErr := gax.Sleep(ctx, p); ctxErr != nil {
if w.lastErr != nil {
return false, fmt.Errorf("retry failed with %v; last error: %w", ctxErr, w.lastErr)
}
return false, ctxErr
}
}

return retryable, err
}
Loading