From cc882cf56d1d1365b637825110eaa54ba679cfbf Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 24 May 2017 11:36:45 -0700 Subject: [PATCH] manager: Fix hanging Stop method If raftNode.JoinAndStart failed, Stop will block forever because it waits for the manager to start up. To fix this, close the "started" channel even if Run exits early due to an error. Fix the way the collector is initialized so its Stop method won't hang either. Add a test that makes sure the node shuts down cleanly after a failed manager initialization. Signed-off-by: Aaron Lehmann --- manager/manager.go | 10 ++++-- manager/state/raft/raft.go | 2 +- node/node_test.go | 74 +++++++++++++++++++++++--------------- 3 files changed, 55 insertions(+), 31 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 005a56c33d..01c296acfe 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -217,7 +217,6 @@ func New(config *Config) (*Manager, error) { m := &Manager{ config: *config, - collector: metrics.NewCollector(raftNode.MemoryStore()), caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), logbroker: logbroker.New(raftNode.MemoryStore()), @@ -502,12 +501,16 @@ func (m *Manager) Run(parent context.Context) error { healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING) if err := m.raftNode.JoinAndStart(ctx); err != nil { + // Don't block future calls to Stop. + close(m.started) return errors.Wrap(err, "can't initialize raft node") } localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING) // Start metrics collection. + + m.collector = metrics.NewCollector(m.raftNode.MemoryStore()) go func(collector *metrics.Collector) { if err := collector.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("collector failed with an error") @@ -590,7 +593,10 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) { m.raftNode.Cancel() - m.collector.Stop() + if m.collector != nil { + m.collector.Stop() + } + m.dispatcher.Stop() m.logbroker.Stop() m.caserver.Stop() diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index 82a550d0f6..b793374095 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -361,7 +361,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { if err != nil { n.stopMu.Lock() // to shutdown transport - close(n.stopped) + n.cancelFunc() n.stopMu.Unlock() n.done() } else { diff --git a/node/node_test.go b/node/node_test.go index 83653673c3..5bfa5b7252 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sync" "testing" "time" @@ -252,6 +251,7 @@ func TestLoadSecurityConfigDownloadAllCerts(t *testing.T) { func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") require.NoError(t, err) + defer os.RemoveAll(tmpDir) // don't bother with a listening socket cAddr := filepath.Join(tmpDir, "control.sock") @@ -264,13 +264,7 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { node, err := New(cfg) require.NoError(t, err) - var nodeErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - nodeErr = node.Start(context.Background()) - wg.Done() - }() + require.NoError(t, node.Start(context.Background())) select { case <-node.Ready(): @@ -297,13 +291,12 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { require.Equal(t, currentCACerts, caCerts) require.NoError(t, node.Stop(context.Background())) - wg.Wait() - require.NoError(t, nodeErr) } func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) { tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") require.NoError(t, err) + defer os.RemoveAll(tmpDir) // bootstrap worker TLS certificates paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory)) @@ -329,13 +322,7 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) { node, err := New(cfg) require.NoError(t, err) - var nodeErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - nodeErr = node.Start(context.Background()) - wg.Done() - }() + require.NoError(t, node.Start(context.Background())) select { case <-node.Ready(): @@ -371,13 +358,12 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) { }, time.Second)) require.NoError(t, node.Stop(context.Background())) - wg.Wait() - require.NoError(t, nodeErr) } func TestCertRenewals(t *testing.T) { tmpDir, err := ioutil.TempDir("", "no-top-level-role") require.NoError(t, err) + defer os.RemoveAll(tmpDir) paths := ca.NewConfigPaths(filepath.Join(tmpDir, "certificates")) @@ -391,13 +377,7 @@ func TestCertRenewals(t *testing.T) { node, err := New(cfg) require.NoError(t, err) - var nodeErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - nodeErr = node.Start(context.Background()) - wg.Done() - }() + require.NoError(t, node.Start(context.Background())) select { case <-node.Ready(): @@ -449,6 +429,44 @@ func TestCertRenewals(t *testing.T) { }, 5*time.Second)) require.NoError(t, node.Stop(context.Background())) - wg.Wait() - require.NoError(t, nodeErr) +} + +func TestManagerFailedStartup(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory)) + + rootCA, err := ca.CreateRootCA(ca.DefaultRootCN) + require.NoError(t, err) + require.NoError(t, ca.SaveRootCA(rootCA, paths.RootCA)) + + krw := ca.NewKeyReadWriter(paths.Node, nil, nil) + require.NoError(t, err) + _, _, err = rootCA.IssueAndSaveNewCertificates(krw, identity.NewID(), ca.ManagerRole, identity.NewID()) + require.NoError(t, err) + + // don't bother with a listening socket + cAddr := filepath.Join(tmpDir, "control.sock") + cfg := &Config{ + ListenControlAPI: cAddr, + StateDir: tmpDir, + Executor: &agentutils.TestExecutor{}, + JoinAddr: "127.0.0.1", + } + + node, err := New(cfg) + require.NoError(t, err) + + require.NoError(t, node.Start(context.Background())) + + select { + case <-node.Ready(): + require.FailNow(t, "node should not become ready") + case <-time.After(5 * time.Second): + require.FailNow(t, "node neither became ready nor encountered an error") + case <-node.closed: + require.EqualError(t, node.err, "manager stopped: can't initialize raft node: attempted to join raft cluster without knowing own address") + } }