From 90d488bcb5dd2bc8f78e01d163071eb49fc23ff3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 14 Jun 2019 10:57:46 -0400 Subject: [PATCH 1/2] Avoid de-registering slowly restored services When a nomad client restarts/upgraded, nomad restores state from running task and starts the sync loop. If sync loop runs early, it may deregister services from Consul prematurely even when Consul has the running service as healthy. This is not ideal, as re-registering the service means potentially waiting a whole service health check interval before declaring the service healthy. We attempt to mitigate this by introducing an initialization probation period. During this time, we only deregister services and checks that were explicitly deregistered, and leave unrecognized ones alone. This serves as a grace period for restoring to complete, or for operators to restore should they recognize they restored with the wrong nomad data directory. --- command/agent/consul/client.go | 74 ++++++--- command/agent/consul/unit_test.go | 245 +++++++++++++++++++++++++++++- 2 files changed, 299 insertions(+), 20 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index ad605690c8be..8dc10fcccdea 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -68,6 +68,11 @@ const ( // ServiceTagSerf is the tag assigned to Serf services ServiceTagSerf = "serf" + + // deregisterProbationPeriod is the initialization period where + // services registered in Consul but not in Nomad don't get registered, + // to allow for nomad restoring tasks + deregisterProbationPeriod = 10 * time.Minute ) // CatalogAPI is the consul/api.Catalog API used by Nomad. @@ -230,6 +235,9 @@ type ServiceClient struct { scripts map[string]*scriptCheck runningScripts map[string]*scriptHandle + explicitlyDeregisteredServices map[string]bool + explicitlyDeregisteredChecks map[string]bool + // allocRegistrations stores the services and checks that are registered // with Consul by allocation ID. allocRegistrations map[string]*AllocRegistration @@ -245,6 +253,11 @@ type ServiceClient struct { // atomics. seen int32 + // deregisterProbationExpiry is the time before which consul sync shouldn't deregister + // unknown services. + // Used to mitigate risk of deleting restored services upon client restart. + deregisterProbationExpiry time.Time + // checkWatcher restarts checks that are unhealthy. checkWatcher *checkWatcher @@ -260,24 +273,27 @@ type ServiceClient struct { func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient { logger = logger.ResetNamed("consul.sync") return &ServiceClient{ - client: consulClient, - logger: logger, - retryInterval: defaultRetryInterval, - maxRetryInterval: defaultMaxRetryInterval, - periodicInterval: defaultPeriodicInterval, - exitCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - shutdownWait: defaultShutdownWait, - opCh: make(chan *operations, 8), - services: make(map[string]*api.AgentServiceRegistration), - checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), - allocRegistrations: make(map[string]*AllocRegistration), - agentServices: make(map[string]struct{}), - agentChecks: make(map[string]struct{}), - checkWatcher: newCheckWatcher(logger, consulClient), - isClientAgent: isNomadClient, + client: consulClient, + logger: logger, + retryInterval: defaultRetryInterval, + maxRetryInterval: defaultMaxRetryInterval, + periodicInterval: defaultPeriodicInterval, + exitCh: make(chan struct{}), + shutdownCh: make(chan struct{}), + shutdownWait: defaultShutdownWait, + opCh: make(chan *operations, 8), + services: make(map[string]*api.AgentServiceRegistration), + checks: make(map[string]*api.AgentCheckRegistration), + scripts: make(map[string]*scriptCheck), + runningScripts: make(map[string]*scriptHandle), + explicitlyDeregisteredServices: make(map[string]bool), + explicitlyDeregisteredChecks: make(map[string]bool), + allocRegistrations: make(map[string]*AllocRegistration), + agentServices: make(map[string]struct{}), + agentChecks: make(map[string]struct{}), + checkWatcher: newCheckWatcher(logger, consulClient), + isClientAgent: isNomadClient, + deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod), } } @@ -372,6 +388,9 @@ INIT: failures = 0 } + // on successful sync, clear deregistered consul entities + c.clearExplicitlyDeregistered() + // Reset timer to periodic interval to periodically // reconile with Consul if !retryTimer.Stop() { @@ -407,6 +426,11 @@ func (c *ServiceClient) commit(ops *operations) { } } +func (c *ServiceClient) clearExplicitlyDeregistered() { + c.explicitlyDeregisteredServices = map[string]bool{} + c.explicitlyDeregisteredChecks = map[string]bool{} +} + // merge registrations into state map prior to sync'ing with Consul func (c *ServiceClient) merge(ops *operations) { for _, s := range ops.regServices { @@ -420,6 +444,7 @@ func (c *ServiceClient) merge(ops *operations) { } for _, sid := range ops.deregServices { delete(c.services, sid) + c.explicitlyDeregisteredServices[sid] = true } for _, cid := range ops.deregChecks { if script, ok := c.runningScripts[cid]; ok { @@ -428,6 +453,7 @@ func (c *ServiceClient) merge(ops *operations) { delete(c.runningScripts, cid) } delete(c.checks, cid) + c.explicitlyDeregisteredChecks[cid] = true } metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services))) metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks))) @@ -450,6 +476,8 @@ func (c *ServiceClient) sync() error { return fmt.Errorf("error querying Consul checks: %v", err) } + inProbation := time.Now().Before(c.deregisterProbationExpiry) + // Remove Nomad services in Consul but unknown locally for id := range consulServices { if _, ok := c.services[id]; ok { @@ -466,6 +494,11 @@ func (c *ServiceClient) sync() error { continue } + // Ignore unknown services during probation + if inProbation && !c.explicitlyDeregisteredServices[id] { + continue + } + // Unknown Nomad managed service; kill if err := c.client.ServiceDeregister(id); err != nil { if isOldNomadService(id) { @@ -518,6 +551,11 @@ func (c *ServiceClient) sync() error { continue } + // Ignore unknown services during probation + if inProbation && !c.explicitlyDeregisteredChecks[id] { + continue + } + // Unknown Nomad managed check; remove if err := c.client.CheckDeregister(id); err != nil { if isOldNomadService(check.ServiceID) { diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index cb9606e80b91..9bc1cc690cd5 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -107,7 +107,11 @@ func (t *testFakeCtx) syncOnce() error { select { case ops := <-t.ServiceClient.opCh: t.ServiceClient.merge(ops) - return t.ServiceClient.sync() + err := t.ServiceClient.sync() + if err == nil { + t.ServiceClient.clearExplicitlyDeregistered() + } + return err default: return errNoOps } @@ -118,8 +122,13 @@ func (t *testFakeCtx) syncOnce() error { func setupFake(t *testing.T) *testFakeCtx { fc := NewMockAgent() tt := testTask() + + // by default start fake client being out of probation + sc := NewServiceClient(fc, testlog.HCLogger(t), true) + sc.deregisterProbationExpiry = time.Now().Add(-1 * time.Minute) + return &testFakeCtx{ - ServiceClient: NewServiceClient(fc, testlog.HCLogger(t), true), + ServiceClient: sc, FakeConsul: fc, Task: tt, MockExec: tt.DriverExec.(*mockExec), @@ -1676,3 +1685,235 @@ func TestConsul_ServiceName_Duplicates(t *testing.T) { } } } + +// TestConsul_ServiceDeregistration_OutOfProbation asserts that during in steady +// state we remove any services we don't reconize locally +func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { + t.Parallel() + ctx := setupFake(t) + require := require.New(t) + + ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) + + remainingTask := testTask() + remainingTask.Services = []*structs.Service{ + { + Name: "remaining-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID, + remainingTask.Name, remainingTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(remainingTask)) + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + explicitlyRemovedTask := testTask() + explicitlyRemovedTask.Services = []*structs.Service{ + { + Name: "explicitly-removed-service", + PortLabel: "y", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID, + explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask)) + + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 2) + require.Len(ctx.FakeConsul.checks, 2) + + // we register a task through nomad API then remove it out of band + outofbandTask := testTask() + outofbandTask.Services = []*structs.Service{ + { + Name: "unknown-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID, + outofbandTask.Name, outofbandTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask)) + require.NoError(ctx.syncOnce()) + + require.Len(ctx.FakeConsul.services, 3) + + // remove outofbandTask from local services so it appears unknown to client + require.Len(ctx.ServiceClient.services, 3) + require.Len(ctx.ServiceClient.checks, 3) + + delete(ctx.ServiceClient.services, outofbandTaskServiceID) + delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + + require.Len(ctx.ServiceClient.services, 2) + require.Len(ctx.ServiceClient.checks, 2) + + // Sync and ensure that explicitly removed service as well as outofbandTask were removed + + ctx.ServiceClient.RemoveTask(explicitlyRemovedTask) + require.NoError(ctx.syncOnce()) + require.NoError(ctx.ServiceClient.sync()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + require.Contains(ctx.FakeConsul.services, remainingTaskServiceID) + require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) + require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) + + require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) +} + +// TestConsul_ServiceDeregistration_InProbation asserts that during initialization +// we only deregister services that were explicitly removed and leave unknown +// services untouched. This adds a grace period for restoring recovered tasks +// before deregistering them +func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { + t.Parallel() + ctx := setupFake(t) + require := require.New(t) + + ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour) + + remainingTask := testTask() + remainingTask.Services = []*structs.Service{ + { + Name: "remaining-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID, + remainingTask.Name, remainingTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(remainingTask)) + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + explicitlyRemovedTask := testTask() + explicitlyRemovedTask.Services = []*structs.Service{ + { + Name: "explicitly-removed-service", + PortLabel: "y", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID, + explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask)) + + require.NoError(ctx.syncOnce()) + require.Len(ctx.FakeConsul.services, 2) + require.Len(ctx.FakeConsul.checks, 2) + + // we register a task through nomad API then remove it out of band + outofbandTask := testTask() + outofbandTask.Services = []*structs.Service{ + { + Name: "unknown-service", + PortLabel: "x", + Checks: []*structs.ServiceCheck{ + { + Name: "check", + Type: "tcp", + Interval: time.Second, + Timeout: time.Second, + }, + }, + }, + } + outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID, + outofbandTask.Name, outofbandTask.Services[0], false) + + require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask)) + require.NoError(ctx.syncOnce()) + + require.Len(ctx.FakeConsul.services, 3) + + // remove outofbandTask from local services so it appears unknown to client + require.Len(ctx.ServiceClient.services, 3) + require.Len(ctx.ServiceClient.checks, 3) + + delete(ctx.ServiceClient.services, outofbandTaskServiceID) + delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + + require.Len(ctx.ServiceClient.services, 2) + require.Len(ctx.ServiceClient.checks, 2) + + // Sync and ensure that explicitly removed service was removed, but outofbandTask remains + + ctx.ServiceClient.RemoveTask(explicitlyRemovedTask) + require.NoError(ctx.syncOnce()) + require.NoError(ctx.ServiceClient.sync()) + require.Len(ctx.FakeConsul.services, 2) + require.Len(ctx.FakeConsul.checks, 2) + + require.Contains(ctx.FakeConsul.services, remainingTaskServiceID) + require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID) + require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) + + require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + + // after probation, outofband services and checks are removed + ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) + + require.NoError(ctx.ServiceClient.sync()) + require.Len(ctx.FakeConsul.services, 1) + require.Len(ctx.FakeConsul.checks, 1) + + require.Contains(ctx.FakeConsul.services, remainingTaskServiceID) + require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) + require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) + + require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + +} From 121c97475d532d7fbe762d83647d11282f714cfd Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 17 Jul 2019 10:43:13 +0700 Subject: [PATCH 2/2] address review feedback --- command/agent/consul/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 8dc10fcccdea..9b85fe5f2b5e 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -70,9 +70,9 @@ const ( ServiceTagSerf = "serf" // deregisterProbationPeriod is the initialization period where - // services registered in Consul but not in Nomad don't get registered, + // services registered in Consul but not in Nomad don't get deregistered, // to allow for nomad restoring tasks - deregisterProbationPeriod = 10 * time.Minute + deregisterProbationPeriod = time.Minute ) // CatalogAPI is the consul/api.Catalog API used by Nomad.