From 429d157338ea16357752b17f0fc3c9482a635e32 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 2 May 2019 16:54:06 +0200 Subject: [PATCH] wip: graceful updates --- command/agent/consul/client.go | 257 ++++++++++++++++++++++++++------- 1 file changed, 202 insertions(+), 55 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index ff5af7282627..0dfb67a93547 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -104,6 +104,11 @@ type operations struct { regChecks []*api.AgentCheckRegistration scripts []*scriptCheck + // legacyServiceMapping is populated on RegisterTask and provides a mapping of + // old task ids to new task ids. This is used for expiring legacy tasks on + // restores. + legacyServiceMapping map[string]string + deregServices []string deregChecks []string } @@ -230,6 +235,12 @@ type ServiceClient struct { scripts map[string]*scriptCheck runningScripts map[string]*scriptHandle + // legacyServiceIDMap is a map of old task ids, to new task ids, for use when + // reconciling pre 0.9.2 unstable identifiers with 0.9.2+ stable identifiers. + // COMPAT(0.11): Remove legacy service mapping. + legacyServiceIDMap map[string]string + legacyServiceExpiry map[string]time.Time + // allocRegistrations stores the services and checks that are registered // with Consul by allocation ID. allocRegistrations map[string]*AllocRegistration @@ -260,24 +271,26 @@ type ServiceClient struct { func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient { logger = logger.ResetNamed("consul.sync") return &ServiceClient{ - client: consulClient, - logger: logger, - retryInterval: defaultRetryInterval, - maxRetryInterval: defaultMaxRetryInterval, - periodicInterval: defaultPeriodicInterval, - exitCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - shutdownWait: defaultShutdownWait, - opCh: make(chan *operations, 8), - services: make(map[string]*api.AgentServiceRegistration), - checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), - allocRegistrations: make(map[string]*AllocRegistration), - agentServices: make(map[string]struct{}), - agentChecks: make(map[string]struct{}), - checkWatcher: newCheckWatcher(logger, consulClient), - isClientAgent: isNomadClient, + client: consulClient, + logger: logger, + retryInterval: defaultRetryInterval, + maxRetryInterval: defaultMaxRetryInterval, + periodicInterval: defaultPeriodicInterval, + exitCh: make(chan struct{}), + shutdownCh: make(chan struct{}), + shutdownWait: defaultShutdownWait, + opCh: make(chan *operations, 8), + services: make(map[string]*api.AgentServiceRegistration), + checks: make(map[string]*api.AgentCheckRegistration), + scripts: make(map[string]*scriptCheck), + legacyServiceIDMap: make(map[string]string), + legacyServiceExpiry: make(map[string]time.Time), + runningScripts: make(map[string]*scriptHandle), + allocRegistrations: make(map[string]*AllocRegistration), + agentServices: make(map[string]struct{}), + agentChecks: make(map[string]struct{}), + checkWatcher: newCheckWatcher(logger, consulClient), + isClientAgent: isNomadClient, } } @@ -429,31 +442,34 @@ func (c *ServiceClient) merge(ops *operations) { } delete(c.checks, cid) } + + if ops.legacyServiceMapping != nil { + for legacyKey, newKey := range ops.legacyServiceMapping { + c.legacyServiceIDMap[legacyKey] = newKey + } + } + metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services))) metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks))) metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts))) } -// sync enqueued operations. -func (c *ServiceClient) sync() error { - sreg, creg, sdereg, cdereg := 0, 0, 0, 0 - - consulServices, err := c.client.Services() - if err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return fmt.Errorf("error querying Consul services: %v", err) - } - - consulChecks, err := c.client.Checks() - if err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return fmt.Errorf("error querying Consul checks: %v", err) - } - - // Remove Nomad services in Consul but unknown locally +// pruneUnknownServices iterates through the consul services and de-registers any +// that are unknown to the current serviceclient. +// It returns a set of oldIDs for legacy services that exist within the consul +// definition and the count of deregistered services. +func (c *ServiceClient) pruneUnknownServices(consulServices map[string]*api.AgentService) (map[string]struct{}, int, error) { + usedLegacyServiceIDs := make(map[string]struct{}) + sdereg := 0 for id := range consulServices { if _, ok := c.services[id]; ok { - // Known service, skip + // Known service, current generation, skip + continue + } + + if _, ok := c.legacyServiceIDMap[id]; ok { + // Known older service, don't remove _yet_, schedule upgrade + usedLegacyServiceIDs[id] = struct{}{} continue } @@ -473,17 +489,21 @@ func (c *ServiceClient) sync() error { continue } - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + return nil, 0, err } sdereg++ metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1) } - // Add Nomad services missing from Consul, or where the service has been updated. + return usedLegacyServiceIDs, sdereg, nil +} + +// upsertNomadServices registers local nomad services that are not yet present +// in Consul, and updates services that have changed locally. +func (c *ServiceClient) upsertNomadServices(consulServices map[string]*api.AgentService) (int, error) { + sreg := 0 for id, locals := range c.services { existingSvc, ok := consulServices[id] - if ok { // There is an existing registration of this service in Consul, so here // we validate to see if the service has been invalidated to see if it @@ -494,17 +514,34 @@ func (c *ServiceClient) sync() error { } } - if err = c.client.ServiceRegister(locals); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + if err := c.client.ServiceRegister(locals); err != nil { + return 0, err } sreg++ metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } + return sreg, nil +} + +func (c *ServiceClient) pruneUnknownChecks(consulChecks map[string]*api.AgentCheck, usedLegacyServiceIDs map[string]struct{}) (map[string][]*api.AgentCheck, int, error) { + legacyCheckIDsByServiceID := make(map[string][]*api.AgentCheck) + cdereg := 0 + // Remove Nomad checks in Consul but unknown locally - for id, check := range consulChecks { - if _, ok := c.checks[id]; ok { + for consulID, check := range consulChecks { + if _, ok := usedLegacyServiceIDs[check.ServiceID]; ok { + // Registered against a legacy service, we don't really care about pruning + // these as they'll be deregistered fairly quickly. + // We store a mapping of ServiceID->Check here for computing expiry times. + checks := legacyCheckIDsByServiceID[check.ServiceID] + checks = append(checks, check) + legacyCheckIDsByServiceID[check.ServiceID] = checks + + continue + } + + if _, ok := c.checks[consulID]; ok { // Known check, leave it continue } @@ -519,20 +556,23 @@ func (c *ServiceClient) sync() error { } // Unknown Nomad managed check; remove - if err := c.client.CheckDeregister(id); err != nil { + if err := c.client.CheckDeregister(consulID); err != nil { if isOldNomadService(check.ServiceID) { // Don't hard-fail on old entries. continue } - - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + return nil, 0, err } cdereg++ metrics.IncrCounter([]string{"client", "consul", "check_deregistrations"}, 1) } - // Add Nomad checks missing from Consul + return legacyCheckIDsByServiceID, cdereg, nil +} + +func (c *ServiceClient) upsertNomadChecks(consulChecks map[string]*api.AgentCheck) (int, error) { + creg := 0 + for id, check := range c.checks { if _, ok := consulChecks[id]; ok { // Already in Consul; skipping @@ -540,9 +580,9 @@ func (c *ServiceClient) sync() error { } if err := c.client.CheckRegister(check); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + return 0, err } + creg++ metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1) @@ -557,6 +597,95 @@ func (c *ServiceClient) sync() error { } } + return creg, nil +} + +// sync enqueued operations +func (c *ServiceClient) sync() error { + sreg, creg, sdereg, cdereg := 0, 0, 0, 0 + + consulServices, err := c.client.Services() + if err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return fmt.Errorf("error querying Consul services: %v", err) + } + + consulChecks, err := c.client.Checks() + if err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return fmt.Errorf("error querying Consul checks: %v", err) + } + + // Expire legacy services that have passed the check period of the follow up + // service registration. + for serviceID, expiryTime := range c.legacyServiceExpiry { + if expiryTime.Before(time.Now()) { + delete(c.legacyServiceIDMap, serviceID) + } + } + + // Remove unknown Nomad service registrations and compute the used legacy service + // map. + usedLegacyServiceIDs, sdereg, err := c.pruneUnknownServices(consulServices) + if err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + + // Add Nomad services missing from Consul, or where the service has been updated. + sreg, err = c.upsertNomadServices(consulServices) + if err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + + // Remove unkown Nomad managed checks from consul and fetch legacy checks + legacyChecksByServiceID, cdereg, err := c.pruneUnknownChecks(consulChecks, usedLegacyServiceIDs) + if err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + + // Add Nomad checks missing from Consul + creg, err = c.upsertNomadChecks(consulChecks) + if err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + + // Schedule deregistration of legacy services after the longest check period + // completes + for serviceID := range usedLegacyServiceIDs { + if _, ok := c.legacyServiceExpiry[serviceID]; ok { + // This service is already scheduled for termination + continue + } + + checks, ok := legacyChecksByServiceID[serviceID] + if !ok { + // legacy service definition has no registered checks, prune it immediately + c.legacyServiceExpiry[serviceID] = time.Now() + continue + } + + longestCheckDuration := 0 * time.Second + for _, check := range checks { + duration := time.Duration(check.Definition.Interval + check.Definition.Timeout) + if longestCheckDuration < duration { + longestCheckDuration = duration + } + } + + c.legacyServiceExpiry[serviceID] = time.Now().Add(longestCheckDuration).Add(2 * time.Second) + } + + // Prune unused legacy service IDs + for serviceID := range c.legacyServiceIDMap { + if _, ok := usedLegacyServiceIDs[serviceID]; !ok { + delete(c.legacyServiceIDMap, serviceID) + } + } + // Only log if something was actually synced if sreg > 0 || sdereg > 0 || creg > 0 || cdereg > 0 { c.logger.Debug("sync complete", "registered_services", sreg, "deregistered_services", sdereg, @@ -777,6 +906,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st // // Actual communication with Consul is done asynchronously (see Run). func (c *ServiceClient) RegisterTask(task *TaskServices) error { + c.logger.Warn("Registering task", "task", task.AllocID) // Fast path numServices := len(task.Services) if numServices == 0 { @@ -786,13 +916,21 @@ func (c *ServiceClient) RegisterTask(task *TaskServices) error { t := new(TaskRegistration) t.Services = make(map[string]*ServiceRegistration, numServices) - ops := &operations{} + ops := &operations{ + legacyServiceMapping: make(map[string]string), + } for _, service := range task.Services { sreg, err := c.serviceRegs(ops, service, task) if err != nil { return err } t.Services[sreg.serviceID] = sreg + + // COMPAT(0.12): This registers allocations with the new and old service ids + // to allow us to migrate pre 0.9.2 registrations to post 0.9.2 registrations. + oldTaskID := makeOldTaskServiceID(task.AllocID, task.Name, service, task.Canary) + ops.legacyServiceMapping[oldTaskID] = sreg.serviceID + c.logger.Warn("registered legacy service for task", "task_id", oldTaskID) } // Add the task to the allocation's registration @@ -1107,14 +1245,23 @@ func makeAgentServiceID(role string, service *structs.Service) string { } // makeTaskServiceID creates a unique ID for identifying a task service in -// Consul. All structs.Service fields are included in the ID's hash except -// Checks. This allows updates to merely compare IDs. +// Consul. TaskServiceID's are stable, encomposing the allocID, taskName, and +// exposed service name. // // Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string { return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name) } +// makeOldTaskServiceID creates a unique ID for identifying a task service in +// Consul. All structs.Service fields are included in the ID's hash except +// Checks. This allows updates to merely compare IDs. +// +// Example Service ID: _nomad-task-ggnjpgl7yn7rgmvxzilmpvrzzvrszc7l +func makeOldTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string { + return fmt.Sprintf("%s%s", nomadTaskPrefix, service.Hash(allocID, taskName, canary)) +} + // makeCheckID creates a unique ID for a check. func makeCheckID(serviceID string, check *structs.ServiceCheck) string { return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))