From e50b20271637e415ab4b425748dfd8fa4486fa09 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 28 Apr 2022 14:47:01 -0700 Subject: [PATCH] remove newAttempt field --- stream.go | 83 ++++++++++++++++++++++--------------------------------- 1 file changed, 33 insertions(+), 50 deletions(-) diff --git a/stream.go b/stream.go index 9ab6c738a3a0..6221e98b9841 100644 --- a/stream.go +++ b/stream.go @@ -303,23 +303,23 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client } cs.binlog = binarylog.GetMethodLogger(method) - if err := cs.newAttemptLocked(false /* isTransparent */); err != nil { + cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */) + if err != nil { cs.finish(err) return nil, err } // Because this operation is always called either here (while creating the // clientStream) or by the retry code while locked when replaying the - // operation, it is safe to access cs.attempt and cs.newAttempt directly. + // operation, it is safe to access cs.attempt directly. op := func(a *csAttempt) error { - if err := cs.newAttempt.getTransport(); err != nil { + if err := a.getTransport(); err != nil { return err } - if err := cs.newAttempt.newStream(); err != nil { + if err := a.newStream(); err != nil { return err } - cs.attempt = cs.newAttempt - cs.newAttempt = nil + cs.attempt = a return nil } if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { @@ -362,12 +362,12 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client } // newAttemptLocked creates a new csAttempt without a transport or stream. -func (cs *clientStream) newAttemptLocked(isTransparent bool) error { +func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) { if err := cs.ctx.Err(); err != nil { - return toRPCErr(err) + return nil, toRPCErr(err) } if err := cs.cc.ctx.Err(); err != nil { - return ErrClientConnClosing + return nil, ErrClientConnClosing } ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp) @@ -411,15 +411,14 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) error { )) } - cs.newAttempt = &csAttempt{ + return &csAttempt{ ctx: ctx, beginTime: beginTime, cs: cs, dc: cs.cc.dopts.dc, statsHandler: sh, trInfo: trInfo, - } - return nil + }, nil } func (a *csAttempt) getTransport() error { @@ -507,9 +506,6 @@ type clientStream struct { // then newClientStream calls finish on the clientStream and returns. So, finish method is the only // place where we need to check if the attempt is nil. attempt *csAttempt - // newAttempt is a partially-created attempt; it is promoted to the attempt - // field when a transport stream exists. - newAttempt *csAttempt // TODO(hedging): hedging will have multiple attempts simultaneously. committed bool // active attempt committed for retry? onCommit func() @@ -564,24 +560,21 @@ func (cs *clientStream) commitAttempt() { // shouldRetry returns nil if the RPC should be retried; otherwise it returns // the error that should be returned by the operation. If the RPC should be // retried, the bool indicates whether it is being retried transparently. -func (cs *clientStream) shouldRetry(err error) (bool, error) { - attempt := cs.newAttempt - if attempt == nil { - attempt = cs.attempt - } +func (a *csAttempt) shouldRetry(err error) (bool, error) { + cs := a.cs - if cs.finished || cs.committed || attempt.drop { + if cs.finished || cs.committed || a.drop { // RPC is finished or committed or was dropped by the picker; cannot retry. return false, err } - if attempt.s == nil && attempt.allowTransparentRetry { + if a.s == nil && a.allowTransparentRetry { return true, nil } // Wait for the trailers. unprocessed := false - if attempt.s != nil { - <-attempt.s.Done() - unprocessed = attempt.s.Unprocessed() + if a.s != nil { + <-a.s.Done() + unprocessed = a.s.Unprocessed() } if cs.firstAttempt && unprocessed { // First attempt, stream unprocessed: transparently retry. @@ -593,14 +586,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) { pushback := 0 hasPushback := false - if attempt.s != nil { - if !attempt.s.TrailersOnly() { + if a.s != nil { + if !a.s.TrailersOnly() { return false, err } // TODO(retry): Move down if the spec changes to not check server pushback // before considering this a failure for throttling. - sps := attempt.s.Trailer()["grpc-retry-pushback-ms"] + sps := a.s.Trailer()["grpc-retry-pushback-ms"] if len(sps) == 1 { var e error if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { @@ -617,8 +610,8 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) { } var code codes.Code - if attempt.s != nil { - code = attempt.s.Status().Code() + if a.s != nil { + code = a.s.Status().Code() } else { code = status.Code(err) } @@ -665,26 +658,23 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) { } // Returns nil if a retry was performed and succeeded; error otherwise. -func (cs *clientStream) retryLocked(lastErr error) error { +func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error { for { - attempt := cs.newAttempt - if attempt == nil { - attempt = cs.attempt - } attempt.finish(toRPCErr(lastErr)) - isTransparent, err := cs.shouldRetry(lastErr) + isTransparent, err := attempt.shouldRetry(lastErr) if err != nil { cs.commitAttemptLocked() return err } cs.firstAttempt = false - if err := cs.newAttemptLocked(isTransparent); err != nil { + attempt, err = cs.newAttemptLocked(isTransparent) + if err != nil { // Only returns error if the clientconn is closed or the context of // the stream is canceled. cs.finish(err) return err } - if lastErr = cs.replayBufferLocked(); lastErr == nil { + if lastErr = cs.replayBufferLocked(attempt); lastErr == nil { return nil } } @@ -711,14 +701,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) // already be status errors. return toRPCErr(op(cs.attempt)) } - a := cs.newAttempt - if a == nil { - a = cs.attempt - } + a := cs.attempt cs.mu.Unlock() err := op(a) cs.mu.Lock() - if (cs.newAttempt == nil && a != cs.attempt) || (cs.newAttempt != nil && a != cs.newAttempt) { + if a != cs.attempt { // We started another attempt already. continue } @@ -730,7 +717,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) cs.mu.Unlock() return err } - if err := cs.retryLocked(err); err != nil { + if err := cs.retryLocked(a, err); err != nil { cs.mu.Unlock() return err } @@ -779,13 +766,9 @@ func (cs *clientStream) Trailer() metadata.MD { return cs.attempt.s.Trailer() } -func (cs *clientStream) replayBufferLocked() error { +func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error { for _, f := range cs.buffer { - a := cs.newAttempt - if a == nil { - a = cs.attempt - } - if err := f(a); err != nil { + if err := f(attempt); err != nil { return err } }