Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsd: always set deregister flag after deregistration of group #16289

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16289.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
services: Fixed a bug where a service would be deregistered twice
```
59 changes: 34 additions & 25 deletions client/allocrunner/group_service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
Expand Down Expand Up @@ -39,7 +39,7 @@ type groupServiceHook struct {
// and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper

logger log.Logger
logger hclog.Logger

// The following fields may be updated
canary bool
Expand All @@ -60,7 +60,7 @@ type groupServiceHookConfig struct {
taskEnvBuilder *taskenv.Builder
networkStatus structs.NetworkStatus
shutdownDelayCtx context.Context
logger log.Logger
logger hclog.Logger

// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made.
Expand Down Expand Up @@ -118,23 +118,26 @@ func (h *groupServiceHook) Prerun() error {
h.prerun = true
h.mu.Unlock()
}()
return h.prerunLocked()
return h.preRunLocked()
}

func (h *groupServiceHook) prerunLocked() error {
// caller must hold h.lock
func (h *groupServiceHook) preRunLocked() error {
if len(h.services) == 0 {
return nil
}

services := h.getWorkloadServices()
services := h.getWorkloadServicesLocked()
return h.serviceRegWrapper.RegisterWorkload(services)
}

// Update is run when a job submitter modifies service(s) (but not much else -
// otherwise a full alloc replacement would occur).
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

oldWorkloadServices := h.getWorkloadServices()
oldWorkloadServices := h.getWorkloadServicesLocked()

// Store new updated values out of request
canary := false
Expand Down Expand Up @@ -166,7 +169,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.providerNamespace = req.Alloc.ServiceProviderNamespace()

// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()
newWorkloadServices := h.getWorkloadServicesLocked()

if !h.prerun {
// Update called before Prerun. Update alloc and exit to allow
Expand All @@ -186,21 +189,20 @@ func (h *groupServiceHook) PreTaskRestart() error {
}()

h.preKillLocked()
return h.prerunLocked()
return h.preRunLocked()
}

func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()
h.preKillLocked()
helper.WithLock(&h.mu, h.preKillLocked)
}

// implements the PreKill hook but requires the caller hold the lock
// implements the PreKill hook
//
// caller must hold h.lock
func (h *groupServiceHook) preKillLocked() {
// If we have a shutdown delay deregister group services and then wait
// before continuing to kill tasks.
h.deregister()
h.deregistered = true
h.deregisterLocked()

if h.delay == 0 {
return
Expand All @@ -220,24 +222,31 @@ func (h *groupServiceHook) preKillLocked() {
}

func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

if !h.deregistered {
h.deregister()
}
helper.WithLock(&h.mu, h.deregisterLocked)
return nil
}

// deregister services from Consul/Nomad service provider.
func (h *groupServiceHook) deregister() {
// deregisterLocked will deregister services from Consul/Nomad service provider.
//
// caller must hold h.lock
func (h *groupServiceHook) deregisterLocked() {
if h.deregistered {
return
}

if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()
workloadServices := h.getWorkloadServicesLocked()
h.serviceRegWrapper.RemoveWorkload(workloadServices)
}

h.deregistered = true
}

func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices {
// getWorkloadServicesLocked returns the set of workload services currently
// on the hook.
//
// caller must hold h.lock
func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)

Expand Down
94 changes: 45 additions & 49 deletions client/allocrunner/group_service_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"github.com/shoenig/test/must"
)

var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
Expand Down Expand Up @@ -50,22 +50,21 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

require.NoError(t, h.Postrun())
must.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.PreTaskRestart())

ops := consulMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
}

// TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks
Expand All @@ -92,23 +91,23 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

// Incease shutdown Delay
alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second)
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

// Assert that update updated the delay value
require.Equal(t, h.delay, 15*time.Second)
must.Eq(t, h.delay, 15*time.Second)

// Remove shutdown delay
alloc.Job.TaskGroups[0].ShutdownDelay = nil
req = &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

// Assert that update updated the delay value
require.Equal(t, h.delay, 0*time.Second)
must.Eq(t, h.delay, 0*time.Second)
}

// TestGroupServiceHook_GroupServices asserts group service hooks with group
Expand All @@ -133,22 +132,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

require.NoError(t, h.Postrun())
must.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.PreTaskRestart())

ops := consulMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
}

// TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with
Expand Down Expand Up @@ -179,25 +177,24 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

// Trigger our hook requests.
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
require.NoError(t, h.Postrun())
require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.Update(req))
must.NoError(t, h.Postrun())
must.NoError(t, h.PreTaskRestart())

// Ensure the Nomad mock provider has the expected operations.
ops := nomadMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun

// Ensure the Consul mock provider has zero operations.
require.Len(t, consulMockClient.GetOps(), 0)
must.SliceEmpty(t, consulMockClient.GetOps())
}

// TestGroupServiceHook_Error asserts group service hooks with group
Expand Down Expand Up @@ -234,22 +231,21 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

require.NoError(t, h.Postrun())
must.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.PreTaskRestart())

ops := consulMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
}

func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
Expand Down Expand Up @@ -284,6 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
logger: logger,
})

services := h.getWorkloadServices()
require.Len(t, services.Services, 1)
services := h.getWorkloadServicesLocked()
must.Len(t, 1, services.Services)
}
2 changes: 2 additions & 0 deletions client/serviceregistration/nsd/nsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ func (s *ServiceRegistrationHandler) removeWorkload(
defer wg.Done()

// Stop check watcher
//
// todo(shoenig) - shouldn't we only unwatch checks for the given serviceSpec ?
for _, service := range workload.Services {
for _, check := range service.Checks {
checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check))
Expand Down