From 59f7c0168b138efbb87352131e2006e26acba7f6 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 28 Apr 2022 10:35:56 -0700 Subject: [PATCH] prevent cs.attempt.s from ever being nil --- clientconn.go | 6 +- internal/transport/http2_client.go | 20 ++-- stream.go | 180 ++++++++++++++++------------- test/end2end_test.go | 7 +- test/retry_test.go | 99 +++++++++++++++- 5 files changed, 212 insertions(+), 100 deletions(-) diff --git a/clientconn.go b/clientconn.go index 3ed6eb8e75e3..ea9836d28b3c 100644 --- a/clientconn.go +++ b/clientconn.go @@ -907,14 +907,10 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { } func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { - t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ + return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ Ctx: ctx, FullMethodName: method, }) - if err != nil { - return nil, nil, toRPCErr(err) - } - return t, done, nil } func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 38ed3d566fff..24ca59084b43 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -631,8 +631,8 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call // the wire. However, there are two notable exceptions: // // 1. If the stream headers violate the max header list size allowed by the -// server. In this case there is no reason to retry at all, as it is -// assumed the RPC would continue to fail on subsequent attempts. +// server. It's possible this could succeed on another transport, even if +// it's unlikely, but do not transparently retry. // 2. If the credentials errored when requesting their headers. In this case, // it's possible a retry can fix the problem, but indefinitely transparently // retrying is not appropriate as it is likely the credentials, if they can @@ -640,8 +640,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call type NewStreamError struct { Err error - DoNotRetry bool - DoNotTransparentRetry bool + AllowTransparentRetry bool } func (e NewStreamError) Error() string { @@ -650,11 +649,11 @@ func (e NewStreamError) Error() string { // NewStream creates a stream and registers it into the transport as "active" // streams. All non-nil errors returned will be *NewStreamError. -func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { +func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) { ctx = peer.NewContext(ctx, t.getPeer()) headerFields, err := t.createHeaderFields(ctx, callHdr) if err != nil { - return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true} + return nil, &NewStreamError{Err: err, AllowTransparentRetry: false} } s := t.newStream(ctx, callHdr) cleanup := func(err error) { @@ -754,13 +753,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return true }, hdr) if err != nil { - return nil, &NewStreamError{Err: err} + // Connection closed. + return nil, &NewStreamError{Err: err, AllowTransparentRetry: true} } if success { break } if hdrListSizeErr != nil { - return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true} + return nil, &NewStreamError{Err: hdrListSizeErr} } firstTry = false select { @@ -768,9 +768,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea case <-ctx.Done(): return nil, &NewStreamError{Err: ContextErr(ctx.Err())} case <-t.goAway: - return nil, &NewStreamError{Err: errStreamDrain} + return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true} case <-t.ctx.Done(): - return nil, &NewStreamError{Err: ErrConnClosing} + return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true} } } if t.statsHandler != nil { diff --git a/stream.go b/stream.go index 8285c8891010..fdf4e1d6697c 100644 --- a/stream.go +++ b/stream.go @@ -308,9 +308,21 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client return nil, err } - op := func(a *csAttempt) error { return a.newStream() } + // Because this operation is always called either here (while creating the + // clientStream) or by the retry code while locked when replaying the + // operation, it is safe to access cs.attempt and cs.newAttempt directly. + op := func(a *csAttempt) error { + if err := cs.newAttempt.getTransport(); err != nil { + return err + } + if err := cs.newAttempt.newStream(); err != nil { + return err + } + cs.attempt = cs.newAttempt + cs.newAttempt = nil + return nil + } if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { - cs.finish(err) return nil, err } @@ -349,9 +361,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client return cs, nil } -// newAttemptLocked creates a new attempt with a transport. -// If it succeeds, then it replaces clientStream's attempt with this new attempt. -func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) { +// newAttemptLocked creates a new csAttempt without a transport or stream. +func (cs *clientStream) newAttemptLocked(isTransparent bool) error { + if err := cs.ctx.Err(); err != nil { + return toRPCErr(err) + } + if err := cs.cc.ctx.Err(); err != nil { + return ErrClientConnClosing + } + ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp) method := cs.callHdr.Method sh := cs.cc.dopts.copts.StatsHandler @@ -385,7 +403,15 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) { ctx = trace.NewContext(ctx, trInfo.tr) } - newAttempt := &csAttempt{ + if cs.cc.parsedTarget.Scheme == "xds" { + // Add extra metadata (metadata that will be added by transport) to context + // so the balancer can see them. + ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs( + "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype), + )) + } + + cs.newAttempt = &csAttempt{ ctx: ctx, beginTime: beginTime, cs: cs, @@ -393,36 +419,20 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) { statsHandler: sh, trInfo: trInfo, } - defer func() { - if retErr != nil { - // This attempt is not set in the clientStream, so it's finish won't - // be called. Call it here for stats and trace in case they are not - // nil. - newAttempt.finish(retErr) - } - }() + return nil +} - if err := ctx.Err(); err != nil { - return toRPCErr(err) - } +func (a *csAttempt) getTransport() error { + cs := a.cs - if cs.cc.parsedTarget.Scheme == "xds" { - // Add extra metadata (metadata that will be added by transport) to context - // so the balancer can see them. - ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs( - "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype), - )) - } - t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method) + var err error + a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method) if err != nil { return err } - if trInfo != nil { - trInfo.firstLine.SetRemoteAddr(t.RemoteAddr()) + if a.trInfo != nil { + a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr()) } - newAttempt.t = t - newAttempt.done = done - cs.attempt = newAttempt return nil } @@ -431,12 +441,21 @@ func (a *csAttempt) newStream() error { cs.callHdr.PreviousAttempts = cs.numRetries s, err := a.t.NewStream(a.ctx, cs.callHdr) if err != nil { - // Return without converting to an RPC error so retry code can - // inspect. - return err + nse, ok := err.(*transport.NewStreamError) + if !ok { + // Unexpected. + return err + } + + if nse.AllowTransparentRetry { + a.allowTransparentRetry = true + } + + // Unwrap and convert error. + return toRPCErr(nse.Err) } - cs.attempt.s = s - cs.attempt.p = &parser{r: s} + a.s = s + a.p = &parser{r: s} return nil } @@ -484,6 +503,9 @@ type clientStream struct { // then newClientStream calls finish on the clientStream and returns. So, finish method is the only // place where we need to check if the attempt is nil. attempt *csAttempt + // newAttempt is a partially-created attempt; it is promoted to the attempt + // field when a transport stream exists. + newAttempt *csAttempt // TODO(hedging): hedging will have multiple attempts simultaneously. committed bool // active attempt committed for retry? onCommit func() @@ -514,6 +536,9 @@ type csAttempt struct { statsHandler stats.Handler beginTime time.Time + + // set for newStream errors that may be transparently retried + allowTransparentRetry bool } func (cs *clientStream) commitAttemptLocked() { @@ -534,40 +559,23 @@ func (cs *clientStream) commitAttempt() { // the error that should be returned by the operation. If the RPC should be // retried, the bool indicates whether it is being retried transparently. func (cs *clientStream) shouldRetry(err error) (bool, error) { + attempt := cs.newAttempt + if attempt == nil { + attempt = cs.attempt + } + if cs.finished || cs.committed { // RPC is finished or committed; cannot retry. return false, err } - if cs.attempt.s == nil { - // Error from NewClientStream. - nse, ok := err.(*transport.NewStreamError) - if !ok { - // Unexpected, but assume no I/O was performed and the RPC is not - // fatal, so retry indefinitely. - return true, nil - } - - // Unwrap and convert error. - err = toRPCErr(nse.Err) - - // Never retry DoNotRetry errors, which indicate the RPC should not be - // retried due to max header list size violation, etc. - if nse.DoNotRetry { - return false, err - } - - // In the event of a non-IO operation error from NewStream, we never - // attempted to write anything to the wire, so we can retry - // indefinitely. - if !nse.DoNotTransparentRetry { - return true, nil - } + if attempt.s == nil && attempt.allowTransparentRetry { + return true, nil } // Wait for the trailers. unprocessed := false - if cs.attempt.s != nil { - <-cs.attempt.s.Done() - unprocessed = cs.attempt.s.Unprocessed() + if attempt.s != nil { + <-attempt.s.Done() + unprocessed = attempt.s.Unprocessed() } if cs.firstAttempt && unprocessed { // First attempt, stream unprocessed: transparently retry. @@ -579,14 +587,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) { pushback := 0 hasPushback := false - if cs.attempt.s != nil { - if !cs.attempt.s.TrailersOnly() { + if attempt.s != nil { + if !attempt.s.TrailersOnly() { return false, err } // TODO(retry): Move down if the spec changes to not check server pushback // before considering this a failure for throttling. - sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] + sps := attempt.s.Trailer()["grpc-retry-pushback-ms"] if len(sps) == 1 { var e error if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { @@ -603,10 +611,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) { } var code codes.Code - if cs.attempt.s != nil { - code = cs.attempt.s.Status().Code() + if attempt.s != nil { + code = attempt.s.Status().Code() } else { - code = status.Convert(err).Code() + code = status.Code(err) } rp := cs.methodConfig.RetryPolicy @@ -653,7 +661,11 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) { // Returns nil if a retry was performed and succeeded; error otherwise. func (cs *clientStream) retryLocked(lastErr error) error { for { - cs.attempt.finish(toRPCErr(lastErr)) + attempt := cs.newAttempt + if attempt == nil { + attempt = cs.attempt + } + attempt.finish(toRPCErr(lastErr)) isTransparent, err := cs.shouldRetry(lastErr) if err != nil { cs.commitAttemptLocked() @@ -661,6 +673,9 @@ func (cs *clientStream) retryLocked(lastErr error) error { } cs.firstAttempt = false if err := cs.newAttemptLocked(isTransparent); err != nil { + // Only returns error if the clientconn is closed or the context of + // the stream is canceled. + cs.finish(err) return err } if lastErr = cs.replayBufferLocked(); lastErr == nil { @@ -673,7 +688,10 @@ func (cs *clientStream) Context() context.Context { cs.commitAttempt() // No need to lock before using attempt, since we know it is committed and // cannot change. - return cs.attempt.s.Context() + if cs.attempt.s != nil { + return cs.attempt.s.Context() + } + return cs.ctx } func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { @@ -687,11 +705,14 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) // already be status errors. return toRPCErr(op(cs.attempt)) } - a := cs.attempt + a := cs.newAttempt + if a == nil { + a = cs.attempt + } cs.mu.Unlock() err := op(a) cs.mu.Lock() - if a != cs.attempt { + if (cs.newAttempt == nil && a != cs.attempt) || (cs.newAttempt != nil && a != cs.newAttempt) { // We started another attempt already. continue } @@ -734,7 +755,7 @@ func (cs *clientStream) Header() (metadata.MD, error) { cs.binlog.Log(logEntry) cs.serverHeaderBinlogged = true } - return m, err + return m, nil } func (cs *clientStream) Trailer() metadata.MD { @@ -753,8 +774,11 @@ func (cs *clientStream) Trailer() metadata.MD { } func (cs *clientStream) replayBufferLocked() error { - a := cs.attempt for _, f := range cs.buffer { + a := cs.newAttempt + if a == nil { + a = cs.attempt + } if err := f(a); err != nil { return err } @@ -805,11 +829,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } msgBytes := data // Store the pointer before setting to nil. For binary logging. op := func(a *csAttempt) error { - err := a.sendMsg(m, hdr, payload, data) - // nil out the message and uncomp when replaying; they are only needed for - // stats which is disabled for subsequent attempts. - m, data = nil, nil - return err + return a.sendMsg(m, hdr, payload, data) } err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) if cs.binlog != nil && err == nil { @@ -818,7 +838,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { Message: msgBytes, }) } - return + return err } func (cs *clientStream) RecvMsg(m interface{}) error { diff --git a/test/end2end_test.go b/test/end2end_test.go index 766d568b9277..3c38e00006af 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1508,7 +1508,7 @@ func testFailFast(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) @@ -1517,9 +1517,10 @@ func testFailFast(t *testing.T, e env) { te.srv.Stop() // Loop until the server teardown is propagated to the client. for { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := ctx.Err(); err != nil { + t.Fatalf("EmptyCall did not return UNAVAILABLE before timeout") + } _, err := tc.EmptyCall(ctx, &testpb.Empty{}) - cancel() if status.Code(err) == codes.Unavailable { break } diff --git a/test/retry_test.go b/test/retry_test.go index 1013e54ce051..38dbcb6d3ad9 100644 --- a/test/retry_test.go +++ b/test/retry_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" @@ -44,7 +45,8 @@ import ( func (s) TestRetryUnary(t *testing.T) { i := -1 ss := &stubserver.StubServer{ - EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) { + defer func() { t.Logf("server call %v returning err %v", i, err) }() i++ switch i { case 0, 2, 5: @@ -95,7 +97,8 @@ func (s) TestRetryUnary(t *testing.T) { {codes.Internal, 11}, {codes.AlreadyExists, 15}, } - for _, tc := range testCases { + for num, tc := range testCases { + t.Log("Case", num) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) cancel() @@ -644,3 +647,95 @@ func (s) TestRetryStats(t *testing.T) { t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff) } } + +func (s) TestRetryTransparentWhenCommitted(t *testing.T) { + // With MaxConcurrentStreams=1: + // + // 1. Create stream 1 that is retriable. + // 2. Stream 1 is created and fails with a retriable code. + // 3. Create dummy stream 2, blocking indefinitely. + // 4. Stream 1 retries (and blocks until stream 2 finishes) + // 5. Stream 1 is canceled manually. + // + // If there is no bug, the stream is done and errors with CANCELED. With a bug: + // + // 6. Stream 1 has a nil stream (attempt.s). Operations like CloseSend will panic. + + first := grpcsync.NewEvent() + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { + // signal? + if !first.HasFired() { + first.Fire() + t.Log("returned first error") + return status.Error(codes.AlreadyExists, "first attempt fails and is retriable") + } + t.Log("blocking") + <-stream.Context().Done() + return stream.Context().Err() + }, + } + + if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + ss.NewServiceConfig(`{ + "methodConfig": [{ + "name": [{"service": "grpc.testing.TestService"}], + "waitForReady": true, + "retryPolicy": { + "MaxAttempts": 2, + "InitialBackoff": ".1s", + "MaxBackoff": ".1s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": [ "ALREADY_EXISTS" ] + } + }]}`) + + ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel1() + ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel2() + + for { + if ctx1.Err() != nil { + t.Fatalf("Timed out waiting for service config update") + } + if ss.CC.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { + break + } + time.Sleep(time.Millisecond) + } + + stream1, err := ss.Client.FullDuplexCall(ctx1) + if err != nil { + t.Fatalf("Error creating stream 1: %v", err) + } + + // Create dummy stream to block indefinitely. + _, err = ss.Client.FullDuplexCall(ctx2) + if err != nil { + t.Errorf("Error creating stream 2: %v", err) + } + + stream1Closed := grpcsync.NewEvent() + go func() { + _, err := stream1.Recv() + // Will trigger a retry when it sees the ALREADY_EXISTS error + if status.Code(err) != codes.Canceled { + t.Errorf("Expected stream1 to be canceled; got error: %v", err) + } + stream1Closed.Fire() + }() + + // Wait longer than the retry backoff timer. + time.Sleep(200 * time.Millisecond) + cancel1() + + // Operations on the stream should not panic. + <-stream1Closed.Done() + stream1.CloseSend() + stream1.Recv() + stream1.Send(&testpb.StreamingOutputCallRequest{}) +}