Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle short lived outages by holding RPC calls #1403

Merged
merged 2 commits into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ type Config struct {

// ConsulConfig is this Agent's Consul configuration
ConsulConfig *config.ConsulConfig

// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
// This is used to paper over a loss of leadership by instead holding RPCs,
// so that the caller experiences a slow response rather than an error.
// This period is meant to be long enough for a leader election to take
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
Copy link
Contributor

@diptanu diptanu Jul 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armon Don't we want this to be configurable using the server config? We will have to add the key in ServerConfig if we want operators to be able to modify the value.

}

// CheckVersion is used to check if the ProtocolVersion is valid
Expand Down Expand Up @@ -227,6 +234,7 @@ func DefaultConfig() *Config {
HeartbeatGrace: 10 * time.Second,
FailoverHeartbeatTTL: 300 * time.Second,
ConsulConfig: config.DefaultConsulConfig(),
RPCHoldTimeout: 5 * time.Second,
}

// Enable all known schedulers by default
Expand Down
56 changes: 48 additions & 8 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ const (

// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter
// the fraction. So 16 == 6.25% limit of jitter. This jitter is also
// applied to RPCHoldTimeout.
jitterFraction = 16

// Warn if the Raft command is larger than this.
Expand Down Expand Up @@ -175,6 +176,8 @@ func (s *Server) handleNomadConn(conn net.Conn) {
// forward is used to forward to a remote region or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
var firstCheck time.Time

region := info.RequestRegion()
if region == "" {
return true, fmt.Errorf("missing target RPC")
Expand All @@ -191,27 +194,64 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
return false, nil
}

// Handle leader forwarding
if !s.IsLeader() {
err := s.forwardLeader(method, args, reply)
CHECK_LEADER:
// Find the leader
isLeader, remoteServer := s.getLeader()

// Handle the case we are the leader
if isLeader {
return false, nil
}

// Handle the case of a known leader
if remoteServer != nil {
err := s.forwardLeader(remoteServer, method, args, reply)
return true, err
}
return false, nil

// Gate the request until there is a leader
if firstCheck.IsZero() {
firstCheck = time.Now()
}
if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout {
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
select {
case <-time.After(jitter):
goto CHECK_LEADER
case <-s.shutdownCh:
}
}

// No leader found and hold time exceeded
return true, structs.ErrNoLeader
}

// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
// getLeader returns if the current node is the leader, and if not
// then it returns the leader which is potentially nil if the cluster
// has not yet elected a leader.
func (s *Server) getLeader() (bool, *serverParts) {
// Check if we are the leader
if s.IsLeader() {
return true, nil
}

// Get the leader
leader := s.raft.Leader()
if leader == "" {
return structs.ErrNoLeader
return false, nil
}

// Lookup the server
s.peerLock.RLock()
server := s.localPeers[leader]
s.peerLock.RUnlock()

// Server could be nil
return false, server
}

// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(server *serverParts, method string, args interface{}, reply interface{}) error {
// Handle a missing server
if server == nil {
return structs.ErrNoLeader
Expand Down
29 changes: 22 additions & 7 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,30 @@ func TestRPC_forwardLeader(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

var out struct{}
err := s1.forwardLeader("Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
isLeader, remote := s1.getLeader()
if !isLeader && remote == nil {
t.Fatalf("missing leader")
}

err = s2.forwardLeader("Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
if remote != nil {
var out struct{}
err := s1.forwardLeader(remote, "Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
}

isLeader, remote = s2.getLeader()
if !isLeader && remote == nil {
t.Fatalf("missing leader")
}

if remote != nil {
var out struct{}
err := s2.forwardLeader(remote, "Status.Ping", struct{}{}, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
}
}

Expand Down