From df8dd0d44097a8c87956a7f92c2031aa748cdeae Mon Sep 17 00:00:00 2001 From: Ismail Gjevori Date: Sun, 24 Nov 2024 11:27:00 +0100 Subject: [PATCH 1/2] stream: update retry delay Update the delay calculation to reflect the updated gRFC --- service_config.go | 5 +++-- stream.go | 11 +++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/service_config.go b/service_config.go index 2671c5ef69f0..7e83027d1994 100644 --- a/service_config.go +++ b/service_config.go @@ -168,6 +168,7 @@ func init() { return parseServiceConfig(js, defaultMaxCallAttempts) } } + func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult { if len(js) == 0 { return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")} @@ -297,7 +298,7 @@ func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalservi return rp, nil } -func min(a, b *int) *int { +func minPointers(a, b *int) *int { if *a < *b { return a } @@ -309,7 +310,7 @@ func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { return &defaultVal } if mcMax != nil && doptMax != nil { - return min(mcMax, doptMax) + return minPointers(mcMax, doptMax) } if mcMax != nil { return mcMax diff --git a/stream.go b/stream.go index 6d10d0ac8713..1f95fc97ccd6 100644 --- a/stream.go +++ b/stream.go @@ -218,7 +218,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth var mc serviceconfig.MethodConfig var onCommit func() - var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { + newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) { return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...) } @@ -708,11 +708,10 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) { cs.numRetriesSincePushback = 0 } else { fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback)) - cur := float64(rp.InitialBackoff) * fact - if max := float64(rp.MaxBackoff); cur > max { - cur = max - } - dur = time.Duration(rand.Int64N(int64(cur))) + cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff)) + // apply a jitter of plus or minus 0.2 + cur *= 0.8 + 0.4*rand.Float64() + dur = time.Duration(int64(cur)) cs.numRetriesSincePushback++ } From 81ae30ba6e93a2fc96619fd1093f6bc6ef6411e9 Mon Sep 17 00:00:00 2001 From: Ismail Gjevori Date: Mon, 25 Nov 2024 20:38:28 +0100 Subject: [PATCH 2/2] strea.go: update comment --- stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 1f95fc97ccd6..17e2267b3320 100644 --- a/stream.go +++ b/stream.go @@ -709,7 +709,7 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) { } else { fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback)) cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff)) - // apply a jitter of plus or minus 0.2 + // Apply jitter by multiplying with a random factor between 0.8 and 1.2 cur *= 0.8 + 0.4*rand.Float64() dur = time.Duration(int64(cur)) cs.numRetriesSincePushback++