diff --git a/client/driver/driver.go b/client/driver/driver.go index 112626585192..2ad38835ab3e 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "path/filepath" - "sync" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" @@ -121,8 +120,6 @@ type DriverHandle interface { // ExecContext is shared between drivers within an allocation type ExecContext struct { - sync.Mutex - // AllocDir contains information about the alloc directory structure. AllocDir *allocdir.AllocDir diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 8ac66fd9fc36..604070fd89fd 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -119,10 +119,6 @@ type Syncer struct { // Checks all guarded by the registryLock. registryLock sync.RWMutex - // trackedChecks and trackedServices are registered with consul - trackedChecks map[consulCheckID]*consul.AgentCheckRegistration - trackedServices map[consulServiceID]*consul.AgentServiceRegistration - // checkRunners are delegated Consul checks being ran by the Syncer checkRunners map[consulCheckID]*CheckRunner @@ -170,10 +166,10 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg shutdownCh: shutdownCh, servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration), checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration), - trackedServices: make(map[consulServiceID]*consul.AgentServiceRegistration), - trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration), checkRunners: make(map[consulCheckID]*CheckRunner), periodicCallbacks: make(map[string]types.PeriodicCallback), + // default noop implementation of addrFinder + addrFinder: func(string) (string, int) { return "", 0 }, } return &consulSyncer, nil @@ -264,22 +260,47 @@ func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*stru return mErr.ErrorOrNil() } + // Update the services and checks groups for this domain c.groupsLock.Lock() - for serviceKey, service := range registeredServices { - serviceKeys, ok := c.servicesGroups[domain] - if !ok { - serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices)) - c.servicesGroups[domain] = serviceKeys + + // Create map for service group if it doesn't exist + serviceKeys, ok := c.servicesGroups[domain] + if !ok { + serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices)) + c.servicesGroups[domain] = serviceKeys + } + + // Remove stale services + for existingServiceKey := range serviceKeys { + if _, ok := registeredServices[existingServiceKey]; !ok { + // Exisitng service needs to be removed + delete(serviceKeys, existingServiceKey) } + } + + // Add registered services + for serviceKey, service := range registeredServices { serviceKeys[serviceKey] = service } - for serviceKey, checks := range registeredChecks { - serviceKeys, ok := c.checkGroups[domain] - if !ok { - serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks)) - c.checkGroups[domain] = serviceKeys + + // Create map for check group if it doesn't exist + checkKeys, ok := c.checkGroups[domain] + if !ok { + checkKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks)) + c.checkGroups[domain] = checkKeys + } + + // Remove stale checks + for existingCheckKey := range checkKeys { + if _, ok := registeredChecks[existingCheckKey]; !ok { + // Exisitng check needs to be removed + delete(checkKeys, existingCheckKey) } - serviceKeys[serviceKey] = checks + } + + // Add registered checks + for checkKey, checks := range registeredChecks { + checkKeys[checkKey] = checks } c.groupsLock.Unlock() @@ -354,8 +375,13 @@ func (c *Syncer) Shutdown() error { cr.Stop() } - // De-register all the services from Consul - for serviceID := range c.trackedServices { + // De-register all the services registered by this syncer from Consul + services, err := c.queryAgentServices() + if err != nil { + c.logger.Printf("[WARN] consul.syncer: failed to fetch services for deregistering due to error: %v", err) + mErr.Errors = append(mErr.Errors, err) + } + for serviceID := range services { convertedID := string(serviceID) if err := c.client.Agent().ServiceDeregister(convertedID); err != nil { c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err) @@ -399,9 +425,6 @@ func (c *Syncer) syncChecks() error { if err := c.registerCheck(check); err != nil { mErr.Errors = append(mErr.Errors, err) } - c.registryLock.Lock() - c.trackedChecks[consulCheckID(check.ID)] = check - c.registryLock.Unlock() } for _, check := range existingChecks { c.ensureCheckRunning(check) @@ -423,9 +446,6 @@ func (c *Syncer) syncChecks() error { if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil { mErr.Errors = append(mErr.Errors, err) } - c.registryLock.Lock() - delete(c.trackedChecks, consulCheckID(check.ID)) - c.registryLock.Unlock() } return mErr.ErrorOrNil() } @@ -444,8 +464,8 @@ func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck * } // calcChecksDiff takes the argument (consulChecks) and calculates the delta -// between the consul.Syncer's list of known checks (c.trackedChecks). Three -// arrays are returned: +// between the consul.Syncer's list of known checks (c.flattenedChecks()). +// Four arrays are returned: // // 1) a slice of checks that exist only locally in the Syncer and are missing // from the Consul Agent (consulChecks) and therefore need to be registered. @@ -479,13 +499,12 @@ func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentChec changedChecksCount = 0 agentChecks = 0 ) - c.registryLock.RLock() - localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks)) - for _, localCheck := range c.flattenedChecks() { + flattenedChecks := c.flattenedChecks() + localChecks := make(map[string]*mergedCheck, len(flattenedChecks)+len(consulChecks)) + for _, localCheck := range flattenedChecks { localChecksCount++ localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'} } - c.registryLock.RUnlock() for _, consulCheck := range consulChecks { if localCheck, found := localChecks[consulCheck.CheckID]; found { localChecksCount-- @@ -561,7 +580,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS // calcServicesDiff takes the argument (consulServices) and calculates the // delta between the consul.Syncer's list of known services -// (c.trackedServices). Four arrays are returned: +// (c.flattenedServices()). Four arrays are returned: // // 1) a slice of services that exist only locally in the Syncer and are // missing from the Consul Agent (consulServices) and therefore need to be @@ -591,10 +610,9 @@ func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.Age changedServicesCount = 0 agentServices = 0 ) - c.registryLock.RLock() - localServices := make(map[string]*mergedService, len(c.trackedServices)+len(consulServices)) - c.registryLock.RUnlock() - for _, localService := range c.flattenedServices() { + flattenedServices := c.flattenedServices() + localServices := make(map[string]*mergedService, len(flattenedServices)+len(consulServices)) + for _, localService := range flattenedServices { localServicesCount++ localServices[localService.ID] = &mergedService{localService, 'l'} } @@ -656,9 +674,6 @@ func (c *Syncer) syncServices() error { if err := c.client.Agent().ServiceRegister(service); err != nil { mErr.Errors = append(mErr.Errors, err) } - c.registryLock.Lock() - c.trackedServices[consulServiceID(service.ID)] = service - c.registryLock.Unlock() } for _, service := range changedServices { // Re-register the local service @@ -670,9 +685,6 @@ func (c *Syncer) syncServices() error { if err := c.deregisterService(service.ID); err != nil { mErr.Errors = append(mErr.Errors, err) } - c.registryLock.Lock() - delete(c.trackedServices, consulServiceID(service.ID)) - c.registryLock.Unlock() } return mErr.ErrorOrNil() } diff --git a/command/agent/consul/syncer_test.go b/command/agent/consul/syncer_test.go index 21fa327a5d69..6eb52911e150 100644 --- a/command/agent/consul/syncer_test.go +++ b/command/agent/consul/syncer_test.go @@ -1,14 +1,14 @@ package consul import ( - "fmt" "log" + "net" "os" + "reflect" "testing" "time" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -19,23 +19,29 @@ const ( serviceGroupName = "executor" ) -var ( - logger = log.New(os.Stdout, "", log.LstdFlags) - check1 = structs.ServiceCheck{ +var logger = log.New(os.Stdout, "", log.LstdFlags) + +func TestCheckRegistration(t *testing.T) { + cs, err := NewSyncer(config.DefaultConsulConfig(), make(chan struct{}), logger) + if err != nil { + t.Fatalf("Err: %v", err) + } + + check1 := structs.ServiceCheck{ Name: "check-foo-1", Type: structs.ServiceCheckTCP, Interval: 30 * time.Second, Timeout: 5 * time.Second, InitialStatus: api.HealthPassing, } - check2 = structs.ServiceCheck{ + check2 := structs.ServiceCheck{ Name: "check1", Type: "tcp", PortLabel: "port2", Interval: 3 * time.Second, Timeout: 1 * time.Second, } - check3 = structs.ServiceCheck{ + check3 := structs.ServiceCheck{ Name: "check3", Type: "http", PortLabel: "port3", @@ -43,7 +49,7 @@ var ( Interval: 3 * time.Second, Timeout: 1 * time.Second, } - service1 = structs.Service{ + service1 := structs.Service{ Name: "foo-1", Tags: []string{"tag1", "tag2"}, PortLabel: "port1", @@ -51,23 +57,32 @@ var ( &check1, &check2, }, } - - service2 = structs.Service{ - Name: "foo-2", - Tags: []string{"tag1", "tag2"}, - PortLabel: "port2", - } -) - -func TestCheckRegistration(t *testing.T) { - cs, err := NewSyncer(config.DefaultConsulConfig(), make(chan struct{}), logger) - if err != nil { - t.Fatalf("Err: %v", err) + task := structs.Task{ + Name: "foo", + Services: []*structs.Service{&service1}, + Resources: &structs.Resources{ + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + IP: "10.10.11.5", + DynamicPorts: []structs.Port{ + structs.Port{ + Label: "port1", + Value: 20002, + }, + structs.Port{ + Label: "port2", + Value: 20003, + }, + structs.Port{ + Label: "port3", + Value: 20004, + }, + }, + }, + }, + }, } - - task := mockTask() cs.SetAddrFinder(task.FindHostAndPortFor) - srvReg, _ := cs.createService(&service1, "domain", "key") check1Reg, _ := cs.createCheckReg(&check1, srvReg) check2Reg, _ := cs.createCheckReg(&check2, srvReg) @@ -95,148 +110,170 @@ func TestCheckRegistration(t *testing.T) { } func TestConsulServiceRegisterServices(t *testing.T) { - t.Skip() - - shutdownCh := make(chan struct{}) - cs, err := NewSyncer(config.DefaultConsulConfig(), shutdownCh, logger) + cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger) if err != nil { t.Fatalf("Err: %v", err) } + defer cs.Shutdown() // Skipping the test if consul isn't present if !cs.consulPresent() { - return + t.Skip("skipping because consul isn't present") } - task := mockTask() - //cs.SetServiceRegPrefix(serviceRegPrefix) - cs.SetAddrFinder(task.FindHostAndPortFor) + + service1 := &structs.Service{Name: "foo", Tags: []string{"a", "b"}} + service2 := &structs.Service{Name: "foo"} + services := map[ServiceKey]*structs.Service{ + GenerateServiceKey(service1): service1, + GenerateServiceKey(service2): service2, + } + + // Call SetServices to update services in consul + if err := cs.SetServices(serviceGroupName, services); err != nil { + t.Fatalf("error setting services: %v", err) + } + + // Manually call SyncServers to cause a synchronous consul update if err := cs.SyncServices(); err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("error syncing services: %v", err) } - defer cs.Shutdown() - // service1 := &structs.Service{Name: task.Name} - // service2 := &structs.Service{Name: task.Name} - //services := []*structs.Service{service1, service2} - //service1.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service1), task.Name, allocID) - //service2.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service2), task.Name, allocID) - - //cs.SetServices(serviceGroupName, services) - // if err := servicesPresent(t, services, cs); err != nil { - // t.Fatalf("err : %v", err) - // } - // FIXME(sean@) - // if err := checksPresent(t, []string{check1.Hash(service1ID)}, cs); err != nil { - // t.Fatalf("err : %v", err) - // } + numservices := len(cs.flattenedServices()) + if numservices != 2 { + t.Fatalf("expected 2 services but found %d", numservices) + } + + numchecks := len(cs.flattenedChecks()) + if numchecks != 0 { + t.Fatalf("expected 0 checks but found %d", numchecks) + } + + // Assert services are in consul + agentServices, err := cs.client.Agent().Services() + if err != nil { + t.Fatalf("error querying consul services: %v", err) + } + found := 0 + for id, as := range agentServices { + if id == "consul" { + found++ + continue + } + if _, ok := services[ServiceKey(as.Service)]; ok { + found++ + continue + } + t.Errorf("unexpected service in consul: %s", id) + } + if found != 3 { + t.Fatalf("expected 3 services in consul but found %d:\nconsul: %#v", len(agentServices), agentServices) + } + + agentChecks, err := cs.queryChecks() + if err != nil { + t.Fatalf("error querying consul checks: %v", err) + } + if len(agentChecks) != numchecks { + t.Fatalf("expected %d checks in consul but found %d:\n%#v", numservices, len(agentChecks), agentChecks) + } } func TestConsulServiceUpdateService(t *testing.T) { - t.Skip() - - shutdownCh := make(chan struct{}) - cs, err := NewSyncer(config.DefaultConsulConfig(), shutdownCh, logger) + cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger) if err != nil { t.Fatalf("Err: %v", err) } + defer cs.Shutdown() // Skipping the test if consul isn't present if !cs.consulPresent() { - return + t.Skip("skipping because consul isn't present") } + cs.SetAddrFinder(func(h string) (string, int) { + a, pstr, _ := net.SplitHostPort(h) + p, _ := net.LookupPort("tcp", pstr) + return a, p + }) - task := mockTask() - //cs.SetServiceRegPrefix(serviceRegPrefix) - cs.SetAddrFinder(task.FindHostAndPortFor) + service1 := &structs.Service{Name: "foo1", Tags: []string{"a", "b"}} + service2 := &structs.Service{Name: "foo2"} + services := map[ServiceKey]*structs.Service{ + GenerateServiceKey(service1): service1, + GenerateServiceKey(service2): service2, + } + if err := cs.SetServices(serviceGroupName, services); err != nil { + t.Fatalf("error setting services: %v", err) + } if err := cs.SyncServices(); err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("error syncing services: %v", err) } - defer cs.Shutdown() - //Update Service defn 1 - newTags := []string{"tag3"} - task.Services[0].Tags = newTags + // Now update both services + service1 = &structs.Service{Name: "foo1", Tags: []string{"a", "z"}} + service2 = &structs.Service{Name: "foo2", PortLabel: ":8899"} + service3 := &structs.Service{Name: "foo3"} + services = map[ServiceKey]*structs.Service{ + GenerateServiceKey(service1): service1, + GenerateServiceKey(service2): service2, + GenerateServiceKey(service3): service3, + } + if err := cs.SetServices(serviceGroupName, services); err != nil { + t.Fatalf("error setting services: %v", err) + } if err := cs.SyncServices(); err != nil { - t.Fatalf("err: %v", err) - } - // Make sure all the services and checks are still present - // service1 := &structs.Service{Name: task.Name} - // service2 := &structs.Service{Name: task.Name} - //services := []*structs.Service{service1, service2} - //service1.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service1), task.Name, allocID) - //service2.ServiceID = fmt.Sprintf("%s-%s:%s/%s", cs.GenerateServiceID(serviceGroupName, service2), task.Name, allocID) - // if err := servicesPresent(t, services, cs); err != nil { - // t.Fatalf("err : %v", err) - // } - // FIXME(sean@) - // if err := checksPresent(t, []string{check1.Hash(service1ID)}, cs); err != nil { - // t.Fatalf("err : %v", err) - // } - - // check if service defn 1 has been updated - // consulServices, err := cs.client.Agent().Services() - // if err != nil { - // t.Fatalf("errL: %v", err) - // } - // srv, _ := consulServices[service1.ServiceID] - // if !reflect.DeepEqual(srv.Tags, newTags) { - // t.Fatalf("expected tags: %v, actual: %v", newTags, srv.Tags) - // } -} + t.Fatalf("error syncing services: %v", err) + } -// func servicesPresent(t *testing.T, configuredServices []*structs.Service, syncer *Syncer) error { -// var mErr multierror.Error -// // services, err := syncer.client.Agent().Services() -// // if err != nil { -// // t.Fatalf("err: %v", err) -// // } - -// // for _, configuredService := range configuredServices { -// // if _, ok := services[configuredService.ServiceID]; !ok { -// // mErr.Errors = append(mErr.Errors, fmt.Errorf("service ID %q not synced", configuredService.ServiceID)) -// // } -// // } -// return mErr.ErrorOrNil() -// } - -func checksPresent(t *testing.T, checkIDs []string, syncer *Syncer) error { - var mErr multierror.Error - checks, err := syncer.client.Agent().Checks() + agentServices, err := cs.queryAgentServices() if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("error querying consul services: %v", err) + } + if len(agentServices) != 3 { + t.Fatalf("expected 3 services in consul but found %d:\n%#v", len(agentServices), agentServices) + } + consulServices := make(map[string]*api.AgentService, 3) + for _, as := range agentServices { + consulServices[as.ID] = as } - for _, checkID := range checkIDs { - if _, ok := checks[checkID]; !ok { - mErr.Errors = append(mErr.Errors, fmt.Errorf("check ID %q not synced", checkID)) + found := 0 + for _, s := range cs.flattenedServices() { + // Assert sure changes were applied to internal state + switch s.Name { + case "foo1": + found++ + if !reflect.DeepEqual(service1.Tags, s.Tags) { + t.Errorf("incorrect tags on foo1:\n expected: %v\n found: %v", service1.Tags, s.Tags) + } + case "foo2": + found++ + if s.Address != "" { + t.Errorf("expected empty host on foo2 but found %q", s.Address) + } + if s.Port != 8899 { + t.Errorf("expected port 8899 on foo2 but found %d", s.Port) + } + case "foo3": + found++ + default: + t.Errorf("unexpected service: %s", s.Name) } - } - return mErr.ErrorOrNil() -} -func mockTask() *structs.Task { - task := structs.Task{ - Name: "foo", - Services: []*structs.Service{&service1, &service2}, - Resources: &structs.Resources{ - Networks: []*structs.NetworkResource{ - &structs.NetworkResource{ - IP: "10.10.11.5", - DynamicPorts: []structs.Port{ - structs.Port{ - Label: "port1", - Value: 20002, - }, - structs.Port{ - Label: "port2", - Value: 20003, - }, - structs.Port{ - Label: "port3", - Value: 20004, - }, - }, - }, - }, - }, + // Assert internal state equals consul's state + cs, ok := consulServices[s.ID] + if !ok { + t.Errorf("service not in consul: %s id: %s", s.Name, s.ID) + continue + } + if !reflect.DeepEqual(s.Tags, cs.Tags) { + t.Errorf("mismatched tags in syncer state and consul for %s:\nsyncer: %v\nconsul: %v", s.Name, s.Tags, cs.Tags) + } + if cs.Port != s.Port { + t.Errorf("mismatched port in syncer state and consul for %s\nsyncer: %v\nconsul: %v", s.Name, s.Port, cs.Port) + } + if cs.Address != s.Address { + t.Errorf("mismatched address in syncer state and consul for %s\nsyncer: %v\nconsul: %v", s.Name, s.Address, cs.Address) + } + } + if found != 3 { + t.Fatalf("expected 3 services locally but found %d", found) } - return &task }