Skip to content

Commit

Permalink
Fix RPC retry logic in nomad client's rpc.go for blocking queries (#9266
Browse files Browse the repository at this point in the history
)
  • Loading branch information
benbuzbee committed Nov 30, 2020
1 parent bf225f7 commit 6a6547b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 39 deletions.
56 changes: 44 additions & 12 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return c.config.RPCHandler.RPC(method, args, reply)
}

// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
firstCheck := time.Now()
// We will try to automatically retry requests that fail due to things like server unavailability
// but instead of retrying forever, lets have a solid upper-bound
deadline := time.Now()

// A reasonable amount of time for leader election. Note when servers forward() our RPC requests
// to the leader they may also allow for an RPCHoldTimeout while waiting for leader election.
// That's OK, we won't double up because we are using it here not as a sleep but
// as a hint to give up
deadline = deadline.Add(c.config.RPCHoldTimeout)

// If its a blocking query, allow the time specified by the request
if info, ok := args.(structs.RPCInfo); ok {
deadline = deadline.Add(info.TimeToBlock())
}

TRY:
server := c.servers.FindServer()
Expand All @@ -68,6 +76,7 @@ TRY:

// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)

if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
Expand All @@ -83,22 +92,45 @@ TRY:
// Move off to another server, and see if we can retry.
c.rpcLogger.Error("error performing RPC to server", "error", rpcErr, "rpc", method, "server", server.Addr)
c.servers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr, firstCheck, c.config.RPCHoldTimeout); !retry {

if !canRetry(args, rpcErr) {
c.rpcLogger.Error("error performing RPC to server which is not safe to automatically retry", "error", rpcErr, "rpc", method, "server", server.Addr)
return rpcErr
}
if time.Now().After(deadline) {
// Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example:
// a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry
// so before we give up on blocking queries make one last attempt for an immediate answer
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
info.SetTimeToBlock(0)
return c.RPC(method, args, reply)
}
c.rpcLogger.Error("error performing RPC to server, deadline exceeded, cannot retry", "error", rpcErr, "rpc", method, "server", server.Addr)
return rpcErr
}

// We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
// Wait to avoid thundering herd
select {
case <-time.After(jitter):
case <-time.After(lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)):
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
newBlockTime := deadline.Sub(time.Now())
// We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response
if newBlockTime < 0 {
newBlockTime = 0
}
info.SetTimeToBlock(newBlockTime)
return c.RPC(method, args, reply)
}

goto TRY
case <-c.shutdownCh:
}
return rpcErr
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.Duration) bool {
func canRetry(args interface{}, err error) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
Expand All @@ -108,7 +140,7 @@ func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) && !info.HasTimedOut(start, rpcHoldTimeout) {
if ok && info.IsRead() && lib.IsErrEOF(err) {
return true
}

Expand Down
7 changes: 1 addition & 6 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,7 @@ func (r *rpcHandler) blockingRPC(opts *blockingOptions) error {
goto RUN_QUERY
}

// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > structs.MaxBlockingRPCQueryTime {
opts.queryOpts.MaxQueryTime = structs.MaxBlockingRPCQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = structs.DefaultBlockingRPCQueryTime
}
opts.queryOpts.MaxQueryTime = opts.queryOpts.TimeToBlock()

// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction)
Expand Down
51 changes: 30 additions & 21 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strings"
"time"

"github.com/hashicorp/consul/lib"
"github.com/hashicorp/cronexpr"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -232,7 +231,11 @@ type RPCInfo interface {
AllowStaleRead() bool
IsForwarded() bool
SetForwarded()
HasTimedOut(since time.Time, rpcHoldTimeout time.Duration) bool
TimeToBlock() time.Duration
// TimeToBlock sets how long this request can block. The requested time may not be possible,
// so Callers should readback TimeToBlock. E.g. you cannot set time to block at all on WriteRequests
// and it cannot exceed MaxBlockingRPCQueryTime
SetTimeToBlock(t time.Duration)
}

// InternalRpcInfo allows adding internal RPC metadata to an RPC. This struct
Expand Down Expand Up @@ -287,6 +290,24 @@ type QueryOptions struct {
InternalRpcInfo
}

// TimeToBlock returns MaxQueryTime adjusted for maximums and defaults
// it will return 0 if this is not a blocking query
func (q QueryOptions) TimeToBlock() time.Duration {
if q.MinQueryIndex == 0 {
return 0
}
if q.MaxQueryTime > MaxBlockingRPCQueryTime {
return MaxBlockingRPCQueryTime
} else if q.MaxQueryTime <= 0 {
return DefaultBlockingRPCQueryTime
}
return q.MaxQueryTime
}

func (q QueryOptions) SetTimeToBlock(t time.Duration) {
q.MaxQueryTime = t
}

func (q QueryOptions) RequestRegion() string {
return q.Region
}
Expand All @@ -312,21 +333,6 @@ func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
if q.MinQueryIndex > 0 {
// Restrict the max query time, and ensure there is always one
if q.MaxQueryTime > MaxBlockingRPCQueryTime {
q.MaxQueryTime = MaxBlockingRPCQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = DefaultBlockingRPCQueryTime
}
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)

return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout)
}
return time.Since(start) > rpcHoldTimeout
}

// AgentPprofRequest is used to request a pprof report for a given node.
type AgentPprofRequest struct {
// ReqType specifies the profile to use
Expand Down Expand Up @@ -387,6 +393,13 @@ type WriteRequest struct {
InternalRpcInfo
}

func (w WriteRequest) TimeToBlock() time.Duration {
return 0
}

func (w WriteRequest) SetTimeToBlock(_ time.Duration) {
}

func (w WriteRequest) RequestRegion() string {
// The target region for this request
return w.Region
Expand All @@ -413,10 +426,6 @@ func (w WriteRequest) AllowStaleRead() bool {
return false
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}

// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
Expand Down

3 comments on commit 6a6547b

@vercel
Copy link

@vercel vercel bot commented on 6a6547b Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

nomad-storybook – ./ui/stories

nomad-storybook.hashicorp.vercel.app
nomad-storybook.vercel.app
nomad-storybook-git-master.hashicorp.vercel.app

@vercel
Copy link

@vercel vercel bot commented on 6a6547b Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

nomad-ui – ./ui

nomad-ui.vercel.app
nomad-ui-git-master.hashicorp.vercel.app
nomad-ui.hashicorp.vercel.app

@vercel
Copy link

@vercel vercel bot commented on 6a6547b Nov 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.