Skip to content

Commit

Permalink
Another implementation of RetryIfErr
Browse files Browse the repository at this point in the history
* Support for delayed retry logic
* Support for resetting the request timeout before retrying
* Users can decide whether to retry based on the request error and the number of retries.
Estimating the time required for a request to complete is relatively straightforward, but when retries are factored in, it becomes challenging to gauge this time accurately.
  • Loading branch information
newacorn committed Aug 27, 2024
1 parent 19c50cd commit 66b1d6d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 78 deletions.
95 changes: 66 additions & 29 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,17 @@ type Client struct {
// RetryIf controls whether a retry should be attempted after an error.
//
// By default will use isIdempotent function.
//
// Deprecated: Use RetryIfErr instead.
// This field is only effective when the `RetryIfErr` field is not set.
RetryIf RetryIfFunc

// When the client encounters an error during a request, the behavior—whether to retry,
// whether to retry later, or whether to reset the request timeout—should be determined
// based on the return value of this interface.
// This field is only effective within the range of MaxIdemponentCallAttempts.
RetryIfErr RetryIfErrFunc

// ConfigureClient configures the fasthttp.HostClient.
ConfigureClient func(hc *HostClient) error

Expand Down Expand Up @@ -537,6 +546,7 @@ func (c *Client) Do(req *Request, resp *Response) error {
DisablePathNormalizing: c.DisablePathNormalizing,
MaxConnWaitTimeout: c.MaxConnWaitTimeout,
RetryIf: c.RetryIf,
RetryIfErr: c.RetryIfErr,
ConnPoolStrategy: c.ConnPoolStrategy,
StreamResponseBody: c.StreamResponseBody,
clientReaderPool: &c.readerPool,
Expand Down Expand Up @@ -659,9 +669,18 @@ type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, err
// Request argument passed to RetryIfFunc, if there are any request errors.
type RetryIfFunc func(request *Request) bool

// RetryIfErrFunc defines the signature of the retry if error function.
// request and error passed to RetryIfErrFunc, if there are any request errors.
type RetryIfErrFunc func(request *Request, err error) bool
// RetryIfErrFunc defines an interface used for implementing the following functionality:
// When the client encounters an error during a request, the behavior—whether to retry,
// whether to retry later, or whether to reset the request timeout—should be determined
// based on the return value of this interface.
//
// retryCount indicates which retry attempt this is, with retry counting starting from 1.
// The first request execution is not considered a retry.
// err represents the error encountered during the previous request.
// sleepDuration if is greater than 0, the retry will be executed after this interval.
// resetReqTime indicates whether to reset the request timeout to its initial value before executing this retry.
// retry indicates whether to perform this retry. If false, the request function will return immediately.
type RetryIfErrFunc func(request *Request, retryCount int, err error) (sleepDuration time.Duration, resetReqTime bool, retry bool)

// RoundTripper wraps every request/response.
type RoundTripper interface {
Expand Down Expand Up @@ -715,12 +734,16 @@ type HostClient struct {
// By default, it uses the isIdempotent function.
//
// Deprecated: Use RetryIfErr instead.
// Panics if both RetryIf and RetryIfErr are set.
// This field is only effective when the `RetryIfErr` field is not set.
RetryIf RetryIfFunc

// RetryIfErr controls whether a retry should be attempted after an error.
// By default, it uses the isIdempotent function.
// When the client encounters an error during a request, the behavior—whether to retry,
// whether to retry later, or whether to reset the request timeout—should be determined
// based on the return value of this interface.
// This field is only effective within the range of MaxIdemponentCallAttempts.
RetryIfErr RetryIfErrFunc
// Whether the timeout resets before retrying.
RetryResetTimeout bool

connsWait *wantConnQueue

Expand Down Expand Up @@ -768,7 +791,10 @@ type HostClient struct {

// Maximum number of attempts for idempotent calls.
//
// DefaultMaxIdemponentCallAttempts is used if not set.
// A value of 0 or a negative value represents using DefaultMaxIdemponentCallAttempts.
// For example, a value of 1 means the request will be executed only once,
// while 2 means the request will be executed at most twice.
// The RetryIfErr and RetryIf fields can invalidate remaining attempts.
MaxIdemponentCallAttempts int

// Per-connection buffer size for responses' reading.
Expand Down Expand Up @@ -1285,14 +1311,12 @@ func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
func (c *HostClient) Do(req *Request, resp *Response) error {
var err error
var retry bool
// Based on issue #1744, we added the RetryIfErr field.
// To avoid ambiguous semantics due to mixed usage,
// only one of the fields can be set at a time.
if c.RetryIf != nil && c.RetryIfErr != nil {
panic("RetryIf and RetryIfErr must not be set at the same time")
}
var (
err error
retry bool
resetReqTimeout bool
sleepDuration time.Duration
)
maxAttempts := c.MaxIdemponentCallAttempts
if maxAttempts <= 0 {
maxAttempts = DefaultMaxIdemponentCallAttempts
Expand All @@ -1308,6 +1332,10 @@ func (c *HostClient) Do(req *Request, resp *Response) error {
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
retryFunc := c.RetryIf
if retryFunc == nil {
retryFunc = isIdempotent
}

atomic.AddInt32(&c.pendingRequests, 1)
for {
Expand All @@ -1329,31 +1357,40 @@ func (c *HostClient) Do(req *Request, resp *Response) error {
if hasBodyStream {
break
}
// Path prioritization based on ease of computation
attempts++

switch {
case c.RetryIf != nil:
retry = c.RetryIf(req)
case c.RetryIfErr != nil:
retry = c.RetryIfErr(req, err)
default:
retry = isIdempotent(req)
if attempts >= maxAttempts {
break
}
if c.RetryIfErr != nil {
sleepDuration, resetReqTimeout, retry = c.RetryIfErr(req, attempts, err)
} else {
retry = retryFunc(req)
}

if !retry {
break
// Retry non-idempotent requests if the server closes
// the connection before sending the response.
//
// This case is possible if the server closes the idle
// keep-alive connection on timeout.
//
// Apache and nginx usually do this.
if err != io.EOF {
break
}
//
// This assertion cannot ensure that the server has not read our request.
// Even if part of the request has been read, retrying for a non-idempotent
// request is still unsafe. It is better to leave this decision to the user.
//
// if err != io.EOF {
// break
// }
}
attempts++
if attempts >= maxAttempts {
break
if sleepDuration > 0 {
time.Sleep(sleepDuration)
}
if resetReqTimeout && timeout > 0 {
deadline = time.Now().Add(timeout)
}
}
atomic.AddInt32(&c.pendingRequests, -1)
Expand Down
49 changes: 0 additions & 49 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1992,55 +1992,6 @@ func (r *readTimeoutConn) SetWriteDeadline(d time.Time) error {
return nil
}

func TestClientNonIdempotentRetry(t *testing.T) {
t.Parallel()

dialsCount := 0
c := &Client{
Dial: func(_ string) (net.Conn, error) {
dialsCount++
switch dialsCount {
case 1, 2:
return &readErrorConn{}, nil
case 3:
return &singleReadConn{
s: "HTTP/1.1 345 OK\r\nContent-Type: foobar\r\nContent-Length: 7\r\n\r\n0123456",
}, nil
default:
return nil, fmt.Errorf("unexpected number of dials: %d", dialsCount)
}
},
}

// This POST must succeed, since the readErrorConn closes
// the connection before sending any response.
// So the client must retry non-idempotent request.
dialsCount = 0
statusCode, body, err := c.Post(nil, "http://foobar/a/b", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if statusCode != 345 {
t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
}
if string(body) != "0123456" {
t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
}

// Verify that idempotent GET succeeds.
dialsCount = 0
statusCode, body, err = c.Get(nil, "http://foobar/a/b")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if statusCode != 345 {
t.Fatalf("unexpected status code: %d. Expecting 345", statusCode)
}
if string(body) != "0123456" {
t.Fatalf("unexpected body: %q. Expecting %q", body, "0123456")
}
}

func TestClientNonIdempotentRetry_BodyStream(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 66b1d6d

Please sign in to comment.