Skip to content

Commit

Permalink
Merge pull request #109050 from MadhavJivrajani/client-go-retry
Browse files Browse the repository at this point in the history
rest: Ensure response body is fully read and closed before retry

Kubernetes-commit: 97bf2986cdeae0e7da70659d70375e0770b14a5e
  • Loading branch information
k8s-publishing-bot committed Apr 4, 2022
2 parents 8a672f0 + 01ab7fb commit 33011f1
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 59 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/protobuf v1.27.1
k8s.io/api v0.0.0-20220331140502-02c2207317b5
k8s.io/api v0.0.0-20220402025220-2de699698342
k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444
k8s.io/klog/v2 v2.60.1
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
Expand All @@ -44,6 +44,6 @@ require (
)

replace (
k8s.io/api => k8s.io/api v0.0.0-20220331140502-02c2207317b5
k8s.io/api => k8s.io/api v0.0.0-20220402025220-2de699698342
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20220331140502-02c2207317b5 h1:shLc1jkM9dNz8zPSm2YeE/XOpp1UP36AZOt5DjspI+0=
k8s.io/api v0.0.0-20220331140502-02c2207317b5/go.mod h1:69QWTzqWVlGn0rU+x3dmk3WAsUQHmeQwIBWMbK1ZEyE=
k8s.io/api v0.0.0-20220402025220-2de699698342 h1:xFpsdy7RmF2niTyB76yUyPubqMa5m9/L9sswkdyhONo=
k8s.io/api v0.0.0-20220402025220-2de699698342/go.mod h1:69QWTzqWVlGn0rU+x3dmk3WAsUQHmeQwIBWMbK1ZEyE=
k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444 h1:whQmS3GtF822OUer+LPJnMFKn6kPfuJOCM/3xUuATIY=
k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
Expand Down
25 changes: 11 additions & 14 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}

if err := r.retry.Before(ctx, r); err != nil {
return nil, r.retry.WrapPreviousError(err)
}

req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
r.retry.After(ctx, r, resp, err)
Expand Down Expand Up @@ -722,18 +721,17 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {

url := r.URL().String()
for {
if err := r.retry.Before(ctx, r); err != nil {
return nil, err
}

req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}

if err := r.retry.Before(ctx, r); err != nil {
return nil, err
}

resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
r.retry.After(ctx, r, resp, err)
Expand Down Expand Up @@ -859,14 +857,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp

// Right now we make about ten retry attempts if we get a Retry-After response.
for {
if err := r.retry.Before(ctx, r); err != nil {
return r.retry.WrapPreviousError(err)
}
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}

if err := r.retry.Before(ctx, r); err != nil {
return r.retry.WrapPreviousError(err)
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
Expand Down
127 changes: 118 additions & 9 deletions rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestRequestWatch(t *testing.T) {
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
},
},
{
Expand All @@ -954,7 +954,10 @@ func TestRequestWatch(t *testing.T) {
serverReturns: []responseErr{
{response: nil, err: io.EOF},
},
Empty: true,
Err: true,
ErrFn: func(err error) bool {
return !apierrors.IsInternalError(err)
},
},
{
name: "max retries 2, server always returns a response with Retry-After header",
Expand Down Expand Up @@ -1130,7 +1133,7 @@ func TestRequestStream(t *testing.T) {
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
},
},
{
Expand Down Expand Up @@ -1371,8 +1374,6 @@ func (b *testBackoffManager) Sleep(d time.Duration) {
}

func TestCheckRetryClosesBody(t *testing.T) {
// unblock CI until http://issue.k8s.io/108906 is resolved in 1.24
t.Skip("http://issue.k8s.io/108906")
count := 0
ch := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -2435,6 +2436,7 @@ func TestRequestWithRetry(t *testing.T) {
body io.Reader
serverReturns responseErr
errExpected error
errContains string
transformFuncInvokedExpected int
roundTripInvokedExpected int
}{
Expand All @@ -2451,7 +2453,7 @@ func TestRequestWithRetry(t *testing.T) {
body: &readSeeker{err: io.EOF},
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
errExpected: nil,
transformFuncInvokedExpected: 1,
transformFuncInvokedExpected: 0,
roundTripInvokedExpected: 1,
},
{
Expand All @@ -2474,7 +2476,7 @@ func TestRequestWithRetry(t *testing.T) {
name: "server returns retryable err, request body Seek returns error, retry aborted",
body: &readSeeker{err: io.EOF},
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
errExpected: io.ErrUnexpectedEOF,
errContains: "failed to reset the request body while retrying a request: EOF",
transformFuncInvokedExpected: 0,
roundTripInvokedExpected: 1,
},
Expand Down Expand Up @@ -2517,8 +2519,15 @@ func TestRequestWithRetry(t *testing.T) {
if test.transformFuncInvokedExpected != transformFuncInvoked {
t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked)
}
if test.errExpected != unWrap(err) {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
switch {
case test.errExpected != nil:
if test.errExpected != unWrap(err) {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
}
case len(test.errContains) > 0:
if !strings.Contains(err.Error(), test.errContains) {
t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error())
}
}
})
}
Expand Down Expand Up @@ -3531,3 +3540,103 @@ func TestTransportConcurrency(t *testing.T) {
})
}
}

// TODO: see if we can consolidate the other trackers into one.
type requestBodyTracker struct {
io.ReadSeeker
f func(string)
}

func (t *requestBodyTracker) Read(p []byte) (int, error) {
t.f("Request.Body.Read")
return t.ReadSeeker.Read(p)
}

func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) {
t.f("Request.Body.Seek")
return t.ReadSeeker.Seek(offset, whence)
}

type responseBodyTracker struct {
io.ReadCloser
f func(string)
}

func (t *responseBodyTracker) Read(p []byte) (int, error) {
t.f("Response.Body.Read")
return t.ReadCloser.Read(p)
}

func (t *responseBodyTracker) Close() error {
t.f("Response.Body.Close")
return t.ReadCloser.Close()
}

type recorder struct {
order []string
}

func (r *recorder) record(call string) {
r.order = append(r.order, call)
}

func TestRequestBodyResetOrder(t *testing.T) {
recorder := &recorder{}
respBodyTracker := &responseBodyTracker{
ReadCloser: nil, // the server will fill it
f: recorder.record,
}

var attempts int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
attempts++
}()

// read the request body.
ioutil.ReadAll(req.Body)

// first attempt, we send a retry-after
if attempts == 0 {
resp := retryAfterResponse()
respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{}))
resp.Body = respBodyTracker
return resp, nil
}

return &http.Response{StatusCode: http.StatusOK}, nil
})

reqBodyTracker := &requestBodyTracker{
ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most.
f: recorder.record,
}
req := &Request{
verb: "POST",
body: reqBodyTracker,
c: &RESTClient{
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: 1},
}

req.Do(context.Background())

expected := []string{
// 1st attempt: the server handler reads the request body
"Request.Body.Read",
// the server sends a retry-after, client reads the
// response body, and closes it
"Response.Body.Read",
"Response.Body.Close",
// client retry logic seeks to the beginning of the request body
"Request.Body.Seek",
// 2nd attempt: the server reads the request body
"Request.Body.Read",
}
if !reflect.DeepEqual(expected, recorder.order) {
t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order))
}
}
53 changes: 21 additions & 32 deletions rest/with_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ type WithRetry interface {
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool

// Before should be invoked prior to each attempt, including
// the first one. if an error is returned, the request
// should be aborted immediately.
// the first one. If an error is returned, the request should
// be aborted immediately.
//
// Before may also be additionally responsible for preparing
// the request for the next retry, namely in terms of resetting
// the request body in case it has been read.
Before(ctx context.Context, r *Request) error

// After should be invoked immediately after an attempt is made.
Expand Down Expand Up @@ -194,46 +198,18 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *
r.retryAfter.Wait = time.Duration(seconds) * time.Second
r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)

if err := r.prepareForNextRetry(ctx, restReq); err != nil {
klog.V(4).Infof("Could not retry request - %v", err)
return false
}

return true
}

// prepareForNextRetry is responsible for carrying out operations that need
// to be completed before the next retry is initiated:
// - if the request context is already canceled there is no need to
// retry, the function will return ctx.Err().
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should return an error if
// it fails to do so.
func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error {
if ctx.Err() != nil {
return ctx.Err()
}

// Ensure the response body is fully read and closed before
// we reconnect, so that we reuse the same TCP connection.
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
if _, err := seeker.Seek(0, 0); err != nil {
return fmt.Errorf("can't Seek() back to beginning of body for %T", request)
}
}

klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
return nil
}

func (r *withRetry) Before(ctx context.Context, request *Request) error {
// If the request context is already canceled there
// is no need to retry.
if ctx.Err() != nil {
r.trackPreviousError(ctx.Err())
return ctx.Err()
}

url := request.URL()

// r.retryAfter represents the retry after parameters calculated
// from the (response, err) tuple from the last attempt, so 'Before'
// can apply these retry after parameters prior to the next attempt.
Expand All @@ -245,6 +221,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
return nil
}

// At this point we've made atleast one attempt, post which the response
// body should have been fully read and closed in order for it to be safe
// to reset the request body before we reconnect, in order for us to reuse
// the same TCP connection.
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err)
r.trackPreviousError(err)
return err
}
}

// if we are here, we have made attempt(s) al least once before.
if request.backoff != nil {
// TODO(tkashem) with default set to use exponential backoff
Expand All @@ -263,6 +251,7 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
return err
}

klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
return nil
}

Expand Down

0 comments on commit 33011f1

Please sign in to comment.