Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RPC retry logic in nomad client's rpc.go for blocking queries #9266

Merged
merged 2 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return c.config.RPCHandler.RPC(method, args, reply)
}

// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
firstCheck := time.Now()
// We will try to automatically retry requests that fail due to things like server unavailability
// but instead of retrying forever, lets have a solid upper-bound
deadline := time.Now()

// A reasonable amount of time for leader election. Note when servers forward() our RPC requests
// to the leader they may also allow for an RPCHoldTimeout while waiting for leader election.
// That's OK, we won't double up because we are using it here not as a sleep but
// as a hint to give up
deadline = deadline.Add(c.config.RPCHoldTimeout)

// If its a blocking query, allow the time specified by the request
if info, ok := args.(structs.RPCInfo); ok {
deadline = deadline.Add(info.TimeToBlock())
}

TRY:
server := c.servers.FindServer()
Expand All @@ -68,6 +76,7 @@ TRY:

// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)

if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
Expand All @@ -83,22 +92,45 @@ TRY:
// Move off to another server, and see if we can retry.
c.rpcLogger.Error("error performing RPC to server", "error", rpcErr, "rpc", method, "server", server.Addr)
c.servers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr, firstCheck, c.config.RPCHoldTimeout); !retry {

if !canRetry(args, rpcErr) {
c.rpcLogger.Error("error performing RPC to server which is not safe to automatically retry", "error", rpcErr, "rpc", method, "server", server.Addr)
return rpcErr
}
if time.Now().After(deadline) {
// Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example:
// a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry
// so before we give up on blocking queries make one last attempt for an immediate answer
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
info.SetTimeToBlock(0)
return c.RPC(method, args, reply)
}
c.rpcLogger.Error("error performing RPC to server, deadline exceeded, cannot retry", "error", rpcErr, "rpc", method, "server", server.Addr)
return rpcErr
}

// We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
// Wait to avoid thundering herd
select {
case <-time.After(jitter):
case <-time.After(lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)):
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 {
newBlockTime := deadline.Sub(time.Now())
// We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response
if newBlockTime < 0 {
newBlockTime = 0
}
info.SetTimeToBlock(newBlockTime)
return c.RPC(method, args, reply)
}

goto TRY
case <-c.shutdownCh:
}
return rpcErr
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.Duration) bool {
func canRetry(args interface{}, err error) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
Expand All @@ -108,7 +140,7 @@ func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) && !info.HasTimedOut(start, rpcHoldTimeout) {
if ok && info.IsRead() && lib.IsErrEOF(err) {
return true
}

Expand Down
7 changes: 1 addition & 6 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,7 @@ func (r *rpcHandler) blockingRPC(opts *blockingOptions) error {
goto RUN_QUERY
}

// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > structs.MaxBlockingRPCQueryTime {
opts.queryOpts.MaxQueryTime = structs.MaxBlockingRPCQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = structs.DefaultBlockingRPCQueryTime
}
opts.queryOpts.MaxQueryTime = opts.queryOpts.TimeToBlock()

// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction)
Expand Down
51 changes: 30 additions & 21 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"strings"
"time"

"github.com/hashicorp/consul/lib"
"github.com/hashicorp/cronexpr"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -230,7 +229,11 @@ type RPCInfo interface {
AllowStaleRead() bool
IsForwarded() bool
SetForwarded()
HasTimedOut(since time.Time, rpcHoldTimeout time.Duration) bool
TimeToBlock() time.Duration
// TimeToBlock sets how long this request can block. The requested time may not be possible,
// so Callers should readback TimeToBlock. E.g. you cannot set time to block at all on WriteRequests
// and it cannot exceed MaxBlockingRPCQueryTime
SetTimeToBlock(t time.Duration)
}

// InternalRpcInfo allows adding internal RPC metadata to an RPC. This struct
Expand Down Expand Up @@ -285,6 +288,24 @@ type QueryOptions struct {
InternalRpcInfo
}

// TimeToBlock returns MaxQueryTime adjusted for maximums and defaults
// it will return 0 if this is not a blocking query
func (q QueryOptions) TimeToBlock() time.Duration {
if q.MinQueryIndex == 0 {
return 0
}
if q.MaxQueryTime > MaxBlockingRPCQueryTime {
return MaxBlockingRPCQueryTime
} else if q.MaxQueryTime <= 0 {
return DefaultBlockingRPCQueryTime
}
return q.MaxQueryTime
}

func (q QueryOptions) SetTimeToBlock(t time.Duration) {
q.MaxQueryTime = t
}

func (q QueryOptions) RequestRegion() string {
return q.Region
}
Expand All @@ -310,21 +331,6 @@ func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
if q.MinQueryIndex > 0 {
// Restrict the max query time, and ensure there is always one
if q.MaxQueryTime > MaxBlockingRPCQueryTime {
q.MaxQueryTime = MaxBlockingRPCQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = DefaultBlockingRPCQueryTime
}
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)

return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout)
}
return time.Since(start) > rpcHoldTimeout
}

// AgentPprofRequest is used to request a pprof report for a given node.
type AgentPprofRequest struct {
// ReqType specifies the profile to use
Expand Down Expand Up @@ -385,6 +391,13 @@ type WriteRequest struct {
InternalRpcInfo
}

func (w WriteRequest) TimeToBlock() time.Duration {
return 0
}

func (w WriteRequest) SetTimeToBlock(_ time.Duration) {
}

func (w WriteRequest) RequestRegion() string {
// The target region for this request
return w.Region
Expand All @@ -411,10 +424,6 @@ func (w WriteRequest) AllowStaleRead() bool {
return false
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}

// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
Expand Down