Skip to content

Commit

Permalink
Move Workload alloc info to struct. Rename namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgemarey committed Aug 26, 2022
1 parent 53551eb commit fa5aba3
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 199 deletions.
2 changes: 1 addition & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
namespace: alloc.ServiceProviderNamespace(),
providerNamespace: alloc.ServiceProviderNamespace(),
serviceRegWrapper: ar.serviceRegWrapper,
restarter: ar,
taskEnvBuilder: envBuilder,
Expand Down
42 changes: 23 additions & 19 deletions client/allocrunner/group_service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ type groupServiceHook struct {
allocID string
jobID string
group string
nomadNamespace string
namespace string
restarter agentconsul.WorkloadRestarter
prerun bool
deregistered bool
networkStatus structs.NetworkStatus
shutdownDelayCtx context.Context

// namespace is the Nomad or Consul namespace in which service
// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made. This field may be updated.
namespace string
providerNamespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
Expand Down Expand Up @@ -63,9 +63,9 @@ type groupServiceHookConfig struct {
shutdownDelayCtx context.Context
logger log.Logger

// namespace is the Nomad or Consul namespace in which service
// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string
providerNamespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
Expand All @@ -84,9 +84,9 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
allocID: cfg.alloc.ID,
jobID: cfg.alloc.JobID,
group: cfg.alloc.TaskGroup,
nomadNamespace: cfg.alloc.Namespace,
namespace: cfg.alloc.Namespace,
restarter: cfg.restarter,
namespace: cfg.namespace,
providerNamespace: cfg.providerNamespace,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatus: cfg.networkStatus,
Expand Down Expand Up @@ -164,7 +164,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {

// An update may change the service provider, therefore we need to account
// for how namespaces work across providers also.
h.namespace = req.Alloc.ServiceProviderNamespace()
h.providerNamespace = req.Alloc.ServiceProviderNamespace()

// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()
Expand Down Expand Up @@ -247,18 +247,22 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe
netStatus = h.networkStatus.NetworkStatus()
}

info := structs.AllocInfo{
AllocID: h.allocID,
JobID: h.jobID,
Group: h.group,
Namespace: h.namespace,
}

// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
AllocID: h.allocID,
JobID: h.jobID,
Group: h.group,
NomadNamespace: h.nomadNamespace,
Namespace: h.namespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,
Canary: h.canary,
AllocInfo: info,
ProviderNamespace: h.providerNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,
Canary: h.canary,
}
}
39 changes: 23 additions & 16 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type serviceHookConfig struct {

// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string
providerNamespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
Expand All @@ -48,6 +48,7 @@ type serviceHook struct {
allocID string
jobID string
taskName string
namespace string
restarter agentconsul.WorkloadRestarter
logger log.Logger

Expand All @@ -60,9 +61,9 @@ type serviceHook struct {
ports structs.AllocatedPorts
taskEnv *taskenv.TaskEnv

// namespace is the Nomad or Consul namespace in which service
// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made. This field may be updated.
namespace string
providerNamespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
Expand All @@ -86,7 +87,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
allocID: c.alloc.ID,
jobID: c.alloc.JobID,
taskName: c.task.Name,
namespace: c.namespace,
namespace: c.alloc.Namespace,
providerNamespace: c.providerNamespace,
serviceRegWrapper: c.serviceRegWrapper,
services: c.task.Services,
restarter: c.restarter,
Expand Down Expand Up @@ -175,7 +177,7 @@ func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error

// An update may change the service provider, therefore we need to account
// for how namespaces work across providers also.
h.namespace = req.Alloc.ServiceProviderNamespace()
h.providerNamespace = req.Alloc.ServiceProviderNamespace()

return nil
}
Expand Down Expand Up @@ -218,18 +220,23 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)

info := structs.AllocInfo{
AllocID: h.allocID,
JobID: h.jobID,
Task: h.taskName,
Namespace: h.namespace,
}

// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
AllocID: h.allocID,
JobID: h.jobID,
Task: h.taskName,
Namespace: h.namespace,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
Ports: h.ports,
AllocInfo: info,
ProviderNamespace: h.providerNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
Ports: h.ports,
}
}
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/service_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func Test_serviceHook_Nomad(t *testing.T) {
h := newServiceHook(serviceHookConfig{
alloc: alloc,
task: alloc.LookupTask("web"),
namespace: "default",
providerNamespace: "default",
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (tr *TaskRunner) initHooks() {
tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{
alloc: tr.Alloc(),
task: tr.Task(),
namespace: serviceProviderNamespace,
providerNamespace: serviceProviderNamespace,
serviceRegWrapper: tr.serviceRegWrapper,
restarter: tr,
logger: hookLogger,
Expand Down
12 changes: 6 additions & 6 deletions client/serviceregistration/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,31 @@ func (h *ServiceRegistrationHandler) RegisterWorkload(services *serviceregistrat
h.mu.Lock()
defer h.mu.Unlock()

h.log.Trace("RegisterWorkload", "alloc_id", services.AllocID,
h.log.Trace("RegisterWorkload", "alloc_id", services.AllocInfo.AllocID,
"name", services.Name(), "services", len(services.Services))

h.ops = append(h.ops, newOperation("add", services.AllocID, services.Name()))
h.ops = append(h.ops, newOperation("add", services.AllocInfo.AllocID, services.Name()))
return nil
}

func (h *ServiceRegistrationHandler) RemoveWorkload(services *serviceregistration.WorkloadServices) {
h.mu.Lock()
defer h.mu.Unlock()

h.log.Trace("RemoveWorkload", "alloc_id", services.AllocID,
h.log.Trace("RemoveWorkload", "alloc_id", services.AllocInfo.AllocID,
"name", services.Name(), "services", len(services.Services))

h.ops = append(h.ops, newOperation("remove", services.AllocID, services.Name()))
h.ops = append(h.ops, newOperation("remove", services.AllocInfo.AllocID, services.Name()))
}

func (h *ServiceRegistrationHandler) UpdateWorkload(old, newServices *serviceregistration.WorkloadServices) error {
h.mu.Lock()
defer h.mu.Unlock()

h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocID, "name", newServices.Name(),
h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocInfo.AllocID, "name", newServices.Name(),
"old_services", len(old.Services), "new_services", len(newServices.Services))

h.ops = append(h.ops, newOperation("update", newServices.AllocID, newServices.Name()))
h.ops = append(h.ops, newOperation("update", newServices.AllocInfo.AllocID, newServices.Name()))
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions client/serviceregistration/nsd/nsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ func (s *ServiceRegistrationHandler) removeWorkload(
workload *serviceregistration.WorkloadServices, serviceSpec *structs.Service) {

// Generate the consistent ID for this service, so we know what to remove.
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec)
id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec)

deleteArgs := structs.ServiceRegistrationDeleteByIDRequest{
ID: id,
WriteRequest: structs.WriteRequest{
Region: s.cfg.Region,
Namespace: workload.Namespace,
Namespace: workload.ProviderNamespace,
AuthToken: s.cfg.NodeSecret,
},
}
Expand All @@ -149,14 +149,14 @@ func (s *ServiceRegistrationHandler) removeWorkload(
// while ensuring the operator can see.
if strings.Contains(err.Error(), "service registration not found") {
s.log.Info("attempted to delete non-existent service registration",
"service_id", id, "namespace", workload.Namespace)
"service_id", id, "namespace", workload.ProviderNamespace)
return
}

// Log the error as there is nothing left to do, so the operator can see it
// and identify any problems.
s.log.Error("failed to delete service registration",
"error", err, "service_id", id, "namespace", workload.Namespace)
"error", err, "service_id", id, "namespace", workload.ProviderNamespace)
}

func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error {
Expand Down Expand Up @@ -202,7 +202,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload(
newIDs := make(map[string]*structs.Service, len(newWork.Services))

for _, s := range newWork.Services {
newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocID, newWork.Name(), s)] = s
newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocInfo.AllocID, newWork.Name(), s)] = s
}

// Iterate through the old services in order to identify whether they can
Expand All @@ -211,7 +211,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload(

// Generate the service ID of the old service. If this is not found
// within the new mapping then we need to remove it.
oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocID, oldWork.Name(), oldService)
oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocInfo.AllocID, oldWork.Name(), oldService)
newSvc, ok := newIDs[oldID]
if !ok {
oldCopy.Services = append(oldCopy.Services, oldService)
Expand Down Expand Up @@ -290,12 +290,12 @@ func (s *ServiceRegistrationHandler) generateNomadServiceRegistration(
}

return &structs.ServiceRegistration{
ID: serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec),
ID: serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec),
ServiceName: serviceSpec.Name,
NodeID: s.cfg.NodeID,
JobID: workload.JobID,
AllocID: workload.AllocID,
Namespace: workload.Namespace,
JobID: workload.AllocInfo.JobID,
AllocID: workload.AllocInfo.AllocID,
Namespace: workload.ProviderNamespace,
Datacenter: s.cfg.Datacenter,
Tags: tags,
Address: ip,
Expand Down
Loading

0 comments on commit fa5aba3

Please sign in to comment.