diff --git a/manager/manager.go b/manager/manager.go index 900e9e843d..9bd33cdded 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -217,7 +217,6 @@ func New(config *Config) (*Manager, error) { m := &Manager{ config: *config, - collector: metrics.NewCollector(raftNode.MemoryStore()), caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), logbroker: logbroker.New(raftNode.MemoryStore()), @@ -502,12 +501,16 @@ func (m *Manager) Run(parent context.Context) error { healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING) if err := m.raftNode.JoinAndStart(ctx); err != nil { + // Don't block future calls to Stop. + close(m.started) return errors.Wrap(err, "can't initialize raft node") } localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING) // Start metrics collection. + + m.collector = metrics.NewCollector(m.raftNode.MemoryStore()) go func(collector *metrics.Collector) { if err := collector.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("collector failed with an error") @@ -590,7 +593,10 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) { m.raftNode.Cancel() - m.collector.Stop() + if m.collector != nil { + m.collector.Stop() + } + m.dispatcher.Stop() m.logbroker.Stop() m.caserver.Stop() diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index 82a550d0f6..b793374095 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -361,7 +361,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { if err != nil { n.stopMu.Lock() // to shutdown transport - close(n.stopped) + n.cancelFunc() n.stopMu.Unlock() n.done() } else { diff --git a/node/node_test.go b/node/node_test.go index 83653673c3..5bfa5b7252 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sync" "testing" "time" @@ -252,6 +251,7 @@ func TestLoadSecurityConfigDownloadAllCerts(t *testing.T) { func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") require.NoError(t, err) + defer os.RemoveAll(tmpDir) // don't bother with a listening socket cAddr := filepath.Join(tmpDir, "control.sock") @@ -264,13 +264,7 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { node, err := New(cfg) require.NoError(t, err) - var nodeErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - nodeErr = node.Start(context.Background()) - wg.Done() - }() + require.NoError(t, node.Start(context.Background())) select { case <-node.Ready(): @@ -297,13 +291,12 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) { require.Equal(t, currentCACerts, caCerts) require.NoError(t, node.Stop(context.Background())) - wg.Wait() - require.NoError(t, nodeErr) } func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) { tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") require.NoError(t, err) + defer os.RemoveAll(tmpDir) // bootstrap worker TLS certificates paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory)) @@ -329,13 +322,7 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) { node, err := New(cfg) require.NoError(t, err) - var nodeErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - nodeErr = node.Start(context.Background()) - wg.Done() - }() + require.NoError(t, node.Start(context.Background())) select { case <-node.Ready(): @@ -371,13 +358,12 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) { }, time.Second)) require.NoError(t, node.Stop(context.Background())) - wg.Wait() - require.NoError(t, nodeErr) } func TestCertRenewals(t *testing.T) { tmpDir, err := ioutil.TempDir("", "no-top-level-role") require.NoError(t, err) + defer os.RemoveAll(tmpDir) paths := ca.NewConfigPaths(filepath.Join(tmpDir, "certificates")) @@ -391,13 +377,7 @@ func TestCertRenewals(t *testing.T) { node, err := New(cfg) require.NoError(t, err) - var nodeErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - nodeErr = node.Start(context.Background()) - wg.Done() - }() + require.NoError(t, node.Start(context.Background())) select { case <-node.Ready(): @@ -449,6 +429,44 @@ func TestCertRenewals(t *testing.T) { }, 5*time.Second)) require.NoError(t, node.Stop(context.Background())) - wg.Wait() - require.NoError(t, nodeErr) +} + +func TestManagerFailedStartup(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "manager-root-ca-update") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory)) + + rootCA, err := ca.CreateRootCA(ca.DefaultRootCN) + require.NoError(t, err) + require.NoError(t, ca.SaveRootCA(rootCA, paths.RootCA)) + + krw := ca.NewKeyReadWriter(paths.Node, nil, nil) + require.NoError(t, err) + _, _, err = rootCA.IssueAndSaveNewCertificates(krw, identity.NewID(), ca.ManagerRole, identity.NewID()) + require.NoError(t, err) + + // don't bother with a listening socket + cAddr := filepath.Join(tmpDir, "control.sock") + cfg := &Config{ + ListenControlAPI: cAddr, + StateDir: tmpDir, + Executor: &agentutils.TestExecutor{}, + JoinAddr: "127.0.0.1", + } + + node, err := New(cfg) + require.NoError(t, err) + + require.NoError(t, node.Start(context.Background())) + + select { + case <-node.Ready(): + require.FailNow(t, "node should not become ready") + case <-time.After(5 * time.Second): + require.FailNow(t, "node neither became ready nor encountered an error") + case <-node.closed: + require.EqualError(t, node.err, "manager stopped: can't initialize raft node: attempted to join raft cluster without knowing own address") + } }