Skip to content

Commit

Permalink
Merge pull request #2203 from aaronlehmann/manager-stop-hanging
Browse files Browse the repository at this point in the history
manager: Fix hanging Stop method
  • Loading branch information
aaronlehmann committed May 25, 2017
2 parents b325d40 + cc882cf commit ffd009f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 31 deletions.
10 changes: 8 additions & 2 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func New(config *Config) (*Manager, error) {

m := &Manager{
config: *config,
collector: metrics.NewCollector(raftNode.MemoryStore()),
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()),
logbroker: logbroker.New(raftNode.MemoryStore()),
Expand Down Expand Up @@ -502,12 +501,16 @@ func (m *Manager) Run(parent context.Context) error {
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)

if err := m.raftNode.JoinAndStart(ctx); err != nil {
// Don't block future calls to Stop.
close(m.started)
return errors.Wrap(err, "can't initialize raft node")
}

localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)

// Start metrics collection.

m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
go func(collector *metrics.Collector) {
if err := collector.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("collector failed with an error")
Expand Down Expand Up @@ -590,7 +593,10 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {

m.raftNode.Cancel()

m.collector.Stop()
if m.collector != nil {
m.collector.Stop()
}

m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop()
Expand Down
2 changes: 1 addition & 1 deletion manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
if err != nil {
n.stopMu.Lock()
// to shutdown transport
close(n.stopped)
n.cancelFunc()
n.stopMu.Unlock()
n.done()
} else {
Expand Down
74 changes: 46 additions & 28 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -252,6 +251,7 @@ func TestLoadSecurityConfigDownloadAllCerts(t *testing.T) {
func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "manager-root-ca-update")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

// don't bother with a listening socket
cAddr := filepath.Join(tmpDir, "control.sock")
Expand All @@ -264,13 +264,7 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) {
node, err := New(cfg)
require.NoError(t, err)

var nodeErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
nodeErr = node.Start(context.Background())
wg.Done()
}()
require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
Expand All @@ -297,13 +291,12 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) {
require.Equal(t, currentCACerts, caCerts)

require.NoError(t, node.Stop(context.Background()))
wg.Wait()
require.NoError(t, nodeErr)
}

func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "manager-root-ca-update")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

// bootstrap worker TLS certificates
paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory))
Expand All @@ -329,13 +322,7 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) {
node, err := New(cfg)
require.NoError(t, err)

var nodeErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
nodeErr = node.Start(context.Background())
wg.Done()
}()
require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
Expand Down Expand Up @@ -371,13 +358,12 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) {
}, time.Second))

require.NoError(t, node.Stop(context.Background()))
wg.Wait()
require.NoError(t, nodeErr)
}

func TestCertRenewals(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "no-top-level-role")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

paths := ca.NewConfigPaths(filepath.Join(tmpDir, "certificates"))

Expand All @@ -391,13 +377,7 @@ func TestCertRenewals(t *testing.T) {
node, err := New(cfg)
require.NoError(t, err)

var nodeErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
nodeErr = node.Start(context.Background())
wg.Done()
}()
require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
Expand Down Expand Up @@ -449,6 +429,44 @@ func TestCertRenewals(t *testing.T) {
}, 5*time.Second))

require.NoError(t, node.Stop(context.Background()))
wg.Wait()
require.NoError(t, nodeErr)
}

func TestManagerFailedStartup(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "manager-root-ca-update")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory))

rootCA, err := ca.CreateRootCA(ca.DefaultRootCN)
require.NoError(t, err)
require.NoError(t, ca.SaveRootCA(rootCA, paths.RootCA))

krw := ca.NewKeyReadWriter(paths.Node, nil, nil)
require.NoError(t, err)
_, _, err = rootCA.IssueAndSaveNewCertificates(krw, identity.NewID(), ca.ManagerRole, identity.NewID())
require.NoError(t, err)

// don't bother with a listening socket
cAddr := filepath.Join(tmpDir, "control.sock")
cfg := &Config{
ListenControlAPI: cAddr,
StateDir: tmpDir,
Executor: &agentutils.TestExecutor{},
JoinAddr: "127.0.0.1",
}

node, err := New(cfg)
require.NoError(t, err)

require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
require.FailNow(t, "node should not become ready")
case <-time.After(5 * time.Second):
require.FailNow(t, "node neither became ready nor encountered an error")
case <-node.closed:
require.EqualError(t, node.err, "manager stopped: can't initialize raft node: attempted to join raft cluster without knowing own address")
}
}

0 comments on commit ffd009f

Please sign in to comment.