diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index 29abb5d90a23..7cd2b833f816 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -33,7 +33,12 @@ type MockAgent struct { // maps of what services and checks have been registered services map[string]*api.AgentServiceRegistration checks map[string]*api.AgentCheckRegistration - mu sync.Mutex + + // hits is the total number of times agent methods have been called + hits int + + // mu guards above fields + mu sync.Mutex // when UpdateTTL is called the check ID will have its counter inc'd checkTTLs map[string]int @@ -52,6 +57,13 @@ func NewMockAgent() *MockAgent { } } +// getHits returns how many Consul Agent API calls have been made. +func (c *MockAgent) getHits() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.hits +} + // SetStatus that Checks() should return. Returns old status value. func (c *MockAgent) SetStatus(s string) string { c.mu.Lock() @@ -62,6 +74,10 @@ func (c *MockAgent) SetStatus(s string) string { } func (c *MockAgent) Self() (map[string]map[string]interface{}, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.hits++ + s := map[string]map[string]interface{}{ "Member": { "Addr": "127.0.0.1", @@ -85,6 +101,7 @@ func (c *MockAgent) Self() (map[string]map[string]interface{}, error) { func (c *MockAgent) Services() (map[string]*api.AgentService, error) { c.mu.Lock() defer c.mu.Unlock() + c.hits++ r := make(map[string]*api.AgentService, len(c.services)) for k, v := range c.services { @@ -105,6 +122,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) { func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) { c.mu.Lock() defer c.mu.Unlock() + c.hits++ r := make(map[string]*api.AgentCheck, len(c.checks)) for k, v := range c.checks { @@ -125,7 +143,6 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) { func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { c.mu.Lock() defer c.mu.Unlock() - regs := make([]*api.AgentCheckRegistration, 0, len(c.checks)) for _, check := range c.checks { regs = append(regs, check) @@ -136,6 +153,8 @@ func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration { func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ + c.checks[check.ID] = check // Be nice and make checks reachable-by-service @@ -147,6 +166,7 @@ func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error { func (c *MockAgent) CheckDeregister(checkID string) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ delete(c.checks, checkID) delete(c.checkTTLs, checkID) return nil @@ -155,6 +175,7 @@ func (c *MockAgent) CheckDeregister(checkID string) error { func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ c.services[service.ID] = service return nil } @@ -162,6 +183,7 @@ func (c *MockAgent) ServiceRegister(service *api.AgentServiceRegistration) error func (c *MockAgent) ServiceDeregister(serviceID string) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ delete(c.services, serviceID) return nil } @@ -169,6 +191,8 @@ func (c *MockAgent) ServiceDeregister(serviceID string) error { func (c *MockAgent) UpdateTTL(id string, output string, status string) error { c.mu.Lock() defer c.mu.Unlock() + c.hits++ + check, ok := c.checks[id] if !ok { return fmt.Errorf("unknown check id: %q", id) diff --git a/command/agent/consul/check_watcher_test.go b/command/agent/consul/check_watcher_test.go index ccc06b5e6fa2..7e0baf9e75c4 100644 --- a/command/agent/consul/check_watcher_test.go +++ b/command/agent/consul/check_watcher_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -113,9 +114,9 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) { // testWatcherSetup sets up a fakeChecksAPI and a real checkWatcher with a test // logger and faster poll frequency. -func testWatcherSetup() (*fakeChecksAPI, *checkWatcher) { +func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) { fakeAPI := newFakeChecksAPI() - cw := newCheckWatcher(testLogger(), fakeAPI) + cw := newCheckWatcher(testlog.Logger(t), fakeAPI) cw.pollFreq = 10 * time.Millisecond return fakeAPI, cw } @@ -141,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) { check := testCheck() check.CheckRestart = nil - cw := newCheckWatcher(testLogger(), newFakeChecksAPI()) + cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI()) restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check) cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1) @@ -155,7 +156,7 @@ func TestCheckWatcher_Skip(t *testing.T) { func TestCheckWatcher_Healthy(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check1) @@ -190,7 +191,7 @@ func TestCheckWatcher_Healthy(t *testing.T) { func TestCheckWatcher_HealthyWarning(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Limit = 1 @@ -218,7 +219,7 @@ func TestCheckWatcher_HealthyWarning(t *testing.T) { func TestCheckWatcher_Flapping(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Grace = 0 @@ -247,7 +248,7 @@ func TestCheckWatcher_Flapping(t *testing.T) { func TestCheckWatcher_Unwatch(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) // Unwatch immediately check1 := testCheck() @@ -276,7 +277,7 @@ func TestCheckWatcher_Unwatch(t *testing.T) { func TestCheckWatcher_MultipleChecks(t *testing.T) { t.Parallel() - fakeAPI, cw := testWatcherSetup() + fakeAPI, cw := testWatcherSetup(t) check1 := testCheck() check1.CheckRestart.Limit = 1 diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 13f44c5a5eda..cc0a26d411ea 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -36,6 +36,13 @@ const ( // defaultMaxRetryInterval is the default max retry interval. defaultMaxRetryInterval = 30 * time.Second + // defaultPeriodicalInterval is the interval at which the service + // client reconciles state between the desired services and checks and + // what's actually registered in Consul. This is done at an interval, + // rather than being purely edge triggered, to handle the case that the + // Consul agent's state may change underneath us + defaultPeriodicInterval = 30 * time.Second + // ttlCheckBuffer is the time interval that Nomad can take to report Consul // the check result ttlCheckBuffer = 31 * time.Second @@ -190,6 +197,7 @@ type ServiceClient struct { logger *log.Logger retryInterval time.Duration maxRetryInterval time.Duration + periodicInterval time.Duration // exitCh is closed when the main Run loop exits exitCh chan struct{} @@ -235,6 +243,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient logger: logger, retryInterval: defaultRetryInterval, maxRetryInterval: defaultMaxRetryInterval, + periodicInterval: defaultPeriodicInterval, exitCh: make(chan struct{}), shutdownCh: make(chan struct{}), shutdownWait: defaultShutdownWait, @@ -279,7 +288,6 @@ func (c *ServiceClient) Run() { // Process operations while waiting for initial contact with Consul but // do not sync until contact has been made. - hasOps := false INIT: for { select { @@ -289,7 +297,6 @@ INIT: case <-c.shutdownCh: return case ops := <-c.opCh: - hasOps = true c.merge(ops) } } @@ -299,11 +306,8 @@ INIT: // Start checkWatcher go c.checkWatcher.Run(ctx) + // Always immediately sync to reconcile Nomad and Consul's state retryTimer := time.NewTimer(0) - if !hasOps { - // No pending operations so don't immediately sync - <-retryTimer.C - } failures := 0 for { @@ -345,6 +349,16 @@ INIT: c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul") failures = 0 } + + // Reset timer to periodic interval to periodically + // reconile with Consul + if !retryTimer.Stop() { + select { + case <-retryTimer.C: + default: + } + } + retryTimer.Reset(c.periodicInterval) } select { diff --git a/command/agent/consul/script_test.go b/command/agent/consul/script_test.go index 46f90a8ca4a7..3d4331ef09bf 100644 --- a/command/agent/consul/script_test.go +++ b/command/agent/consul/script_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testtask" "github.com/hashicorp/nomad/nomad/structs" ) @@ -59,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) { exec := newBlockingScriptExec() // pass nil for heartbeater as it shouldn't be called - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testLogger(), nil) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil) handle := check.run() // wait until Exec is called @@ -111,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) { exec := newBlockingScriptExec() hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), nil) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil) handle := check.run() defer handle.cancel() // just-in-case cleanup <-exec.running @@ -160,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) { Timeout: time.Nanosecond, } hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testLogger(), nil) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil) handle := check.run() defer handle.cancel() // just-in-case cleanup @@ -205,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) { hb := newFakeHeartbeater() shutdown := make(chan struct{}) exec := newSimpleExec(0, nil) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown) handle := check.run() defer handle.cancel() // just-in-case cleanup @@ -242,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) { hb := newFakeHeartbeater() shutdown := make(chan struct{}) exec := newSimpleExec(code, err) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testLogger(), shutdown) + check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown) handle := check.run() defer handle.cancel() diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 91ec3a2b3d05..9f72ee2ea218 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -3,9 +3,6 @@ package consul import ( "context" "fmt" - "io/ioutil" - "log" - "os" "reflect" "strings" "sync/atomic" @@ -14,6 +11,7 @@ import ( "github.com/hashicorp/consul/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" @@ -27,13 +25,6 @@ const ( yPort = 1235 ) -func testLogger() *log.Logger { - if testing.Verbose() { - return log.New(os.Stderr, "", log.LstdFlags) - } - return log.New(ioutil.Discard, "", 0) -} - func testTask() *structs.Task { return &structs.Task{ Name: "taskname", @@ -112,10 +103,10 @@ func (t *testFakeCtx) syncOnce() error { // setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul. // A test Task is also provided. -func setupFake() *testFakeCtx { +func setupFake(t *testing.T) *testFakeCtx { fc := NewMockAgent() return &testFakeCtx{ - ServiceClient: NewServiceClient(fc, testLogger()), + ServiceClient: NewServiceClient(fc, testlog.Logger(t)), FakeConsul: fc, Task: testTask(), Restarter: &restartRecorder{}, @@ -124,7 +115,7 @@ func setupFake() *testFakeCtx { } func TestConsul_ChangeTags(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) allocID := "allocid" if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx.Restarter, nil, nil); err != nil { @@ -225,7 +216,7 @@ func TestConsul_ChangeTags(t *testing.T) { // it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a // slightly different code path than changing tags. func TestConsul_ChangePorts(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -406,7 +397,7 @@ func TestConsul_ChangePorts(t *testing.T) { // TestConsul_ChangeChecks asserts that updating only the checks on a service // properly syncs with Consul. func TestConsul_ChangeChecks(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -641,7 +632,7 @@ func TestConsul_ChangeChecks(t *testing.T) { // TestConsul_RegServices tests basic service registration. func TestConsul_RegServices(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) // Add a check w/restarting ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -778,7 +769,7 @@ func TestConsul_RegServices(t *testing.T) { // ServiceClient. func TestConsul_ShutdownOK(t *testing.T) { require := require.New(t) - ctx := setupFake() + ctx := setupFake(t) // Add a script check to make sure its TTL gets updated ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -840,8 +831,8 @@ func TestConsul_ShutdownOK(t *testing.T) { // TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in // ServiceClient. func TestConsul_ShutdownSlow(t *testing.T) { - t.Parallel() // run the slow tests in parallel - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) // Add a script check to make sure its TTL gets updated ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -912,8 +903,8 @@ func TestConsul_ShutdownSlow(t *testing.T) { // TestConsul_ShutdownBlocked tests the blocked past deadline path for the // shutdown logic in ServiceClient. func TestConsul_ShutdownBlocked(t *testing.T) { - t.Parallel() // run the slow tests in parallel - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) // Add a script check to make sure its TTL gets updated ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ @@ -981,7 +972,7 @@ func TestConsul_ShutdownBlocked(t *testing.T) { // TestConsul_RemoveScript assert removing a script check removes all objects // related to that check. func TestConsul_CancelScript(t *testing.T) { - ctx := setupFake() + ctx := setupFake(t) ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "scriptcheckDel", @@ -1069,7 +1060,8 @@ func TestConsul_CancelScript(t *testing.T) { // auto-use set then services should advertise it unless explicitly set to // host. Checks should always use host. func TestConsul_DriverNetwork_AutoUse(t *testing.T) { - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) ctx.Task.Services = []*structs.Service{ { @@ -1195,7 +1187,8 @@ func TestConsul_DriverNetwork_AutoUse(t *testing.T) { // set auto-use only services which request the driver's network should // advertise it. func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) ctx.Task.Services = []*structs.Service{ { @@ -1268,7 +1261,8 @@ func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) { // TestConsul_DriverNetwork_Change asserts that if a driver network is // specified and a service updates its use its properly updated in Consul. func TestConsul_DriverNetwork_Change(t *testing.T) { - ctx := setupFake() + t.Parallel() + ctx := setupFake(t) ctx.Task.Services = []*structs.Service{ { @@ -1337,9 +1331,38 @@ func TestConsul_DriverNetwork_Change(t *testing.T) { syncAndAssertPort(net.PortMap["x"]) } +// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with +// Consul. +func TestConsul_PeriodicSync(t *testing.T) { + t.Parallel() + + ctx := setupFake(t) + defer ctx.ServiceClient.Shutdown() + + // Lower periodic sync interval to speed up test + ctx.ServiceClient.periodicInterval = 2 * time.Millisecond + + // Run for 10ms and assert hits >= 5 because each sync() calls multiple + // Consul APIs + go ctx.ServiceClient.Run() + + select { + case <-ctx.ServiceClient.exitCh: + t.Fatalf("exited unexpectedly") + case <-time.After(10 * time.Millisecond): + } + + minHits := 5 + if hits := ctx.FakeConsul.getHits(); hits < minHits { + t.Fatalf("expected at least %d hits but found %d", minHits, hits) + } +} + // TestIsNomadService asserts the isNomadService helper returns true for Nomad // task IDs and false for unknown IDs and Nomad agent IDs (see #2827). func TestIsNomadService(t *testing.T) { + t.Parallel() + tests := []struct { id string result bool