diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index ad605690c8be..9b85fe5f2b5e 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -68,6 +68,11 @@ const ( // ServiceTagSerf is the tag assigned to Serf services ServiceTagSerf = "serf" + + // deregisterProbationPeriod is the initialization period where + // services registered in Consul but not in Nomad don't get deregistered, + // to allow for nomad restoring tasks + deregisterProbationPeriod = time.Minute ) // CatalogAPI is the consul/api.Catalog API used by Nomad. @@ -230,6 +235,9 @@ type ServiceClient struct { scripts map[string]*scriptCheck runningScripts map[string]*scriptHandle + explicitlyDeregisteredServices map[string]bool + explicitlyDeregisteredChecks map[string]bool + // allocRegistrations stores the services and checks that are registered // with Consul by allocation ID. allocRegistrations map[string]*AllocRegistration @@ -245,6 +253,11 @@ type ServiceClient struct { // atomics. seen int32 + // deregisterProbationExpiry is the time before which consul sync shouldn't deregister + // unknown services. + // Used to mitigate risk of deleting restored services upon client restart. + deregisterProbationExpiry time.Time + // checkWatcher restarts checks that are unhealthy. checkWatcher *checkWatcher @@ -260,24 +273,27 @@ type ServiceClient struct { func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient { logger = logger.ResetNamed("consul.sync") return &ServiceClient{ - client: consulClient, - logger: logger, - retryInterval: defaultRetryInterval, - maxRetryInterval: defaultMaxRetryInterval, - periodicInterval: defaultPeriodicInterval, - exitCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - shutdownWait: defaultShutdownWait, - opCh: make(chan *operations, 8), - services: make(map[string]*api.AgentServiceRegistration), - checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), - allocRegistrations: make(map[string]*AllocRegistration), - agentServices: make(map[string]struct{}), - agentChecks: make(map[string]struct{}), - checkWatcher: newCheckWatcher(logger, consulClient), - isClientAgent: isNomadClient, + client: consulClient, + logger: logger, + retryInterval: defaultRetryInterval, + maxRetryInterval: defaultMaxRetryInterval, + periodicInterval: defaultPeriodicInterval, + exitCh: make(chan struct{}), + shutdownCh: make(chan struct{}), + shutdownWait: defaultShutdownWait, + opCh: make(chan *operations, 8), + services: make(map[string]*api.AgentServiceRegistration), + checks: make(map[string]*api.AgentCheckRegistration), + scripts: make(map[string]*scriptCheck), + runningScripts: make(map[string]*scriptHandle), + explicitlyDeregisteredServices: make(map[string]bool), + explicitlyDeregisteredChecks: make(map[string]bool), + allocRegistrations: make(map[string]*AllocRegistration), + agentServices: make(map[string]struct{}), + agentChecks: make(map[string]struct{}), + checkWatcher: newCheckWatcher(logger, consulClient), + isClientAgent: isNomadClient, + deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod), } } @@ -372,6 +388,9 @@ INIT: failures = 0 } + // on successful sync, clear deregistered consul entities + c.clearExplicitlyDeregistered() + // Reset timer to periodic interval to periodically // reconile with Consul if !retryTimer.Stop() { @@ -407,6 +426,11 @@ func (c *ServiceClient) commit(ops *operations) { } } +func (c *ServiceClient) clearExplicitlyDeregistered() { + c.explicitlyDeregisteredServices = map[string]bool{} + c.explicitlyDeregisteredChecks = map[string]bool{} +} + // merge registrations into state map prior to sync'ing with Consul func (c *ServiceClient) merge(ops *operations) { for _, s := range ops.regServices { @@ -420,6 +444,7 @@ func (c *ServiceClient) merge(ops *operations) { } for _, sid := range ops.deregServices { delete(c.services, sid) + c.explicitlyDeregisteredServices[sid] = true } for _, cid := range ops.deregChecks { if script, ok := c.runningScripts[cid]; ok { @@ -428,6 +453,7 @@ func (c *ServiceClient) merge(ops *operations) { delete(c.runningScripts, cid) } delete(c.checks, cid) + c.explicitlyDeregisteredChecks[cid] = true } metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services))) metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks))) @@ -450,6 +476,8 @@ func (c *ServiceClient) sync() error { return fmt.Errorf("error querying Consul checks: %v", err) } + inProbation := time.Now().Before(c.deregisterProbationExpiry) + // Remove Nomad services in Consul but unknown locally for id := range consulServices { if _, ok := c.services[id]; ok { @@ -466,6 +494,11 @@ func (c *ServiceClient) sync() error { continue } + // Ignore unknown services during probation + if inProbation && !c.explicitlyDeregisteredServices[id] { + continue + } + // Unknown Nomad managed service; kill if err := c.client.ServiceDeregister(id); err != nil { if isOldNomadService(id) { @@ -518,6 +551,11 @@ func (c *ServiceClient) sync() error { continue } + // Ignore unknown services during probation + if inProbation && !c.explicitlyDeregisteredChecks[id] { + continue + } + // Unknown Nomad managed check; remove if err := c.client.CheckDeregister(id); err != nil { if isOldNomadService(check.ServiceID) { diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index cb9606e80b91..9bc1cc690cd5 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -107,7 +107,11 @@ func (t *testFakeCtx) syncOnce() error { select { case ops := <-t.ServiceClient.opCh: t.ServiceClient.merge(ops) - return t.ServiceClient.sync() + err := t.ServiceClient.sync() + if err == nil { + t.ServiceClient.clearExplicitlyDeregistered() + } + return err default: return errNoOps } @@ -118,8 +122,13 @@ func (t *testFakeCtx) syncOnce() error { func setupFake(t *testing.T) *testFakeCtx { fc := NewMockAgent() tt := testTask() + + // by default start fake client being out of probation + sc := NewServiceClient(fc, testlog.HCLogger(t), true) + sc.deregisterProbationExpiry = time.Now().Add(-1 * time.Minute) + return &testFakeCtx{ - ServiceClient: NewServiceClient(fc, testlog.HCLogger(t), true), + ServiceClient: sc, FakeConsul: fc, Task: tt, MockExec: tt.DriverExec.(*mockExec), @@ -1676,3 +1685,235 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) { } } } + +// TestConsul_ServiceDeregistration_OutOfProbation asserts that during in steady +// state we remove any services we don't reconize locally +func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { + t.Parallel() + ctx := setupFake(t) + require := require.New(t) + + ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) + + remainingTask := testTask() + remainingTask.Services = []*structs.Service{ + { + Name: "remaining-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID, + remainingTask.Name, remainingTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(remainingTask)) + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + explicitlyRemovedTask := testTask() + explicitlyRemovedTask.Services = []*structs.Service{ + { + Name: "explicitly-removed-service", + PortLabel: "y", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID, + explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask)) + + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 2) + require.Len(ctx.FakeConsul.checks, 2) + + // we register a task through nomad API then remove it out of band + outofbandTask := testTask() + outofbandTask.Services = []*structs.Service{ + { + Name: "unknown-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID, + outofbandTask.Name, outofbandTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask)) + require.NoError(ctx.syncOnce()) + + require.Len(ctx.FakeConsul.services, 3) + + // remove outofbandTask from local services so it appears unknown to client + require.Len(ctx.ServiceClient.services, 3) + require.Len(ctx.ServiceClient.checks, 3) + + delete(ctx.ServiceClient.services, outofbandTaskServiceID) + delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + + require.Len(ctx.ServiceClient.services, 2) + require.Len(ctx.ServiceClient.checks, 2) + + // Sync and ensure that explicitly removed service as well as outofbandTask were removed + + ctx.ServiceClient.RemoveTask(explicitlyRemovedTask) + require.NoError(ctx.syncOnce()) + require.NoError(ctx.ServiceClient.sync()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + require.Contains(ctx.FakeConsul.services, remainingTaskServiceID) + require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) + require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) + + require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) +} + +// TestConsul_ServiceDeregistration_InProbation asserts that during initialization +// we only deregister services that were explicitly removed and leave unknown +// services untouched. This adds a grace period for restoring recovered tasks +// before deregistering them +func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { + t.Parallel() + ctx := setupFake(t) + require := require.New(t) + + ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour) + + remainingTask := testTask() + remainingTask.Services = []*structs.Service{ + { + Name: "remaining-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID, + remainingTask.Name, remainingTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(remainingTask)) + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + explicitlyRemovedTask := testTask() + explicitlyRemovedTask.Services = []*structs.Service{ + { + Name: "explicitly-removed-service", + PortLabel: "y", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID, + explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask)) + + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 2) + require.Len(ctx.FakeConsul.checks, 2) + + // we register a task through nomad API then remove it out of band + outofbandTask := testTask() + outofbandTask.Services = []*structs.Service{ + { + Name: "unknown-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID, + outofbandTask.Name, outofbandTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask)) + require.NoError(ctx.syncOnce()) + + require.Len(ctx.FakeConsul.services, 3) + + // remove outofbandTask from local services so it appears unknown to client + require.Len(ctx.ServiceClient.services, 3) + require.Len(ctx.ServiceClient.checks, 3) + + delete(ctx.ServiceClient.services, outofbandTaskServiceID) + delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + + require.Len(ctx.ServiceClient.services, 2) + require.Len(ctx.ServiceClient.checks, 2) + + // Sync and ensure that explicitly removed service was removed, but outofbandTask remains + + ctx.ServiceClient.RemoveTask(explicitlyRemovedTask) + require.NoError(ctx.syncOnce()) + require.NoError(ctx.ServiceClient.sync()) + require.Len(ctx.FakeConsul.services, 2) + require.Len(ctx.FakeConsul.checks, 2) + + require.Contains(ctx.FakeConsul.services, remainingTaskServiceID) + require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID) + require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) + + require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + + // after probation, outofband services and checks are removed + ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) + + require.NoError(ctx.ServiceClient.sync()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + require.Contains(ctx.FakeConsul.services, remainingTaskServiceID) + require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) + require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) + + require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + +}