From 2173424020bad673a4d8d97c19f6d89c61b405fb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 30 Nov 2021 17:03:16 -0500 Subject: [PATCH] provide `-no-shutdown-delay` flag for job/alloc stop Some operators use very long group/task `shutdown_delay` settings to safely drain network connections to their workloads after service deregistration. But during incident response, they may want to cause that drain to be skipped so they can quickly shed load. Provide a `-no-shutdown-delay` flag on the `nomad alloc stop` and `nomad job stop` commands that bypasses the delay. This sets a new desired transition state on the affected allocations that the allocation/task runner will identify during pre-kill on the client. Note (as documented here) that using this flag will almost always result in failed inbound network connections for workloads as the tasks will exit before clients receive updated service discovery information and won't be gracefully drained. --- .changelog/11596.txt | 3 + api/jobs.go | 9 +- client/allocrunner/alloc_runner.go | 12 ++ client/allocrunner/alloc_runner_hooks.go | 1 + client/allocrunner/groupservice_hook.go | 13 +- client/allocrunner/taskrunner/task_runner.go | 16 +++ .../taskrunner/task_runner_test.go | 116 +++++++++++++++--- command/agent/alloc_endpoint.go | 12 +- command/agent/job_endpoint.go | 12 ++ command/alloc_stop.go | 16 ++- command/job_stop.go | 32 +++-- nomad/alloc_endpoint.go | 3 +- nomad/fsm.go | 26 +++- nomad/job_endpoint_test.go | 91 ++++++++++++++ nomad/state/state_store.go | 11 +- nomad/structs/structs.go | 26 +++- website/content/docs/commands/alloc/stop.mdx | 7 ++ website/content/docs/commands/job/stop.mdx | 7 ++ 18 files changed, 366 insertions(+), 47 deletions(-) create mode 100644 .changelog/11596.txt diff --git a/.changelog/11596.txt b/.changelog/11596.txt new file mode 100644 index 000000000000..74b451c02787 --- /dev/null +++ b/.changelog/11596.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: provide `-no-shutdown-delay` option to `job stop` and `alloc stop` commands to ignore `shutdown_delay` +``` diff --git a/api/jobs.go b/api/jobs.go index 146f65cf7131..b3c3944bc8c7 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -299,6 +299,11 @@ type DeregisterOptions struct { // is useful when an operator wishes to push through a job deregistration // in busy clusters with a large evaluation backlog. EvalPriority int + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and set it to "0s" for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool } // DeregisterOpts is used to remove an existing job. See DeregisterOptions @@ -312,8 +317,8 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt // Protect against nil opts. url.Values expects a string, and so using // fmt.Sprintf is the best way to do this. if opts != nil { - endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v", - opts.Purge, opts.Global, opts.EvalPriority) + endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v&no_shutdown_delay=%t", + opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) } wm, err := j.client.delete(endpoint, &resp, q) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7dcf072a948f..c846b0916931 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -171,6 +171,9 @@ type allocRunner struct { taskHookCoordinator *taskHookCoordinator + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // rpcClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. rpcClient RPCer @@ -230,6 +233,10 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks) + shutdownDelayCtx, shutdownDelayCancel := context.WithCancel(context.Background()) + ar.shutdownDelayCtx = shutdownDelayCtx + ar.shutdownDelayCancelFn = shutdownDelayCancel + // Initialize the runners hooks. if err := ar.initRunnerHooks(config.ClientConfig); err != nil { return nil, err @@ -265,6 +272,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DriverManager: ar.driverManager, ServersContactedCh: ar.serversContactedCh, StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), + ShutdownDelayCtx: ar.shutdownDelayCtx, } if ar.cpusetManager != nil { @@ -824,6 +832,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) { default: } + if update.DesiredTransition.ShouldIgnoreShutdownDelay() { + ar.shutdownDelayCancelFn() + } + // Queue the new update ar.allocUpdatedCh <- update } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9624e633c730..8b79da4d5cf4 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -159,6 +159,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { taskEnvBuilder: envBuilder, networkStatusGetter: ar, logger: hookLogger, + shutdownDelayCtx: ar.shutdownDelayCtx, }), newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 778109e65f49..69eae41e8bf7 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -1,6 +1,7 @@ package allocrunner import ( + "context" "sync" "time" @@ -29,9 +30,9 @@ type groupServiceHook struct { consulClient consul.ConsulServiceAPI consulNamespace string prerun bool - delay time.Duration deregistered bool networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger @@ -41,6 +42,7 @@ type groupServiceHook struct { networks structs.Networks ports structs.AllocatedPorts taskEnvBuilder *taskenv.Builder + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -54,6 +56,7 @@ type groupServiceHookConfig struct { restarter agentconsul.WorkloadRestarter taskEnvBuilder *taskenv.Builder networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger } @@ -76,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { networkStatusGetter: cfg.networkStatusGetter, logger: cfg.logger.Named(groupServiceHookName), services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services, + shutdownDelayCtx: cfg.shutdownDelayCtx, } if cfg.alloc.AllocatedResources != nil { @@ -187,9 +191,12 @@ func (h *groupServiceHook) preKillLocked() { h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay) - // Wait for specified shutdown_delay + select { + // Wait for specified shutdown_delay unless ignored // This will block an agent from shutting down. - <-time.After(h.delay) + case <-time.After(h.delay): + case <-h.shutdownDelayCtx.Done(): + } } func (h *groupServiceHook) Postrun() error { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 68df0827f60e..b6c8ebe3dd76 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -112,6 +112,10 @@ type TaskRunner struct { killErr error killErrLock sync.Mutex + // shutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + shutdownDelayCtx context.Context + // Logger is the logger for the task runner. logger log.Logger @@ -287,6 +291,15 @@ type Config struct { // startConditionMetCtx is done when TR should start the task StartConditionMetCtx <-chan struct{} + + // ShutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + ShutdownDelayCtx context.Context + + // shutdownDelayCancelFn is the matching CancelFunc for + // ShutdownDelayCtx, for use in testing. It is not embedded in the + // TaskRunner + shutdownDelayCancelFn context.CancelFunc } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -342,6 +355,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { maxEvents: defaultMaxEvents, serversContactedCh: config.ServersContactedCh, startConditionMetCtx: config.StartConditionMetCtx, + shutdownDelayCtx: config.ShutdownDelayCtx, } // Create the logger based on the allocation ID @@ -895,6 +909,8 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E select { case result := <-resultCh: return result + case <-tr.shutdownDelayCtx.Done(): + break case <-time.After(delay): } } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f3ef206cfd68..1706870c79ce 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -14,6 +14,10 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" @@ -26,6 +30,7 @@ import ( agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/drivers/rawexec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -33,9 +38,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockTaskStateUpdater struct { @@ -94,26 +96,30 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri cleanup() } + shutdownDelayCtx, shutdownDelayCancelFn := context.WithCancel(context.Background()) + // Create a closed channel to mock TaskHookCoordinator.startConditionForTask. // Closed channel indicates this task is not blocked on prestart hooks. closedCh := make(chan struct{}) close(closedCh) conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - ConsulSI: consulapi.NewMockServiceIdentitiesClient(), - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), - ServersContactedCh: make(chan struct{}), - StartConditionMetCtx: closedCh, + Alloc: alloc, + ClientConfig: clientConf, + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + ConsulSI: consulapi.NewMockServiceIdentitiesClient(), + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), + StartConditionMetCtx: closedCh, + ShutdownDelayCtx: shutdownDelayCtx, + shutdownDelayCancelFn: shutdownDelayCancelFn, } return conf, trCleanup } @@ -996,6 +1002,82 @@ WAIT: } } +// TestTaskRunner_NoShutdownDelay asserts services are removed from +// Consul and tasks are killed without waiting for ${shutdown_delay} +// when the alloc has the NoShutdownDelay transition flag set. +func TestTaskRunner_NoShutdownDelay(t *testing.T) { + t.Parallel() + + // don't set this too high so that we don't block the test runner + // on shutting down the agent if the test fails + maxTestDuration := time.Duration(testutil.TestMultiplier()*10) * time.Second + maxTimeToFailDuration := time.Duration(testutil.TestMultiplier()) * time.Second + + alloc := mock.Alloc() + alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Services[0].Tags = []string{"tag1"} + task.Services = task.Services[:1] // only need 1 for this test + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "1000s", + } + task.ShutdownDelay = maxTestDuration + + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient) + + testWaitForTaskToStart(t, tr) + + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n != 1 { + return false, fmt.Errorf("expected 1 consul operation. Found %d", n) + } + return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + testCtx, cancel := context.WithTimeout(context.Background(), maxTimeToFailDuration) + defer cancel() + + killed := make(chan error) + go func() { + conf.shutdownDelayCancelFn() + err := tr.Kill(testCtx, structs.NewTaskEvent("test")) + killed <- err + }() + + // Wait for first de-registration call. Note that unlike + // TestTaskRunner_ShutdownDelay, we're racing with task exit + // and can't assert that we only get the first deregistration op + // (from serviceHook.PreKill). + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n < 2 { + return false, fmt.Errorf("expected at least 2 consul operations.") + } + return ops[1].Op == "remove", fmt.Errorf( + "consul operation was not a deregistration: %#v", ops[1]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the task to exit + select { + case <-tr.WaitCh(): + case <-time.After(maxTimeToFailDuration): + t.Fatalf("task kill did not ignore shutdown delay") + return + } + + err := <-killed + require.NoError(t, err, "killing task returned unexpected error") +} + // TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the // payload was written to disk. func TestTaskRunner_Dispatch_Payload(t *testing.T) { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index d1a7e210c5d2..f6f724001bae 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht return nil, CodedError(405, ErrInvalidMethod) } + noShutdownDelay := false + if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS) + if err != nil { + return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err) + } + } + sr := &structs.AllocStopRequest{ - AllocID: allocID, + AllocID: allocID, + NoShutdownDelay: noShutdownDelay, } s.parseWriteRequest(req, &sr.WriteRequest) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 8a9da76febae..1ff8a7bde07c 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -453,6 +453,18 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return nil, err } + // Identify the no_shutdown_delay query param and parse. + noShutdownDelayStr := req.URL.Query().Get("no_shutdown_delay") + var noShutdownDelay bool + if noShutdownDelayStr != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %qq (%v) as a bool: %v", "no_shutdown_delay", noShutdownDelayStr, err) + } + } + args.NoShutdownDelay = noShutdownDelay + // Validate the evaluation priority if the user supplied a non-default // value. It's more efficient to do it here, within the agent rather than // sending a bad request for the server to reject. diff --git a/command/alloc_stop.go b/command/alloc_stop.go index 3c8f9cbd93c9..3b1d10218ab2 100644 --- a/command/alloc_stop.go +++ b/command/alloc_stop.go @@ -38,6 +38,12 @@ Stop Specific Options: screen, which can be used to examine the rescheduling evaluation using the eval-status command. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocation + being stopped. + -verbose Show full information. ` @@ -47,12 +53,13 @@ Stop Specific Options: func (c *AllocStopCommand) Name() string { return "alloc stop" } func (c *AllocStopCommand) Run(args []string) int { - var detach, verbose bool + var detach, verbose, noShutdownDelay bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -115,7 +122,12 @@ func (c *AllocStopCommand) Run(args []string) int { return 1 } - resp, err := client.Allocations().Stop(alloc, nil) + var opts *api.QueryOptions + if noShutdownDelay { + opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}} + } + + resp, err := client.Allocations().Stop(alloc, opts) if err != nil { c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) return 1 diff --git a/command/job_stop.go b/command/job_stop.go index 8dd5d8a11972..f6a51d31d786 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -43,14 +43,20 @@ Stop Options: Override the priority of the evaluations produced as a result of this job deregistration. By default, this is set to the priority of the job. - -purge - Purge is used to stop the job and purge it from the system. If not set, the - job will still be queryable and will be purged by the garbage collector. - -global Stop a multi-region job in all its regions. By default job stop will stop only a single region at a time. Ignored for single-region jobs. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocations + being stopped. + + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -67,12 +73,13 @@ func (c *JobStopCommand) Synopsis() string { func (c *JobStopCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-eval-priority": complete.PredictNothing, - "-purge": complete.PredictNothing, - "-global": complete.PredictNothing, - "-yes": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-eval-priority": complete.PredictNothing, + "-purge": complete.PredictNothing, + "-global": complete.PredictNothing, + "-no-shutdown-delay": complete.PredictNothing, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -94,7 +101,7 @@ func (c *JobStopCommand) AutocompleteArgs() complete.Predictor { func (c *JobStopCommand) Name() string { return "job stop" } func (c *JobStopCommand) Run(args []string) int { - var detach, purge, verbose, global, autoYes bool + var detach, purge, verbose, global, autoYes, noShutdownDelay bool var evalPriority int flags := c.Meta.FlagSet(c.Name(), FlagSetClient) @@ -102,6 +109,7 @@ func (c *JobStopCommand) Run(args []string) int { flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&global, "global", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") flags.BoolVar(&autoYes, "yes", false, "") flags.BoolVar(&purge, "purge", false, "") flags.IntVar(&evalPriority, "eval-priority", 0, "") @@ -199,7 +207,7 @@ func (c *JobStopCommand) Run(args []string) int { } // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority} + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 3a32b5f19646..0b44175adf62 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -320,7 +320,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes Evals: []*structs.Evaluation{eval}, Allocs: map[string]*structs.DesiredTransition{ args.AllocID: { - Migrate: helper.BoolToPtr(true), + Migrate: helper.BoolToPtr(true), + NoShutdownDelay: helper.BoolToPtr(args.NoShutdownDelay), }, }, } diff --git a/nomad/fsm.go b/nomad/fsm.go index 84721014560f..727574312110 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -605,7 +606,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i } err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { - err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx) if err != nil { n.logger.Error("deregistering job failed", @@ -645,7 +646,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err) return err } @@ -670,12 +671,31 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) } + if noShutdownDelay { + ws := memdb.NewWatchSet() + allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false) + if err != nil { + return err + } + transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + for _, alloc := range allocs { + err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition) + if err != nil { + return err + } + err = tx.Insert("index", &state.IndexEntry{Key: "allocs", Value: index}) + if err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + } + if purge { if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil { return fmt.Errorf("DeleteJob failed: %w", err) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f311..92351c86ceec 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -3737,6 +3737,97 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { }) } +func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register requests + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp0 structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp0)) + + // Deregister but don't purge + dereg1 := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp1 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg1, &resp1)) + require.NotZero(resp1.Index) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.True(out.Stop) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp1.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp1.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + var ws memdb.WatchSet + allocs, err := state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.Nil(alloc.DesiredTransition) + } + + // Deregister with no shutdown delay + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + NoShutdownDelay: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp2)) + require.NotZero(resp2.Index) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp2.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + allocs, err = state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.NotNil(alloc.DesiredTransition) + require.True(*(alloc.DesiredTransition.NoShutdownDelay)) + } + +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3d08fdca768a..81cbd7c633c8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1600,7 +1600,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b } if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { - return fmt.Errorf("unable to update job scaling policies: %v", err) + return fmt.Errorf("unable to update job csi plugins: %v", err) } // Insert the job @@ -3371,7 +3371,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, // Handle each of the updated allocations for id, transition := range allocs { - if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { + if err := s.UpdateAllocDesiredTransitionTxn(txn, index, id, transition); err != nil { return err } } @@ -3390,9 +3390,9 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, return txn.Commit() } -// nestedUpdateAllocDesiredTransition is used to nest an update of an +// UpdateAllocDesiredTransitionTxn is used to nest an update of an // allocations desired transition -func (s *StateStore) nestedUpdateAllocDesiredTransition( +func (s *StateStore) UpdateAllocDesiredTransitionTxn( txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { @@ -3414,8 +3414,9 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // Merge the desired transitions copyAlloc.DesiredTransition.Merge(transition) - // Update the modify index + // Update the modify indexes copyAlloc.ModifyIndex = index + copyAlloc.AllocModifyIndex = index // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 50f9f640b7cb..1fb2088e5104 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -621,6 +621,11 @@ type JobDeregisterRequest struct { // in busy clusters with a large evaluation backlog. EvalPriority int + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and set it to "0s" for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation @@ -934,7 +939,8 @@ type AllocUpdateDesiredTransitionRequest struct { // AllocStopRequest is used to stop and reschedule a running Allocation. type AllocStopRequest struct { - AllocID string + AllocID string + NoShutdownDelay bool WriteRequest } @@ -9119,6 +9125,11 @@ type DesiredTransition struct { // This field is only used when operators want to force a placement even if // a failed allocation is not eligible to be rescheduled ForceReschedule *bool + + // NoShutdownDelay is used to indicate that whatever transition is + // desired should be applied immediately without waiting for + // shutdown delays + NoShutdownDelay *bool } // Merge merges the two desired transitions, preferring the values from the @@ -9135,6 +9146,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { if o.ForceReschedule != nil { d.ForceReschedule = o.ForceReschedule } + + if o.NoShutdownDelay != nil { + d.NoShutdownDelay = o.NoShutdownDelay + } } // ShouldMigrate returns whether the transition object dictates a migration. @@ -9157,6 +9172,15 @@ func (d *DesiredTransition) ShouldForceReschedule() bool { return d.ForceReschedule != nil && *d.ForceReschedule } +// ShouldIgnoreShutdownDelay returns whether the transition object dictates an +// immediate transition, skipping shutdown delays. +func (d *DesiredTransition) ShouldIgnoreShutdownDelay() bool { + if d == nil { + return false + } + return d.NoShutdownDelay != nil && *d.NoShutdownDelay +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop diff --git a/website/content/docs/commands/alloc/stop.mdx b/website/content/docs/commands/alloc/stop.mdx index b82195172a44..03e78e04d58f 100644 --- a/website/content/docs/commands/alloc/stop.mdx +++ b/website/content/docs/commands/alloc/stop.mdx @@ -42,6 +42,12 @@ allocation's namespace. - `-verbose`: Display verbose output. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocation being stopped. + ## Examples ```shell-session @@ -58,3 +64,4 @@ $ nomad alloc stop -detach eb17e557 ``` [eval status]: /docs/commands/eval-status +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index 004520b978e2..e231935d03b7 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -55,6 +55,12 @@ When ACLs are enabled, this command requires a token with the `submit-job`, Stop a [multi-region] job in all its regions. By default, `job stop` will stop only a single region at a time. Ignored for single-region jobs. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocations being stopped. + ## Examples Stop the job with ID "job1": @@ -75,3 +81,4 @@ $ nomad job stop -detach job1 [eval status]: /docs/commands/eval-status [multi-region]: /docs/job-specification/multiregion +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay