Skip to content

Commit

Permalink
provide -no-shutdown-delay flag for job/alloc stop
Browse files Browse the repository at this point in the history
Some operators use very long group/task `shutdown_delay` settings to
safely drain network connections to their workloads after service
deregistration. But during incident response, they may want to cause
that drain to be skipped so they can quickly shed load.

Provide a `-no-shutdown-delay` flag on the `nomad alloc stop` and
`nomad job stop` commands that bypasses the delay. This sets a new
desired transition state on the affected allocations that the
allocation/task runner will identify during pre-kill on the client.
  • Loading branch information
tgross committed Dec 2, 2021
1 parent 16b2428 commit 1647bde
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 35 deletions.
5 changes: 5 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ type DeregisterOptions struct {
// is useful when an operator wishes to push through a job deregistration
// in busy clusters with a large evaluation backlog.
EvalPriority int

// NoShutdownDelay, if set to true, will override the group and
// task shutdown_delay configuration and set it to "0s" for any
// allocations stopped as a result of this Deregister call.
NoShutdownDelay bool
}

// DeregisterOpts is used to remove an existing job. See DeregisterOptions
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (ar *allocRunner) preKillHooks() {
ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start)
}

pre.PreKill()
pre.PreKill(ar.alloc)

if ar.logger.IsTrace() {
end := time.Now()
Expand Down
12 changes: 6 additions & 6 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type groupServiceHook struct {
consulClient consul.ConsulServiceAPI
consulNamespace string
prerun bool
delay time.Duration
deregistered bool
networkStatusGetter networkStatusGetter

Expand All @@ -41,6 +40,7 @@ type groupServiceHook struct {
networks structs.Networks
ports structs.AllocatedPorts
taskEnvBuilder *taskenv.Builder
delay time.Duration

// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
Expand Down Expand Up @@ -164,24 +164,24 @@ func (h *groupServiceHook) PreTaskRestart() error {
h.mu.Unlock()
}()

h.preKillLocked()
h.preKillLocked(nil)
return h.prerunLocked()
}

func (h *groupServiceHook) PreKill() {
func (h *groupServiceHook) PreKill(alloc *structs.Allocation) {
h.mu.Lock()
defer h.mu.Unlock()
h.preKillLocked()
h.preKillLocked(alloc)
}

// implements the PreKill hook but requires the caller hold the lock
func (h *groupServiceHook) preKillLocked() {
func (h *groupServiceHook) preKillLocked(alloc *structs.Allocation) {
// If we have a shutdown delay deregister group services and then wait
// before continuing to kill tasks.
h.deregister()
h.deregistered = true

if h.delay == 0 {
if h.delay == 0 || (alloc != nil && alloc.DesiredTransition.ShouldIgnoreShutdownDelay()) {
return
}

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/interfaces/runner_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type RunnerPrerunHook interface {
type RunnerPreKillHook interface {
RunnerHook

PreKill()
PreKill(*structs.Allocation)
}

// RunnerPostrunHooks are executed after calling TaskRunner.Run, even for
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E
// Wait for task ShutdownDelay after running prekill hooks
// This allows for things like service de-registration to run
// before waiting to kill task
if delay := tr.Task().ShutdownDelay; delay != 0 {
if delay := tr.Task().ShutdownDelay; delay != 0 && !tr.alloc.DesiredTransition.ShouldIgnoreShutdownDelay() {
tr.logger.Debug("waiting before killing task", "shutdown_delay", delay)

select {
Expand Down
76 changes: 73 additions & 3 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"time"

"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/config"
Expand All @@ -26,16 +30,14 @@ import (
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
mockdriver "github.com/hashicorp/nomad/drivers/mock"
"github.com/hashicorp/nomad/drivers/rawexec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -996,6 +998,74 @@ WAIT:
}
}

// TestTaskRunner_NoShutdownDelay asserts services are removed from
// Consul and tasks are killed without waiting for ${shutdown_delay}
// when the alloc has the NoShutdownDelay transition flag set.
func TestTaskRunner_NoShutdownDelay(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Tags = []string{"tag1"}
task.Services = task.Services[:1] // only need 1 for this test
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}

// set the shutdown delay high; we'll timeout well before this
task.ShutdownDelay = 1 * time.Minute

tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient)

testWaitForTaskToStart(t, tr)

testutil.WaitForResult(func() (bool, error) {
ops := mockConsul.GetOps()
if n := len(ops); n != 1 {
return false, fmt.Errorf("expected 1 consul operation. Found %d", n)
}
return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0])
}, func(err error) {
t.Fatalf("err: %v", err)
})

killed := make(chan error)
go func() {
err := tr.Kill(context.Background(), structs.NewTaskEvent("test"))
killed <- err
}()

// Wait for first de-registration call. Note that unlike
// TestTaskRunner_ShutdownDelay, we're racing with task exit
// and can't assert that we only get the first deregistration op
// (from serviceHook.PreKill).
testutil.WaitForResult(func() (bool, error) {
ops := mockConsul.GetOps()
if n := len(ops); n < 2 {
return false, fmt.Errorf("expected at least 2 consul operations.")
}
return ops[1].Op == "remove", fmt.Errorf(
"consul operation was not a deregistration: %#v", ops[1])
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Wait for the task to exit
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()) * time.Second):
t.Fatalf("task kill did not ignore shutdown delay")
}

err := <-killed
require.NoError(t, err, "killing task returned unexpected error")
}

// TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the
// payload was written to disk.
func TestTaskRunner_Dispatch_Payload(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht
return nil, CodedError(405, ErrInvalidMethod)
}

noShutdownDelay := false
if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" {
var err error
noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS)
if err != nil {
return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err)
}
}

sr := &structs.AllocStopRequest{
AllocID: allocID,
AllocID: allocID,
NoShutdownDelay: noShutdownDelay,
}
s.parseWriteRequest(req, &sr.WriteRequest)

Expand Down
14 changes: 12 additions & 2 deletions command/alloc_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Stop Specific Options:
screen, which can be used to examine the rescheduling evaluation using the
eval-status command.
-no-shutdown-delay
Ignore the the group and task shutdown_delay configuration so there
is no delay between service deregistration and task shutdown.
-verbose
Show full information.
`
Expand All @@ -47,12 +51,13 @@ Stop Specific Options:
func (c *AllocStopCommand) Name() string { return "alloc stop" }

func (c *AllocStopCommand) Run(args []string) int {
var detach, verbose bool
var detach, verbose, noShutdownDelay bool

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "")

if err := flags.Parse(args); err != nil {
return 1
Expand Down Expand Up @@ -115,7 +120,12 @@ func (c *AllocStopCommand) Run(args []string) int {
return 1
}

resp, err := client.Allocations().Stop(alloc, nil)
var opts *api.QueryOptions
if noShutdownDelay {
opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}}
}

resp, err := client.Allocations().Stop(alloc, opts)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err))
return 1
Expand Down
30 changes: 18 additions & 12 deletions command/job_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ Stop Options:
Override the priority of the evaluations produced as a result of this job
deregistration. By default, this is set to the priority of the job.
-purge
Purge is used to stop the job and purge it from the system. If not set, the
job will still be queryable and will be purged by the garbage collector.
-global
Stop a multi-region job in all its regions. By default job stop will stop
only a single region at a time. Ignored for single-region jobs.
-no-shutdown-delay
Ignore the the group and task shutdown_delay configuration so there
is no delay between service deregistration and task shutdown.
-purge
Purge is used to stop the job and purge it from the system. If not set, the
job will still be queryable and will be purged by the garbage collector.
-yes
Automatic yes to prompts.
Expand All @@ -67,12 +71,13 @@ func (c *JobStopCommand) Synopsis() string {
func (c *JobStopCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-detach": complete.PredictNothing,
"-eval-priority": complete.PredictNothing,
"-purge": complete.PredictNothing,
"-global": complete.PredictNothing,
"-yes": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-detach": complete.PredictNothing,
"-eval-priority": complete.PredictNothing,
"-purge": complete.PredictNothing,
"-global": complete.PredictNothing,
"-no-shutdown-delay": complete.PredictNothing,
"-yes": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}

Expand All @@ -94,14 +99,15 @@ func (c *JobStopCommand) AutocompleteArgs() complete.Predictor {
func (c *JobStopCommand) Name() string { return "job stop" }

func (c *JobStopCommand) Run(args []string) int {
var detach, purge, verbose, global, autoYes bool
var detach, purge, verbose, global, autoYes, noShutdownDelay bool
var evalPriority int

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&global, "global", false, "")
flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "")
flags.BoolVar(&autoYes, "yes", false, "")
flags.BoolVar(&purge, "purge", false, "")
flags.IntVar(&evalPriority, "eval-priority", 0, "")
Expand Down Expand Up @@ -199,7 +205,7 @@ func (c *JobStopCommand) Run(args []string) int {
}

// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority}
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes
Evals: []*structs.Evaluation{eval},
Allocs: map[string]*structs.DesiredTransition{
args.AllocID: {
Migrate: helper.BoolToPtr(true),
Migrate: helper.BoolToPtr(true),
NoShutdownDelay: helper.BoolToPtr(args.NoShutdownDelay),
},
},
}
Expand Down
22 changes: 19 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -605,7 +606,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i
}

err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx)

if err != nil {
n.logger.Error("deregistering job failed",
Expand Down Expand Up @@ -645,7 +646,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err)
return err
}
Expand All @@ -670,12 +671,27 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by

// handleJobDeregister is used to deregister a job. Leaves error logging up to
// caller.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error {
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
return fmt.Errorf("periodicDispatcher.Remove failed: %w", err)
}

if noShutdownDelay {
ws := memdb.NewWatchSet()
allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false)
if err != nil {
return err
}
transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)}
for _, alloc := range allocs {
err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition)
if err != nil {
return err
}
}
}

if purge {
if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil {
return fmt.Errorf("DeleteJob failed: %w", err)
Expand Down
Loading

0 comments on commit 1647bde

Please sign in to comment.