diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 0069bb559002..37d9b4a49735 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -153,7 +153,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, - namespace: alloc.ServiceProviderNamespace(), + providerNamespace: alloc.ServiceProviderNamespace(), serviceRegWrapper: ar.serviceRegWrapper, restarter: ar, taskEnvBuilder: envBuilder, diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 3442a76476c0..49fe5889438f 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -25,16 +25,16 @@ type groupServiceHook struct { allocID string jobID string group string - nomadNamespace string + namespace string restarter agentconsul.WorkloadRestarter prerun bool deregistered bool networkStatus structs.NetworkStatus shutdownDelayCtx context.Context - // namespace is the Nomad or Consul namespace in which service + // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. This field may be updated. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -63,9 +63,9 @@ type groupServiceHookConfig struct { shutdownDelayCtx context.Context logger log.Logger - // namespace is the Nomad or Consul namespace in which service + // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -84,9 +84,9 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { allocID: cfg.alloc.ID, jobID: cfg.alloc.JobID, group: cfg.alloc.TaskGroup, - nomadNamespace: cfg.alloc.Namespace, + namespace: cfg.alloc.Namespace, restarter: cfg.restarter, - namespace: cfg.namespace, + providerNamespace: cfg.providerNamespace, taskEnvBuilder: cfg.taskEnvBuilder, delay: shutdownDelay, networkStatus: cfg.networkStatus, @@ -164,7 +164,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { // An update may change the service provider, therefore we need to account // for how namespaces work across providers also. - h.namespace = req.Alloc.ServiceProviderNamespace() + h.providerNamespace = req.Alloc.ServiceProviderNamespace() // Create new task services struct with those new values newWorkloadServices := h.getWorkloadServices() @@ -247,18 +247,22 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe netStatus = h.networkStatus.NetworkStatus() } + info := structs.AllocInfo{ + AllocID: h.allocID, + JobID: h.jobID, + Group: h.group, + Namespace: h.namespace, + } + // Create task services struct with request's driver metadata return &serviceregistration.WorkloadServices{ - AllocID: h.allocID, - JobID: h.jobID, - Group: h.group, - NomadNamespace: h.nomadNamespace, - Namespace: h.namespace, - Restarter: h.restarter, - Services: interpolatedServices, - Networks: h.networks, - NetworkStatus: netStatus, - Ports: h.ports, - Canary: h.canary, + AllocInfo: info, + ProviderNamespace: h.providerNamespace, + Restarter: h.restarter, + Services: interpolatedServices, + Networks: h.networks, + NetworkStatus: netStatus, + Ports: h.ports, + Canary: h.canary, } } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 5a29a82e6f06..8db45479f1c6 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -32,7 +32,7 @@ type serviceHookConfig struct { // namespace is the Nomad or Consul namespace in which service // registrations will be made. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -48,6 +48,7 @@ type serviceHook struct { allocID string jobID string taskName string + namespace string restarter agentconsul.WorkloadRestarter logger log.Logger @@ -60,9 +61,9 @@ type serviceHook struct { ports structs.AllocatedPorts taskEnv *taskenv.TaskEnv - // namespace is the Nomad or Consul namespace in which service + // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. This field may be updated. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -86,7 +87,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook { allocID: c.alloc.ID, jobID: c.alloc.JobID, taskName: c.task.Name, - namespace: c.namespace, + namespace: c.alloc.Namespace, + providerNamespace: c.providerNamespace, serviceRegWrapper: c.serviceRegWrapper, services: c.task.Services, restarter: c.restarter, @@ -175,7 +177,7 @@ func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error // An update may change the service provider, therefore we need to account // for how namespaces work across providers also. - h.namespace = req.Alloc.ServiceProviderNamespace() + h.providerNamespace = req.Alloc.ServiceProviderNamespace() return nil } @@ -218,18 +220,23 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services) + info := structs.AllocInfo{ + AllocID: h.allocID, + JobID: h.jobID, + Task: h.taskName, + Namespace: h.namespace, + } + // Create task services struct with request's driver metadata return &serviceregistration.WorkloadServices{ - AllocID: h.allocID, - JobID: h.jobID, - Task: h.taskName, - Namespace: h.namespace, - Restarter: h.restarter, - Services: interpolatedServices, - DriverExec: h.driverExec, - DriverNetwork: h.driverNet, - Networks: h.networks, - Canary: h.canary, - Ports: h.ports, + AllocInfo: info, + ProviderNamespace: h.providerNamespace, + Restarter: h.restarter, + Services: interpolatedServices, + DriverExec: h.driverExec, + DriverNetwork: h.driverNet, + Networks: h.networks, + Canary: h.canary, + Ports: h.ports, } } diff --git a/client/allocrunner/taskrunner/service_hook_test.go b/client/allocrunner/taskrunner/service_hook_test.go index ae59fb649f30..6125f65b8a31 100644 --- a/client/allocrunner/taskrunner/service_hook_test.go +++ b/client/allocrunner/taskrunner/service_hook_test.go @@ -177,7 +177,7 @@ func Test_serviceHook_Nomad(t *testing.T) { h := newServiceHook(serviceHookConfig{ alloc: alloc, task: alloc.LookupTask("web"), - namespace: "default", + providerNamespace: "default", serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), logger: logger, diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 5c15e3a793a1..089e834d711a 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -123,7 +123,7 @@ func (tr *TaskRunner) initHooks() { tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{ alloc: tr.Alloc(), task: tr.Task(), - namespace: serviceProviderNamespace, + providerNamespace: serviceProviderNamespace, serviceRegWrapper: tr.serviceRegWrapper, restarter: tr, logger: hookLogger, diff --git a/client/serviceregistration/mock/mock.go b/client/serviceregistration/mock/mock.go index 899c4f8246fd..da7e885adf36 100644 --- a/client/serviceregistration/mock/mock.go +++ b/client/serviceregistration/mock/mock.go @@ -42,10 +42,10 @@ func (h *ServiceRegistrationHandler) RegisterWorkload(services *serviceregistrat h.mu.Lock() defer h.mu.Unlock() - h.log.Trace("RegisterWorkload", "alloc_id", services.AllocID, + h.log.Trace("RegisterWorkload", "alloc_id", services.AllocInfo.AllocID, "name", services.Name(), "services", len(services.Services)) - h.ops = append(h.ops, newOperation("add", services.AllocID, services.Name())) + h.ops = append(h.ops, newOperation("add", services.AllocInfo.AllocID, services.Name())) return nil } @@ -53,20 +53,20 @@ func (h *ServiceRegistrationHandler) RemoveWorkload(services *serviceregistratio h.mu.Lock() defer h.mu.Unlock() - h.log.Trace("RemoveWorkload", "alloc_id", services.AllocID, + h.log.Trace("RemoveWorkload", "alloc_id", services.AllocInfo.AllocID, "name", services.Name(), "services", len(services.Services)) - h.ops = append(h.ops, newOperation("remove", services.AllocID, services.Name())) + h.ops = append(h.ops, newOperation("remove", services.AllocInfo.AllocID, services.Name())) } func (h *ServiceRegistrationHandler) UpdateWorkload(old, newServices *serviceregistration.WorkloadServices) error { h.mu.Lock() defer h.mu.Unlock() - h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocID, "name", newServices.Name(), + h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocInfo.AllocID, "name", newServices.Name(), "old_services", len(old.Services), "new_services", len(newServices.Services)) - h.ops = append(h.ops, newOperation("update", newServices.AllocID, newServices.Name())) + h.ops = append(h.ops, newOperation("update", newServices.AllocInfo.AllocID, newServices.Name())) return nil } diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 32312d626472..b883c4631773 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -124,13 +124,13 @@ func (s *ServiceRegistrationHandler) removeWorkload( workload *serviceregistration.WorkloadServices, serviceSpec *structs.Service) { // Generate the consistent ID for this service, so we know what to remove. - id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec) + id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec) deleteArgs := structs.ServiceRegistrationDeleteByIDRequest{ ID: id, WriteRequest: structs.WriteRequest{ Region: s.cfg.Region, - Namespace: workload.Namespace, + Namespace: workload.ProviderNamespace, AuthToken: s.cfg.NodeSecret, }, } @@ -149,14 +149,14 @@ func (s *ServiceRegistrationHandler) removeWorkload( // while ensuring the operator can see. if strings.Contains(err.Error(), "service registration not found") { s.log.Info("attempted to delete non-existent service registration", - "service_id", id, "namespace", workload.Namespace) + "service_id", id, "namespace", workload.ProviderNamespace) return } // Log the error as there is nothing left to do, so the operator can see it // and identify any problems. s.log.Error("failed to delete service registration", - "error", err, "service_id", id, "namespace", workload.Namespace) + "error", err, "service_id", id, "namespace", workload.ProviderNamespace) } func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error { @@ -202,7 +202,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload( newIDs := make(map[string]*structs.Service, len(newWork.Services)) for _, s := range newWork.Services { - newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocID, newWork.Name(), s)] = s + newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocInfo.AllocID, newWork.Name(), s)] = s } // Iterate through the old services in order to identify whether they can @@ -211,7 +211,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload( // Generate the service ID of the old service. If this is not found // within the new mapping then we need to remove it. - oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocID, oldWork.Name(), oldService) + oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocInfo.AllocID, oldWork.Name(), oldService) newSvc, ok := newIDs[oldID] if !ok { oldCopy.Services = append(oldCopy.Services, oldService) @@ -290,12 +290,12 @@ func (s *ServiceRegistrationHandler) generateNomadServiceRegistration( } return &structs.ServiceRegistration{ - ID: serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec), + ID: serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec), ServiceName: serviceSpec.Name, NodeID: s.cfg.NodeID, - JobID: workload.JobID, - AllocID: workload.AllocID, - Namespace: workload.Namespace, + JobID: workload.AllocInfo.JobID, + AllocID: workload.AllocInfo.AllocID, + Namespace: workload.ProviderNamespace, Datacenter: s.cfg.Datacenter, Tags: tags, Address: ip, diff --git a/client/serviceregistration/nsd/nsd_test.go b/client/serviceregistration/nsd/nsd_test.go index 935c247b73bf..90bd0e973098 100644 --- a/client/serviceregistration/nsd/nsd_test.go +++ b/client/serviceregistration/nsd/nsd_test.go @@ -127,12 +127,14 @@ func TestServiceRegistrationHandler_UpdateWorkload(t *testing.T) { }, inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "changed-redis-db", @@ -171,12 +173,14 @@ func TestServiceRegistrationHandler_UpdateWorkload(t *testing.T) { }, inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -246,12 +250,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { { inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "changed-redis-db", @@ -279,12 +285,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, expectedOldOutput: mockWorkload(), expectedNewOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "changed-redis-db", @@ -315,12 +323,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { { inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -349,13 +359,15 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, }, expectedOldOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", - Services: []*structs.Service{}, + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", + Services: []*structs.Service{}, Ports: []structs.AllocatedPortMapping{ { Label: "db", @@ -370,12 +382,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, }, expectedNewOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -408,12 +422,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { { inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -441,12 +457,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, expectedOldOutput: mockWorkload(), expectedNewOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -489,12 +507,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { func mockWorkload() *serviceregistration.WorkloadServices { return &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", diff --git a/client/serviceregistration/workload.go b/client/serviceregistration/workload.go index 7af994a04987..7123b7e4e2bb 100644 --- a/client/serviceregistration/workload.go +++ b/client/serviceregistration/workload.go @@ -9,32 +9,15 @@ import ( // WorkloadServices describes services defined in either a Task or TaskGroup // that need to be syncronized with a service registration provider. type WorkloadServices struct { - AllocID string - - // Group in which the service belongs for a group-level service, or the - // group in which task belongs for a task-level service. - Group string - - // Task in which the service belongs for task-level service. Will be empty - // for a group-level service. - Task string - - // JobID provides additional context for providers regarding which job - // caused this registration. - JobID string - - // NomadNamespace provides additional context for providers regarding which - // nomad namespace caused this registration. - // Note: this can be different to Namespace if the provider is not Nomad - NomadNamespace string + AllocInfo structs.AllocInfo // Canary indicates whether, or not the allocation is a canary. This is // used to build the correct tags mapping. Canary bool - // Namespace is the provider namespace in which services will be + // ProviderNamespace is the provider namespace in which services will be // registered, if the provider supports this functionality. - Namespace string + ProviderNamespace string // Restarter allows restarting the task or task group depending on the // check_restart stanzas. @@ -93,8 +76,8 @@ func (ws *WorkloadServices) Copy() *WorkloadServices { } func (ws *WorkloadServices) Name() string { - if ws.Task != "" { - return ws.Task + if ws.AllocInfo.Task != "" { + return ws.AllocInfo.Task } - return "group-" + ws.Group + return "group-" + ws.AllocInfo.Group } diff --git a/command/agent/consul/connect.go b/command/agent/consul/connect.go index 7dcee478c83c..0429ff0e4405 100644 --- a/command/agent/consul/connect.go +++ b/command/agent/consul/connect.go @@ -11,17 +11,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -type allocInfo struct { - Group string - JobID string - Namespace string - AllocID string -} - // newConnect creates a new Consul AgentServiceConnect struct based on a Nomad // Connect struct. If the nomad Connect struct is nil, nil will be returned to // disable Connect for this service. -func newConnect(serviceID string, info allocInfo, serviceName string, nc *structs.ConsulConnect, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceConnect, error) { +func newConnect(serviceID string, info structs.AllocInfo, serviceName string, nc *structs.ConsulConnect, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceConnect, error) { switch { case nc == nil: // no connect stanza means there is no connect service to register @@ -97,7 +90,7 @@ func newConnectGateway(connect *structs.ConsulConnect) *api.AgentServiceConnectP return &api.AgentServiceConnectProxyConfig{Config: envoyConfig} } -func connectSidecarRegistration(serviceID string, info allocInfo, css *structs.ConsulSidecarService, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceRegistration, error) { +func connectSidecarRegistration(serviceID string, info structs.AllocInfo, css *structs.ConsulSidecarService, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceRegistration, error) { if css == nil { // no sidecar stanza means there is no sidecar service to register return nil, nil @@ -137,7 +130,7 @@ func connectSidecarRegistration(serviceID string, info allocInfo, css *structs.C }, nil } -func connectSidecarProxy(info allocInfo, proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) { +func connectSidecarProxy(info structs.AllocInfo, proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) { if proxy == nil { proxy = new(structs.ConsulProxy) } @@ -233,7 +226,7 @@ func connectMeshGateway(in structs.ConsulMeshGateway) api.MeshGatewayConfig { return gw } -func connectProxyConfig(cfg map[string]interface{}, port int, info allocInfo) map[string]interface{} { +func connectProxyConfig(cfg map[string]interface{}, port int, info structs.AllocInfo) map[string]interface{} { if cfg == nil { cfg = make(map[string]interface{}) } diff --git a/command/agent/consul/connect_test.go b/command/agent/consul/connect_test.go index b38a3186f985..f08860b740ce 100644 --- a/command/agent/consul/connect_test.go +++ b/command/agent/consul/connect_test.go @@ -37,12 +37,12 @@ func TestConnect_newConnect(t *testing.T) { service := "redis" redisID := uuid.Generate() allocID := uuid.Generate() - info := allocInfo{ + info := structs.AllocInfo{ AllocID: allocID, } t.Run("nil", func(t *testing.T) { - asr, err := newConnect("", allocInfo{}, "", nil, nil, nil) + asr, err := newConnect("", structs.AllocInfo{}, "", nil, nil, nil) require.NoError(t, err) require.Nil(t, asr) }) @@ -126,7 +126,7 @@ func TestConnect_connectSidecarRegistration(t *testing.T) { redisID := uuid.Generate() allocID := uuid.Generate() - info := allocInfo{ + info := structs.AllocInfo{ AllocID: allocID, } @@ -193,7 +193,7 @@ func TestConnect_connectProxy(t *testing.T) { ci.Parallel(t) allocID := uuid.Generate() - info := allocInfo{ + info := structs.AllocInfo{ AllocID: allocID, } @@ -397,7 +397,7 @@ func TestConnect_connectProxyConfig(t *testing.T) { "bind_address": "0.0.0.0", "bind_port": 42, "envoy_stats_tags": []string{"nomad.alloc_id=test_alloc1"}, - }, connectProxyConfig(nil, 42, allocInfo{AllocID: "test_alloc1"})) + }, connectProxyConfig(nil, 42, structs.AllocInfo{AllocID: "test_alloc1"})) }) t.Run("pre-existing map", func(t *testing.T) { @@ -408,7 +408,7 @@ func TestConnect_connectProxyConfig(t *testing.T) { "envoy_stats_tags": []string{"nomad.alloc_id=test_alloc2"}, }, connectProxyConfig(map[string]interface{}{ "foo": "bar", - }, 42, allocInfo{AllocID: "test_alloc2"})) + }, 42, structs.AllocInfo{AllocID: "test_alloc2"})) }) } @@ -608,13 +608,13 @@ func Test_injectNomadInfo(t *testing.T) { info1 := func() map[string]string { return map[string]string{ - "nomad.alloc_id": "abc123", + "nomad.alloc_id=": "abc123", } } info2 := func() map[string]string { return map[string]string{ - "nomad.alloc_id": "abc123", - "nomad.namespace": "testns", + "nomad.alloc_id=": "abc123", + "nomad.namespace=": "testns", } } diff --git a/command/agent/consul/group_test.go b/command/agent/consul/group_test.go index 1af9a113dd4c..3f6620470130 100644 --- a/command/agent/consul/group_test.go +++ b/command/agent/consul/group_test.go @@ -124,7 +124,7 @@ func TestConsul_Connect(t *testing.T) { require.Equal(t, connectService.Proxy.Config, map[string]interface{}{ "bind_address": "0.0.0.0", "bind_port": float64(9998), - "envoy_stats_tags": []interface{}{"nomad.alloc_id=" + alloc.ID}, + "envoy_stats_tags": []interface{}{"nomad.alloc_id=" + alloc.ID, "nomad.group=" + alloc.TaskGroup}, }) require.Equal(t, alloc.ID, agentService.Meta["alloc_id"]) diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index ef3f5c99982d..bbfb241da6a2 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -935,7 +935,7 @@ func (c *ServiceClient) serviceRegs( *serviceregistration.ServiceRegistration, error) { // Get the services ID - id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) sreg := &serviceregistration.ServiceRegistration{ ServiceID: id, CheckIDs: make(map[string]struct{}, len(service.Checks)), @@ -965,15 +965,8 @@ func (c *ServiceClient) serviceRegs( copy(tags, service.Tags) } - info := allocInfo{ - Group: workload.Group, - JobID: workload.JobID, - Namespace: workload.NomadNamespace, - AllocID: workload.AllocID, - } - // newConnect returns (nil, nil) if there's no Connect-enabled service. - connect, err := newConnect(id, info, service.Name, service.Connect, workload.Networks, workload.Ports) + connect, err := newConnect(id, workload.AllocInfo, service.Name, service.Connect, workload.Networks, workload.Ports) if err != nil { return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err) } @@ -1046,7 +1039,7 @@ func (c *ServiceClient) serviceRegs( Kind: kind, ID: id, Name: service.Name, - Namespace: workload.Namespace, + Namespace: workload.ProviderNamespace, Tags: tags, EnableTagOverride: service.EnableTagOverride, Address: ip, @@ -1107,7 +1100,7 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, } checkID := MakeCheckID(serviceID, check) - registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.Namespace) + registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ProviderNamespace) if err != nil { return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) } @@ -1144,18 +1137,18 @@ func (c *ServiceClient) RegisterWorkload(workload *serviceregistration.WorkloadS } // Add the workload to the allocation's registration - c.addRegistrations(workload.AllocID, workload.Name(), t) + c.addRegistrations(workload.AllocInfo.AllocID, workload.Name(), t) c.commit(ops) // Start watching checks. Done after service registrations are built // since an error building them could leak watches. for _, service := range workload.Services { - serviceID := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + serviceID := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) for _, check := range service.Checks { if check.TriggersRestarts() { checkID := MakeCheckID(serviceID, check) - c.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter) + c.checkWatcher.Watch(workload.AllocInfo.AllocID, workload.Name(), checkID, check, workload.Restarter) } } } @@ -1173,12 +1166,12 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor newIDs := make(map[string]*structs.Service, len(newWorkload.Services)) for _, s := range newWorkload.Services { - newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s + newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocInfo.AllocID, newWorkload.Name(), s)] = s } // Loop over existing Services to see if they have been removed for _, existingSvc := range old.Services { - existingID := serviceregistration.MakeAllocServiceID(old.AllocID, old.Name(), existingSvc) + existingID := serviceregistration.MakeAllocServiceID(old.AllocInfo.AllocID, old.Name(), existingSvc) newSvc, ok := newIDs[existingID] if !ok { @@ -1196,8 +1189,8 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor continue } - oldHash := existingSvc.Hash(old.AllocID, old.Name(), old.Canary) - newHash := newSvc.Hash(newWorkload.AllocID, newWorkload.Name(), newWorkload.Canary) + oldHash := existingSvc.Hash(old.AllocInfo.AllocID, old.Name(), old.Canary) + newHash := newSvc.Hash(newWorkload.AllocInfo.AllocID, newWorkload.Name(), newWorkload.Canary) if oldHash == newHash { // Service exists and hasn't changed, don't re-add it later delete(newIDs, existingID) @@ -1242,7 +1235,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor // Update all watched checks as CheckRestart fields aren't part of ID if check.TriggersRestarts() { - c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) + c.checkWatcher.Watch(newWorkload.AllocInfo.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) } } @@ -1268,7 +1261,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor } // Add the task to the allocation's registration - c.addRegistrations(newWorkload.AllocID, newWorkload.Name(), regs) + c.addRegistrations(newWorkload.AllocInfo.AllocID, newWorkload.Name(), regs) c.commit(ops) @@ -1278,7 +1271,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor for _, check := range service.Checks { if check.TriggersRestarts() { checkID := MakeCheckID(serviceID, check) - c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) + c.checkWatcher.Watch(newWorkload.AllocInfo.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) } } } @@ -1293,7 +1286,7 @@ func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadSer ops := operations{} for _, service := range workload.Services { - id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { @@ -1307,7 +1300,7 @@ func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadSer } // Remove the workload from the alloc's registrations - c.removeRegistration(workload.AllocID, workload.Name()) + c.removeRegistration(workload.AllocInfo.AllocID, workload.Name()) // Now add them to the deregistration fields; main Run loop will update c.commit(&ops) diff --git a/command/agent/consul/service_client_test.go b/command/agent/consul/service_client_test.go index e269a9679c0d..a89ed1b0b873 100644 --- a/command/agent/consul/service_client_test.go +++ b/command/agent/consul/service_client_test.go @@ -416,8 +416,10 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) { allocID := uuid.Generate() ws := &serviceregistration.WorkloadServices{ - AllocID: allocID, - Task: "taskname", + AllocInfo: structs.AllocInfo{ + AllocID: allocID, + Task: "taskname", + }, Restarter: &restartRecorder{}, Services: []*structs.Service{ { diff --git a/command/agent/consul/structs.go b/command/agent/consul/structs.go index d8e9210ce13a..ddfdecae19dd 100644 --- a/command/agent/consul/structs.go +++ b/command/agent/consul/structs.go @@ -17,8 +17,10 @@ func BuildAllocServices( tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) ws := &serviceregistration.WorkloadServices{ - AllocID: alloc.ID, - Group: alloc.TaskGroup, + AllocInfo: structs.AllocInfo{ + AllocID: alloc.ID, + Group: alloc.TaskGroup, + }, Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services), Networks: alloc.AllocatedResources.Shared.Networks, diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index f7328bfee2b9..481c350e8384 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -29,8 +29,10 @@ const ( func testWorkload() *serviceregistration.WorkloadServices { return &serviceregistration.WorkloadServices{ - AllocID: uuid.Generate(), - Task: "taskname", + AllocInfo: structs.AllocInfo{ + AllocID: uuid.Generate(), + Task: "taskname", + }, Restarter: &restartRecorder{}, Services: []*structs.Service{ { @@ -132,7 +134,7 @@ func TestConsul_ChangeTags(t *testing.T) { r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") // Validate the alloc registration - reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID) r.NoError(err) r.NotNil(reg1, "Unexpected nil alloc registration") r.Equal(1, reg1.NumServices()) @@ -172,7 +174,7 @@ func TestConsul_EnableTagOverride_Syncs(t *testing.T) { r.Equal(1, len(ctx.FakeConsul.services)) // Validate the alloc registration - reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID) r.NoError(err) r.NotNil(reg1) r.Equal(1, reg1.NumServices()) @@ -369,7 +371,7 @@ func TestConsul_ChangeChecks(t *testing.T) { // Query the allocs registrations and then again when we update. The IDs // should change - reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID) if err != nil { t.Fatalf("Looking up alloc registration failed: %v", err) } @@ -483,7 +485,7 @@ func TestConsul_ChangeChecks(t *testing.T) { } // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID) if err != nil { t.Fatalf("Looking up alloc registration failed: %v", err) } @@ -615,7 +617,7 @@ func TestConsul_RegServices(t *testing.T) { // Assert the check update is properly formed checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh - if checkUpd.checkRestart.allocID != ctx.Workload.AllocID { + if checkUpd.checkRestart.allocID != ctx.Workload.AllocInfo.AllocID { t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID) } if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected { @@ -1457,7 +1459,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID, + remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocInfo.AllocID, remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) @@ -1480,7 +1482,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, + explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocInfo.AllocID, explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) @@ -1505,7 +1507,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID, + outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocInfo.AllocID, outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) @@ -1567,7 +1569,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID, + remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocInfo.AllocID, remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) @@ -1590,7 +1592,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, + explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocInfo.AllocID, explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) @@ -1615,7 +1617,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID, + outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocInfo.AllocID, outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) diff --git a/nomad/structs/alloc.go b/nomad/structs/alloc.go index 2f5c0cfa9d17..57ef361be7fd 100644 --- a/nomad/structs/alloc.go +++ b/nomad/structs/alloc.go @@ -56,3 +56,23 @@ func (a *Allocation) ServiceProviderNamespace() string { return tg.Consul.GetNamespace() } + +type AllocInfo struct { + AllocID string + + // Group in which the service belongs for a group-level service, or the + // group in which task belongs for a task-level service. + Group string + + // Task in which the service belongs for task-level service. Will be empty + // for a group-level service. + Task string + + // JobID provides additional context for providers regarding which job + // caused this registration. + JobID string + + // NomadNamespace provides additional context for providers regarding which + // nomad namespace caused this registration. + Namespace string +}