From 3b300925a2aa5f7efaa26532943b92e066ed3b2d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 7 Aug 2017 15:54:05 -0700 Subject: [PATCH 1/2] Fix alloc health with checks using interpolation Fixes an issue in which the allocation health watcher was checking for allocations health based on un-interpolated services and checks. Change the interface for retrieving check information from Consul to retrieving all registered services and checks by allocation. In the future this will allow us to output nicer messages. Fixes https://github.com/hashicorp/nomad/issues/2969 --- client/alloc_runner_health_watcher.go | 43 ++-- client/alloc_runner_test.go | 27 ++- client/consul.go | 4 +- client/consul_test.go | 19 +- command/agent/consul/client.go | 301 ++++++++++++++++++++++---- command/agent/consul/int_test.go | 13 +- command/agent/consul/unit_test.go | 105 ++++++++- helper/funcs.go | 13 ++ 8 files changed, 438 insertions(+), 87 deletions(-) diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go index 31c7f59543a9..b2bd75b563b5 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/alloc_runner_health_watcher.go @@ -5,6 +5,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -105,7 +106,7 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { // Store whether the last consul checks call was successful or not consulChecksErr := false - var checks []*api.AgentCheck + var allocReg *consul.AllocRegistration first := true OUTER: for { @@ -121,15 +122,15 @@ OUTER: alloc = newAlloc r.logger.Printf("[TRACE] client.alloc_watcher: new alloc version for %q", alloc.ID) case <-checkCh: - newChecks, err := r.consulClient.Checks(alloc) + newAllocReg, err := r.consulClient.AllocRegistrations(alloc.ID) if err != nil { if !consulChecksErr { consulChecksErr = true - r.logger.Printf("[WARN] client.alloc_watcher: failed to lookup consul checks for allocation %q: %v", alloc.ID, err) + r.logger.Printf("[WARN] client.alloc_watcher: failed to lookup consul registrations for allocation %q: %v", alloc.ID, err) } } else { consulChecksErr = false - checks = newChecks + allocReg = newAllocReg } case <-deadline.C: // We have exceeded our deadline without being healthy. @@ -174,23 +175,29 @@ OUTER: } // If we should have checks and they aren't all healthy continue - if len(checks) != desiredChecks { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, len(checks), alloc.ID) - cancelHealthyTimer() - continue OUTER - } - - // Check if all the checks are passing - for _, check := range checks { - if check.Status != api.HealthPassing { - r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID) - latestChecksHealthy = time.Time{} + if desiredChecks > 0 { + if allocReg.NumChecks() != desiredChecks { + r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, allocReg.NumChecks(), alloc.ID) cancelHealthyTimer() continue OUTER } - } - if latestChecksHealthy.IsZero() { - latestChecksHealthy = time.Now() + + // Check if all the checks are passing + for _, treg := range allocReg.Tasks { + for _, sreg := range treg.Services { + for _, check := range sreg.Checks { + if check.Status != api.HealthPassing { + r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID) + latestChecksHealthy = time.Time{} + cancelHealthyTimer() + continue OUTER + } + } + } + } + if latestChecksHealthy.IsZero() { + latestChecksHealthy = time.Now() + } } // Determine if the allocation is healthy diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 4323b8e1e970..33cf07bad013 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -14,6 +14,7 @@ import ( "github.com/boltdb/bolt" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -254,12 +255,32 @@ func TestAllocRunner_DeploymentHealth_Healthy_Checks(t *testing.T) { // Only return the check as healthy after a duration trigger := time.After(500 * time.Millisecond) - ar.consulClient.(*mockConsulServiceClient).checksFn = func(a *structs.Allocation) ([]*api.AgentCheck, error) { + ar.consulClient.(*mockConsulServiceClient).allocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { select { case <-trigger: - return []*api.AgentCheck{checkHealthy}, nil + return &consul.AllocRegistration{ + Tasks: map[string]*consul.TaskRegistration{ + task.Name: { + Services: map[string]*consul.ServiceRegistration{ + "123": { + Checks: []*api.AgentCheck{checkHealthy}, + }, + }, + }, + }, + }, nil default: - return []*api.AgentCheck{checkUnhealthy}, nil + return &consul.AllocRegistration{ + Tasks: map[string]*consul.TaskRegistration{ + task.Name: { + Services: map[string]*consul.ServiceRegistration{ + "123": { + Checks: []*api.AgentCheck{checkUnhealthy}, + }, + }, + }, + }, + }, nil } } diff --git a/client/consul.go b/client/consul.go index d470e3dea285..89666e41e451 100644 --- a/client/consul.go +++ b/client/consul.go @@ -1,9 +1,9 @@ package client import ( - "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -13,5 +13,5 @@ type ConsulServiceAPI interface { RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error RemoveTask(allocID string, task *structs.Task) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error - Checks(alloc *structs.Allocation) ([]*api.AgentCheck, error) + AllocRegistrations(allocID string) (*consul.AllocRegistration, error) } diff --git a/client/consul_test.go b/client/consul_test.go index 897303f04ce3..10d1ebe10d04 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -8,9 +8,9 @@ import ( "sync" "testing" - "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" ) @@ -24,7 +24,7 @@ type mockConsulOp struct { } func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) mockConsulOp { - if op != "add" && op != "remove" && op != "update" && op != "checks" { + if op != "add" && op != "remove" && op != "update" && op != "alloc_registrations" { panic(fmt.Errorf("invalid consul op: %s", op)) } return mockConsulOp{ @@ -44,8 +44,9 @@ type mockConsulServiceClient struct { logger *log.Logger - // checksFn allows injecting return values for the Checks function. - checksFn func(*structs.Allocation) ([]*api.AgentCheck, error) + // allocRegistrationsFn allows injecting return values for the + // AllocRegistrations function. + allocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error) } func newMockConsulServiceClient() *mockConsulServiceClient { @@ -82,14 +83,14 @@ func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil, nil)) } -func (m *mockConsulServiceClient) Checks(alloc *structs.Allocation) ([]*api.AgentCheck, error) { +func (m *mockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: Checks(%q)", alloc.ID) - m.ops = append(m.ops, newMockConsulOp("checks", alloc.ID, nil, nil, nil)) + m.logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID) + m.ops = append(m.ops, newMockConsulOp("alloc_registrations", allocID, nil, nil, nil)) - if m.checksFn != nil { - return m.checksFn(alloc) + if m.allocRegistrationsFn != nil { + return m.allocRegistrationsFn(allocID) } return nil, nil diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index a1710250cab0..a7ee09d1d8b9 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/driver" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -84,6 +85,100 @@ type operations struct { deregChecks []string } +// AllocRegistration holds the status of services registered for a particular +// allocations by task. +type AllocRegistration struct { + // Tasks maps the name of a task to its registered services and checks + Tasks map[string]*TaskRegistration +} + +func (a *AllocRegistration) Copy() *AllocRegistration { + c := &AllocRegistration{ + Tasks: make(map[string]*TaskRegistration, len(a.Tasks)), + } + + for k, v := range a.Tasks { + c.Tasks[k] = v.Copy() + } + + return c +} + +// NumServices returns the number of registered services +func (a *AllocRegistration) NumServices() int { + if a == nil { + return 0 + } + + total := 0 + for _, treg := range a.Tasks { + for _, sreg := range treg.Services { + if sreg.Service != nil { + total++ + } + } + } + + return total +} + +// NumChecks returns the number of registered checks +func (a *AllocRegistration) NumChecks() int { + if a == nil { + return 0 + } + + total := 0 + for _, treg := range a.Tasks { + for _, sreg := range treg.Services { + total += len(sreg.Checks) + } + } + + return total +} + +// TaskRegistration holds the status of services registered for a particular +// task. +type TaskRegistration struct { + Services map[string]*ServiceRegistration +} + +func (t *TaskRegistration) Copy() *TaskRegistration { + c := &TaskRegistration{ + Services: make(map[string]*ServiceRegistration, len(t.Services)), + } + + for k, v := range t.Services { + c.Services[k] = v.Copy() + } + + return c +} + +// ServiceRegistration holds the status of a registered Consul Service and its +// Checks. +type ServiceRegistration struct { + // serviceID and checkIDs are internal fields that track just the IDs of the + // services/checks registered in Consul. It is used to materialize the other + // fields when queried. + serviceID string + checkIDs map[string]struct{} + + // Service is the AgentService registered in Consul. + Service *api.AgentService + + // Checks is the status of the registered checks. + Checks []*api.AgentCheck +} + +func (s *ServiceRegistration) Copy() *ServiceRegistration { + return &ServiceRegistration{ + serviceID: s.serviceID, + checkIDs: helper.CopyMapStringStruct(s.checkIDs), + } +} + // ServiceClient handles task and agent service registration with Consul. type ServiceClient struct { client AgentAPI @@ -111,6 +206,11 @@ type ServiceClient struct { scripts map[string]*scriptCheck runningScripts map[string]*scriptHandle + // allocRegistrations stores the services and checks that are registered + // with Consul by allocation ID. + allocRegistrations map[string]*AllocRegistration + allocRegistrationsLock sync.RWMutex + // agent services and checks record entries for the agent itself which // should be removed on shutdown agentServices map[string]struct{} @@ -126,21 +226,22 @@ type ServiceClient struct { // Client and logger. func NewServiceClient(consulClient AgentAPI, skipVerifySupport bool, logger *log.Logger) *ServiceClient { return &ServiceClient{ - client: consulClient, - skipVerifySupport: skipVerifySupport, - logger: logger, - retryInterval: defaultRetryInterval, - maxRetryInterval: defaultMaxRetryInterval, - exitCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - shutdownWait: defaultShutdownWait, - opCh: make(chan *operations, 8), - services: make(map[string]*api.AgentServiceRegistration), - checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), - agentServices: make(map[string]struct{}), - agentChecks: make(map[string]struct{}), + client: consulClient, + skipVerifySupport: skipVerifySupport, + logger: logger, + retryInterval: defaultRetryInterval, + maxRetryInterval: defaultMaxRetryInterval, + exitCh: make(chan struct{}), + shutdownCh: make(chan struct{}), + shutdownWait: defaultShutdownWait, + opCh: make(chan *operations, 8), + services: make(map[string]*api.AgentServiceRegistration), + checks: make(map[string]*api.AgentCheckRegistration), + scripts: make(map[string]*scriptCheck), + runningScripts: make(map[string]*scriptHandle), + allocRegistrations: make(map[string]*AllocRegistration), + agentServices: make(map[string]struct{}), + agentChecks: make(map[string]struct{}), } } @@ -445,11 +546,19 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) } // serviceRegs creates service registrations, check registrations, and script -// checks from a service. +// checks from a service. It returns a service registration object with the +// service and check IDs populated. func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *structs.Service, - task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { + task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) (*ServiceRegistration, error) { + // Get the services ID id := makeTaskServiceID(allocID, task.Name, service) + sreg := &ServiceRegistration{ + serviceID: id, + checkIDs: make(map[string]struct{}, len(service.Checks)), + } + + // Determine the address to advertise addrMode := service.AddressMode if addrMode == structs.AddressModeAuto { if net.Advertise() { @@ -462,11 +571,13 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st ip, port := task.Resources.Networks.Port(service.PortLabel) if addrMode == structs.AddressModeDriver { if net == nil { - return fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name) + return nil, fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name) } ip = net.IP port = net.PortMap[service.PortLabel] } + + // Build the Consul Service registration request serviceReg := &api.AgentServiceRegistration{ ID: id, Name: service.Name, @@ -478,12 +589,30 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st // with tests that may reuse Tasks copy(serviceReg.Tags, service.Tags) ops.regServices = append(ops.regServices, serviceReg) - return c.checkRegs(ops, allocID, id, service, task, exec, net) + + // Build the check registrations + checkIDs, err := c.checkRegs(ops, allocID, id, service, task, exec, net) + if err != nil { + return nil, err + } + for _, cid := range checkIDs { + sreg.checkIDs[cid] = struct{}{} + } + return sreg, nil } +// checkRegs registers the checks for the given service and returns the +// registered check ids. func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, service *structs.Service, - task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { + task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) ([]string, error) { + // Fast path + numChecks := len(service.Checks) + if numChecks == 0 { + return nil, nil + } + + checkIDs := make([]string, 0, numChecks) for _, check := range service.Checks { if check.TLSSkipVerify && !c.skipVerifySupport { c.logger.Printf("[WARN] consul.sync: skipping check %q for task %q alloc %q because Consul doesn't support tls_skip_verify. Please upgrade to Consul >= 0.7.2.", @@ -491,9 +620,10 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se continue } checkID := makeCheckID(serviceID, check) + checkIDs = append(checkIDs, checkID) if check.Type == structs.ServiceCheckScript { if exec == nil { - return fmt.Errorf("driver doesn't support script checks") + return nil, fmt.Errorf("driver doesn't support script checks") } ops.scripts = append(ops.scripts, newScriptCheck( allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh)) @@ -509,11 +639,11 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se ip, port := task.Resources.Networks.Port(portLabel) checkReg, err := createCheckReg(serviceID, checkID, check, ip, port) if err != nil { - return fmt.Errorf("failed to add check %q: %v", check.Name, err) + return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) } ops.regChecks = append(ops.regChecks, checkReg) } - return nil + return checkIDs, nil } // RegisterTask with Consul. Adds all sevice entries and checks to Consul. If @@ -524,12 +654,28 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se // // Actual communication with Consul is done asynchrously (see Run). func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { + // Fast path + numServices := len(task.Services) + if numServices == 0 { + return nil + } + + t := new(TaskRegistration) + t.Services = make(map[string]*ServiceRegistration, numServices) + ops := &operations{} for _, service := range task.Services { - if err := c.serviceRegs(ops, allocID, service, task, exec, net); err != nil { + sreg, err := c.serviceRegs(ops, allocID, service, task, exec, net) + if err != nil { return err } + t.Services[sreg.serviceID] = sreg } + + // Add the task to the allocation's registration + // Add the task to the allocation's registration + c.addTaskRegistration(allocID, task.Name, t) + c.commit(ops) return nil } @@ -541,6 +687,9 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec dr func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { ops := &operations{} + t := new(TaskRegistration) + t.Services = make(map[string]*ServiceRegistration, len(newTask.Services)) + existingIDs := make(map[string]*structs.Service, len(existing.Services)) for _, s := range existing.Services { existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s @@ -563,6 +712,13 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta continue } + // Service still exists so add it to the task's registration + sreg := &ServiceRegistration{ + serviceID: existingID, + checkIDs: make(map[string]struct{}, len(newSvc.Checks)), + } + t.Services[existingID] = sreg + // PortLabel and AddressMode aren't included in the ID, so we // have to compare manually. serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode @@ -583,12 +739,16 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta if _, exists := existingChecks[checkID]; exists { // Check exists, so don't remove it delete(existingChecks, checkID) + sreg.checkIDs[checkID] = struct{}{} } else if serviceUnchanged { // New check on an unchanged service; add them now - err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) + newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net) if err != nil { return err } + for _, checkID := range newCheckIDs { + sreg.checkIDs[checkID] = struct{}{} + } } } @@ -600,12 +760,17 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta // Any remaining services should just be enqueued directly for _, newSvc := range newIDs { - err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net) + sreg, err := c.serviceRegs(ops, allocID, newSvc, newTask, exec, net) if err != nil { return err } + + t.Services[sreg.serviceID] = sreg } + // Add the task to the allocation's registration + c.addTaskRegistration(allocID, newTask.Name, t) + c.commit(ops) return nil } @@ -625,43 +790,52 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) { } } + // Remove the task from the alloc's registrations + c.removeTaskRegistration(allocID, task.Name) + // Now add them to the deregistration fields; main Run loop will update c.commit(&ops) } -// Checks returns the checks registered against the agent for the given -// allocation. -func (c *ServiceClient) Checks(a *structs.Allocation) ([]*api.AgentCheck, error) { - tg := a.Job.LookupTaskGroup(a.TaskGroup) - if tg == nil { - return nil, fmt.Errorf("failed to find task group in alloc") +// AllocRegistrations returns the registrations for the given allocation. If the +// allocation has no reservations, the response is a nil object. +func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) { + // Get the internal struct using the lock + c.allocRegistrationsLock.RLock() + regInternal, ok := c.allocRegistrations[allocID] + if !ok { + c.allocRegistrationsLock.RUnlock() + return nil, nil } - // Determine the checks that are relevant - relevant := make(map[string]struct{}, 4) - for _, task := range tg.Tasks { - for _, service := range task.Services { - id := makeTaskServiceID(a.ID, task.Name, service) - for _, check := range service.Checks { - relevant[makeCheckID(id, check)] = struct{}{} - } - } + // Copy so we don't expose internal structs + reg := regInternal.Copy() + c.allocRegistrationsLock.RUnlock() + + // Query the services and checks to populate the allocation registrations. + services, err := c.client.Services() + if err != nil { + return nil, err } - // Query all the checks checks, err := c.client.Checks() if err != nil { return nil, err } - allocChecks := make([]*api.AgentCheck, 0, len(relevant)) - for checkID := range relevant { - if check, ok := checks[checkID]; ok { - allocChecks = append(allocChecks, check) + // Populate the object + for _, treg := range reg.Tasks { + for serviceID, sreg := range treg.Services { + sreg.Service = services[serviceID] + for checkID := range sreg.checkIDs { + if check, ok := checks[checkID]; ok { + sreg.Checks = append(sreg.Checks, check) + } + } } } - return allocChecks, nil + return reg, nil } // Shutdown the Consul client. Update running task registations and deregister @@ -718,6 +892,39 @@ func (c *ServiceClient) Shutdown() error { return nil } +// addTaskRegistration adds the task registration for the given allocation. +func (c *ServiceClient) addTaskRegistration(allocID, taskName string, reg *TaskRegistration) { + c.allocRegistrationsLock.Lock() + defer c.allocRegistrationsLock.Unlock() + + alloc, ok := c.allocRegistrations[allocID] + if !ok { + alloc = &AllocRegistration{ + Tasks: make(map[string]*TaskRegistration), + } + c.allocRegistrations[allocID] = alloc + } + alloc.Tasks[taskName] = reg +} + +// removeTaskRegistration removes the task registration for the given allocation. +func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) { + c.allocRegistrationsLock.Lock() + defer c.allocRegistrationsLock.Unlock() + + alloc, ok := c.allocRegistrations[allocID] + if !ok { + return + } + + // Delete the task and if it is the last one also delete the alloc's + // registration + delete(alloc.Tasks, taskName) + if len(alloc.Tasks) == 0 { + delete(c.allocRegistrations, allocID) + } +} + // makeAgentServiceID creates a unique ID for identifying an agent service in // Consul. // diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 97cd56072ff6..fc9e0a625ee0 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -208,13 +208,18 @@ func TestConsul_Integration(t *testing.T) { } // Assert the service client returns all the checks for the allocation. - checksOut, err := serviceClient.Checks(alloc) + reg, err := serviceClient.AllocRegistrations(alloc.ID) if err != nil { t.Fatalf("unexpected error retrieving allocation checks: %v", err) } - - if l := len(checksOut); l != 2 { - t.Fatalf("got %d checks; want %d", l, 2) + if reg == nil { + t.Fatalf("Unexpected nil allocation registration") + } + if snum := reg.NumServices(); snum != 2 { + t.Fatalf("Unexpected number of services registered. Got %d; want 2", snum) + } + if cnum := reg.NumChecks(); cnum != 2 { + t.Fatalf("Unexpected number of checks registered. Got %d; want 2", cnum) } logger.Printf("[TEST] consul.test: killing task") diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index c3841b672a39..ce379e22e024 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -215,7 +215,8 @@ func (c *fakeConsul) UpdateTTL(id string, output string, status string) error { func TestConsul_ChangeTags(t *testing.T) { ctx := setupFake() - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, nil, nil); err != nil { + allocID := "allocid" + if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, nil, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -227,6 +228,22 @@ func TestConsul_ChangeTags(t *testing.T) { t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) } + // Query the allocs registrations and then again when we update. The IDs + // should change + reg1, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg1 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg1.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg1.NumChecks(); num != 0 { + t.Fatalf("Wrong number of checks: got %d; want 0", num) + } + origKey := "" for k, v := range ctx.FakeConsul.services { origKey = k @@ -263,6 +280,34 @@ func TestConsul_ChangeTags(t *testing.T) { t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) } } + + // Check again and ensure the IDs changed + reg2, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg2 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg2.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg2.NumChecks(); num != 0 { + t.Fatalf("Wrong number of checks: got %d; want 0", num) + } + + for task, treg := range reg1.Tasks { + otherTaskReg, ok := reg2.Tasks[task] + if !ok { + t.Fatalf("Task %q not in second reg", task) + } + + for sID := range treg.Services { + if _, ok := otherTaskReg.Services[sID]; ok { + t.Fatalf("service ID didn't change") + } + } + } } // TestConsul_ChangePorts asserts that changing the ports on a service updates @@ -461,7 +506,8 @@ func TestConsul_ChangeChecks(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx, nil); err != nil { + allocID := "allocid" + if err := ctx.ServiceClient.RegisterTask(allocID, ctx.Task, ctx, nil); err != nil { t.Fatalf("unexpected error registering task: %v", err) } @@ -473,6 +519,22 @@ func TestConsul_ChangeChecks(t *testing.T) { t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) } + // Query the allocs registrations and then again when we update. The IDs + // should change + reg1, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg1 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg1.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg1.NumChecks(); num != 1 { + t.Fatalf("Wrong number of checks: got %d; want 1", num) + } + origServiceKey := "" for k, v := range ctx.FakeConsul.services { origServiceKey = k @@ -493,13 +555,13 @@ func TestConsul_ChangeChecks(t *testing.T) { } } - // Now add a check + // Now add a check and modify the original origTask := ctx.Task.Copy() ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", Type: "tcp", - Interval: time.Second, + Interval: 2 * time.Second, Timeout: time.Second, PortLabel: "x", }, @@ -545,6 +607,41 @@ func TestConsul_ChangeChecks(t *testing.T) { t.Errorf("Unkown check: %q", k) } } + + // Check again and ensure the IDs changed + reg2, err := ctx.ServiceClient.AllocRegistrations(allocID) + if err != nil { + t.Fatalf("Looking up alloc registration failed: %v", err) + } + if reg2 == nil { + t.Fatalf("Nil alloc registrations: %v", err) + } + if num := reg2.NumServices(); num != 1 { + t.Fatalf("Wrong number of servies: got %d; want 1", num) + } + if num := reg2.NumChecks(); num != 2 { + t.Fatalf("Wrong number of checks: got %d; want 2", num) + } + + for task, treg := range reg1.Tasks { + otherTaskReg, ok := reg2.Tasks[task] + if !ok { + t.Fatalf("Task %q not in second reg", task) + } + + for sID, sreg := range treg.Services { + otherServiceReg, ok := otherTaskReg.Services[sID] + if !ok { + t.Fatalf("service ID changed") + } + + for newID := range sreg.checkIDs { + if _, ok := otherServiceReg.checkIDs[newID]; ok { + t.Fatalf("check IDs should change") + } + } + } + } } // TestConsul_RegServices tests basic service registration. diff --git a/helper/funcs.go b/helper/funcs.go index 3ad918dde049..ae26849164b4 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -173,6 +173,19 @@ func CopyMapStringString(m map[string]string) map[string]string { return c } +func CopyMapStringStruct(m map[string]struct{}) map[string]struct{} { + l := len(m) + if l == 0 { + return nil + } + + c := make(map[string]struct{}, l) + for k, _ := range m { + c[k] = struct{}{} + } + return c +} + func CopyMapStringInt(m map[string]int) map[string]int { l := len(m) if l == 0 { From 4d323e14dfac1aa0f69ca660ccea6fc3437e20b4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 10 Aug 2017 13:07:03 -0700 Subject: [PATCH 2/2] Address comments --- command/agent/consul/client.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index a7ee09d1d8b9..44f3d3725511 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -92,13 +92,13 @@ type AllocRegistration struct { Tasks map[string]*TaskRegistration } -func (a *AllocRegistration) Copy() *AllocRegistration { +func (a *AllocRegistration) copy() *AllocRegistration { c := &AllocRegistration{ Tasks: make(map[string]*TaskRegistration, len(a.Tasks)), } for k, v := range a.Tasks { - c.Tasks[k] = v.Copy() + c.Tasks[k] = v.copy() } return c @@ -144,13 +144,13 @@ type TaskRegistration struct { Services map[string]*ServiceRegistration } -func (t *TaskRegistration) Copy() *TaskRegistration { +func (t *TaskRegistration) copy() *TaskRegistration { c := &TaskRegistration{ Services: make(map[string]*ServiceRegistration, len(t.Services)), } for k, v := range t.Services { - c.Services[k] = v.Copy() + c.Services[k] = v.copy() } return c @@ -172,7 +172,10 @@ type ServiceRegistration struct { Checks []*api.AgentCheck } -func (s *ServiceRegistration) Copy() *ServiceRegistration { +func (s *ServiceRegistration) copy() *ServiceRegistration { + // Copy does not copy the external fields but only the internal fields. This + // is so that the caller of AllocRegistrations can not access the internal + // fields and that method uses these fields to populate the external fields. return &ServiceRegistration{ serviceID: s.serviceID, checkIDs: helper.CopyMapStringStruct(s.checkIDs), @@ -672,7 +675,6 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec dr t.Services[sreg.serviceID] = sreg } - // Add the task to the allocation's registration // Add the task to the allocation's registration c.addTaskRegistration(allocID, task.Name, t) @@ -809,7 +811,7 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, } // Copy so we don't expose internal structs - reg := regInternal.Copy() + reg := regInternal.copy() c.allocRegistrationsLock.RUnlock() // Query the services and checks to populate the allocation registrations.