Skip to content

Commit

Permalink
Another implementation of RetryIfErr
Browse files Browse the repository at this point in the history
* Support for delayed retry logic
* Support for resetting the request timeout before retrying
* Users can decide whether to retry based on the request error and the number of retries.
Estimating the time required for a request to complete is relatively straightforward, but when retries are factored in, it becomes challenging to gauge this time accurately.
  • Loading branch information
newacorn committed Aug 31, 2024
1 parent 19c50cd commit 2a94229
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 85 deletions.
95 changes: 59 additions & 36 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,17 @@ type Client struct {
// RetryIf controls whether a retry should be attempted after an error.
//
// By default will use isIdempotent function.
//
// Deprecated: Use RetryIfErr instead.
// This field is only effective when the `RetryIfErr` field is not set.
RetryIf RetryIfFunc

// When the client encounters an error during a request, the behavior—whether to retry,
// whether to retry later, or whether to reset the request timeout—should be determined
// based on the return value of this field.
// This field is only effective within the range of MaxIdemponentCallAttempts.
RetryIfErr RetryIfErrFunc

// ConfigureClient configures the fasthttp.HostClient.
ConfigureClient func(hc *HostClient) error

Expand Down Expand Up @@ -537,6 +546,7 @@ func (c *Client) Do(req *Request, resp *Response) error {
DisablePathNormalizing: c.DisablePathNormalizing,
MaxConnWaitTimeout: c.MaxConnWaitTimeout,
RetryIf: c.RetryIf,
RetryIfErr: c.RetryIfErr,
ConnPoolStrategy: c.ConnPoolStrategy,
StreamResponseBody: c.StreamResponseBody,
clientReaderPool: &c.readerPool,
Expand Down Expand Up @@ -659,9 +669,23 @@ type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, err
// Request argument passed to RetryIfFunc, if there are any request errors.
type RetryIfFunc func(request *Request) bool

// RetryIfErrFunc defines the signature of the retry if error function.
// request and error passed to RetryIfErrFunc, if there are any request errors.
type RetryIfErrFunc func(request *Request, err error) bool
// RetryIfErrFunc defines an interface used for implementing the following functionality:
// When the client encounters an error during a request, the behavior—whether to retry,
// or whether to reset the request timeout—should be determined
// based on the return value of this interface.
//
// attempt indicates which attempt the current retry is due to a failure of.
// The first request counts as the first attempt.
//
// err represents the error encountered while attempting the `attempts`-th request.
//
// resetTimeout indicates whether to reuse the `Request`'s timeout as the timeout interval,
// rather than using the timeout after subtracting the time spent on previous failed requests.
// This return value is meaningful only when you use `Request.SetTimeout`, `DoTimeout`, or `DoDeadline`.
//
// retry indicates whether to retry the current request. If it is false,
// the request function will immediately return with the `err`.
type RetryIfErrFunc func(request *Request, attempts int, err error) (resetTimeout bool, retry bool)

// RoundTripper wraps every request/response.
type RoundTripper interface {
Expand Down Expand Up @@ -715,12 +739,16 @@ type HostClient struct {
// By default, it uses the isIdempotent function.
//
// Deprecated: Use RetryIfErr instead.
// Panics if both RetryIf and RetryIfErr are set.
// This field is only effective when the `RetryIfErr` field is not set.
RetryIf RetryIfFunc

// RetryIfErr controls whether a retry should be attempted after an error.
// By default, it uses the isIdempotent function.
// When the client encounters an error during a request, the behavior—whether to retry,
// or whether to reset the request timeout—should be determined
// based on the return value of this field.
// This field is only effective within the range of MaxIdemponentCallAttempts.
RetryIfErr RetryIfErrFunc
// Whether the timeout resets before retrying.
RetryResetTimeout bool

connsWait *wantConnQueue

Expand Down Expand Up @@ -768,7 +796,10 @@ type HostClient struct {

// Maximum number of attempts for idempotent calls.
//
// DefaultMaxIdemponentCallAttempts is used if not set.
// A value of 0 or a negative value represents using DefaultMaxIdemponentCallAttempts.
// For example, a value of 1 means the request will be executed only once,
// while 2 means the request will be executed at most twice.
// The RetryIfErr and RetryIf fields can invalidate remaining attempts.
MaxIdemponentCallAttempts int

// Per-connection buffer size for responses' reading.
Expand Down Expand Up @@ -1285,14 +1316,11 @@ func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func (c *HostClient) Do(req *Request, resp *Response) error {
var err error
var retry bool
// Based on issue #1744, we added the RetryIfErr field.
// To avoid ambiguous semantics due to mixed usage,
// only one of the fields can be set at a time.
if c.RetryIf != nil && c.RetryIfErr != nil {
panic("RetryIf and RetryIfErr must not be set at the same time")
}
var (
err error
retry bool
resetTimeout bool
)
maxAttempts := c.MaxIdemponentCallAttempts
if maxAttempts <= 0 {
maxAttempts = DefaultMaxIdemponentCallAttempts
Expand All @@ -1308,6 +1336,10 @@ func (c *HostClient) Do(req *Request, resp *Response) error {
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
retryFunc := c.RetryIf
if retryFunc == nil {
retryFunc = isIdempotent
}

atomic.AddInt32(&c.pendingRequests, 1)
for {
Expand All @@ -1329,32 +1361,23 @@ func (c *HostClient) Do(req *Request, resp *Response) error {
if hasBodyStream {
break
}
// Path prioritization based on ease of computation
attempts++

switch {
case c.RetryIf != nil:
retry = c.RetryIf(req)
case c.RetryIfErr != nil:
retry = c.RetryIfErr(req, err)
default:
retry = isIdempotent(req)
if attempts >= maxAttempts {
break
}

if !retry {
// Retry non-idempotent requests if the server closes
// the connection before sending the response.
//
// This case is possible if the server closes the idle
// keep-alive connection on timeout.
//
// Apache and nginx usually do this.
if err != io.EOF {
break
}
if c.RetryIfErr != nil {
resetTimeout, retry = c.RetryIfErr(req, attempts, err)
} else {
retry = retryFunc(req)
}
attempts++
if attempts >= maxAttempts {
if !retry {
break
}
if timeout > 0 && resetTimeout {
deadline = time.Now().Add(timeout)
}
}
atomic.AddInt32(&c.pendingRequests, -1)

Expand Down
49 changes: 0 additions & 49 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1992,55 +1992,6 @@ func (r *readTimeoutConn) SetWriteDeadline(d time.Time) error {
return nil
}

func TestClientNonIdempotentRetry(t *testing.T) {
t.Parallel()

dialsCount := 0
c := &Client{
Dial: func(_ string) (net.Conn, error) {
dialsCount++
switch dialsCount {
case 1, 2:
return &readErrorConn{}, nil
case 3:
return &singleReadConn{
s: "HTTP/1.1 345 OK\r\nContent-Type: foobar\r\nContent-Length: 7\r\n\r\n0123456",
}, nil
default:
return nil, fmt.Errorf("unexpected number of dials: %d", dialsCount)
}
},
}

// This POST must succeed, since the readErrorConn closes
// the connection before sending any response.
// So the client must retry non-idempotent request.
dialsCount = 0
statusCode, body, err := c.Post(nil, "http://foobar/a/b", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if statusCode != 345 {
t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
}
if string(body) != "0123456" {
t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
}

// Verify that idempotent GET succeeds.
dialsCount = 0
statusCode, body, err = c.Get(nil, "http://foobar/a/b")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if statusCode != 345 {
t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
}
if string(body) != "0123456" {
t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
}
}

func TestClientNonIdempotentRetry_BodyStream(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 2a94229

Please sign in to comment.