From 5f88d0f40876771405cb592db62db7e346260d36 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 6 May 2019 14:44:55 -0500 Subject: [PATCH 1/3] Remove unnecessary locking and serverlist syncing in heartbeats This removes an unnecessary shared lock between discovery and heartbeating which was causing heartbeats to be missed upon retries when a single server fails. Also made a drive by fix to call the periodic server shuffler goroutine. --- client/client.go | 31 +++++++++++++++++++++++-------- dev/cluster/client1.hcl | 3 +++ dev/cluster/client2.hcl | 3 +++ dev/cluster/server1.hcl | 2 +- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/client/client.go b/client/client.go index c8bed1e95a2f..bf767a58fb20 100644 --- a/client/client.go +++ b/client/client.go @@ -315,6 +315,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // Initialize the server manager c.servers = servers.New(c.logger, c.shutdownCh, c) + // Start server manager rebalancing go routine + go c.servers.Start() + // Initialize the client if err := c.init(); err != nil { return nil, fmt.Errorf("failed to initialize client: %v", err) @@ -1345,7 +1348,6 @@ func (c *Client) registerAndHeartbeat() { case <-c.shutdownCh: return } - if err := c.updateNodeStatus(); err != nil { // The servers have changed such that this node has not been // registered before @@ -2342,13 +2344,6 @@ func (c *Client) consulDiscovery() { func (c *Client) consulDiscoveryImpl() error { consulLogger := c.logger.Named("consul") - // Acquire heartbeat lock to prevent heartbeat from running - // concurrently with discovery. Concurrent execution is safe, however - // discovery is usually triggered when heartbeating has failed so - // there's no point in allowing it. - c.heartbeatLock.Lock() - defer c.heartbeatLock.Unlock() - dcs, err := c.consulCatalog.Datacenters() if err != nil { return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err) @@ -2432,6 +2427,26 @@ DISCOLOOP: consulLogger.Info("discovered following servers", "servers", nomadServers) + // Check if the list of servers discovered is identical to the list we already have + // If so, we don't need to reset the server list unnecessarily + knownServers := make(map[string]struct{}) + serverList := c.servers.GetServers() + for _, s := range serverList { + knownServers[s.Addr.String()] = struct{}{} + } + + allFound := true + for _, s := range nomadServers { + _, known := knownServers[s.Addr.String()] + if !known { + allFound = false + break + } + } + if allFound && len(nomadServers) == len(serverList) { + c.logger.Info("Not replacing server list, current server list is identical to servers discovered in Consul") + return nil + } // Fire the retry trigger if we have updated the set of servers. if c.servers.SetServers(nomadServers) { // Start rebalancing diff --git a/dev/cluster/client1.hcl b/dev/cluster/client1.hcl index 2cab7bba4d3b..ff41a0d74fcc 100644 --- a/dev/cluster/client1.hcl +++ b/dev/cluster/client1.hcl @@ -7,6 +7,9 @@ data_dir = "/tmp/client1" # Give the agent a unique name. Defaults to hostname name = "client1" +# Enable debugging +enable_debug = true + # Enable the client client { enabled = true diff --git a/dev/cluster/client2.hcl b/dev/cluster/client2.hcl index 3385ab0d7750..e5271d6dc576 100644 --- a/dev/cluster/client2.hcl +++ b/dev/cluster/client2.hcl @@ -7,6 +7,9 @@ data_dir = "/tmp/client2" # Give the agent a unique name. Defaults to hostname name = "client2" +# Enable debugging +enable_debug = true + # Enable the client client { enabled = true diff --git a/dev/cluster/server1.hcl b/dev/cluster/server1.hcl index 62b2c792a334..d9a22e364368 100644 --- a/dev/cluster/server1.hcl +++ b/dev/cluster/server1.hcl @@ -16,5 +16,5 @@ server { } # Self-elect, should be 3 or 5 for production - bootstrap_expect = 3 + bootstrap_expect = 1 } From 12e18047335aec2ec6ff52c51407f15fed6fb8aa Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 7 May 2019 16:23:32 -0500 Subject: [PATCH 2/3] code review feedback --- client/client.go | 20 -------------------- client/servers/manager.go | 31 ++++++++++++++++++++++++++----- client/servers/manager_test.go | 8 ++++++++ dev/cluster/client3.hcl | 3 +++ dev/cluster/server1.hcl | 2 +- 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/client/client.go b/client/client.go index bf767a58fb20..43343f2cd96b 100644 --- a/client/client.go +++ b/client/client.go @@ -2427,26 +2427,6 @@ DISCOLOOP: consulLogger.Info("discovered following servers", "servers", nomadServers) - // Check if the list of servers discovered is identical to the list we already have - // If so, we don't need to reset the server list unnecessarily - knownServers := make(map[string]struct{}) - serverList := c.servers.GetServers() - for _, s := range serverList { - knownServers[s.Addr.String()] = struct{}{} - } - - allFound := true - for _, s := range nomadServers { - _, known := knownServers[s.Addr.String()] - if !known { - allFound = false - break - } - } - if allFound && len(nomadServers) == len(serverList) { - c.logger.Info("Not replacing server list, current server list is identical to servers discovered in Consul") - return nil - } // Fire the retry trigger if we have updated the set of servers. if c.servers.SetServers(nomadServers) { // Start rebalancing diff --git a/client/servers/manager.go b/client/servers/manager.go index fea945dc5e9a..d65fd43e0212 100644 --- a/client/servers/manager.go +++ b/client/servers/manager.go @@ -201,12 +201,16 @@ func (m *Manager) SetServers(servers Servers) bool { m.Lock() defer m.Unlock() - // Sort both the existing and incoming servers - servers.Sort() - m.servers.Sort() - // Determine if they are equal - equal := servers.Equal(m.servers) + equal := m.serversAreEqual(servers) + + // If server list is equal don't change the list and return immediatly + // This prevents unnecessary shuffling of a failed server that was moved to the + // bottom of the list + if equal { + m.logger.Debug("Not replacing server list, current server list is identical to servers discovered in Consul") + return !equal + } // Randomize the incoming servers servers.shuffle() @@ -215,6 +219,23 @@ func (m *Manager) SetServers(servers Servers) bool { return !equal } +// Method to check if the arg list of servers is equal to the one we already have +func (m *Manager) serversAreEqual(servers Servers) bool { + // We use a copy of the server list here because determining + // equality requires a sort step which modifies the order of the server list + var copy Servers + copy = make([]*Server, 0, len(m.servers)) + for _, s := range m.servers { + copy = append(copy, s.Copy()) + } + + // Sort both the existing and incoming servers + copy.Sort() + servers.Sort() + + return copy.Equal(servers) +} + // FindServer returns a server to send an RPC too. If there are no servers, nil // is returned. func (m *Manager) FindServer() *Server { diff --git a/client/servers/manager_test.go b/client/servers/manager_test.go index e33ab4119ff9..8442d1e6f3ce 100644 --- a/client/servers/manager_test.go +++ b/client/servers/manager_test.go @@ -66,6 +66,14 @@ func TestServers_SetServers(t *testing.T) { require.True(m.SetServers([]*servers.Server{s1})) require.Equal(1, m.NumServers()) require.Len(m.GetServers(), 1) + + // Test that the list of servers does not get shuffled + // as a side effect when incoming list is equal + require.True(m.SetServers([]*servers.Server{s1, s2})) + before := m.GetServers() + require.False(m.SetServers([]*servers.Server{s1, s2})) + after := m.GetServers() + require.Equal(before, after) } func TestServers_FindServer(t *testing.T) { diff --git a/dev/cluster/client3.hcl b/dev/cluster/client3.hcl index 2702b79b0794..ceb09715a08a 100644 --- a/dev/cluster/client3.hcl +++ b/dev/cluster/client3.hcl @@ -7,6 +7,9 @@ data_dir = "/tmp/client3" # Give the agent a unique name. Defaults to hostname name = "client3" +# Enable debugging +enable_debug = true + # Enable the client client { enabled = true diff --git a/dev/cluster/server1.hcl b/dev/cluster/server1.hcl index d9a22e364368..62b2c792a334 100644 --- a/dev/cluster/server1.hcl +++ b/dev/cluster/server1.hcl @@ -16,5 +16,5 @@ server { } # Self-elect, should be 3 or 5 for production - bootstrap_expect = 1 + bootstrap_expect = 3 } From 5bfa35ab81e09820c191d20fd8cda44bc64068c7 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 8 May 2019 10:54:22 -0500 Subject: [PATCH 3/3] fix typo and add one more test scenario --- client/servers/manager.go | 2 +- client/servers/manager_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/client/servers/manager.go b/client/servers/manager.go index d65fd43e0212..1cc61e8f45db 100644 --- a/client/servers/manager.go +++ b/client/servers/manager.go @@ -204,7 +204,7 @@ func (m *Manager) SetServers(servers Servers) bool { // Determine if they are equal equal := m.serversAreEqual(servers) - // If server list is equal don't change the list and return immediatly + // If server list is equal don't change the list and return immediately // This prevents unnecessary shuffling of a failed server that was moved to the // bottom of the list if equal { diff --git a/client/servers/manager_test.go b/client/servers/manager_test.go index 8442d1e6f3ce..1c13889a7f21 100644 --- a/client/servers/manager_test.go +++ b/client/servers/manager_test.go @@ -74,6 +74,11 @@ func TestServers_SetServers(t *testing.T) { require.False(m.SetServers([]*servers.Server{s1, s2})) after := m.GetServers() require.Equal(before, after) + + // Send a shuffled list, verify original order doesn't change + require.False(m.SetServers([]*servers.Server{s2, s1})) + afterShuffledInput := m.GetServers() + require.Equal(after, afterShuffledInput) } func TestServers_FindServer(t *testing.T) {