Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix old services not getting removed from consul on update #1668

Merged
merged 7 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log"
"path/filepath"
"sync"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
Expand Down Expand Up @@ -121,8 +120,6 @@ type DriverHandle interface {

// ExecContext is shared between drivers within an allocation
type ExecContext struct {
sync.Mutex

// AllocDir contains information about the alloc directory structure.
AllocDir *allocdir.AllocDir

Expand Down
94 changes: 53 additions & 41 deletions command/agent/consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ type Syncer struct {
// Checks all guarded by the registryLock.
registryLock sync.RWMutex

// trackedChecks and trackedServices are registered with consul
trackedChecks map[consulCheckID]*consul.AgentCheckRegistration
trackedServices map[consulServiceID]*consul.AgentServiceRegistration

// checkRunners are delegated Consul checks being ran by the Syncer
checkRunners map[consulCheckID]*CheckRunner

Expand Down Expand Up @@ -170,10 +166,10 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
shutdownCh: shutdownCh,
servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration),
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
trackedServices: make(map[consulServiceID]*consul.AgentServiceRegistration),
trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration),
checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
// default noop implementation of addrFinder
addrFinder: func(string) (string, int) { return "", 0 },
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a safe or wise thing to do.

}

return &consulSyncer, nil
Expand Down Expand Up @@ -264,22 +260,47 @@ func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*stru
return mErr.ErrorOrNil()
}

// Update the services and checks groups for this domain
c.groupsLock.Lock()
for serviceKey, service := range registeredServices {
serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys

// Create map for service group if it doesn't exist
serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys
}

// Remove stale services
for existingServiceKey := range serviceKeys {
if _, ok := registeredServices[existingServiceKey]; !ok {
// Exisitng service needs to be removed
delete(serviceKeys, existingServiceKey)
}
}

// Add registered services
for serviceKey, service := range registeredServices {
serviceKeys[serviceKey] = service
}
for serviceKey, checks := range registeredChecks {
serviceKeys, ok := c.checkGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = serviceKeys

// Create map for check group if it doesn't exist
checkKeys, ok := c.checkGroups[domain]
if !ok {
checkKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = checkKeys
}

// Remove stale checks
for existingCheckKey := range checkKeys {
if _, ok := registeredChecks[existingCheckKey]; !ok {
// Exisitng check needs to be removed
delete(checkKeys, existingCheckKey)
}
serviceKeys[serviceKey] = checks
}

// Add registered checks
for checkKey, checks := range registeredChecks {
checkKeys[checkKey] = checks
}
c.groupsLock.Unlock()

Expand Down Expand Up @@ -355,7 +376,12 @@ func (c *Syncer) Shutdown() error {
}

// De-register all the services from Consul
for serviceID := range c.trackedServices {
services, err := c.queryAgentServices()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change the comment above:

// De-register all services registered by this syncer from Consul

if err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to register services due to error: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message is wrong

mErr.Errors = append(mErr.Errors, err)
}
for serviceID := range services {
convertedID := string(serviceID)
if err := c.client.Agent().ServiceDeregister(convertedID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err)
Expand Down Expand Up @@ -399,9 +425,6 @@ func (c *Syncer) syncChecks() error {
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
c.trackedChecks[consulCheckID(check.ID)] = check
c.registryLock.Unlock()
}
for _, check := range existingChecks {
c.ensureCheckRunning(check)
Expand All @@ -423,9 +446,6 @@ func (c *Syncer) syncChecks() error {
if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
delete(c.trackedChecks, consulCheckID(check.ID))
c.registryLock.Unlock()
}
return mErr.ErrorOrNil()
}
Expand All @@ -444,8 +464,8 @@ func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *
}

// calcChecksDiff takes the argument (consulChecks) and calculates the delta
// between the consul.Syncer's list of known checks (c.trackedChecks). Three
// arrays are returned:
// between the consul.Syncer's list of known checks (c.flattenedChecks()).
// Four arrays are returned:
//
// 1) a slice of checks that exist only locally in the Syncer and are missing
// from the Consul Agent (consulChecks) and therefore need to be registered.
Expand Down Expand Up @@ -479,13 +499,12 @@ func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentChec
changedChecksCount = 0
agentChecks = 0
)
c.registryLock.RLock()
localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks))
for _, localCheck := range c.flattenedChecks() {
flattenedChecks := c.flattenedChecks()
localChecks := make(map[string]*mergedCheck, len(flattenedChecks)+len(consulChecks))
for _, localCheck := range flattenedChecks {
localChecksCount++
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
}
c.registryLock.RUnlock()
for _, consulCheck := range consulChecks {
if localCheck, found := localChecks[consulCheck.CheckID]; found {
localChecksCount--
Expand Down Expand Up @@ -561,7 +580,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS

// calcServicesDiff takes the argument (consulServices) and calculates the
// delta between the consul.Syncer's list of known services
// (c.trackedServices). Four arrays are returned:
// (c.flattenedServices()). Four arrays are returned:
//
// 1) a slice of services that exist only locally in the Syncer and are
// missing from the Consul Agent (consulServices) and therefore need to be
Expand Down Expand Up @@ -591,10 +610,9 @@ func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.Age
changedServicesCount = 0
agentServices = 0
)
c.registryLock.RLock()
localServices := make(map[string]*mergedService, len(c.trackedServices)+len(consulServices))
c.registryLock.RUnlock()
for _, localService := range c.flattenedServices() {
flattenedServices := c.flattenedServices()
localServices := make(map[string]*mergedService, len(flattenedServices)+len(consulServices))
for _, localService := range flattenedServices {
localServicesCount++
localServices[localService.ID] = &mergedService{localService, 'l'}
}
Expand Down Expand Up @@ -656,9 +674,6 @@ func (c *Syncer) syncServices() error {
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
c.trackedServices[consulServiceID(service.ID)] = service
c.registryLock.Unlock()
}
for _, service := range changedServices {
// Re-register the local service
Expand All @@ -670,9 +685,6 @@ func (c *Syncer) syncServices() error {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.registryLock.Lock()
delete(c.trackedServices, consulServiceID(service.ID))
c.registryLock.Unlock()
}
return mErr.ErrorOrNil()
}
Expand Down
Loading