From 8205b89d7c90425b1c43efa81f42ef015f791368 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Wed, 1 Mar 2023 10:42:18 -0600 Subject: [PATCH 1/2] services: always set deregister flag after deregistration of group This PR fixes a bug where the group service hook's deregister flag was not set in some cases, causing the hook to attempt deregistrations twice during job updates (alloc replacement). In the tests ... we used to assert on the wrong behvior (remove twice) which has now been corrected to assert we remove only once. This bug was "silent" in the Consul provider world because the error logs for double deregistration only show up in Consul logs; with the Nomad provider the error logs are in the Nomad agent logs. --- .changelog/16289.txt | 3 + client/allocrunner/group_service_hook.go | 59 +++++++++++-------- client/allocrunner/group_service_hook_test.go | 22 +++---- client/serviceregistration/nsd/nsd.go | 2 + 4 files changed, 48 insertions(+), 38 deletions(-) create mode 100644 .changelog/16289.txt diff --git a/.changelog/16289.txt b/.changelog/16289.txt new file mode 100644 index 000000000000..f11e0dd46882 --- /dev/null +++ b/.changelog/16289.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed a bug where a service would be deregistered twice +``` diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index e350f7773724..0d37b6301b7f 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -5,7 +5,7 @@ import ( "sync" "time" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/serviceregistration/wrapper" @@ -39,7 +39,7 @@ type groupServiceHook struct { // and check registration and deregistration. serviceRegWrapper *wrapper.HandlerWrapper - logger log.Logger + logger hclog.Logger // The following fields may be updated canary bool @@ -60,7 +60,7 @@ type groupServiceHookConfig struct { taskEnvBuilder *taskenv.Builder networkStatus structs.NetworkStatus shutdownDelayCtx context.Context - logger log.Logger + logger hclog.Logger // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. @@ -118,23 +118,26 @@ func (h *groupServiceHook) Prerun() error { h.prerun = true h.mu.Unlock() }() - return h.prerunLocked() + return h.preRunLocked() } -func (h *groupServiceHook) prerunLocked() error { +// caller must hold h.lock +func (h *groupServiceHook) preRunLocked() error { if len(h.services) == 0 { return nil } - services := h.getWorkloadServices() + services := h.getWorkloadServicesLocked() return h.serviceRegWrapper.RegisterWorkload(services) } +// Update is run when a job submitter modifies service(s) (but not much else - +// otherwise a full alloc replacement would occur). func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.mu.Lock() defer h.mu.Unlock() - oldWorkloadServices := h.getWorkloadServices() + oldWorkloadServices := h.getWorkloadServicesLocked() // Store new updated values out of request canary := false @@ -166,7 +169,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.providerNamespace = req.Alloc.ServiceProviderNamespace() // Create new task services struct with those new values - newWorkloadServices := h.getWorkloadServices() + newWorkloadServices := h.getWorkloadServicesLocked() if !h.prerun { // Update called before Prerun. Update alloc and exit to allow @@ -186,21 +189,20 @@ func (h *groupServiceHook) PreTaskRestart() error { }() h.preKillLocked() - return h.prerunLocked() + return h.preRunLocked() } func (h *groupServiceHook) PreKill() { - h.mu.Lock() - defer h.mu.Unlock() - h.preKillLocked() + helper.WithLock(&h.mu, h.preKillLocked) } -// implements the PreKill hook but requires the caller hold the lock +// implements the PreKill hook +// +// caller must hold h.lock func (h *groupServiceHook) preKillLocked() { // If we have a shutdown delay deregister group services and then wait // before continuing to kill tasks. - h.deregister() - h.deregistered = true + h.deregisterLocked() if h.delay == 0 { return @@ -220,24 +222,31 @@ func (h *groupServiceHook) preKillLocked() { } func (h *groupServiceHook) Postrun() error { - h.mu.Lock() - defer h.mu.Unlock() - - if !h.deregistered { - h.deregister() - } + helper.WithLock(&h.mu, h.deregisterLocked) return nil } -// deregister services from Consul/Nomad service provider. -func (h *groupServiceHook) deregister() { +// deregisterLocked will deregister services from Consul/Nomad service provider. +// +// caller must hold h.lock +func (h *groupServiceHook) deregisterLocked() { + if h.deregistered { + return + } + if len(h.services) > 0 { - workloadServices := h.getWorkloadServices() + workloadServices := h.getWorkloadServicesLocked() h.serviceRegWrapper.RemoveWorkload(workloadServices) } + + h.deregistered = true } -func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices { +// getWorkloadServicesLocked returns the set of workload services currently +// on the hook. +// +// caller must hold h.lock +func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices { // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services) diff --git a/client/allocrunner/group_service_hook_test.go b/client/allocrunner/group_service_hook_test.go index e05df8cbc19e..f369fb3cc0a9 100644 --- a/client/allocrunner/group_service_hook_test.go +++ b/client/allocrunner/group_service_hook_test.go @@ -60,12 +60,11 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { require.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) + require.Len(t, ops, 4) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + require.Equal(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks @@ -143,12 +142,11 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { require.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) + require.Len(t, ops, 4) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + require.Equal(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with @@ -189,12 +187,11 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { // Ensure the Nomad mock provider has the expected operations. ops := nomadMockClient.GetOps() - require.Len(t, ops, 5) + require.Len(t, ops, 4) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + require.Equal(t, "add", ops[3].Op) // Restart -> preRun // Ensure the Consul mock provider has zero operations. require.Len(t, consulMockClient.GetOps(), 0) @@ -244,12 +241,11 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { require.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) + require.Len(t, ops, 4) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + require.Equal(t, "add", ops[3].Op) // Restart -> preRun } func TestGroupServiceHook_getWorkloadServices(t *testing.T) { @@ -284,6 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { logger: logger, }) - services := h.getWorkloadServices() + services := h.getWorkloadServicesLocked() require.Len(t, services.Services, 1) } diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 807f4d492b2d..adbea8a0bc11 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -157,6 +157,8 @@ func (s *ServiceRegistrationHandler) removeWorkload( defer wg.Done() // Stop check watcher + // + // todo(shoenig) - shouldn't we only unwatch checks for the given serviceSpec ? for _, service := range workload.Services { for _, check := range service.Checks { checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check)) From 26d7e41182e0e3dd1f90a67f0894299c35ea5c57 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 17 Mar 2023 08:58:26 -0500 Subject: [PATCH 2/2] services: cleanup group service hook tests --- client/allocrunner/group_service_hook_test.go | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/client/allocrunner/group_service_hook_test.go b/client/allocrunner/group_service_hook_test.go index f369fb3cc0a9..606dab8f37a3 100644 --- a/client/allocrunner/group_service_hook_test.go +++ b/client/allocrunner/group_service_hook_test.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil) @@ -50,21 +50,21 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 4) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "add", ops[3].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks @@ -91,23 +91,23 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) // Incease shutdown Delay alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) // Assert that update updated the delay value - require.Equal(t, h.delay, 15*time.Second) + must.Eq(t, h.delay, 15*time.Second) // Remove shutdown delay alloc.Job.TaskGroups[0].ShutdownDelay = nil req = &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) // Assert that update updated the delay value - require.Equal(t, h.delay, 0*time.Second) + must.Eq(t, h.delay, 0*time.Second) } // TestGroupServiceHook_GroupServices asserts group service hooks with group @@ -132,21 +132,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 4) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "add", ops[3].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with @@ -177,24 +177,24 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) // Trigger our hook requests. req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.Update(req)) + must.NoError(t, h.Postrun()) + must.NoError(t, h.PreTaskRestart()) // Ensure the Nomad mock provider has the expected operations. ops := nomadMockClient.GetOps() - require.Len(t, ops, 4) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "add", ops[3].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun // Ensure the Consul mock provider has zero operations. - require.Len(t, consulMockClient.GetOps(), 0) + must.SliceEmpty(t, consulMockClient.GetOps()) } // TestGroupServiceHook_Error asserts group service hooks with group @@ -231,21 +231,21 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 4) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "add", ops[3].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } func TestGroupServiceHook_getWorkloadServices(t *testing.T) { @@ -281,5 +281,5 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { }) services := h.getWorkloadServicesLocked() - require.Len(t, services.Services, 1) + must.Len(t, 1, services.Services) }