Skip to content

Commit

Permalink
wip: graceful updates
Browse files Browse the repository at this point in the history
  • Loading branch information
endocrimes committed May 2, 2019
1 parent 14c231a commit 429d157
Showing 1 changed file with 202 additions and 55 deletions.
257 changes: 202 additions & 55 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ type operations struct {
regChecks []*api.AgentCheckRegistration
scripts []*scriptCheck

// legacyServiceMapping is populated on RegisterTask and provides a mapping of
// old task ids to new task ids. This is used for expiring legacy tasks on
// restores.
legacyServiceMapping map[string]string

deregServices []string
deregChecks []string
}
Expand Down Expand Up @@ -230,6 +235,12 @@ type ServiceClient struct {
scripts map[string]*scriptCheck
runningScripts map[string]*scriptHandle

// legacyServiceIDMap is a map of old task ids, to new task ids, for use when
// reconciling pre 0.9.2 unstable identifiers with 0.9.2+ stable identifiers.
// COMPAT(0.11): Remove legacy service mapping.
legacyServiceIDMap map[string]string
legacyServiceExpiry map[string]time.Time

// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
allocRegistrations map[string]*AllocRegistration
Expand Down Expand Up @@ -260,24 +271,26 @@ type ServiceClient struct {
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
logger = logger.ResetNamed("consul.sync")
return &ServiceClient{
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
runningScripts: make(map[string]*scriptHandle),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
client: consulClient,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
periodicInterval: defaultPeriodicInterval,
exitCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
shutdownWait: defaultShutdownWait,
opCh: make(chan *operations, 8),
services: make(map[string]*api.AgentServiceRegistration),
checks: make(map[string]*api.AgentCheckRegistration),
scripts: make(map[string]*scriptCheck),
legacyServiceIDMap: make(map[string]string),
legacyServiceExpiry: make(map[string]time.Time),
runningScripts: make(map[string]*scriptHandle),
allocRegistrations: make(map[string]*AllocRegistration),
agentServices: make(map[string]struct{}),
agentChecks: make(map[string]struct{}),
checkWatcher: newCheckWatcher(logger, consulClient),
isClientAgent: isNomadClient,
}
}

Expand Down Expand Up @@ -429,31 +442,34 @@ func (c *ServiceClient) merge(ops *operations) {
}
delete(c.checks, cid)
}

if ops.legacyServiceMapping != nil {
for legacyKey, newKey := range ops.legacyServiceMapping {
c.legacyServiceIDMap[legacyKey] = newKey
}
}

metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts)))
}

// sync enqueued operations.
func (c *ServiceClient) sync() error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0

consulServices, err := c.client.Services()
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return fmt.Errorf("error querying Consul services: %v", err)
}

consulChecks, err := c.client.Checks()
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return fmt.Errorf("error querying Consul checks: %v", err)
}

// Remove Nomad services in Consul but unknown locally
// pruneUnknownServices iterates through the consul services and de-registers any
// that are unknown to the current serviceclient.
// It returns a set of oldIDs for legacy services that exist within the consul
// definition and the count of deregistered services.
func (c *ServiceClient) pruneUnknownServices(consulServices map[string]*api.AgentService) (map[string]struct{}, int, error) {
usedLegacyServiceIDs := make(map[string]struct{})
sdereg := 0
for id := range consulServices {
if _, ok := c.services[id]; ok {
// Known service, skip
// Known service, current generation, skip
continue
}

if _, ok := c.legacyServiceIDMap[id]; ok {
// Known older service, don't remove _yet_, schedule upgrade
usedLegacyServiceIDs[id] = struct{}{}
continue
}

Expand All @@ -473,17 +489,21 @@ func (c *ServiceClient) sync() error {
continue
}

metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
return nil, 0, err
}
sdereg++
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}

// Add Nomad services missing from Consul, or where the service has been updated.
return usedLegacyServiceIDs, sdereg, nil
}

// upsertNomadServices registers local nomad services that are not yet present
// in Consul, and updates services that have changed locally.
func (c *ServiceClient) upsertNomadServices(consulServices map[string]*api.AgentService) (int, error) {
sreg := 0
for id, locals := range c.services {
existingSvc, ok := consulServices[id]

if ok {
// There is an existing registration of this service in Consul, so here
// we validate to see if the service has been invalidated to see if it
Expand All @@ -494,17 +514,34 @@ func (c *ServiceClient) sync() error {
}
}

if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
if err := c.client.ServiceRegister(locals); err != nil {
return 0, err
}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}

return sreg, nil
}

