diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go index 31c7f59543a9..b2bd75b563b5 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/alloc_runner_health_watcher.go @@ -5,6 +5,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -105,7 +106,7 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { // Store whether the last consul checks call was successful or not consulChecksErr := false - var checks []*api.AgentCheck + var allocReg *consul.AllocRegistration first := true OUTER: for { @@ -121,15 +122,15 @@ OUTER: alloc = newAlloc r.logger.Printf("[TRACE] client.alloc_watcher: new alloc version for %q", alloc.ID) case <-checkCh: - newChecks, err := r.consulClient.Checks(alloc) + newAllocReg, err := r.consulClient.AllocRegistrations(alloc.ID) if err != nil { if !consulChecksErr { consulChecksErr = true - r.logger.Printf("[WARN] client.alloc_watcher: failed to lookup consul checks for allocation %q: %v", alloc.ID, err) + r.logger.Printf("[WARN] client.alloc_watcher: failed to lookup consul registrations for allocation %q: %v", alloc.ID, err) } } else { consulChecksErr = false - checks = newChecks + allocReg = newAllocReg } case <-deadline.C: // We have exceeded our deadline without being healthy. @@ -174,23 +175,29 @@ OUTER: } // If we should have checks and they aren't all healthy continue - if len(checks) != desiredChecks { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, len(checks), alloc.ID) - cancelHealthyTimer() - continue OUTER - } - - // Check if all the checks are passing - for _, check := range checks { - if check.Status != api.HealthPassing { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID) - latestChecksHealthy = time.Time{} + if desiredChecks > 0 { + if allocReg.NumChecks() != desiredChecks { + r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, allocReg.NumChecks(), alloc.ID) cancelHealthyTimer() continue OUTER } - } - if latestChecksHealthy.IsZero() { - latestChecksHealthy = time.Now() + + // Check if all the checks are passing + for _, treg := range allocReg.Tasks { + for _, sreg := range treg.Services { + for _, check := range sreg.Checks { + if check.Status != api.HealthPassing { + r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID) + latestChecksHealthy = time.Time{} + cancelHealthyTimer() + continue OUTER + } + } + } + } + if latestChecksHealthy.IsZero() { + latestChecksHealthy = time.Now() + } } // Determine if the allocation is healthy diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 4323b8e1e970..33cf07bad013 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -14,6 +14,7 @@ import ( "github.com/boltdb/bolt" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -254,12 +255,32 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { // Only return the check as healthy after a duration trigger := time.After(500 * time.Millisecond) - ar.consulClient.(*mockConsulServiceClient).checksFn = func(a *structs.Allocation) ([]*api.AgentCheck, error) { + ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { select { case <-trigger: - return []*api.AgentCheck{checkHealthy}, nil + return &consul.AllocRegistration{ + Tasks: map[string]*consul.TaskRegistration{ + task.Name: { + Services: map[string]*consul.ServiceRegistration{ + "123": { + Checks: []*api.AgentCheck{checkHealthy}, + }, + }, + }, + }, + }, nil default: - return []*api.AgentCheck{checkUnhealthy}, nil + return &consul.AllocRegistration{ + Tasks: map[string]*consul.TaskRegistration{ + task.Name: { + Services: map[string]*consul.ServiceRegistration{ + "123": { + Checks: []*api.AgentCheck{checkUnhealthy}, + }, + }, + }, + }, + }, nil } } diff --git a/client/consul.go b/client/consul.go index d470e3dea285..89666e41e451 100644 --- a/client/consul.go +++ b/client/consul.go @@ -1,9 +1,9 @@ package client import ( - "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -13,5 +13,5 @@ type ConsulServiceAPI interface { RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error RemoveTask(allocID string, task *structs.Task) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error - Checks(alloc *structs.Allocation) ([]*api.AgentCheck, error) + AllocRegistrations(allocID string) (*consul.AllocRegistration, error) } diff --git a/client/consul_test.go b/client/consul_test.go index 897303f04ce3..10d1ebe10d04 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -8,9 +8,9 @@ import ( "sync" "testing" - "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -24,7 +24,7 @@ type mockConsulOp struct { } func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) mockConsulOp { - if op != "add" && op != "remove" && op != "update" && op != "checks" { + if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" { panic(fmt.Errorf("invalid consul op: %s", op)) } return mockConsulOp{ @@ -44,8 +44,9 @@ type mockConsulServiceClient struct { logger *log.Logger - // checksFn allows injecting return values for the Checks function. - checksFn func(*structs.Allocation) ([]*api.AgentCheck, error) + // allocRegistrationsFn allows injecting return values for the + // AllocRegistrations function. + allocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error) } func newMockConsulServiceClient() *mockConsulServiceClient { @@ -82,14 +83,14 @@ func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil, nil)) } -func (m *mockConsulServiceClient) Checks(alloc *structs.Allocation) ([]*api.AgentCheck, error) { +func (m *mockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: Checks(%q)", alloc.ID) - m.ops = append(m.ops, newMockConsulOp("checks", alloc.ID, nil, nil, nil)) + m.logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID) + m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, nil, nil, nil)) - if m.checksFn != nil { - return m.checksFn(alloc) + if m.allocRegistrationsFn != nil { + return m.allocRegistrationsFn(allocID) } return nil, nil diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index a1710250cab0..44f3d3725511 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -84,6 +85,103 @@ type operations struct { deregChecks []string } +// AllocRegistration holds the status of services registered for a particular +// allocations by task. +type AllocRegistration struct { + // Tasks maps the name of a task to its registered services and checks + Tasks map[string]*TaskRegistration +} + +func (a *AllocRegistration) copy() *AllocRegistration { + c := &AllocRegistration{ + Tasks: make(map[string]*TaskRegistration, len(a.Tasks)), + } + + for k, v := range a.Tasks { + c.Tasks[k] = v.copy() + } + + return c +} + +// NumServices returns the number of registered services +func (a *AllocRegistration) NumServices() int { + if a == nil { + return 0 + } + + total := 0 + for _, treg := range a.Tasks { + for _, sreg := range treg.Services { + if sreg.Service != nil { + total++ + } + } + } + + return total +} + +// NumChecks returns the number of registered checks +func (a *AllocRegistration) NumChecks() int { + if a == nil { + return 0 + } + + total := 0 + for _, treg := range a.Tasks { + for _, sreg := range treg.Services { + total += len(sreg.Checks) + } + } + + return total +} + +// TaskRegistration holds the status of services registered for a particular +// task. +type TaskRegistration struct { + Services map[string]*ServiceRegistration +} + +func (t *TaskRegistration) copy() *TaskRegistration { + c := &TaskRegistration{ + Services: make(map[string]*ServiceRegistration, len(t.Services)), + } + + for k, v := range t.Services { + c.Services[k] = v.copy() + } + + return c +} + +// ServiceRegistration holds the status of a registered Consul Service and its +// Checks. +type ServiceRegistration struct { + // serviceID and checkIDs are internal fields that track just the IDs of the + // services/checks registered in Consul. It is used to materialize the other + // fields when queried. + serviceID string + checkIDs map[string]struct{} + + // Service is the AgentService registered in Consul. + Service *api.AgentService + + // Checks is the status of the registered checks. + Checks []*api.AgentCheck +} + +func (s *ServiceRegistration) copy() *ServiceRegistration { + // Copy does not copy the external fields but only the internal fields. This + // is so that the caller of AllocRegistrations can not access the internal + // fields and that method uses these fields to populate the external fields. + return &ServiceRegistration{ + serviceID: s.serviceID, + checkIDs: helper.CopyMapStringStruct(s.checkIDs), + } +} + // ServiceClient handles task and agent service registration with Consul. type ServiceClient struct { client AgentAPI @@ -111,6 +209,11 @@ type ServiceClient struct { scripts map[string]*scriptCheck runningScripts map[string]*scriptHandle + // allocRegistrations stores the services and checks that are registered + // with Consul by allocation ID. + allocRegistrations map[string]*AllocRegistration + allocRegistrationsLock sync.RWMutex + // agent services and checks record entries for the agent itself which // should be removed on shutdown agentServices map[string]struct{} @@ -126,21 +229,22 @@ type ServiceClient struct { // Client and logger. func NewServiceClient(consulClient AgentAPI, skipVerifySupport bool, logger *log.Logger) *ServiceClient { return &ServiceClient{ - client: consulClient, - skipVerifySupport: skipVerifySupport, - logger: logger, - retryInterval: defaultRetryInterval, - maxRetryInterval: defaultMaxRetryInterval, - exitCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - shutdownWait: defaultShutdownWait, - opCh: make(chan *operations, 8), - services: make(map[string]*api.AgentServiceRegistration), - checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), - agentServices: make(map[string]struct{}), - agentChecks: make(map[string]struct{}), + client: consulClient, + skipVerifySupport: skipVerifySupport, + logger: logger, + retryInterval: defaultRetryInterval, + maxRetryInterval: defaultMaxRetryInterval, + exitCh: make(chan struct{}), + shutdownCh: make(chan struct{}), + shutdownWait: defaultShutdownWait, + opCh: make(chan *operations, 8), + services: make(map[string]*api.AgentServiceRegistration), + checks: make(map[string]*api.AgentCheckRegistration), + scripts: make(map[string]*scriptCheck), + runningScripts: make(map[string]*scriptHandle), + allocRegistrations: make(map[string]*AllocRegistration), + agentServices: make(map[string]struct{}), + agentChecks: make(map[string]struct{}), } } @@ -445,11 +549,19 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) } // serviceRegs creates service registrations, check registrations, and script -// checks from a service. +// checks from a service. It returns a service registration object with the +// service and check IDs populated. func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *structs.Service, - task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { + task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) (*ServiceRegistration, error) { + // Get the services ID id := makeTaskServiceID(allocID, task.Name, service) + sreg := &ServiceRegistration{ + serviceID: id, + checkIDs: make(map[string]struct{}, len(service.Checks)), + } + + // Determine the address to advertise addrMode := service.AddressMode if addrMode == structs.AddressModeAuto { if net.Advertise() { @@ -462,11 +574,13 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st ip, port := task.Resources.Networks.Port(service.PortLabel) if addrMode == structs.AddressModeDriver { if net == nil { - return fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name) + return nil, fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name) } ip = net.IP port = net.PortMap[service.PortLabel] } + + // Build the Consul Service registration request serviceReg := &api.AgentServiceRegistration{ ID: id, Name: service.Name, @@ -478,12 +592,30 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st // with tests that may reuse Tasks copy(serviceReg.Tags, service.Tags) ops.regServices = append(ops.regServices, serviceReg) - return c.checkRegs(ops, allocID, id, service, task, exec, net) + + // Build the check registrations + checkIDs, err := c.checkRegs(ops, allocID, id, service, task, exec, net) + if err != nil { + return nil, err + } + for _, cid := range checkIDs { + sreg.checkIDs[cid] = struct{}{} + } + return sreg, nil } +// checkRegs registers the checks for the given service and returns the +// registered check ids. func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, service *structs.Service, - task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { + task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) ([]string, error) { + // Fast path + numChecks := len(service.Checks) + if numChecks == 0 { + return nil, nil + } + + checkIDs := make([]string, 0, numChecks) for _, check := range service.Checks { if check.TLSSkipVerify && !c.skipVerifySupport { c.logger.Printf("[WARN] consul.sync: skipping check %q for task %q alloc %q because Consul doesn't support tls_skip_verify. Please upgrade to Consul >= 0.7.2.", @@ -491,9 +623,10 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se continue } checkID := makeCheckID(serviceID, check) + checkIDs = append(checkIDs, checkID) if check.Type == structs.ServiceCheckScript { if exec == nil { - return fmt.Errorf("driver doesn't support script checks") + return nil, fmt.Errorf("driver doesn't support script checks") } ops.scripts = append(ops.scripts, newScriptCheck( allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh)) @@ -509,11 +642,11 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se ip, port := task.Resources.Networks.Port(portLabel) checkReg, err := createCheckReg(serviceID, checkID, check, ip, port) if err != nil { - return fmt.Errorf("failed to add check %q: %v", check.Name, err) + return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) } ops.regChecks = append(ops.regChecks, checkReg) } - return nil + return checkIDs, nil } // RegisterTask with Consul. Adds all sevice entries and checks to Consul. If @@ -524,12 +657,27 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se // // Actual communication with Consul is done asynchrously (see Run). func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { + // Fast path + numServices := len(task.Services) + if numServices == 0 { + return nil + } + + t := new(TaskRegistration) + t.Services = make(map[string]*ServiceRegistration, numServices) + ops := &operations{} for _, service := range task.Services { - if err := c.serviceRegs(ops, allocID, service, task, exec, net); err != nil { + sreg, err := c.serviceRegs(ops, allocID, service, task, exec, net) + if err != nil { return err } + t.Services[sreg.serviceID] = sreg } + + // Add the task to the allocation's registration + c.addTaskRegistration(allocID, task.Name, t) + c.commit(ops) return nil } @@ -541,6 +689,9 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec dr func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { ops := &operations{} + t := new(TaskRegistration) + t.Services = make(map[string]*ServiceRegistration, len(newTask.Services)) + existingIDs := make(map[string]*structs.Service, len(existing.Services)) for _, s := range existing.Services { existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s @@ -563,6 +714,13 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta continue } + // Service still exists so add it to the task's registration + sreg := &ServiceRegistration{ + serviceID: existingID, + checkIDs: make(map[string]struct{}, len(newSvc.Checks)), + } + t.Services[existingID] = sreg + // PortLabel and AddressMode aren't included in the ID, so we // have to compare manually. serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode @@ -583,12 +741,16 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta if _, exists := existingChecks[checkID]; exists { // Check exists, so don't remove it delete(existingChecks, checkID) + sreg.checkIDs[checkID] = struct{}{} } else if serviceUnchanged { // New check on an unchanged service; add them now - err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) + newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) if err != nil { return err } + for _, checkID := range newCheckIDs { + sreg.checkIDs[checkID] = struct{}{} + } } } @@ -600,12 +762,17 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Any remaining services should just be enqueued directly for _, newSvc := range newIDs { - err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net) + sreg, err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net) if err != nil { return err } + + t.Services[sreg.serviceID] = sreg } + // Add the task to the allocation's registration + c.addTaskRegistration(allocID, newTask.Name, t) + c.commit(ops) return nil } @@ -625,43 +792,52 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { } } + // Remove the task from the alloc's registrations + c.removeTaskRegistration(allocID, task.Name) + // Now add them to the deregistration fields; main Run loop will update c.commit(&ops) } -// Checks returns the checks registered against the agent for the given -// allocation. -func (c *ServiceClient) Checks(a *structs.Allocation) ([]*api.AgentCheck, error) { - tg := a.Job.LookupTaskGroup(a.TaskGroup) - if tg == nil { - return nil, fmt.Errorf("failed to find task group in alloc") +// AllocRegistrations returns the registrations for the given allocation. If the +// allocation has no reservations, the response is a nil object. +func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) { + // Get the internal struct using the lock + c.allocRegistrationsLock.RLock() + regInternal, ok := c.allocRegistrations[allocID] + if !ok { + c.allocRegistrationsLock.RUnlock() + return nil, nil } - // Determine the checks that are relevant - relevant := make(map[string]struct{}, 4) - for _, task := range tg.Tasks { - for _, service := range task.Services { - id := makeTaskServiceID(a.ID, task.Name, service) - for _, check := range service.Checks { - relevant[makeCheckID(id, check)] = struct{}{} - } - } + // Copy so we don't expose internal structs + reg := regInternal.copy() + c.allocRegistrationsLock.RUnlock() + + // Query the services and checks to populate the allocation registrations. + services, err := c.client.Services() + if err != nil { + return nil, err } - // Query all the checks checks, err := c.client.Checks() if err != nil { return nil, err } - allocChecks := make([]*api.AgentCheck, 0, len(relevant)) - for checkID := range relevant { - if check, ok := checks[checkID]; ok { - allocChecks = append(allocChecks, check) + // Populate the object + for _, treg := range reg.Tasks { + for serviceID, sreg := range treg.Services { + sreg.Service = services[serviceID] + for checkID := range sreg.checkIDs { + if check, ok := checks[checkID]; ok { + sreg.Checks = append(sreg.Checks, check) + } + } } } - return allocChecks, nil + return reg, nil } // Shutdown the Consul client. Update running task registations and deregister @@ -718,6 +894,39 @@ func (c *ServiceClient) Shutdown() error { return nil } +// addTaskRegistration adds the task registration for the given allocation. +func (c *ServiceClient) addTaskRegistration(allocID, taskName string, reg *TaskRegistration) { + c.allocRegistrationsLock.Lock() + defer c.allocRegistrationsLock.Unlock() + + alloc, ok := c.allocRegistrations[allocID] + if !ok { + alloc = &AllocRegistration{ + Tasks: make(map[string]*TaskRegistration), + } + c.allocRegistrations[allocID] = alloc + } + alloc.Tasks[taskName] = reg +} + +// removeTaskRegistration removes the task registration for the given allocation. +func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) { + c.allocRegistrationsLock.Lock() + defer c.allocRegistrationsLock.Unlock() + + alloc, ok := c.allocRegistrations[allocID] + if !ok { + return + } + + // Delete the task and if it is the last one also delete the alloc's + // registration + delete(alloc.Tasks, taskName) + if len(alloc.Tasks) == 0 { + delete(c.allocRegistrations, allocID) + } +} + // makeAgentServiceID creates a unique ID for identifying an agent service in // Consul. // diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 97cd56072ff6..fc9e0a625ee0 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -208,13 +208,18 @@ func TestConsul_Integration(t *testing.T) { } // Assert the service client returns all the checks for the allocation. - checksOut, err := serviceClient.Checks(alloc) + reg, err := serviceClient.AllocRegistrations(alloc.ID) if err != nil { t.Fatalf("unexpected error retrieving allocation checks: %v", err) } - - if l := len(checksOut); l != 2 { - t.Fatalf("got %d checks; want %d", l, 2) + if reg == nil { + t.Fatalf("Unexpected nil allocation registration") + } + if snum := reg.NumServices(); snum != 2 { + t.Fatalf("Unexpected number of services registered. Got %d; want 2", snum) + } + if cnum := reg.NumChecks(); cnum != 2 { + t.Fatalf("Unexpected number of checks registered. Got %d; want 2", cnum) } logger.Printf("[TEST] consul.test: killing task") diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index c3841b672a39..ce379e22e024 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -215,7 +215,8 @@ func (c *fakeConsul) UpdateTTL(id string, output string, status string) error { func TestConsul_ChangeTags(t *testing.T) { ctx := setupFake() - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil, nil); err != nil { + allocID := "allocid" + if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, nil, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -227,6 +228,22 @@ func TestConsul_ChangeTags(t *testing.T) { t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) } + // Query the allocs registrations and then again when we update. The IDs + // should change + reg1, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg1 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg1.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg1.NumChecks(); num != 0 { + t.Fatalf("Wrong number of checks: got %d; want 0", num) + } + origKey := "" for k, v := range ctx.FakeConsul.services { origKey = k @@ -263,6 +280,34 @@ func TestConsul_ChangeTags(t *testing.T) { t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) } } + + // Check again and ensure the IDs changed + reg2, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg2 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg2.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg2.NumChecks(); num != 0 { + t.Fatalf("Wrong number of checks: got %d; want 0", num) + } + + for task, treg := range reg1.Tasks { + otherTaskReg, ok := reg2.Tasks[task] + if !ok { + t.Fatalf("Task %q not in second reg", task) + } + + for sID := range treg.Services { + if _, ok := otherTaskReg.Services[sID]; ok { + t.Fatalf("service ID didn't change") + } + } + } } // TestConsul_ChangePorts asserts that changing the ports on a service updates @@ -461,7 +506,8 @@ func TestConsul_ChangeChecks(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, nil); err != nil { + allocID := "allocid" + if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -473,6 +519,22 @@ func TestConsul_ChangeChecks(t *testing.T) { t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) } + // Query the allocs registrations and then again when we update. The IDs + // should change + reg1, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg1 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg1.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg1.NumChecks(); num != 1 { + t.Fatalf("Wrong number of checks: got %d; want 1", num) + } + origServiceKey := "" for k, v := range ctx.FakeConsul.services { origServiceKey = k @@ -493,13 +555,13 @@ func TestConsul_ChangeChecks(t *testing.T) { } } - // Now add a check + // Now add a check and modify the original origTask := ctx.Task.Copy() ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", Type: "tcp", - Interval: time.Second, + Interval: 2 * time.Second, Timeout: time.Second, PortLabel: "x", }, @@ -545,6 +607,41 @@ func TestConsul_ChangeChecks(t *testing.T) { t.Errorf("Unkown check: %q", k) } } + + // Check again and ensure the IDs changed + reg2, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg2 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg2.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg2.NumChecks(); num != 2 { + t.Fatalf("Wrong number of checks: got %d; want 2", num) + } + + for task, treg := range reg1.Tasks { + otherTaskReg, ok := reg2.Tasks[task] + if !ok { + t.Fatalf("Task %q not in second reg", task) + } + + for sID, sreg := range treg.Services { + otherServiceReg, ok := otherTaskReg.Services[sID] + if !ok { + t.Fatalf("service ID changed") + } + + for newID := range sreg.checkIDs { + if _, ok := otherServiceReg.checkIDs[newID]; ok { + t.Fatalf("check IDs should change") + } + } + } + } } // TestConsul_RegServices tests basic service registration. diff --git a/helper/funcs.go b/helper/funcs.go index 3ad918dde049..ae26849164b4 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -173,6 +173,19 @@ func CopyMapStringString(m map[string]string) map[string]string { return c } +func CopyMapStringStruct(m map[string]struct{}) map[string]struct{} { + l := len(m) + if l == 0 { + return nil + } + + c := make(map[string]struct{}, l) + for k, _ := range m { + c[k] = struct{}{} + } + return c +} + func CopyMapStringInt(m map[string]int) map[string]int { l := len(m) if l == 0 {