Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Client handling of failed RPCs #4106

Merged
merged 8 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,31 @@ IMPROVEMENTS:
* cli: Clearer task event descriptions in `nomad alloc-status` when there are server side failures authenticating to Vault [[GH-3968](https://github.com/hashicorp/nomad/issues/3968)]
* client: Allow '.' in environment variable names [[GH-3760](https://github.com/hashicorp/nomad/issues/3760)]
* client: Refactor client fingerprint methods to a request/response format
* client: Improved handling of failed RPCs and heartbeat retry logic [[GH-4106](https://github.com/hashicorp/nomad/issues/4106)]
[[GH-3781](https://github.com/hashicorp/nomad/issues/3781)]
* discovery: Allow `check_restart` to be specified in the `service` stanza.
* discovery: Allow `check_restart` to be specified in the `service` stanza
[[GH-3718](https://github.com/hashicorp/nomad/issues/3718)]
* discovery: Allow configuring names of Nomad client and server health checks.
* discovery: Allow configuring names of Nomad client and server health checks
[[GH-4003](https://github.com/hashicorp/nomad/issues/4003)]
* discovery: Only log if Consul does not support TLSSkipVerify instead of
dropping checks which relied on it. Consul has had this feature since 0.7.2. [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)]
dropping checks which relied on it. Consul has had this feature since 0.7.2 [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)]
* driver/docker: Support hard CPU limits [[GH-3825](https://github.com/hashicorp/nomad/issues/3825)]
* driver/docker: Support advertising IPv6 addresses [[GH-3790](https://github.com/hashicorp/nomad/issues/3790)]
* driver/docker; Support overriding image entrypoint [[GH-3788](https://github.com/hashicorp/nomad/issues/3788)]
* driver/docker: Support adding or dropping capabilities [[GH-3754](https://github.com/hashicorp/nomad/issues/3754)]
* driver/docker: Support mounting root filesystem as read-only [[GH-3802](https://github.com/hashicorp/nomad/issues/3802)]
* driver/docker: Retry on Portworx "volume is attached on another node" errors.
* driver/docker: Retry on Portworx "volume is attached on another node" errors
[[GH-3993](https://github.com/hashicorp/nomad/issues/3993)]
* driver/lxc: Add volumes config to LXC driver [[GH-3687](https://github.com/hashicorp/nomad/issues/3687)]
* driver/rkt: Allow overriding group [[GH-3990](https://github.com/hashicorp/nomad/issues/3990)]
* telemetry: Support DataDog tags [[GH-3839](https://github.com/hashicorp/nomad/issues/3839)]
* ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance). [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation stats requests are made through the server instead of directly through clients. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: Allocation log requests fallback to using the server when the client can't be reached. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: All views poll for changes using long-polling via blocking queries. [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)]
* ui: Dispatch payload on the parameterized instance job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Periodic force launch button on the periodic job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation breadcrumbs now extend job breadcrumbs. [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)]
* ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance) [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation stats requests are made through the server instead of directly through clients [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: Allocation log requests fallback to using the server when the client can't be reached [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: All views poll for changes using long-polling via blocking queries [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)]
* ui: Dispatch payload on the parameterized instance job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Periodic force launch button on the periodic job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation breadcrumbs now extend job breadcrumbs [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)]
* vault: Allow Nomad to create orphaned tokens for allocations [[GH-3992](https://github.com/hashicorp/nomad/issues/3992)]

BUG FIXES:
Expand Down
89 changes: 69 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ type Client struct {
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
// rpcRetryCh is closed when there an event such as server discovery or a
// successful RPC occurring happens such that a retry should happen. Access
// should only occur via the getter method
rpcRetryCh chan struct{}
rpcRetryLock sync.Mutex

// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
Expand Down Expand Up @@ -217,7 +219,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
}

Expand Down Expand Up @@ -1154,7 +1155,7 @@ func (c *Client) registerAndHeartbeat() {

for {
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-heartbeat:
case <-c.shutdownCh:
return
Expand All @@ -1169,11 +1170,11 @@ func (c *Client) registerAndHeartbeat() {
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
intv := c.getHeartbeatRetryIntv(err)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)

// if heartbeating fails, trigger Consul discovery
// If heartbeating fails, trigger Consul discovery
c.triggerDiscovery()
}
} else {
Expand All @@ -1184,6 +1185,56 @@ func (c *Client) registerAndHeartbeat() {
}
}

// getHeartbeatRetryIntv is used to retrieve the time to wait before attempting
// another heartbeat.
func (c *Client) getHeartbeatRetryIntv(err error) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}

// Collect the useful heartbeat info
c.heartbeatLock.Lock()
haveHeartbeated := c.haveHeartbeated
last := c.lastHeartbeat
ttl := c.heartbeatTTL
c.heartbeatLock.Unlock()

// If we haven't even successfully heartbeated once or there is no leader
// treat it as a registration. In the case that there is a leadership loss,
// we will have our heartbeat timer reset to a much larger threshold, so
// do not put unnecessary pressure on the new leader.
if !haveHeartbeated || err == structs.ErrNoLeader {
return c.retryIntv(registerRetryIntv)
}

// Determine how much time we have left to heartbeat
left := last.Add(ttl).Sub(time.Now())

// Logic for retrying is:
// * Do not retry faster than once a second
// * Do not retry less that once every 30 seconds
// * If we have missed the heartbeat by more than 30 seconds, start to use
// the absolute time since we do not want to retry indefinitely
switch {
case left < -30*time.Second:
// Make left the absolute value so we delay and jitter properly.
left *= -1
case left < 0:
return time.Second + lib.RandomStagger(time.Second)
default:
}

stagger := lib.RandomStagger(left)
switch {
case stagger < time.Second:
return time.Second + lib.RandomStagger(time.Second)
case stagger > 30*time.Second:
return 25*time.Second + lib.RandomStagger(5*time.Second)
default:
return stagger
}
}

// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
Expand Down Expand Up @@ -1307,7 +1358,7 @@ func (c *Client) retryRegisterNode() {
c.logger.Printf("[ERR] client: registration failure: %v", err)
}
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
Expand Down Expand Up @@ -1567,7 +1618,7 @@ OUTER:
}
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
Expand Down Expand Up @@ -1622,7 +1673,7 @@ OUTER:
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
Expand Down Expand Up @@ -2085,18 +2136,16 @@ DISCOLOOP:
}

c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers)
c.servers.SetServers(nomadServers)

// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
for {
select {
case c.serversDiscoveredCh <- struct{}{}:
default:
return nil
}
// Fire the retry trigger if we have updated the set of servers.
if c.servers.SetServers(nomadServers) {
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
c.fireRpcRetryWatcher()
}

return nil
}

// emitStats collects host resource usage stats periodically
Expand Down
28 changes: 28 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ func TestClient_RPC(t *testing.T) {
})
}

func TestClient_RPC_FireRetryWatchers(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)
defer s1.Shutdown()

c1 := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()

watcher := c1.rpcRetryWatcher()

// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})

select {
case <-watcher:
default:
t.Fatal("watcher should be fired")
}
}

func TestClient_RPC_Passthrough(t *testing.T) {
t.Parallel()
s1, _ := testServer(t, nil)
Expand Down
25 changes: 25 additions & 0 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ TRY:
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
}

Expand Down Expand Up @@ -382,3 +383,27 @@ func (c *Client) Ping(srv net.Addr) error {
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
return err
}

// rpcRetryWatcher returns a channel that will be closed if an event happens
// such that we expect the next RPC to be successful.
func (c *Client) rpcRetryWatcher() <-chan struct{} {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()

if c.rpcRetryCh == nil {
c.rpcRetryCh = make(chan struct{})
}

return c.rpcRetryCh
}

// fireRpcRetryWatcher causes any RPC retryloops to retry their RPCs because we
// believe the will be successful.
func (c *Client) fireRpcRetryWatcher() {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()
if c.rpcRetryCh != nil {
close(c.rpcRetryCh)
c.rpcRetryCh = nil
}
}
55 changes: 53 additions & 2 deletions client/servers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"net"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -74,6 +75,16 @@ func (s *Server) String() string {
return s.addr
}

func (s *Server) Equal(o *Server) bool {
if s == nil && o == nil {
return true
} else if s == nil && o != nil || s != nil && o == nil {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be shortened to if s == nil || o == nil { return s == o }

h/t to @chelseakomlo


return s.Addr.String() == o.Addr.String() && s.DC == o.DC
}

type Servers []*Server

func (s Servers) String() string {
Expand Down Expand Up @@ -106,6 +117,32 @@ func (s Servers) shuffle() {
}
}

func (s Servers) Sort() {
sort.Slice(s, func(i, j int) bool {
a, b := s[i], s[j]
if addr1, addr2 := a.Addr.String(), b.Addr.String(); addr1 == addr2 {
return a.DC < b.DC
} else {
return addr1 < addr2
}
})
}

// Equal returns if the two server lists are equal, including the ordering.
func (s Servers) Equal(o Servers) bool {
if len(s) != len(o) {
return false
}

for i, v := range s {
if !v.Equal(o[i]) {
return false
}
}

return true
}

type Manager struct {
// servers is the list of all known Nomad servers.
servers Servers
Expand Down Expand Up @@ -157,10 +194,24 @@ func (m *Manager) Start() {
}
}

func (m *Manager) SetServers(servers Servers) {
// SetServers sets the servers and returns if the new server list is different
// than the existing server set
func (m *Manager) SetServers(servers Servers) bool {
m.Lock()
defer m.Unlock()

// Sort both the existing and incoming servers
servers.Sort()
m.servers.Sort()

// Determine if they are equal
equal := servers.Equal(m.servers)

// Randomize the incoming servers
servers.shuffle()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still want to shuffle if they're equal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah otherwise we will have deterministic ordering through Consul discovery.

m.servers = servers

return !equal
}

// FindServer returns a server to send an RPC too. If there are no servers, nil
Expand Down Expand Up @@ -204,7 +255,7 @@ func (m *Manager) NotifyFailedServer(s *Server) {
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, move the server to the end of the list.
if len(m.servers) > 1 && m.servers[0] == s {
if len(m.servers) > 1 && m.servers[0].Equal(s) {
m.servers.cycle()
}
}
Expand Down
Loading