From f970e3dc9d351be35dce241b347a4b2a09a05bd5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 16 Feb 2018 17:49:31 -0800 Subject: [PATCH 1/9] Registering back to initializing Fix a bug in which if the node attributes/meta changed, we would re-register the node in status initializing. This would incorrectly trigger the client to log that it missed its heartbeat. It would change the status of the Node to initializing until the next heartbeat occured. --- client/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/client.go b/client/client.go index e27d2d62caf6..a53e89bbf1bc 100644 --- a/client/client.go +++ b/client/client.go @@ -1131,6 +1131,7 @@ func (c *Client) registerNode() error { // Update the node status to ready after we register. c.configLock.Lock() node.Status = structs.NodeStatusReady + c.config.Node.Status = structs.NodeStatusReady c.configLock.Unlock() c.logger.Printf("[INFO] client: node registration complete") From 50ac3f16ffa951be62ff0710f207a62ba3091569 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 20 Feb 2018 10:22:15 -0800 Subject: [PATCH 2/9] Add escape hatches when non-leader --- nomad/deploymentwatcher/deployment_watcher.go | 1 + nomad/heartbeat.go | 28 +++++++++++++++ nomad/heartbeat_test.go | 34 ++++++++++++++----- nomad/leader.go | 3 ++ 4 files changed, 58 insertions(+), 8 deletions(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 003268916af1..d0479c462527 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -442,6 +442,7 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) { w.outstandingBatch = true + // TODO this isn't safe on deployment watcher shutdown since timer leaks. time.AfterFunc(perJobEvalBatchPeriod, func() { // Create the eval evalCreateIndex, err := w.createEvaluation(w.getEval()) diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index 89bc86010152..201775cf853f 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -1,6 +1,7 @@ package nomad import ( + "errors" "time" "github.com/armon/go-metrics" @@ -9,6 +10,18 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // heartbeatNotLeader is the error string returned when the heartbeat request + // couldn't be completed since the server is not the leader. + heartbeatNotLeader = "failed to reset heartbeat since server is not leader" +) + +var ( + // heartbeatNotLeaderErr is the error returned when the heartbeat request + // couldn't be completed since the server is not the leader. + heartbeatNotLeaderErr = errors.New(heartbeatNotLeader) +) + // initializeHeartbeatTimers is used when a leader is newly elected to create // a new map to track heartbeat expiration and to reset all the timers from // the previously known set of timers. @@ -50,6 +63,13 @@ func (s *Server) resetHeartbeatTimer(id string) (time.Duration, error) { s.heartbeatTimersLock.Lock() defer s.heartbeatTimersLock.Unlock() + // Do not create a timer for the node since we are not the leader. This + // check avoids the race in which leadership is lost but a timer is created + // on this server since it was servicing an RPC during a leadership loss. + if !s.IsLeader() { + return 0, heartbeatNotLeaderErr + } + // Compute the target TTL value n := len(s.heartbeatTimers) ttl := lib.RateScaledInterval(s.config.MaxHeartbeatsPerSecond, s.config.MinHeartbeatTTL, n) @@ -89,6 +109,14 @@ func (s *Server) invalidateHeartbeat(id string) { s.heartbeatTimersLock.Lock() delete(s.heartbeatTimers, id) s.heartbeatTimersLock.Unlock() + + // Do not invalidate the node since we are not the leader. This check avoids + // the race in which leadership is lost but a timer is created on this + // server since it was servicing an RPC during a leadership loss. + if !s.IsLeader() { + return + } + s.logger.Printf("[WARN] nomad.heartbeat: node '%s' TTL expired", id) // Make a request to update the node status diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go index 24d8283fdc38..3fa6f9e7cbdb 100644 --- a/nomad/heartbeat_test.go +++ b/nomad/heartbeat_test.go @@ -10,9 +10,10 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" ) -func TestInitializeHeartbeatTimers(t *testing.T) { +func TestHeartbeat_InitializeHeartbeatTimers(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -38,7 +39,7 @@ func TestInitializeHeartbeatTimers(t *testing.T) { } } -func TestResetHeartbeatTimer(t *testing.T) { +func TestHeartbeat_ResetHeartbeatTimer(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -60,7 +61,24 @@ func TestResetHeartbeatTimer(t *testing.T) { } } -func TestResetHeartbeatTimerLocked(t *testing.T) { +func TestHeartbeat_ResetHeartbeatTimer_Nonleader(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := testServer(t, func(c *Config) { + c.BootstrapExpect = 3 // Won't become leader + c.DevDisableBootstrap = true + }) + defer s1.Shutdown() + + require.False(s1.IsLeader()) + + // Create a new timer + _, err := s1.resetHeartbeatTimer("test") + require.NotNil(err) + require.EqualError(err, heartbeatNotLeader) +} + +func TestHeartbeat_ResetHeartbeatTimerLocked(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -81,7 +99,7 @@ func TestResetHeartbeatTimerLocked(t *testing.T) { } } -func TestResetHeartbeatTimerLocked_Renew(t *testing.T) { +func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -120,7 +138,7 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) { t.Fatalf("should have expired") } -func TestInvalidateHeartbeat(t *testing.T) { +func TestHeartbeat_InvalidateHeartbeat(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -148,7 +166,7 @@ func TestInvalidateHeartbeat(t *testing.T) { } } -func TestClearHeartbeatTimer(t *testing.T) { +func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -168,7 +186,7 @@ func TestClearHeartbeatTimer(t *testing.T) { } } -func TestClearAllHeartbeatTimers(t *testing.T) { +func TestHeartbeat_ClearAllHeartbeatTimers(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() @@ -190,7 +208,7 @@ func TestClearAllHeartbeatTimers(t *testing.T) { } } -func TestServer_HeartbeatTTL_Failover(t *testing.T) { +func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) { t.Parallel() s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/leader.go b/nomad/leader.go index 4d1a571cc01f..aec66f8b7452 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -639,6 +639,9 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { + // TODO put metrics here and on establish leadership that time how long the + // whole thing takes so we can detect lock contention or blocking. + // Clear the leader token since we are no longer the leader. s.setLeaderAcl("") From 9f64e360666a9c08d9733ccda2d76ee1064bed6a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 20 Feb 2018 10:23:11 -0800 Subject: [PATCH 3/9] timers --- nomad/leader.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index aec66f8b7452..a8313e6a1bfe 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -157,6 +157,8 @@ WAIT: // previously inflight transactions have been committed and that our // state is up-to-date. func (s *Server) establishLeadership(stopCh chan struct{}) error { + defer metrics.MeasureSince([]string{"nomad", "leader", "establish_leadership"}, time.Now()) + // Generate a leader ACL token. This will allow the leader to issue work // that requires a valid ACL token. s.setLeaderAcl(uuid.Generate()) @@ -639,8 +641,7 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { - // TODO put metrics here and on establish leadership that time how long the - // whole thing takes so we can detect lock contention or blocking. + defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now()) // Clear the leader token since we are no longer the leader. s.setLeaderAcl("") From a62f8aea5eec545527ea938830367e64d9e773ce Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 20 Feb 2018 12:47:43 -0800 Subject: [PATCH 4/9] Fix leaking time.After function --- nomad/deploymentwatcher/deployment_watcher.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index d0479c462527..3ea88ffee24c 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -442,8 +442,15 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) { w.outstandingBatch = true - // TODO this isn't safe on deployment watcher shutdown since timer leaks. time.AfterFunc(perJobEvalBatchPeriod, func() { + // If the timer has been created and then we shutdown, we need to no-op + // the evaluation creation. + select { + case <-w.ctx.Done(): + return + default: + } + // Create the eval evalCreateIndex, err := w.createEvaluation(w.getEval()) if err != nil { From f1877e3d99c197e4ecdc183ffdbbc0184e1a4fbd Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 20 Feb 2018 12:52:00 -0800 Subject: [PATCH 5/9] Need to revoke leadership to clean up in case there was a failure during leadership establishment --- nomad/leader.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nomad/leader.go b/nomad/leader.go index a8313e6a1bfe..748e292279a2 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -107,8 +107,16 @@ RECONCILE: if !establishedLeader { if err := s.establishLeadership(stopCh); err != nil { s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err) + + // Immediately revoke leadership since we didn't successfully + // establish leadership. + if err := s.revokeLeadership(); err != nil { + s.logger.Printf("[ERR] nomad: failed to revoke leadership: %v", err) + } + goto WAIT } + establishedLeader = true defer func() { if err := s.revokeLeadership(); err != nil { From cb6a697109979c36200b393c45f58cde62f1b81a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 20 Feb 2018 15:05:16 -0800 Subject: [PATCH 6/9] add a revoke leadership test --- nomad/leader_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 4689cbfcbd59..03252f3993f5 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestLeader_LeftServer(t *testing.T) { @@ -978,3 +979,19 @@ func TestLeader_RollRaftServer(t *testing.T) { }) } } + +func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should have finished establish leader loop") + }) + + require.Nil(t, s1.revokeLeadership()) + require.Nil(t, s1.revokeLeadership()) + require.Nil(t, s1.revokeLeadership()) +} From 08353e87230a136a6448ba5973e38d350abd48b4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 20 Feb 2018 16:05:43 -0800 Subject: [PATCH 7/9] Fix flaky test --- nomad/heartbeat_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go index 3fa6f9e7cbdb..3059db1cfc04 100644 --- a/nomad/heartbeat_test.go +++ b/nomad/heartbeat_test.go @@ -271,9 +271,11 @@ func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) { leader.Shutdown() // heartbeatTimers should be cleared on leader shutdown - if len(leader.heartbeatTimers) != 0 { + testutil.WaitForResult(func() (bool, error) { + return len(leader.heartbeatTimers) == 0, nil + }, func(err error) { t.Fatalf("heartbeat timers should be empty on the shutdown leader") - } + }) // Find the new leader testutil.WaitForResult(func() (bool, error) { From ebfd203a17dab675d3159f83ca0bf9b53b1f85e0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 21 Feb 2018 10:23:49 -0800 Subject: [PATCH 8/9] logging --- nomad/heartbeat.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index 201775cf853f..115f13cfeee3 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -67,6 +67,7 @@ func (s *Server) resetHeartbeatTimer(id string) (time.Duration, error) { // check avoids the race in which leadership is lost but a timer is created // on this server since it was servicing an RPC during a leadership loss. if !s.IsLeader() { + s.logger.Printf("[DEBUG] nomad.heartbeat: ignoring resetting node %q TTL since this node is not the leader", id) return 0, heartbeatNotLeaderErr } @@ -114,6 +115,7 @@ func (s *Server) invalidateHeartbeat(id string) { // the race in which leadership is lost but a timer is created on this // server since it was servicing an RPC during a leadership loss. if !s.IsLeader() { + s.logger.Printf("[DEBUG] nomad.heartbeat: ignoring node %q TTL since this node is not the leader", id) return } From a0137821a1cbd51fab755759ac6e150fbc3a83bc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 21 Feb 2018 10:24:32 -0800 Subject: [PATCH 9/9] Pull in new autopilot --- .../agent/consul/autopilot/autopilot.go | 20 +++++++++++++++++++ vendor/vendor.json | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/vendor/github.com/hashicorp/consul/agent/consul/autopilot/autopilot.go b/vendor/github.com/hashicorp/consul/agent/consul/autopilot/autopilot.go index d6efd11e7d47..b50252ce510c 100644 --- a/vendor/github.com/hashicorp/consul/agent/consul/autopilot/autopilot.go +++ b/vendor/github.com/hashicorp/consul/agent/consul/autopilot/autopilot.go @@ -38,8 +38,10 @@ type Autopilot struct { clusterHealth OperatorHealthReply clusterHealthLock sync.RWMutex + enabled bool removeDeadCh chan struct{} shutdownCh chan struct{} + shutdownLock sync.Mutex waitGroup sync.WaitGroup } @@ -62,6 +64,14 @@ func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterva } func (a *Autopilot) Start() { + a.shutdownLock.Lock() + defer a.shutdownLock.Unlock() + + // Nothing to do + if a.enabled { + return + } + a.shutdownCh = make(chan struct{}) a.waitGroup = sync.WaitGroup{} a.clusterHealth = OperatorHealthReply{} @@ -69,11 +79,21 @@ func (a *Autopilot) Start() { a.waitGroup.Add(2) go a.run() go a.serverHealthLoop() + a.enabled = true } func (a *Autopilot) Stop() { + a.shutdownLock.Lock() + defer a.shutdownLock.Unlock() + + // Nothing to do + if !a.enabled { + return + } + close(a.shutdownCh) a.waitGroup.Wait() + a.enabled = false } // run periodically looks for nonvoting servers to promote and dead servers to remove. diff --git a/vendor/vendor.json b/vendor/vendor.json index 3a5bcc1a9266..e6662b21dd55 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -123,7 +123,7 @@ {"path":"github.com/hashicorp/consul-template/template","checksumSHA1":"N9qobVzScLbTEnGE7MgFnnTbGBw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"}, {"path":"github.com/hashicorp/consul-template/version","checksumSHA1":"NB5+D4AuCNV9Bsqh3YFdPi4AJ6U=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"}, {"path":"github.com/hashicorp/consul-template/watch","checksumSHA1":"b4+Y+02pY2Y5620F9ALzKg8Zmdw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"}, - {"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"/nyemJLkxBXKqI9xpLFyTyvOaYY=","revision":"bfeb09983befa337a3b2ebbafb7567913773e40b","revisionTime":"2018-01-23T20:52:17Z"}, + {"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"d1ede2c93dec7b4580e37ef41d24371abab9d9e9","revisionTime":"2018-02-21T18:19:48Z"}, {"path":"github.com/hashicorp/consul/api","checksumSHA1":"XLfcIX2qpRr0o26aFMjCOzvw6jo=","revision":"51ea240df8476e02215d53fbfad5838bf0d44d21","revisionTime":"2017-10-16T16:22:40Z"}, {"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"XTQIYV+DPUVRKpVp0+y/78bWH3I=","revision":"d08ab9fd199434e5220276356ecf9617cfec1eb2","revisionTime":"2017-12-18T20:26:35Z"}, {"path":"github.com/hashicorp/consul/lib","checksumSHA1":"HGljdtVaqi/e3DgIHymLRLfPYhw=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},