func (c *ServiceClient) pruneUnknownChecks(consulChecks map[string]*api.AgentCheck, usedLegacyServiceIDs map[string]struct{}) (map[string][]*api.AgentCheck, int, error) {
legacyCheckIDsByServiceID := make(map[string][]*api.AgentCheck)
cdereg := 0

// Remove Nomad checks in Consul but unknown locally
for id, check := range consulChecks {
if _, ok := c.checks[id]; ok {
for consulID, check := range consulChecks {
if _, ok := usedLegacyServiceIDs[check.ServiceID]; ok {
// Registered against a legacy service, we don't really care about pruning
// these as they'll be deregistered fairly quickly.
// We store a mapping of ServiceID->Check here for computing expiry times.
checks := legacyCheckIDsByServiceID[check.ServiceID]
checks = append(checks, check)
legacyCheckIDsByServiceID[check.ServiceID] = checks

continue
}

if _, ok := c.checks[consulID]; ok {
// Known check, leave it
continue
}
Expand All @@ -519,30 +556,33 @@ func (c *ServiceClient) sync() error {
}

// Unknown Nomad managed check; remove
if err := c.client.CheckDeregister(id); err != nil {
if err := c.client.CheckDeregister(consulID); err != nil {
if isOldNomadService(check.ServiceID) {
// Don't hard-fail on old entries.
continue
}

metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
return nil, 0, err
}
cdereg++
metrics.IncrCounter([]string{"client", "consul", "check_deregistrations"}, 1)
}

// Add Nomad checks missing from Consul
return legacyCheckIDsByServiceID, cdereg, nil
}

func (c *ServiceClient) upsertNomadChecks(consulChecks map[string]*api.AgentCheck) (int, error) {
creg := 0

for id, check := range c.checks {
if _, ok := consulChecks[id]; ok {
// Already in Consul; skipping
continue
}

if err := c.client.CheckRegister(check); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
return 0, err
}

creg++
metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1)

Expand All @@ -557,6 +597,95 @@ func (c *ServiceClient) sync() error {
}
}

return creg, nil
}

// sync enqueued operations
func (c *ServiceClient) sync() error {
sreg, creg, sdereg, cdereg := 0, 0, 0, 0

consulServices, err := c.client.Services()
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return fmt.Errorf("error querying Consul services: %v", err)
}

consulChecks, err := c.client.Checks()
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return fmt.Errorf("error querying Consul checks: %v", err)
}

// Expire legacy services that have passed the check period of the follow up
// service registration.
for serviceID, expiryTime := range c.legacyServiceExpiry {
if expiryTime.Before(time.Now()) {
delete(c.legacyServiceIDMap, serviceID)
}
}

// Remove unknown Nomad service registrations and compute the used legacy service
// map.
usedLegacyServiceIDs, sdereg, err := c.pruneUnknownServices(consulServices)
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}

// Add Nomad services missing from Consul, or where the service has been updated.
sreg, err = c.upsertNomadServices(consulServices)
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}

// Remove unkown Nomad managed checks from consul and fetch legacy checks
legacyChecksByServiceID, cdereg, err := c.pruneUnknownChecks(consulChecks, usedLegacyServiceIDs)
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}

// Add Nomad checks missing from Consul
creg, err = c.upsertNomadChecks(consulChecks)
if err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}

// Schedule deregistration of legacy services after the longest check period
// completes
for serviceID := range usedLegacyServiceIDs {
if _, ok := c.legacyServiceExpiry[serviceID]; ok {
// This service is already scheduled for termination
continue
}

checks, ok := legacyChecksByServiceID[serviceID]
if !ok {
// legacy service definition has no registered checks, prune it immediately
c.legacyServiceExpiry[serviceID] = time.Now()
continue
}

longestCheckDuration := 0 * time.Second
for _, check := range checks {
duration := time.Duration(check.Definition.Interval + check.Definition.Timeout)
if longestCheckDuration < duration {
longestCheckDuration = duration
}
}

c.legacyServiceExpiry[serviceID] = time.Now().Add(longestCheckDuration).Add(2 * time.Second)
}

// Prune unused legacy service IDs
for serviceID := range c.legacyServiceIDMap {
if _, ok := usedLegacyServiceIDs[serviceID]; !ok {
delete(c.legacyServiceIDMap, serviceID)
}
}

// Only log if something was actually synced
if sreg > 0 || sdereg > 0 || creg > 0 || cdereg > 0 {
c.logger.Debug("sync complete", "registered_services", sreg, "deregistered_services", sdereg,
Expand Down Expand Up @@ -777,6 +906,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
//
// Actual communication with Consul is done asynchronously (see Run).
func (c *ServiceClient) RegisterTask(task *TaskServices) error {
c.logger.Warn("Registering task", "task", task.AllocID)
// Fast path
numServices := len(task.Services)
if numServices == 0 {
Expand All @@ -786,13 +916,21 @@ func (c *ServiceClient) RegisterTask(task *TaskServices) error {
t := new(TaskRegistration)
t.Services = make(map[string]*ServiceRegistration, numServices)

ops := &operations{}
ops := &operations{
legacyServiceMapping: make(map[string]string),
}
for _, service := range task.Services {
sreg, err := c.serviceRegs(ops, service, task)
if err != nil {
return err
}
t.Services[sreg.serviceID] = sreg

// COMPAT(0.12): This registers allocations with the new and old service ids
// to allow us to migrate pre 0.9.2 registrations to post 0.9.2 registrations.
oldTaskID := makeOldTaskServiceID(task.AllocID, task.Name, service, task.Canary)
ops.legacyServiceMapping[oldTaskID] = sreg.serviceID
c.logger.Warn("registered legacy service for task", "task_id", oldTaskID)
}

// Add the task to the allocation's registration
Expand Down Expand Up @@ -1107,14 +1245,23 @@ func makeAgentServiceID(role string, service *structs.Service) string {
}

// makeTaskServiceID creates a unique ID for identifying a task service in
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
// Consul. TaskServiceID's are stable, encomposing the allocID, taskName, and
// exposed service name.
//
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http
func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name)
}

// makeOldTaskServiceID creates a unique ID for identifying a task service in
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
//
// Example Service ID: _nomad-task-ggnjpgl7yn7rgmvxzilmpvrzzvrszc7l
func makeOldTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string {
return fmt.Sprintf("%s%s", nomadTaskPrefix, service.Hash(allocID, taskName, canary))
}

// makeCheckID creates a unique ID for a check.
func makeCheckID(serviceID string, check *structs.ServiceCheck) string {
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
Expand Down

0 comments on commit 429d157

Please sign in to comment.