Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RPC retry logic in nomad client's rpc.go for blocking queries #9266

Merged
merged 2 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 74 additions & 12 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"net/rpc"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -46,19 +47,53 @@ func (c *Client) StreamingRpcHandler(method string) (structs.StreamingRpcHandler
return c.streamingRpcs.GetHandler(method)
}

// Given a type that is or eventually points to a concrete type with an embedded QueryOptions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These reflection-based functions are really here because we've removed the HasTimeout from the RPCInfo interface which was implemented by both the QueryOptions and the WriteRequest. It's been replaced only on the QueryOptions side by TimeToBlock. But it looks to me like all cases where we call TimeToBlock, we are testing for a non-0 result.

So couldn't we avoid this by having TimeToBlock on the RPCInfo interface, having WriteRequest implement it, but have WriteRequest always return 0? That seems cheaper to me if we can make it work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought about this but for some reason thought it was better to use reflection, maybe it was early in my thinking before it evolved a bit. Either way now I don't see any reason why not so I updated the PR.

// returns the value reference to that options. Otherwise returns reflect.Value{}
func getEmbeddedQueryOptsValue(arg interface{}) reflect.Value {
// Dereference interfaces or pointers for their concrete types
maybeOpts := reflect.ValueOf(arg)
for maybeOpts.Kind() == reflect.Ptr || maybeOpts.Kind() == reflect.Interface {
maybeOpts = maybeOpts.Elem()
}
return maybeOpts.FieldByName("QueryOptions")
}

// Given a type that is or eventually points to a concrete type with an embedded QueryOptions
// returns a copy of that QueryOptions.
func getEmbeddedQueryOpts(arg interface{}) (structs.QueryOptions, bool) {
if maybeOpts := getEmbeddedQueryOptsValue(arg); maybeOpts != (reflect.Value{}) {
return maybeOpts.Interface().(structs.QueryOptions), true
}
return structs.QueryOptions{}, false
}

// Sets the query options embedded in the concrete type backing the arg
// will panic if there isn't a query opts in there
func setEmbeddedQueryOpts(arg interface{}, opts structs.QueryOptions) {
getEmbeddedQueryOptsValue(arg).Set(reflect.ValueOf(opts))
}

// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Invoke the RPCHandler if it exists
if c.config.RPCHandler != nil {
return c.config.RPCHandler.RPC(method, args, reply)
}

// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
firstCheck := time.Now()
// We will try to automatically retry requests that fail due to things like server unavailability
// but instead of retrying forever, lets have a solid upper-bound
deadline := time.Now()

// A reasonable amount of time for leader election. Note when servers forward() our RPC requests
// to the leader they may also allow for an RPCHoldTimeout while waiting for leader election.
// That's OK, we won't double up because we are using it here not as a sleep but
// as a hint to give up
deadline = deadline.Add(c.config.RPCHoldTimeout)

// If its a blocking query, allow the time specified by the request
if opts, ok := getEmbeddedQueryOpts(args); ok {
deadline = deadline.Add(opts.TimeToBlock())
}

TRY:
server := c.servers.FindServer()
Expand All @@ -68,6 +103,7 @@ TRY:

// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)

if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
Expand All @@ -83,22 +119,48 @@ TRY:
// Move off to another server, and see if we can retry.
c.rpcLogger.Error("error performing RPC to server", "error", rpcErr, "rpc", method, "server", server.Addr)
c.servers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr, firstCheck, c.config.RPCHoldTimeout); !retry {

if !canRetry(args, rpcErr) {
c.rpcLogger.Error("error performing RPC to server which is not safe to automatically retry", "error", rpcErr, "rpc", method, "server", server.Addr)
return rpcErr
}
if time.Now().After(deadline) {
// Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example:
// a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry
// so before we give up on blocking queries make one last attempt for an immediate answer
if opts, ok := getEmbeddedQueryOpts(args); ok && opts.TimeToBlock() > 0 {
opts.MinQueryIndex = 0
opts.MaxQueryTime = 0
setEmbeddedQueryOpts(args, opts)
return c.RPC(method, args, reply)
}
c.rpcLogger.Error("error performing RPC to server, deadline exceeded, cannot retry", "error", rpcErr, "rpc", method, "server", server.Addr)
return rpcErr
}

// We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
// Wait to avoid thundering herd
select {
case <-time.After(jitter):
case <-time.After(lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)):
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
if opts, ok := getEmbeddedQueryOpts(args); ok && opts.TimeToBlock() > 0 {
opts.MaxQueryTime = deadline.Sub(time.Now())
// We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response
if opts.MaxQueryTime <= 0 {
opts.MinQueryIndex = 0
opts.MaxQueryTime = 0
}
setEmbeddedQueryOpts(args, opts)
return c.RPC(method, args, reply)
}

goto TRY
case <-c.shutdownCh:
}
return rpcErr
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.Duration) bool {
func canRetry(args interface{}, err error) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
Expand All @@ -108,7 +170,7 @@ func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) && !info.HasTimedOut(start, rpcHoldTimeout) {
if ok && info.IsRead() && lib.IsErrEOF(err) {
return true
}

Expand Down
7 changes: 1 addition & 6 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,7 @@ func (r *rpcHandler) blockingRPC(opts *blockingOptions) error {
goto RUN_QUERY
}

// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > structs.MaxBlockingRPCQueryTime {
opts.queryOpts.MaxQueryTime = structs.MaxBlockingRPCQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = structs.DefaultBlockingRPCQueryTime
}
opts.queryOpts.MaxQueryTime = opts.queryOpts.TimeToBlock()

// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction)
Expand Down
35 changes: 14 additions & 21 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"strings"
"time"

"github.com/hashicorp/consul/lib"
"github.com/hashicorp/cronexpr"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -230,7 +229,6 @@ type RPCInfo interface {
AllowStaleRead() bool
IsForwarded() bool
SetForwarded()
HasTimedOut(since time.Time, rpcHoldTimeout time.Duration) bool
}

// InternalRpcInfo allows adding internal RPC metadata to an RPC. This struct
Expand Down Expand Up @@ -285,6 +283,20 @@ type QueryOptions struct {
InternalRpcInfo
}

// TimeToBlock returns MaxQueryTime adjusted for maximums and defaults
// it will return 0 if this is not a blocking query
func (q QueryOptions) TimeToBlock() time.Duration {
if q.MinQueryIndex == 0 {
return 0
}
if q.MaxQueryTime > MaxBlockingRPCQueryTime {
return MaxBlockingRPCQueryTime
} else if q.MaxQueryTime <= 0 {
return DefaultBlockingRPCQueryTime
}
return q.MaxQueryTime
}

func (q QueryOptions) RequestRegion() string {
return q.Region
}
Expand All @@ -310,21 +322,6 @@ func (q QueryOptions) AllowStaleRead() bool {
return q.AllowStale
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
if q.MinQueryIndex > 0 {
// Restrict the max query time, and ensure there is always one
if q.MaxQueryTime > MaxBlockingRPCQueryTime {
q.MaxQueryTime = MaxBlockingRPCQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = DefaultBlockingRPCQueryTime
}
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)

return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout)
}
return time.Since(start) > rpcHoldTimeout
}

// AgentPprofRequest is used to request a pprof report for a given node.
type AgentPprofRequest struct {
// ReqType specifies the profile to use
Expand Down Expand Up @@ -411,10 +408,6 @@ func (w WriteRequest) AllowStaleRead() bool {
return false
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}

// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
Expand Down