diff --git a/client/client.go b/client/client.go index dc45e3a38b5e..6334e3c2aa73 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,7 @@ import ( "net/rpc" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -23,6 +24,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/servers" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" @@ -111,8 +113,8 @@ type Client struct { connPool *pool.ConnPool - // servers is the (optionally prioritized) list of nomad servers - servers *serverlist + // servers is the list of nomad servers + servers *servers.Manager // heartbeat related times for tracking how often to heartbeat lastHeartbeat time.Time @@ -198,11 +200,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic allocs: make(map[string]*AllocRunner), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), - servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), } + // Initialize the server manager + c.servers = servers.New(c.logger, c.shutdownCh, c) + // Initialize the client if err := c.init(); err != nil { return nil, fmt.Errorf("failed to initialize client: %v", err) @@ -268,7 +272,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // Setup Consul discovery if enabled if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin { go c.consulDiscovery() - if len(c.servers.all()) == 0 { + if c.servers.NumServers() == 0 { // No configured servers; trigger discovery manually c.triggerDiscoveryCh <- struct{}{} } @@ -463,7 +467,7 @@ func (c *Client) Stats() map[string]map[string]string { stats := map[string]map[string]string{ "client": { "node_id": c.NodeID(), - "known_servers": c.servers.all().String(), + "known_servers": strings.Join(c.GetServers(), ","), "num_allocations": strconv.Itoa(c.NumAllocs()), "last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)), "heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL), @@ -548,32 +552,49 @@ func (c *Client) GetClientAlloc(allocID string) (*structs.Allocation, error) { // GetServers returns the list of nomad servers this client is aware of. func (c *Client) GetServers() []string { - endpoints := c.servers.all() + endpoints := c.servers.GetServers() res := make([]string, len(endpoints)) for i := range endpoints { - res[i] = endpoints[i].addr.String() + res[i] = endpoints[i].String() } + sort.Strings(res) return res } // SetServers sets a new list of nomad servers to connect to. As long as one // server is resolvable no error is returned. -func (c *Client) SetServers(servers []string) error { - endpoints := make([]*endpoint, 0, len(servers)) +func (c *Client) SetServers(in []string) error { + var mu sync.Mutex + var wg sync.WaitGroup var merr multierror.Error - for _, s := range servers { - addr, err := resolveServer(s) - if err != nil { - c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", s, err) - merr.Errors = append(merr.Errors, err) - continue - } - // Valid endpoint, append it without a priority as this API - // doesn't support different priorities for different servers - endpoints = append(endpoints, &endpoint{name: s, addr: addr}) + endpoints := make([]*servers.Server, 0, len(in)) + wg.Add(len(in)) + + for _, s := range in { + go func(srv string) { + defer wg.Done() + addr, err := resolveServer(srv) + if err != nil { + c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", srv, err) + merr.Errors = append(merr.Errors, err) + return + } + + // Try to ping to check if it is a real server + if err := c.Ping(addr); err != nil { + merr.Errors = append(merr.Errors, fmt.Errorf("Server at address %s failed ping: %v", addr, err)) + return + } + + mu.Lock() + endpoints = append(endpoints, &servers.Server{Addr: addr}) + mu.Unlock() + }(s) } + wg.Wait() + // Only return errors if no servers are valid if len(endpoints) == 0 { if len(merr.Errors) > 0 { @@ -582,7 +603,7 @@ func (c *Client) SetServers(servers []string) error { return noServersErr } - c.servers.set(endpoints) + c.servers.SetServers(endpoints) return nil } @@ -1228,26 +1249,25 @@ func (c *Client) updateNodeStatus() error { } } - // Convert []*NodeServerInfo to []*endpoints - localdc := c.Datacenter() - servers := make(endpoints, 0, len(resp.Servers)) + // Update the number of nodes in the cluster so we can adjust our server + // rebalance rate. + c.servers.SetNumNodes(resp.NumNodes) + + // Convert []*NodeServerInfo to []*servers.Server + nomadServers := make([]*servers.Server, 0, len(resp.Servers)) for _, s := range resp.Servers { addr, err := resolveServer(s.RPCAdvertiseAddr) if err != nil { c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err) continue } - e := endpoint{name: s.RPCAdvertiseAddr, addr: addr} - if s.Datacenter != localdc { - // server is non-local; de-prioritize - e.priority = 1 - } - servers = append(servers, &e) + e := &servers.Server{DC: s.Datacenter, Addr: addr} + nomadServers = append(nomadServers, e) } - if len(servers) == 0 { - return fmt.Errorf("server returned no valid servers") + if len(nomadServers) == 0 { + return fmt.Errorf("heartbeat response returned no valid servers") } - c.servers.set(servers) + c.servers.SetServers(nomadServers) // Begin polling Consul if there is no Nomad leader. We could be // heartbeating to a Nomad server that is in the minority of a @@ -1840,7 +1860,7 @@ func (c *Client) consulDiscoveryImpl() error { serviceName := c.configCopy.ConsulConfig.ServerServiceName var mErr multierror.Error - var servers endpoints + var nomadServers servers.Servers c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) DISCOLOOP: for _, dc := range dcs { @@ -1880,22 +1900,23 @@ DISCOLOOP: if err != nil { mErr.Errors = append(mErr.Errors, err) } - servers = append(servers, &endpoint{name: p, addr: addr}) + srv := &servers.Server{Addr: addr} + nomadServers = append(nomadServers, srv) } - if len(servers) > 0 { + if len(nomadServers) > 0 { break DISCOLOOP } } } - if len(servers) == 0 { + if len(nomadServers) == 0 { if len(mErr.Errors) > 0 { return mErr.ErrorOrNil() } return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs) } - c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers) - c.servers.set(servers) + c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers) + c.servers.SetServers(nomadServers) // Notify waiting rpc calls. If a goroutine just failed an RPC call and // isn't receiving on this chan yet they'll still retry eventually. diff --git a/client/client_test.go b/client/client_test.go index 53265962168e..2b93342016da 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -904,3 +904,27 @@ func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) { assert.Equal(c.ValidateMigrateToken("", ""), true) } + +// TestClient_ServerList tests client methods that interact with the internal +// nomad server list. +func TestClient_ServerList(t *testing.T) { + t.Parallel() + client := TestClient(t, func(c *config.Config) {}) + + if s := client.GetServers(); len(s) != 0 { + t.Fatalf("expected server lit to be empty but found: %+q", s) + } + if err := client.SetServers(nil); err != noServersErr { + t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err) + } + if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil { + t.Fatalf("expected setting a bad server to return an error") + } + if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil { + t.Fatalf("expected setting at least one good server to succeed but received: %v", err) + } + s := client.GetServers() + if len(s) != 0 { + t.Fatalf("expected 2 servers but received: %+q", s) + } +} diff --git a/client/config/config.go b/client/config/config.go index 1f89e4033c6b..6f56a61fdf0b 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -195,6 +195,13 @@ type Config struct { // BackwardsCompatibleMetrics determines whether to show methods of // displaying metrics for older verions, or to only show the new format BackwardsCompatibleMetrics bool + + // RPCHoldTimeout is how long an RPC can be "held" before it is errored. + // This is used to paper over a loss of leadership by instead holding RPCs, + // so that the caller experiences a slow response rather than an error. + // This period is meant to be long enough for a leader election to take + // place, and a small jitter is applied to avoid a thundering herd. + RPCHoldTimeout time.Duration } func (c *Config) Copy() *Config { @@ -228,6 +235,7 @@ func DefaultConfig() *Config { NoHostUUID: true, DisableTaggedMetrics: false, BackwardsCompatibleMetrics: false, + RPCHoldTimeout: 5 * time.Second, } } diff --git a/client/rpc.go b/client/rpc.go index 8fa24c495fc3..231e1dbb4ab4 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -1,16 +1,17 @@ package client import ( - "fmt" "io" "net" "net/rpc" "strings" + "time" metrics "github.com/armon/go-metrics" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/yamux" ) @@ -39,26 +40,60 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return c.config.RPCHandler.RPC(method, args, reply) } - servers := c.servers.all() - if len(servers) == 0 { + // This is subtle but we start measuring the time on the client side + // right at the time of the first request, vs. on the first retry as + // is done on the server side inside forward(). This is because the + // servers may already be applying the RPCHoldTimeout up there, so by + // starting the timer here we won't potentially double up the delay. + firstCheck := time.Now() + +TRY: + server := c.servers.FindServer() + if server == nil { return noServersErr } - var mErr multierror.Error - for _, s := range servers { - // Make the RPC request - if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil { - errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err) - mErr.Errors = append(mErr.Errors, errmsg) - c.logger.Printf("[DEBUG] client: %v", errmsg) - c.servers.failed(s) - continue - } - c.servers.good(s) + // Make the request. + rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply) + if rpcErr == nil { return nil } - return mErr.ErrorOrNil() + // Move off to another server, and see if we can retry. + c.logger.Printf("[ERR] nomad: %q RPC failed to server %s: %v", method, server.Addr, rpcErr) + c.servers.NotifyFailedServer(server) + if retry := canRetry(args, rpcErr); !retry { + return rpcErr + } + + // We can wait a bit and retry! + if time.Since(firstCheck) < c.config.RPCHoldTimeout { + jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) + select { + case <-time.After(jitter): + goto TRY + case <-c.shutdownCh: + } + } + return rpcErr +} + +// canRetry returns true if the given situation is safe for a retry. +func canRetry(args interface{}, err error) bool { + // No leader errors are always safe to retry since no state could have + // been changed. + if structs.IsErrNoLeader(err) { + return true + } + + // Reads are safe to retry for stream errors, such as if a server was + // being shut down. + info, ok := args.(structs.RPCInfo) + if ok && info.IsRead() && lib.IsErrEOF(err) { + return true + } + + return false } // setupClientRpc is used to setup the Client's RPC endpoints @@ -159,3 +194,11 @@ func resolveServer(s string) (net.Addr, error) { } return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)) } + +// Ping is used to ping a particular server and returns whether it is healthy or +// a potential error. +func (c *Client) Ping(srv net.Addr) error { + var reply struct{} + err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply) + return err +} diff --git a/client/serverlist.go b/client/serverlist.go deleted file mode 100644 index 87aec05e61f5..000000000000 --- a/client/serverlist.go +++ /dev/null @@ -1,111 +0,0 @@ -package client - -import ( - "math/rand" - "net" - "sort" - "strings" - "sync" -) - -// serverlist is a prioritized randomized list of nomad servers. Users should -// call all() to retrieve the full list, followed by failed(e) on each endpoint -// that's failed and good(e) when a valid endpoint is found. -type serverlist struct { - e endpoints - mu sync.RWMutex -} - -func newServerList() *serverlist { - return &serverlist{} -} - -// set the server list to a new list. The new list will be shuffled and sorted -// by priority. -func (s *serverlist) set(in endpoints) { - s.mu.Lock() - s.e = in - s.mu.Unlock() -} - -// all returns a copy of the full server list, shuffled and then sorted by -// priority -func (s *serverlist) all() endpoints { - s.mu.RLock() - out := make(endpoints, len(s.e)) - copy(out, s.e) - s.mu.RUnlock() - - // Randomize the order - for i, j := range rand.Perm(len(out)) { - out[i], out[j] = out[j], out[i] - } - - // Sort by priority - sort.Sort(out) - return out -} - -// failed endpoint will be deprioritized if its still in the list. -func (s *serverlist) failed(e *endpoint) { - s.mu.Lock() - defer s.mu.Unlock() - for _, cur := range s.e { - if cur.equal(e) { - cur.priority++ - return - } - } -} - -// good endpoint will get promoted to the highest priority if it's still in the -// list. -func (s *serverlist) good(e *endpoint) { - s.mu.Lock() - defer s.mu.Unlock() - for _, cur := range s.e { - if cur.equal(e) { - cur.priority = 0 - return - } - } -} - -func (e endpoints) Len() int { - return len(e) -} - -func (e endpoints) Less(i int, j int) bool { - // Sort only by priority as endpoints should be shuffled and ordered - // only by priority - return e[i].priority < e[j].priority -} - -func (e endpoints) Swap(i int, j int) { - e[i], e[j] = e[j], e[i] -} - -type endpoints []*endpoint - -func (e endpoints) String() string { - names := make([]string, 0, len(e)) - for _, endpoint := range e { - names = append(names, endpoint.name) - } - return strings.Join(names, ",") -} - -type endpoint struct { - name string - addr net.Addr - - // 0 being the highest priority - priority int -} - -// equal returns true if the name and addr match between two endpoints. -// Priority is ignored because the same endpoint may be added by discovery and -// heartbeating with different priorities. -func (e *endpoint) equal(o *endpoint) bool { - return e.name == o.name && e.addr == o.addr -} diff --git a/client/serverlist_test.go b/client/serverlist_test.go deleted file mode 100644 index e23ef4b7fe51..000000000000 --- a/client/serverlist_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package client - -import ( - "log" - "os" - "strings" - "testing" -) - -func TestServerList(t *testing.T) { - t.Parallel() - s := newServerList() - - // New lists should be empty - if e := s.all(); len(e) != 0 { - t.Fatalf("expected empty list to return an empty list, but received: %+q", e) - } - - mklist := func() endpoints { - return endpoints{ - &endpoint{"b", nil, 1}, - &endpoint{"c", nil, 1}, - &endpoint{"g", nil, 2}, - &endpoint{"d", nil, 1}, - &endpoint{"e", nil, 1}, - &endpoint{"f", nil, 1}, - &endpoint{"h", nil, 2}, - &endpoint{"a", nil, 0}, - } - } - s.set(mklist()) - - orig := mklist() - all := s.all() - if len(all) != len(orig) { - t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all)) - } - - // Assert list is properly randomized+sorted - for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} { - if all[i].priority != pri { - t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri) - } - } - - // Subsequent sets should reshuffle (try multiple times as they may - // shuffle in the same order) - tries := 0 - max := 3 - for ; tries < max; tries++ { - if s.all().String() == s.all().String() { - // eek, matched; try again in case we just got unlucky - continue - } - break - } - if tries == max { - t.Fatalf("after %d attempts servers were still not random reshuffled", tries) - } - - // Mark an endpoint as failed enough that it should be at the end of the list - sa := &endpoint{"a", nil, 0} - s.failed(sa) - s.failed(sa) - s.failed(sa) - all2 := s.all() - if len(all2) != len(orig) { - t.Fatalf("marking should not have changed list length") - } - if all2[len(all)-1].name != sa.name { - t.Fatalf("failed endpoint should be at end of list: %+q", all2) - } - - // But if the bad endpoint succeeds even once it should be bumped to the top group - s.good(sa) - found := false - for _, e := range s.all() { - if e.name == sa.name { - if e.priority != 0 { - t.Fatalf("server newly marked good should have highest priority") - } - found = true - } - } - if !found { - t.Fatalf("what happened to endpoint A?!") - } -} - -// TestClient_ServerList tests client methods that interact with the internal -// nomad server list. -func TestClient_ServerList(t *testing.T) { - t.Parallel() - // manually create a mostly empty client to avoid spinning up a ton of - // goroutines that complicate testing - client := Client{servers: newServerList(), logger: log.New(os.Stderr, "", log.Ltime|log.Lshortfile)} - - if s := client.GetServers(); len(s) != 0 { - t.Fatalf("expected server lit to be empty but found: %+q", s) - } - if err := client.SetServers(nil); err != noServersErr { - t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err) - } - if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil { - t.Fatalf("expected setting a bad server to return an error") - } - if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err != nil { - t.Fatalf("expected setting at least one good server to succeed but received: %v", err) - } - s := client.GetServers() - if len(s) != 2 { - t.Fatalf("expected 2 servers but received: %+q", s) - } - for _, host := range s { - if !strings.HasPrefix(host, "127.0.0.1:") { - t.Errorf("expected both servers to be localhost and include port but found: %s", host) - } - } -} diff --git a/client/servers/manager.go b/client/servers/manager.go new file mode 100644 index 000000000000..604b109a2cef --- /dev/null +++ b/client/servers/manager.go @@ -0,0 +1,308 @@ +// Package servers provides an interface for choosing Servers to communicate +// with from a Nomad Client perspective. The package does not provide any API +// guarantees and should be called only by `hashicorp/nomad`. +package servers + +import ( + "log" + "math/rand" + "net" + "strings" + "sync" + "time" + + "github.com/hashicorp/consul/lib" +) + +const ( + // clientRPCMinReuseDuration controls the minimum amount of time RPC + // queries are sent over an established connection to a single server + clientRPCMinReuseDuration = 5 * time.Minute + + // Limit the number of new connections a server receives per second + // for connection rebalancing. This limit caps the load caused by + // continual rebalancing efforts when a cluster is in equilibrium. A + // lower value comes at the cost of increased recovery time after a + // partition. This parameter begins to take effect when there are + // more than ~48K clients querying 5x servers or at lower server + // values when there is a partition. + // + // For example, in a 100K Nomad cluster with 5x servers, it will + // take ~5min for all servers to rebalance their connections. If + // 99,995 agents are in the minority talking to only one server, it + // will take ~26min for all servers to rebalance. A 10K cluster in + // the same scenario will take ~2.6min to rebalance. + newRebalanceConnsPerSecPerServer = 64 +) + +// Pinger is an interface for pinging a server to see if it is healthy. +type Pinger interface { + Ping(addr net.Addr) error +} + +// Server contains the address of a server and metadata that can be used for +// choosing a server to contact. +type Server struct { + // Addr is the resolved address of the server + Addr net.Addr + addr string + sync.Mutex + + // DC is the datacenter of the server + DC string +} + +func (s *Server) Copy() *Server { + s.Lock() + defer s.Unlock() + + return &Server{ + Addr: s.Addr, + addr: s.addr, + DC: s.DC, + } +} + +func (s *Server) String() string { + s.Lock() + defer s.Unlock() + + if s.addr == "" { + s.addr = s.Addr.String() + } + + return s.addr +} + +type Servers []*Server + +func (s Servers) String() string { + addrs := make([]string, 0, len(s)) + for _, srv := range s { + addrs = append(addrs, srv.String()) + } + return strings.Join(addrs, ",") +} + +// cycle cycles a list of servers in-place +func (s Servers) cycle() { + numServers := len(s) + if numServers < 2 { + return // No action required + } + + start := s[0] + for i := 1; i < numServers; i++ { + s[i-1] = s[i] + } + s[numServers-1] = start +} + +// removeServerByKey performs an inline removal of the first matching server +func (s Servers) removeServerByKey(targetKey string) { + for i, srv := range s { + if targetKey == srv.String() { + copy(s[i:], s[i+1:]) + s[len(s)-1] = nil + s = s[:len(s)-1] + return + } + } +} + +// shuffle shuffles the server list in place +func (s Servers) shuffle() { + for i := len(s) - 1; i > 0; i-- { + j := rand.Int31n(int32(i + 1)) + s[i], s[j] = s[j], s[i] + } +} + +type Manager struct { + // servers is the list of all known Nomad servers. + servers Servers + + // rebalanceTimer controls the duration of the rebalance interval + rebalanceTimer *time.Timer + + // shutdownCh is a copy of the channel in Nomad.Client + shutdownCh chan struct{} + + logger *log.Logger + + // numNodes is used to estimate the approximate number of nodes in + // a cluster and limit the rate at which it rebalances server + // connections. This should be read and set using atomic. + numNodes int32 + + // connPoolPinger is used to test the health of a server in the connection + // pool. Pinger is an interface that wraps client.ConnPool. + connPoolPinger Pinger + + sync.Mutex +} + +// New is the only way to safely create a new Manager struct. +func New(logger *log.Logger, shutdownCh chan struct{}, connPoolPinger Pinger) (m *Manager) { + return &Manager{ + logger: logger, + connPoolPinger: connPoolPinger, + rebalanceTimer: time.NewTimer(clientRPCMinReuseDuration), + shutdownCh: shutdownCh, + } +} + +// Start is used to start and manage the task of automatically shuffling and +// rebalancing the list of Nomad servers in order to distribute load across +// all known and available Nomad servers. +func (m *Manager) Start() { + for { + select { + case <-m.rebalanceTimer.C: + m.RebalanceServers() + m.refreshServerRebalanceTimer() + + case <-m.shutdownCh: + m.logger.Printf("[DEBUG] manager: shutting down") + return + } + } +} + +func (m *Manager) SetServers(servers Servers) { + m.Lock() + defer m.Unlock() + m.servers = servers +} + +// FindServer returns a server to send an RPC too. If there are no servers, nil +// is returned. +func (m *Manager) FindServer() *Server { + m.Lock() + defer m.Unlock() + + if len(m.servers) == 0 { + m.logger.Printf("[WARN] manager: No servers available") + return nil + } + + // Return whatever is at the front of the list because it is + // assumed to be the oldest in the server list (unless - + // hypothetically - the server list was rotated right after a + // server was added). + return m.servers[0] +} + +// NumNodes returns the number of approximate nodes in the cluster. +func (m *Manager) NumNodes() int32 { + m.Lock() + defer m.Unlock() + return m.numNodes +} + +// SetNumNodes stores the number of approximate nodes in the cluster. +func (m *Manager) SetNumNodes(n int32) { + m.Lock() + defer m.Unlock() + m.numNodes = n +} + +// NotifyFailedServer marks the passed in server as "failed" by rotating it +// to the end of the server list. +func (m *Manager) NotifyFailedServer(s *Server) { + m.Lock() + defer m.Unlock() + + // If the server being failed is not the first server on the list, + // this is a noop. If, however, the server is failed and first on + // the list, move the server to the end of the list. + if len(m.servers) > 1 && m.servers[0] == s { + m.servers.cycle() + } +} + +// NumServers returns the total number of known servers whether healthy or not. +func (m *Manager) NumServers() int { + m.Lock() + defer m.Unlock() + return len(m.servers) +} + +// GetServers returns a copy of the current list of servers. +func (m *Manager) GetServers() Servers { + m.Lock() + defer m.Unlock() + + copy := make([]*Server, 0, len(m.servers)) + for _, s := range m.servers { + copy = append(copy, s.Copy()) + } + + return copy +} + +// RebalanceServers shuffles the order in which Servers will be contacted. The +// function will shuffle the set of potential servers to contact and then attempt +// to contact each server. If a server successfully responds it is used, otherwise +// it is rotated such that it will be the last attempted server. +func (m *Manager) RebalanceServers() { + // Shuffle servers so we have a chance of picking a new one. + servers := m.GetServers() + servers.shuffle() + + // Iterate through the shuffled server list to find an assumed + // healthy server. NOTE: Do not iterate on the list directly because + // this loop mutates the server list in-place. + var foundHealthyServer bool + for i := 0; i < len(m.servers); i++ { + // Always test the first server. Failed servers are cycled + // while Serf detects the node has failed. + srv := servers[0] + + err := m.connPoolPinger.Ping(srv.Addr) + if err == nil { + foundHealthyServer = true + break + } + m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, srv, err) + + servers.cycle() + } + + if !foundHealthyServer { + m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance") + return + } + + // Save the servers + m.Lock() + m.servers = servers + m.Unlock() +} + +// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires. +func (m *Manager) refreshServerRebalanceTimer() time.Duration { + m.Lock() + defer m.Unlock() + numServers := len(m.servers) + + // Limit this connection's life based on the size (and health) of the + // cluster. Never rebalance a connection more frequently than + // connReuseLowWatermarkDuration, and make sure we never exceed + // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. + clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer) + + connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, clientRPCMinReuseDuration, int(m.numNodes)) + connRebalanceTimeout += lib.RandomStagger(connRebalanceTimeout) + + m.rebalanceTimer.Reset(connRebalanceTimeout) + return connRebalanceTimeout +} + +// ResetRebalanceTimer resets the rebalance timer. This method exists for +// testing and should not be used directly. +func (m *Manager) ResetRebalanceTimer() { + m.Lock() + defer m.Unlock() + m.rebalanceTimer.Reset(clientRPCMinReuseDuration) +} diff --git a/client/servers/manager_internal_test.go b/client/servers/manager_internal_test.go new file mode 100644 index 000000000000..e6ad03bb3c6e --- /dev/null +++ b/client/servers/manager_internal_test.go @@ -0,0 +1,158 @@ +package servers + +import ( + "fmt" + "log" + "math/rand" + "net" + "os" + "testing" + "time" +) + +func init() { + // Seed the random number generator + rand.Seed(time.Now().UnixNano()) +} + +type fauxAddr struct { + Addr string +} + +func (fa *fauxAddr) String() string { return fa.Addr } +func (fa *fauxAddr) Network() string { return fa.Addr } + +type fauxConnPool struct { + // failPct between 0.0 and 1.0 == pct of time a Ping should fail + failPct float64 +} + +func (cp *fauxConnPool) Ping(net.Addr) error { + successProb := rand.Float64() + if successProb > cp.failPct { + return nil + } + return fmt.Errorf("bad server") +} + +func testManager(t *testing.T) (m *Manager) { + logger := log.New(os.Stderr, "", 0) + shutdownCh := make(chan struct{}) + m = New(logger, shutdownCh, &fauxConnPool{}) + return m +} + +func testManagerFailProb(failPct float64) (m *Manager) { + logger := log.New(os.Stderr, "", 0) + shutdownCh := make(chan struct{}) + m = New(logger, shutdownCh, &fauxConnPool{failPct: failPct}) + return m +} + +func TestManagerInternal_cycleServer(t *testing.T) { + server0 := &Server{Addr: &fauxAddr{"server1"}} + server1 := &Server{Addr: &fauxAddr{"server2"}} + server2 := &Server{Addr: &fauxAddr{"server3"}} + srvs := Servers([]*Server{server0, server1, server2}) + + srvs.cycle() + if len(srvs) != 3 { + t.Fatalf("server length incorrect: %d/3", len(srvs)) + } + if srvs[0] != server1 && + srvs[1] != server2 && + srvs[2] != server0 { + t.Fatalf("server ordering after one cycle not correct") + } + + srvs.cycle() + if srvs[0] != server2 && + srvs[1] != server0 && + srvs[2] != server1 { + t.Fatalf("server ordering after two cycles not correct") + } + + srvs.cycle() + if srvs[0] != server0 && + srvs[1] != server1 && + srvs[2] != server2 { + t.Fatalf("server ordering after three cycles not correct") + } +} + +func TestManagerInternal_New(t *testing.T) { + m := testManager(t) + if m == nil { + t.Fatalf("Manager nil") + } + + if m.logger == nil { + t.Fatalf("Manager.logger nil") + } + + if m.shutdownCh == nil { + t.Fatalf("Manager.shutdownCh nil") + } +} + +// func (l *serverList) refreshServerRebalanceTimer() { +func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) { + type clusterSizes struct { + numNodes int32 + numServers int + minRebalance time.Duration + } + clusters := []clusterSizes{ + {1, 0, 5 * time.Minute}, // partitioned cluster + {1, 3, 5 * time.Minute}, + {2, 3, 5 * time.Minute}, + {100, 0, 5 * time.Minute}, // partitioned + {100, 1, 5 * time.Minute}, // partitioned + {100, 3, 5 * time.Minute}, + {1024, 1, 5 * time.Minute}, // partitioned + {1024, 3, 5 * time.Minute}, // partitioned + {1024, 5, 5 * time.Minute}, + {16384, 1, 4 * time.Minute}, // partitioned + {16384, 2, 5 * time.Minute}, // partitioned + {16384, 3, 5 * time.Minute}, // partitioned + {16384, 5, 5 * time.Minute}, + {32768, 0, 5 * time.Minute}, // partitioned + {32768, 1, 8 * time.Minute}, // partitioned + {32768, 2, 3 * time.Minute}, // partitioned + {32768, 3, 5 * time.Minute}, // partitioned + {32768, 5, 3 * time.Minute}, // partitioned + {65535, 7, 5 * time.Minute}, + {65535, 0, 5 * time.Minute}, // partitioned + {65535, 1, 8 * time.Minute}, // partitioned + {65535, 2, 3 * time.Minute}, // partitioned + {65535, 3, 5 * time.Minute}, // partitioned + {65535, 5, 3 * time.Minute}, // partitioned + {65535, 7, 5 * time.Minute}, + {1000000, 1, 4 * time.Hour}, // partitioned + {1000000, 2, 2 * time.Hour}, // partitioned + {1000000, 3, 80 * time.Minute}, // partitioned + {1000000, 5, 50 * time.Minute}, // partitioned + {1000000, 11, 20 * time.Minute}, // partitioned + {1000000, 19, 10 * time.Minute}, + } + + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + + for _, s := range clusters { + m := New(logger, shutdownCh, &fauxConnPool{}) + m.SetNumNodes(s.numNodes) + servers := make([]*Server, 0, s.numServers) + for i := 0; i < s.numServers; i++ { + nodeName := fmt.Sprintf("s%02d", i) + servers = append(servers, &Server{Addr: &fauxAddr{nodeName}}) + } + m.SetServers(servers) + + d := m.refreshServerRebalanceTimer() + t.Logf("Nodes: %d; Servers: %d; Refresh: %v; Min: %v", s.numNodes, s.numServers, d, s.minRebalance) + if d < s.minRebalance { + t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) + } + } +} diff --git a/client/servers/manager_test.go b/client/servers/manager_test.go new file mode 100644 index 000000000000..deea7f48f009 --- /dev/null +++ b/client/servers/manager_test.go @@ -0,0 +1,238 @@ +package servers_test + +import ( + "fmt" + "log" + "math/rand" + "net" + "os" + "strings" + "testing" + + "github.com/hashicorp/nomad/client/servers" +) + +type fauxAddr struct { + Addr string +} + +func (fa *fauxAddr) String() string { return fa.Addr } +func (fa *fauxAddr) Network() string { return fa.Addr } + +type fauxConnPool struct { + // failPct between 0.0 and 1.0 == pct of time a Ping should fail + failPct float64 +} + +func (cp *fauxConnPool) Ping(net.Addr) error { + successProb := rand.Float64() + if successProb > cp.failPct { + return nil + } + return fmt.Errorf("bad server") +} + +func testManager() (m *servers.Manager) { + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + m = servers.New(logger, shutdownCh, &fauxConnPool{}) + return m +} + +func testManagerFailProb(failPct float64) (m *servers.Manager) { + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + m = servers.New(logger, shutdownCh, &fauxConnPool{failPct: failPct}) + return m +} + +func TestServers_SetServers(t *testing.T) { + m := testManager() + var num int + num = m.NumServers() + if num != 0 { + t.Fatalf("Expected zero servers to start") + } + + s1 := &servers.Server{Addr: &fauxAddr{"server1"}} + s2 := &servers.Server{Addr: &fauxAddr{"server2"}} + m.SetServers([]*servers.Server{s1, s2}) + num = m.NumServers() + if num != 2 { + t.Fatalf("Expected two servers") + } + + all := m.GetServers() + if l := len(all); l != 2 { + t.Fatalf("expected 2 servers got %d", l) + } + + if all[0] == s1 || all[0] == s2 { + t.Fatalf("expected a copy, got actual server") + } +} + +func TestServers_FindServer(t *testing.T) { + m := testManager() + + if m.FindServer() != nil { + t.Fatalf("Expected nil return") + } + + var srvs []*servers.Server + srvs = append(srvs, &servers.Server{Addr: &fauxAddr{"s1"}}) + m.SetServers(srvs) + if m.NumServers() != 1 { + t.Fatalf("Expected one server") + } + + s1 := m.FindServer() + if s1 == nil { + t.Fatalf("Expected non-nil server") + } + if s1.String() != "s1" { + t.Fatalf("Expected s1 server") + } + + s1 = m.FindServer() + if s1 == nil || s1.String() != "s1" { + t.Fatalf("Expected s1 server (still)") + } + + srvs = append(srvs, &servers.Server{Addr: &fauxAddr{"s2"}}) + m.SetServers(srvs) + if m.NumServers() != 2 { + t.Fatalf("Expected two servers") + } + s1 = m.FindServer() + if s1 == nil || s1.String() != "s1" { + t.Fatalf("Expected s1 server (still)") + } + + m.NotifyFailedServer(s1) + s2 := m.FindServer() + if s2 == nil || s2.String() != "s2" { + t.Fatalf("Expected s2 server") + } + + m.NotifyFailedServer(s2) + s1 = m.FindServer() + if s1 == nil || s1.String() != "s1" { + t.Fatalf("Expected s1 server") + } +} + +func TestServers_New(t *testing.T) { + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + m := servers.New(logger, shutdownCh, &fauxConnPool{}) + if m == nil { + t.Fatalf("Manager nil") + } +} + +func TestServers_NotifyFailedServer(t *testing.T) { + m := testManager() + + if m.NumServers() != 0 { + t.Fatalf("Expected zero servers to start") + } + + s1 := &servers.Server{Addr: &fauxAddr{"s1"}} + s2 := &servers.Server{Addr: &fauxAddr{"s2"}} + + // Try notifying for a server that is not managed by Manager + m.NotifyFailedServer(s1) + if m.NumServers() != 0 { + t.Fatalf("Expected zero servers to start") + } + m.SetServers([]*servers.Server{s1}) + + // Test again w/ a server not in the list + m.NotifyFailedServer(s2) + if m.NumServers() != 1 { + t.Fatalf("Expected one server") + } + + m.SetServers([]*servers.Server{s1, s2}) + if m.NumServers() != 2 { + t.Fatalf("Expected two servers") + } + + s1 = m.FindServer() + if s1 == nil || s1.String() != "s1" { + t.Fatalf("Expected s1 server") + } + + m.NotifyFailedServer(s2) + s1 = m.FindServer() + if s1 == nil || s1.String() != "s1" { + t.Fatalf("Expected s1 server (still)") + } + + m.NotifyFailedServer(s1) + s2 = m.FindServer() + if s2 == nil || s2.String() != "s2" { + t.Fatalf("Expected s2 server") + } + + m.NotifyFailedServer(s2) + s1 = m.FindServer() + if s1 == nil || s1.String() != "s1" { + t.Fatalf("Expected s1 server") + } +} + +func TestServers_NumServers(t *testing.T) { + m := testManager() + var num int + num = m.NumServers() + if num != 0 { + t.Fatalf("Expected zero servers to start") + } + + s := &servers.Server{Addr: &fauxAddr{"server1"}} + m.SetServers([]*servers.Server{s}) + num = m.NumServers() + if num != 1 { + t.Fatalf("Expected one server after SetServers") + } +} + +func TestServers_RebalanceServers(t *testing.T) { + const failPct = 0.5 + m := testManagerFailProb(failPct) + const maxServers = 100 + const numShuffleTests = 100 + const uniquePassRate = 0.5 + + // Make a huge list of nodes. + var srvs []*servers.Server + for i := 0; i < maxServers; i++ { + nodeName := fmt.Sprintf("s%02d", i) + srvs = append(srvs, &servers.Server{Addr: &fauxAddr{nodeName}}) + } + m.SetServers(srvs) + + // Keep track of how many unique shuffles we get. + uniques := make(map[string]struct{}, maxServers) + for i := 0; i < numShuffleTests; i++ { + m.RebalanceServers() + + var names []string + for j := 0; j < maxServers; j++ { + server := m.FindServer() + m.NotifyFailedServer(server) + names = append(names, server.String()) + } + key := strings.Join(names, "|") + uniques[key] = struct{}{} + } + + // We have to allow for the fact that there won't always be a unique + // shuffle each pass, so we just look for smell here without the test + // being flaky. + if len(uniques) < int(maxServers*uniquePassRate) { + t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers) + } +} diff --git a/client/stats/host.go b/client/stats/host.go index 23826b10a3f1..1da2b4641801 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -1,6 +1,7 @@ package stats import ( + "fmt" "log" "math" "runtime" @@ -133,7 +134,7 @@ func (h *HostStatsCollector) collectLocked() error { // Getting the disk stats for the allocation directory usage, err := disk.Usage(h.allocDir) if err != nil { - return err + return fmt.Errorf("failed to find disk usage of alloc_dir %q: %v", h.allocDir, err) } hs.AllocDirStats = h.toDiskStats(usage, nil) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 46748bb93aaf..211cd74cf6f0 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -4,14 +4,20 @@ import ( "bytes" "encoding/json" "fmt" + "net" "net/http" "net/http/httptest" + "net/url" "testing" + "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/assert" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" ) func TestHTTP_AgentSelf(t *testing.T) { @@ -44,7 +50,7 @@ func TestHTTP_AgentSelf(t *testing.T) { t.Fatalf("bad: %#v", self) } - // Assign a Vault token and assert it is redacted. + // Assign a Vault token and require it is redacted. s.Config.Vault.Token = "badc0deb-adc0-deba-dc0d-ebadc0debadc" respW = httptest.NewRecorder() obj, err = s.Server.AgentSelfRequest(respW, req) @@ -60,21 +66,21 @@ func TestHTTP_AgentSelf(t *testing.T) { func TestHTTP_AgentSelf_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() // Make the HTTP request req, err := http.NewRequest("GET", "/v1/agent/self", nil) - assert.Nil(err) + require.Nil(err) // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.AgentSelfRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -83,8 +89,8 @@ func TestHTTP_AgentSelf_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.AgentSelfRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -93,11 +99,11 @@ func TestHTTP_AgentSelf_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite)) setToken(req, token) obj, err := s.Server.AgentSelfRequest(respW, req) - assert.Nil(err) + require.Nil(err) self := obj.(agentSelf) - assert.NotNil(self.Config) - assert.NotNil(self.Stats) + require.NotNil(self.Config) + require.NotNil(self.Stats) } // Try request with a root token @@ -105,18 +111,17 @@ func TestHTTP_AgentSelf_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) obj, err := s.Server.AgentSelfRequest(respW, req) - assert.Nil(err) + require.Nil(err) self := obj.(agentSelf) - assert.NotNil(self.Config) - assert.NotNil(self.Stats) + require.NotNil(self.Config) + require.NotNil(self.Stats) } }) } func TestHTTP_AgentJoin(t *testing.T) { - // TODO(alexdadgar) - // t.Parallel() + t.Parallel() httpTest(t, nil, func(s *TestAgent) { // Determine the join address member := s.Agent.Server().LocalMember() @@ -173,21 +178,21 @@ func TestHTTP_AgentMembers(t *testing.T) { func TestHTTP_AgentMembers_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() // Make the HTTP request req, err := http.NewRequest("GET", "/v1/agent/members", nil) - assert.Nil(err) + require.Nil(err) // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.AgentMembersRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -196,8 +201,8 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.AgentPolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.AgentMembersRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -206,10 +211,10 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.NodePolicy(acl.PolicyRead)) setToken(req, token) obj, err := s.Server.AgentMembersRequest(respW, req) - assert.Nil(err) + require.Nil(err) members := obj.(structs.ServerMembersResponse) - assert.Len(members.Members, 1) + require.Len(members.Members, 1) } // Try request with a root token @@ -217,10 +222,10 @@ func TestHTTP_AgentMembers_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) obj, err := s.Server.AgentMembersRequest(respW, req) - assert.Nil(err) + require.Nil(err) members := obj.(structs.ServerMembersResponse) - assert.Len(members.Members, 1) + require.Len(members.Members, 1) } }) } @@ -245,21 +250,21 @@ func TestHTTP_AgentForceLeave(t *testing.T) { func TestHTTP_AgentForceLeave_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() // Make the HTTP request req, err := http.NewRequest("PUT", "/v1/agent/force-leave?node=foo", nil) - assert.Nil(err) + require.Nil(err) // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.AgentForceLeaveRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -268,8 +273,8 @@ func TestHTTP_AgentForceLeave_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead)) setToken(req, token) _, err := s.Server.AgentForceLeaveRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -278,8 +283,8 @@ func TestHTTP_AgentForceLeave_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.AgentForceLeaveRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) } // Try request with a root token @@ -287,71 +292,113 @@ func TestHTTP_AgentForceLeave_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) _, err := s.Server.AgentForceLeaveRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) } }) } func TestHTTP_AgentSetServers(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpTest(t, nil, func(s *TestAgent) { + addr := s.Config.AdvertiseAddrs.RPC + testutil.WaitForResult(func() (bool, error) { + conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) + if err != nil { + return false, err + } + defer conn.Close() + + // Write the Consul RPC byte to set the mode + if _, err := conn.Write([]byte{byte(pool.RpcNomad)}); err != nil { + return false, err + } + + codec := pool.NewClientCodec(conn) + args := &structs.GenericRequest{} + var leader string + err = msgpackrpc.CallWithCodec(codec, "Status.Leader", args, &leader) + return leader != "", err + }, func(err error) { + t.Fatalf("failed to find leader: %v", err) + }) + // Create the request req, err := http.NewRequest("PUT", "/v1/agent/servers", nil) - assert.Nil(err) + require.Nil(err) // Send the request respW := httptest.NewRecorder() _, err = s.Server.AgentServersRequest(respW, req) - assert.NotNil(err) - assert.Contains(err.Error(), "missing server address") + require.NotNil(err) + require.Contains(err.Error(), "missing server address") // Create a valid request req, err = http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil) - assert.Nil(err) + require.Nil(err) - // Send the request + // Send the request which should fail respW = httptest.NewRecorder() _, err = s.Server.AgentServersRequest(respW, req) - assert.Nil(err) + require.NotNil(err) // Retrieve the servers again req, err = http.NewRequest("GET", "/v1/agent/servers", nil) - assert.Nil(err) + require.Nil(err) respW = httptest.NewRecorder() // Make the request and check the result expected := []string{ - "127.0.0.1:4647", - "127.0.0.2:4647", - "127.0.0.3:4647", + s.GetConfig().AdvertiseAddrs.RPC, } out, err := s.Server.AgentServersRequest(respW, req) - assert.Nil(err) + require.Nil(err) servers := out.([]string) - assert.Len(servers, len(expected)) - assert.Equal(expected, servers) + require.Len(servers, len(expected)) + require.Equal(expected, servers) }) } func TestHTTP_AgentSetServers_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() + addr := s.Config.AdvertiseAddrs.RPC + testutil.WaitForResult(func() (bool, error) { + conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) + if err != nil { + return false, err + } + defer conn.Close() + + // Write the Consul RPC byte to set the mode + if _, err := conn.Write([]byte{byte(pool.RpcNomad)}); err != nil { + return false, err + } + + codec := pool.NewClientCodec(conn) + args := &structs.GenericRequest{} + var leader string + err = msgpackrpc.CallWithCodec(codec, "Status.Leader", args, &leader) + return leader != "", err + }, func(err error) { + t.Fatalf("failed to find leader: %v", err) + }) // Make the HTTP request - req, err := http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil) - assert.Nil(err) + path := fmt.Sprintf("/v1/agent/servers?address=%s", url.QueryEscape(s.GetConfig().AdvertiseAddrs.RPC)) + req, err := http.NewRequest("PUT", path, nil) + require.Nil(err) // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.AgentServersRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -360,8 +407,8 @@ func TestHTTP_AgentSetServers_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead)) setToken(req, token) _, err := s.Server.AgentServersRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -370,8 +417,8 @@ func TestHTTP_AgentSetServers_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite)) setToken(req, token) _, err := s.Server.AgentServersRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) } // Try request with a root token @@ -379,47 +426,33 @@ func TestHTTP_AgentSetServers_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) _, err := s.Server.AgentServersRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) } }) } func TestHTTP_AgentListServers_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) httpACLTest(t, nil, func(s *TestAgent) { state := s.Agent.server.State() - // Set some servers - { - req, err := http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil) - assert.Nil(err) - - respW := httptest.NewRecorder() - setToken(req, s.RootToken) - _, err = s.Server.AgentServersRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - } - // Create list request req, err := http.NewRequest("GET", "/v1/agent/servers", nil) - assert.Nil(err) + require.Nil(err) expected := []string{ - "127.0.0.1:4647", - "127.0.0.2:4647", - "127.0.0.3:4647", + s.GetConfig().AdvertiseAddrs.RPC, } // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.AgentServersRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -428,20 +461,27 @@ func TestHTTP_AgentListServers_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.NodePolicy(acl.PolicyRead)) setToken(req, token) _, err := s.Server.AgentServersRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } + // Wait for client to have a server + testutil.WaitForResult(func() (bool, error) { + return len(s.client.GetServers()) != 0, fmt.Errorf("no servers") + }, func(err error) { + t.Fatal(err) + }) + // Try request with a valid token { respW := httptest.NewRecorder() token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyRead)) setToken(req, token) out, err := s.Server.AgentServersRequest(respW, req) - assert.Nil(err) + require.Nil(err) servers := out.([]string) - assert.Len(servers, len(expected)) - assert.Equal(expected, servers) + require.Len(servers, len(expected)) + require.Equal(expected, servers) } // Try request with a root token @@ -449,10 +489,10 @@ func TestHTTP_AgentListServers_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) out, err := s.Server.AgentServersRequest(respW, req) - assert.Nil(err) + require.Nil(err) servers := out.([]string) - assert.Len(servers, len(expected)) - assert.Equal(expected, servers) + require.Len(servers, len(expected)) + require.Equal(expected, servers) } }) } @@ -472,19 +512,15 @@ func TestHTTP_AgentListKeys(t *testing.T) { respW := httptest.NewRecorder() out, err := s.Server.KeyringOperationRequest(respW, req) - if err != nil { - t.Fatalf("err: %s", err) - } + require.Nil(t, err) kresp := out.(structs.KeyringResponse) - if len(kresp.Keys) != 1 { - t.Fatalf("bad: %v", kresp) - } + require.Len(t, kresp.Keys, 1) }) } func TestHTTP_AgentListKeys_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) key1 := "HS5lJ+XuTlYKWaeGYyG+/A==" @@ -497,14 +533,14 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) { // Make the HTTP request req, err := http.NewRequest("GET", "/v1/agent/keyring/list", nil) - assert.Nil(err) + require.Nil(err) // Try request without a token and expect failure { respW := httptest.NewRecorder() _, err := s.Server.KeyringOperationRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with an invalid token and expect failure @@ -513,8 +549,8 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1005, "invalid", mock.AgentPolicy(acl.PolicyRead)) setToken(req, token) _, err := s.Server.KeyringOperationRequest(respW, req) - assert.NotNil(err) - assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + require.NotNil(err) + require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) } // Try request with a valid token @@ -523,10 +559,10 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) { token := mock.CreatePolicyAndToken(t, state, 1007, "valid", mock.AgentPolicy(acl.PolicyWrite)) setToken(req, token) out, err := s.Server.KeyringOperationRequest(respW, req) - assert.Nil(err) + require.Nil(err) kresp := out.(structs.KeyringResponse) - assert.Len(kresp.Keys, 1) - assert.Contains(kresp.Keys, key1) + require.Len(kresp.Keys, 1) + require.Contains(kresp.Keys, key1) } // Try request with a root token @@ -534,17 +570,16 @@ func TestHTTP_AgentListKeys_ACL(t *testing.T) { respW := httptest.NewRecorder() setToken(req, s.RootToken) out, err := s.Server.KeyringOperationRequest(respW, req) - assert.Nil(err) + require.Nil(err) kresp := out.(structs.KeyringResponse) - assert.Len(kresp.Keys, 1) - assert.Contains(kresp.Keys, key1) + require.Len(kresp.Keys, 1) + require.Contains(kresp.Keys, key1) } }) } func TestHTTP_AgentInstallKey(t *testing.T) { - // TODO(alexdadgar) - // t.Parallel() + t.Parallel() key1 := "HS5lJ+XuTlYKWaeGYyG+/A==" key2 := "wH1Bn9hlJ0emgWB1JttVRA==" @@ -584,8 +619,7 @@ func TestHTTP_AgentInstallKey(t *testing.T) { } func TestHTTP_AgentRemoveKey(t *testing.T) { - // TODO(alexdadgar) - // t.Parallel() + t.Parallel() key1 := "HS5lJ+XuTlYKWaeGYyG+/A==" key2 := "wH1Bn9hlJ0emgWB1JttVRA==" @@ -635,87 +669,87 @@ func TestHTTP_AgentRemoveKey(t *testing.T) { func TestHTTP_AgentHealth_Ok(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) // Enable ACLs to ensure they're not enforced httpACLTest(t, nil, func(s *TestAgent) { // No ?type= { req, err := http.NewRequest("GET", "/v1/agent/health", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() healthI, err := s.Server.HealthRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - assert.NotNil(healthI) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) + require.NotNil(healthI) health := healthI.(*healthResponse) - assert.NotNil(health.Client) - assert.True(health.Client.Ok) - assert.Equal("ok", health.Client.Message) - assert.NotNil(health.Server) - assert.True(health.Server.Ok) - assert.Equal("ok", health.Server.Message) + require.NotNil(health.Client) + require.True(health.Client.Ok) + require.Equal("ok", health.Client.Message) + require.NotNil(health.Server) + require.True(health.Server.Ok) + require.Equal("ok", health.Server.Message) } // type=client { req, err := http.NewRequest("GET", "/v1/agent/health?type=client", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() healthI, err := s.Server.HealthRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - assert.NotNil(healthI) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) + require.NotNil(healthI) health := healthI.(*healthResponse) - assert.NotNil(health.Client) - assert.True(health.Client.Ok) - assert.Equal("ok", health.Client.Message) - assert.Nil(health.Server) + require.NotNil(health.Client) + require.True(health.Client.Ok) + require.Equal("ok", health.Client.Message) + require.Nil(health.Server) } // type=server { req, err := http.NewRequest("GET", "/v1/agent/health?type=server", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() healthI, err := s.Server.HealthRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - assert.NotNil(healthI) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) + require.NotNil(healthI) health := healthI.(*healthResponse) - assert.NotNil(health.Server) - assert.True(health.Server.Ok) - assert.Equal("ok", health.Server.Message) - assert.Nil(health.Client) + require.NotNil(health.Server) + require.True(health.Server.Ok) + require.Equal("ok", health.Server.Message) + require.Nil(health.Client) } // type=client&type=server { req, err := http.NewRequest("GET", "/v1/agent/health?type=client&type=server", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() healthI, err := s.Server.HealthRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - assert.NotNil(healthI) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) + require.NotNil(healthI) health := healthI.(*healthResponse) - assert.NotNil(health.Client) - assert.True(health.Client.Ok) - assert.Equal("ok", health.Client.Message) - assert.NotNil(health.Server) - assert.True(health.Server.Ok) - assert.Equal("ok", health.Server.Message) + require.NotNil(health.Client) + require.True(health.Client.Ok) + require.Equal("ok", health.Client.Message) + require.NotNil(health.Server) + require.True(health.Server.Ok) + require.Equal("ok", health.Server.Message) } }) } func TestHTTP_AgentHealth_BadServer(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) // Enable ACLs to ensure they're not enforced httpACLTest(t, nil, func(s *TestAgent) { @@ -726,39 +760,39 @@ func TestHTTP_AgentHealth_BadServer(t *testing.T) { // No ?type= means server is just skipped { req, err := http.NewRequest("GET", "/v1/agent/health", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() healthI, err := s.Server.HealthRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - assert.NotNil(healthI) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) + require.NotNil(healthI) health := healthI.(*healthResponse) - assert.NotNil(health.Client) - assert.True(health.Client.Ok) - assert.Equal("ok", health.Client.Message) - assert.Nil(health.Server) + require.NotNil(health.Client) + require.True(health.Client.Ok) + require.Equal("ok", health.Client.Message) + require.Nil(health.Server) } // type=server means server is considered unhealthy { req, err := http.NewRequest("GET", "/v1/agent/health?type=server", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() _, err = s.Server.HealthRequest(respW, req) - assert.NotNil(err) + require.NotNil(err) httpErr, ok := err.(HTTPCodedError) - assert.True(ok) - assert.Equal(500, httpErr.Code()) - assert.Equal(`{"server":{"ok":false,"message":"server not enabled"}}`, err.Error()) + require.True(ok) + require.Equal(500, httpErr.Code()) + require.Equal(`{"server":{"ok":false,"message":"server not enabled"}}`, err.Error()) } }) } func TestHTTP_AgentHealth_BadClient(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) // Enable ACLs to ensure they're not enforced httpACLTest(t, nil, func(s *TestAgent) { @@ -769,32 +803,32 @@ func TestHTTP_AgentHealth_BadClient(t *testing.T) { // No ?type= means client is just skipped { req, err := http.NewRequest("GET", "/v1/agent/health", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() healthI, err := s.Server.HealthRequest(respW, req) - assert.Nil(err) - assert.Equal(http.StatusOK, respW.Code) - assert.NotNil(healthI) + require.Nil(err) + require.Equal(http.StatusOK, respW.Code) + require.NotNil(healthI) health := healthI.(*healthResponse) - assert.NotNil(health.Server) - assert.True(health.Server.Ok) - assert.Equal("ok", health.Server.Message) - assert.Nil(health.Client) + require.NotNil(health.Server) + require.True(health.Server.Ok) + require.Equal("ok", health.Server.Message) + require.Nil(health.Client) } // type=client means client is considered unhealthy { req, err := http.NewRequest("GET", "/v1/agent/health?type=client", nil) - assert.Nil(err) + require.Nil(err) respW := httptest.NewRecorder() _, err = s.Server.HealthRequest(respW, req) - assert.NotNil(err) + require.NotNil(err) httpErr, ok := err.(HTTPCodedError) - assert.True(ok) - assert.Equal(500, httpErr.Code()) - assert.Equal(`{"client":{"ok":false,"message":"client not enabled"}}`, err.Error()) + require.True(ok) + require.Equal(500, httpErr.Code()) + require.Equal(`{"client":{"ok":false,"message":"client not enabled"}}`, err.Error()) } }) } diff --git a/command/agent/testagent.go b/command/agent/testagent.go index 2b8fe6c5afc1..4a260f92f96b 100644 --- a/command/agent/testagent.go +++ b/command/agent/testagent.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/lib/freeport" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -115,7 +116,10 @@ func (a *TestAgent) Start() *TestAgent { a.Config.NomadConfig.DataDir = d } - for i := 10; i >= 0; i-- { + i := 10 + +RETRY: + for ; i >= 0; i-- { a.pickRandomPorts(a.Config) if a.Config.NodeName == "" { a.Config.NodeName = fmt.Sprintf("Node %d", a.Config.Ports.RPC) @@ -137,14 +141,14 @@ func (a *TestAgent) Start() *TestAgent { a.Agent = agent break } else if i == 0 { - fmt.Println(a.Name, "Error starting agent:", err) + a.T.Logf("%s: Error starting agent: %v", a.Name, err) runtime.Goexit() } else { if agent != nil { agent.Shutdown() } wait := time.Duration(rand.Int31n(2000)) * time.Millisecond - fmt.Println(a.Name, "retrying in", wait) + a.T.Logf("%s: retrying in %v", a.Name, wait) time.Sleep(wait) } @@ -153,12 +157,13 @@ func (a *TestAgent) Start() *TestAgent { // the data dir, such as in the Raft configuration. if a.DataDir != "" { if err := os.RemoveAll(a.DataDir); err != nil { - fmt.Println(a.Name, "Error resetting data dir:", err) + a.T.Logf("%s: Error resetting data dir: %v", a.Name, err) runtime.Goexit() } } } + failed := false if a.Config.NomadConfig.Bootstrap && a.Config.Server.Enabled { testutil.WaitForResult(func() (bool, error) { args := &structs.GenericRequest{} @@ -166,7 +171,8 @@ func (a *TestAgent) Start() *TestAgent { err := a.RPC("Status.Leader", args, &leader) return leader != "", err }, func(err error) { - a.T.Fatalf("failed to find leader: %v", err) + a.T.Logf("failed to find leader: %v", err) + failed = true }) } else { testutil.WaitForResult(func() (bool, error) { @@ -175,9 +181,14 @@ func (a *TestAgent) Start() *TestAgent { _, err := a.Server.AgentSelfRequest(resp, req) return err == nil && resp.Code == 200, err }, func(err error) { - a.T.Fatalf("failed OK response: %v", err) + a.T.Logf("failed to find leader: %v", err) + failed = true }) } + if failed { + a.Agent.Shutdown() + goto RETRY + } // Check if ACLs enabled. Use special value of PolicyTTL 0s // to do a bypass of this step. This is so we can test bootstrap @@ -194,7 +205,7 @@ func (a *TestAgent) Start() *TestAgent { func (a *TestAgent) start() (*Agent, error) { if a.LogOutput == nil { - a.LogOutput = os.Stderr + a.LogOutput = testlog.NewWriter(a.T) } inm := metrics.NewInmemSink(10*time.Second, time.Minute) @@ -264,6 +275,15 @@ func (a *TestAgent) pickRandomPorts(c *Config) { c.Ports.RPC = ports[1] c.Ports.Serf = ports[2] + // Clear out the advertise addresses such that through retries we + // re-normalize the addresses correctly instead of using the values from the + // last port selection that had a port conflict. + if c.AdvertiseAddrs != nil { + c.AdvertiseAddrs.HTTP = "" + c.AdvertiseAddrs.RPC = "" + c.AdvertiseAddrs.Serf = "" + } + if err := c.normalizeAddrs(); err != nil { a.T.Fatalf("error normalizing config: %v", err) } diff --git a/command/client_config_test.go b/command/client_config_test.go index e00bb6e30517..cb9275ca0fbb 100644 --- a/command/client_config_test.go +++ b/command/client_config_test.go @@ -33,23 +33,16 @@ func TestClientConfigCommand_UpdateServers(t *testing.T) { } ui.ErrorWriter.Reset() - // Set the servers list + // Set the servers list with bad addresses code = cmd.Run([]string{"-address=" + url, "-update-servers", "127.0.0.42", "198.18.5.5"}) - if code != 0 { - t.Fatalf("expected exit 0, got: %d", code) + if code != 1 { + t.Fatalf("expected exit 1, got: %d", code) } - // Query the servers list - code = cmd.Run([]string{"-address=" + url, "-servers"}) + // Set the servers list with good addresses + code = cmd.Run([]string{"-address=" + url, "-update-servers", srv.Config.AdvertiseAddrs.RPC}) if code != 0 { - t.Fatalf("expect exit 0, got: %d", code) - } - out := ui.OutputWriter.String() - if !strings.Contains(out, "127.0.0.42") { - t.Fatalf("missing 127.0.0.42") - } - if !strings.Contains(out, "198.18.5.5") { - t.Fatalf("missing 198.18.5.5") + t.Fatalf("expected exit 0, got: %d", code) } } diff --git a/nomad/rpc.go b/nomad/rpc.go index f86fadc64c76..53fa1dd1c4ed 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -31,12 +31,6 @@ const ( // if no time is specified. Previously we would wait the maxQueryTime. defaultQueryTime = 300 * time.Second - // jitterFraction is a the limit to the amount of jitter we apply - // to a user specified MaxQueryTime. We divide the specified time by - // the fraction. So 16 == 6.25% limit of jitter. This jitter is also - // applied to RPCHoldTimeout. - jitterFraction = 16 - // Warn if the Raft command is larger than this. // If it's over 1MB something is probably being abusive. raftWarnSize = 1024 * 1024 @@ -262,7 +256,7 @@ CHECK_LEADER: firstCheck = time.Now() } if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout { - jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) select { case <-time.After(jitter): goto CHECK_LEADER @@ -415,7 +409,7 @@ func (s *Server) blockingRPC(opts *blockingOptions) error { } // Apply a small amount of jitter to the request - opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / jitterFraction) + opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction) // Setup a query timeout ctx, cancel = context.WithTimeout(context.Background(), opts.queryOpts.MaxQueryTime) diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go new file mode 100644 index 000000000000..1618218573bb --- /dev/null +++ b/nomad/structs/errors.go @@ -0,0 +1,43 @@ +package structs + +import ( + "errors" + "strings" +) + +const ( + errNoLeader = "No cluster leader" + errNoRegionPath = "No path to region" + errTokenNotFound = "ACL token not found" + errPermissionDenied = "Permission denied" +) + +var ( + ErrNoLeader = errors.New(errNoLeader) + ErrNoRegionPath = errors.New(errNoRegionPath) + ErrTokenNotFound = errors.New(errTokenNotFound) + ErrPermissionDenied = errors.New(errPermissionDenied) +) + +// IsErrNoLeader returns whether the error is due to there being no leader. +func IsErrNoLeader(err error) bool { + return err != nil && strings.Contains(err.Error(), errNoLeader) +} + +// IsErrNoRegionPath returns whether the error is due to there being no path to +// the given region. +func IsErrNoRegionPath(err error) bool { + return err != nil && strings.Contains(err.Error(), errNoRegionPath) +} + +// IsErrTokenNotFound returns whether the error is due to the passed token not +// being resolvable. +func IsErrTokenNotFound(err error) bool { + return err != nil && strings.Contains(err.Error(), errTokenNotFound) +} + +// IsErrPermissionDenied returns whether the error is due to the operation not +// being allowed due to lack of permissions. +func IsErrPermissionDenied(err error) bool { + return err != nil && strings.Contains(err.Error(), errPermissionDenied) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fe40ce6e6286..07da26f5d0ec 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -39,11 +39,6 @@ import ( ) var ( - ErrNoLeader = fmt.Errorf("No cluster leader") - ErrNoRegionPath = fmt.Errorf("No path to region") - ErrTokenNotFound = errors.New("ACL token not found") - ErrPermissionDenied = errors.New("Permission denied") - // validPolicyName is used to validate a policy name validPolicyName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$") @@ -120,6 +115,12 @@ const ( // DefaultNamespace is the default namespace. DefaultNamespace = "default" DefaultNamespaceDescription = "Default shared namespace" + + // JitterFraction is a the limit to the amount of jitter we apply + // to a user specified MaxQueryTime. We divide the specified time by + // the fraction. So 16 == 6.25% limit of jitter. This jitter is also + // applied to RPCHoldTimeout. + JitterFraction = 16 ) // Context defines the scope in which a search for Nomad object operates, and