Skip to content

Commit

Permalink
Merge pull request #1403 from hashicorp/f-hold-rpc
Browse files Browse the repository at this point in the history
Gracefully handle short lived outages by holding RPC calls
  • Loading branch information
dadgar committed Jul 12, 2016
2 parents 5798a23 + 33e655b commit 3f511eb
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 15 deletions.
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ type Config struct {

// ConsulConfig is this Agent's Consul configuration
ConsulConfig *config.ConsulConfig

// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
// This is used to paper over a loss of leadership by instead holding RPCs,
// so that the caller experiences a slow response rather than an error.
// This period is meant to be long enough for a leader election to take
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
}

// CheckVersion is used to check if the ProtocolVersion is valid
Expand Down Expand Up @@ -227,6 +234,7 @@ func DefaultConfig() *Config {
HeartbeatGrace: 10 * time.Second,
FailoverHeartbeatTTL: 300 * time.Second,
ConsulConfig: config.DefaultConsulConfig(),
RPCHoldTimeout: 5 * time.Second,
}

// Enable all known schedulers by default
Expand Down
56 changes: 48 additions & 8 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ const (

// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter
// the fraction. So 16 == 6.25% limit of jitter. This jitter is also
// applied to RPCHoldTimeout.
jitterFraction = 16

// Warn if the Raft command is larger than this.
Expand Down Expand Up @@ -175,6 +176,8 @@ func (s *Server) handleNomadConn(conn net.Conn) {
// forward is used to forward to a remote region or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
var firstCheck time.Time

region := info.RequestRegion()
if region == "" {
return true, fmt.Errorf("missing target RPC")
Expand All @@ -191,27 +194,64 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
return false, nil
}

// Handle leader forwarding
if !s.IsLeader() {
err := s.forwardLeader(method, args, reply)
CHECK_LEADER:
// Find the leader
isLeader, remoteServer := s.getLeader()

// Handle the case we are the leader
if isLeader {
return false, nil
}

// Handle the case of a known leader
if remoteServer != nil {
err := s.forwardLeader(remoteServer, method, args, reply)
return true, err
}
return false, nil

// Gate the request until there is a leader
if firstCheck.IsZero() {
firstCheck = time.Now()
}
if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout {
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
select {
case <-time.After(jitter):
goto CHECK_LEADER
case <-s.shutdownCh:
}
}

// No leader found and hold time exceeded
return true, structs.ErrNoLeader
}

// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
// getLeader returns if the current node is the leader, and if not
// then it returns the leader which is potentially nil if the cluster
// has not yet elected a leader.
func (s *Server) getLeader() (bool, *serverParts) {
// Check if we are the leader
if s.IsLeader() {
return true, nil
}

// Get the leader
leader := s.raft.Leader()
if leader == "" {
return structs.ErrNoLeader
return false, nil
}

// Lookup the server
s.peerLock.RLock()
server := s.localPeers[leader]
s.peerLock.RUnlock()

// Server could be nil
return false, server
}

// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(server *serverParts, method string, args interface{}, reply interface{}) error {
// Handle a missing server
if server == nil {
return structs.ErrNoLeader
Expand Down
29 changes: 22 additions & 7 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,30 @@ func TestRPC_forwardLeader(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

var out struct{}
err := s1.forwardLeader("Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
isLeader, remote := s1.getLeader()
if !isLeader && remote == nil {
t.Fatalf("missing leader")
}

err = s2.forwardLeader("Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
if remote != nil {
var out struct{}
err := s1.forwardLeader(remote, "Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
}

isLeader, remote = s2.getLeader()
if !isLeader && remote == nil {
t.Fatalf("missing leader")
}

if remote != nil {
var out struct{}
err := s2.forwardLeader(remote, "Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
}
}

Expand Down

0 comments on commit 3f511eb

Please sign in to comment.