Skip to content

Commit

Permalink
remove newAttempt field
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Apr 28, 2022
1 parent 0a4d344 commit e50b202
Showing 1 changed file with 33 additions and 50 deletions.
83 changes: 33 additions & 50 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,23 +303,23 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)

if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
if err != nil {
cs.finish(err)
return nil, err
}

// Because this operation is always called either here (while creating the
// clientStream) or by the retry code while locked when replaying the
// operation, it is safe to access cs.attempt and cs.newAttempt directly.
// operation, it is safe to access cs.attempt directly.
op := func(a *csAttempt) error {
if err := cs.newAttempt.getTransport(); err != nil {
if err := a.getTransport(); err != nil {
return err
}
if err := cs.newAttempt.newStream(); err != nil {
if err := a.newStream(); err != nil {
return err
}
cs.attempt = cs.newAttempt
cs.newAttempt = nil
cs.attempt = a
return nil
}
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
Expand Down Expand Up @@ -362,12 +362,12 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}

// newAttemptLocked creates a new csAttempt without a transport or stream.
func (cs *clientStream) newAttemptLocked(isTransparent bool) error {
func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
if err := cs.ctx.Err(); err != nil {
return toRPCErr(err)
return nil, toRPCErr(err)
}
if err := cs.cc.ctx.Err(); err != nil {
return ErrClientConnClosing
return nil, ErrClientConnClosing
}

ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
Expand Down Expand Up @@ -411,15 +411,14 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) error {
))
}

cs.newAttempt = &csAttempt{
return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
return nil
}, nil
}

func (a *csAttempt) getTransport() error {
Expand Down Expand Up @@ -507,9 +506,6 @@ type clientStream struct {
// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
// place where we need to check if the attempt is nil.
attempt *csAttempt
// newAttempt is a partially-created attempt; it is promoted to the attempt
// field when a transport stream exists.
newAttempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
onCommit func()
Expand Down Expand Up @@ -564,24 +560,21 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
func (cs *clientStream) shouldRetry(err error) (bool, error) {
attempt := cs.newAttempt
if attempt == nil {
attempt = cs.attempt
}
func (a *csAttempt) shouldRetry(err error) (bool, error) {
cs := a.cs

if cs.finished || cs.committed || attempt.drop {
if cs.finished || cs.committed || a.drop {
// RPC is finished or committed or was dropped by the picker; cannot retry.
return false, err
}
if attempt.s == nil && attempt.allowTransparentRetry {
if a.s == nil && a.allowTransparentRetry {
return true, nil
}
// Wait for the trailers.
unprocessed := false
if attempt.s != nil {
<-attempt.s.Done()
unprocessed = attempt.s.Unprocessed()
if a.s != nil {
<-a.s.Done()
unprocessed = a.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
Expand All @@ -593,14 +586,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {

pushback := 0
hasPushback := false
if attempt.s != nil {
if !attempt.s.TrailersOnly() {
if a.s != nil {
if !a.s.TrailersOnly() {
return false, err
}

// TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling.
sps := attempt.s.Trailer()["grpc-retry-pushback-ms"]
sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
Expand All @@ -617,8 +610,8 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}

var code codes.Code
if attempt.s != nil {
code = attempt.s.Status().Code()
if a.s != nil {
code = a.s.Status().Code()
} else {
code = status.Code(err)
}
Expand Down Expand Up @@ -665,26 +658,23 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}

// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for {
attempt := cs.newAttempt
if attempt == nil {
attempt = cs.attempt
}
attempt.finish(toRPCErr(lastErr))
isTransparent, err := cs.shouldRetry(lastErr)
isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
if err := cs.newAttemptLocked(isTransparent); err != nil {
attempt, err = cs.newAttemptLocked(isTransparent)
if err != nil {
// Only returns error if the clientconn is closed or the context of
// the stream is canceled.
cs.finish(err)
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
}
}
Expand All @@ -711,14 +701,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
// already be status errors.
return toRPCErr(op(cs.attempt))
}
a := cs.newAttempt
if a == nil {
a = cs.attempt
}
a := cs.attempt
cs.mu.Unlock()
err := op(a)
cs.mu.Lock()
if (cs.newAttempt == nil && a != cs.attempt) || (cs.newAttempt != nil && a != cs.newAttempt) {
if a != cs.attempt {
// We started another attempt already.
continue
}
Expand All @@ -730,7 +717,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
cs.mu.Unlock()
return err
}
if err := cs.retryLocked(err); err != nil {
if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
Expand Down Expand Up @@ -779,13 +766,9 @@ func (cs *clientStream) Trailer() metadata.MD {
return cs.attempt.s.Trailer()
}

func (cs *clientStream) replayBufferLocked() error {
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.buffer {
a := cs.newAttempt
if a == nil {
a = cs.attempt
}
if err := f(a); err != nil {
if err := f(attempt); err != nil {
return err
}
}
Expand Down

0 comments on commit e50b202

Please sign in to comment.