Skip to content

Commit

Permalink
RPC Timeout/Retries account for blocking requests (hashicorp#8921)
Browse files Browse the repository at this point in the history
The current implementation measures RPC request timeout only against
config.RPCHoldTimeout, which is fine for non-blocking requests but will
almost surely be exceeded by long-poll requests that block for minutes
at a time.

This adds an HasTimedOut method on the RPCInfo interface that takes into
account whether the request is blocking, its maximum wait time, and the
RPCHoldTimeout.
  • Loading branch information
pierreca authored and fredrikhgrelland committed Sep 28, 2020
1 parent 2cd5824 commit 363c3a6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
18 changes: 8 additions & 10 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,22 @@ TRY:
// Move off to another server, and see if we can retry.
c.rpcLogger.Error("error performing RPC to server", "error", rpcErr, "rpc", method, "server", server.Addr)
c.servers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr); !retry {
if retry := canRetry(args, rpcErr, firstCheck, c.config.RPCHoldTimeout); !retry {
return rpcErr
}

// We can wait a bit and retry!
if time.Since(firstCheck) < c.config.RPCHoldTimeout {
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
return rpcErr
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error) bool {
func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.Duration) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
Expand All @@ -110,7 +108,7 @@ func canRetry(args interface{}, err error) bool {
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) {
if ok && info.IsRead() && lib.IsErrEOF(err) && !info.HasTimedOut(start, rpcHoldTimeout) {
return true
}

Expand Down
12 changes: 12 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type RPCInfo interface {
AllowStaleRead() bool
IsForwarded() bool
SetForwarded()
HasTimedOut(since time.Time, rpcHoldTimeout time.Duration) bool
}

// InternalRpcInfo allows adding internal RPC metadata to an RPC. This struct
Expand Down Expand Up @@ -282,6 +283,13 @@ func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
if q.MinQueryIndex > 0 {
return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout)
}
return time.Since(start) > rpcHoldTimeout
}

// AgentPprofRequest is used to request a pprof report for a given node.
type AgentPprofRequest struct {
// ReqType specifies the profile to use
Expand Down Expand Up @@ -368,6 +376,10 @@ func (w WriteRequest) AllowStaleRead() bool {
return false
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}

// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
Expand Down

0 comments on commit 363c3a6

Please sign in to comment.