Skip to content

Commit

Permalink
Merge pull request #1668 from hashicorp/b-fix-consul-updates
Browse files Browse the repository at this point in the history
Fix old services not getting removed from consul on update
  • Loading branch information
schmichael committed Sep 1, 2016
2 parents 67481cd + ff21ded commit 5f04e2a
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 182 deletions.
3 changes: 0 additions & 3 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log"
"path/filepath"
"sync"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -121,8 +120,6 @@ type DriverHandle interface {

// ExecContext is shared between drivers within an allocation
type ExecContext struct {
sync.Mutex

// AllocDir contains information about the alloc directory structure.
AllocDir *allocdir.AllocDir

Expand Down
96 changes: 54 additions & 42 deletions command/agent/consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ type Syncer struct {
// Checks all guarded by the registryLock.
registryLock sync.RWMutex

// trackedChecks and trackedServices are registered with consul
trackedChecks map[consulCheckID]*consul.AgentCheckRegistration
trackedServices map[consulServiceID]*consul.AgentServiceRegistration

// checkRunners are delegated Consul checks being ran by the Syncer
checkRunners map[consulCheckID]*CheckRunner

Expand Down Expand Up @@ -170,10 +166,10 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
shutdownCh: shutdownCh,
servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration),
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
trackedServices: make(map[consulServiceID]*consul.AgentServiceRegistration),
trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration),
checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
// default noop implementation of addrFinder
addrFinder: func(string) (string, int) { return "", 0 },
}

return &consulSyncer, nil
Expand Down Expand Up @@ -264,22 +260,47 @@ func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*stru
return mErr.ErrorOrNil()
}

// Update the services and checks groups for this domain
c.groupsLock.Lock()
for serviceKey, service := range registeredServices {
serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys

// Create map for service group if it doesn't exist
serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys
}

// Remove stale services
for existingServiceKey := range serviceKeys {
if _, ok := registeredServices[existingServiceKey]; !ok {
// Exisitng service needs to be removed
delete(serviceKeys, existingServiceKey)
}
}

// Add registered services
for serviceKey, service := range registeredServices {
serviceKeys[serviceKey] = service
}
for serviceKey, checks := range registeredChecks {
serviceKeys, ok := c.checkGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = serviceKeys

// Create map for check group if it doesn't exist
checkKeys, ok := c.checkGroups[domain]
if !ok {
checkKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = checkKeys
}

// Remove stale checks
for existingCheckKey := range checkKeys {
if _, ok := registeredChecks[existingCheckKey]; !ok {
// Exisitng check needs to be removed
delete(checkKeys, existingCheckKey)
}
serviceKeys[serviceKey] = checks
}

// Add registered checks
for checkKey, checks := range registeredChecks {
checkKeys[checkKey] = checks
}
c.groupsLock.Unlock()

Expand Down Expand Up @@ -354,8 +375,13 @@ func (c *Syncer) Shutdown() error {
cr.Stop()
}

// De-register all the services from Consul
for serviceID := range c.trackedServices {
// De-register all the services registered by this syncer from Consul
services, err := c.queryAgentServices()
if err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to fetch services for deregistering due to error: %v", err)
mErr.Errors = append(mErr.Errors, err)
}
for serviceID := range services {
convertedID := string(serviceID)
if err := c.client.Agent().ServiceDeregister(convertedID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err)
Expand Down Expand Up @@ -399,9 +425,6 @@ func (c *Syncer) syncChecks() error {
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
c.trackedChecks[consulCheckID(check.ID)] = check
c.registryLock.Unlock()
}
for _, check := range existingChecks {
c.ensureCheckRunning(check)
Expand All @@ -423,9 +446,6 @@ func (c *Syncer) syncChecks() error {
if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
delete(c.trackedChecks, consulCheckID(check.ID))
c.registryLock.Unlock()
}
return mErr.ErrorOrNil()
}
Expand All @@ -444,8 +464,8 @@ func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *
}

// calcChecksDiff takes the argument (consulChecks) and calculates the delta
// between the consul.Syncer's list of known checks (c.trackedChecks). Three
// arrays are returned:
// between the consul.Syncer's list of known checks (c.flattenedChecks()).
// Four arrays are returned:
//
// 1) a slice of checks that exist only locally in the Syncer and are missing
// from the Consul Agent (consulChecks) and therefore need to be registered.
Expand Down Expand Up @@ -479,13 +499,12 @@ func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentChec
changedChecksCount = 0
agentChecks = 0
)
c.registryLock.RLock()
localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks))
for _, localCheck := range c.flattenedChecks() {
flattenedChecks := c.flattenedChecks()
localChecks := make(map[string]*mergedCheck, len(flattenedChecks)+len(consulChecks))
for _, localCheck := range flattenedChecks {
localChecksCount++
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
}
c.registryLock.RUnlock()
for _, consulCheck := range consulChecks {
if localCheck, found := localChecks[consulCheck.CheckID]; found {
localChecksCount--
Expand Down Expand Up @@ -561,7 +580,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS

// calcServicesDiff takes the argument (consulServices) and calculates the
// delta between the consul.Syncer's list of known services
// (c.trackedServices). Four arrays are returned:
// (c.flattenedServices()). Four arrays are returned:
//
// 1) a slice of services that exist only locally in the Syncer and are
// missing from the Consul Agent (consulServices) and therefore need to be
Expand Down Expand Up @@ -591,10 +610,9 @@ func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.Age
changedServicesCount = 0
agentServices = 0
)
c.registryLock.RLock()
localServices := make(map[string]*mergedService, len(c.trackedServices)+len(consulServices))
c.registryLock.RUnlock()
for _, localService := range c.flattenedServices() {
flattenedServices := c.flattenedServices()
localServices := make(map[string]*mergedService, len(flattenedServices)+len(consulServices))
for _, localService := range flattenedServices {
localServicesCount++
localServices[localService.ID] = &mergedService{localService, 'l'}
}
Expand Down Expand Up @@ -656,9 +674,6 @@ func (c *Syncer) syncServices() error {
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
c.trackedServices[consulServiceID(service.ID)] = service
c.registryLock.Unlock()
}
for _, service := range changedServices {
// Re-register the local service
Expand All @@ -670,9 +685,6 @@ func (c *Syncer) syncServices() error {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
delete(c.trackedServices, consulServiceID(service.ID))
c.registryLock.Unlock()
}
return mErr.ErrorOrNil()
}
Expand Down
Loading

0 comments on commit 5f04e2a

Please sign in to comment.