diff --git a/api/jobs.go b/api/jobs.go index 146f65cf7131..1f525b1708c4 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -299,6 +299,11 @@ type DeregisterOptions struct { // is useful when an operator wishes to push through a job deregistration // in busy clusters with a large evaluation backlog. EvalPriority int + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and set it to "0s" for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool } // DeregisterOpts is used to remove an existing job. See DeregisterOptions diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9624e633c730..f6e95726fd27 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -338,7 +338,7 @@ func (ar *allocRunner) preKillHooks() { ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start) } - pre.PreKill() + pre.PreKill(ar.alloc) if ar.logger.IsTrace() { end := time.Now() diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 778109e65f49..8abee3d22bce 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -29,7 +29,6 @@ type groupServiceHook struct { consulClient consul.ConsulServiceAPI consulNamespace string prerun bool - delay time.Duration deregistered bool networkStatusGetter networkStatusGetter @@ -41,6 +40,7 @@ type groupServiceHook struct { networks structs.Networks ports structs.AllocatedPorts taskEnvBuilder *taskenv.Builder + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -164,24 +164,24 @@ func (h *groupServiceHook) PreTaskRestart() error { h.mu.Unlock() }() - h.preKillLocked() + h.preKillLocked(nil) return h.prerunLocked() } -func (h *groupServiceHook) PreKill() { +func (h *groupServiceHook) PreKill(alloc *structs.Allocation) { h.mu.Lock() defer h.mu.Unlock() - h.preKillLocked() + h.preKillLocked(alloc) } // implements the PreKill hook but requires the caller hold the lock -func (h *groupServiceHook) preKillLocked() { +func (h *groupServiceHook) preKillLocked(alloc *structs.Allocation) { // If we have a shutdown delay deregister group services and then wait // before continuing to kill tasks. h.deregister() h.deregistered = true - if h.delay == 0 { + if h.delay == 0 || (alloc != nil && alloc.DesiredTransition.ShouldIgnoreShutdownDelay()) { return } diff --git a/client/allocrunner/interfaces/runner_lifecycle.go b/client/allocrunner/interfaces/runner_lifecycle.go index 7855deaa3f4d..92ef52b83002 100644 --- a/client/allocrunner/interfaces/runner_lifecycle.go +++ b/client/allocrunner/interfaces/runner_lifecycle.go @@ -22,7 +22,7 @@ type RunnerPrerunHook interface { type RunnerPreKillHook interface { RunnerHook - PreKill() + PreKill(*structs.Allocation) } // RunnerPostrunHooks are executed after calling TaskRunner.Run, even for diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 68df0827f60e..e50ad37ff130 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -889,7 +889,7 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E // Wait for task ShutdownDelay after running prekill hooks // This allows for things like service de-registration to run // before waiting to kill task - if delay := tr.Task().ShutdownDelay; delay != 0 { + if delay := tr.Task().ShutdownDelay; delay != 0 && !tr.alloc.DesiredTransition.ShouldIgnoreShutdownDelay() { tr.logger.Debug("waiting before killing task", "shutdown_delay", delay) select { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index d1a7e210c5d2..f6f724001bae 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht return nil, CodedError(405, ErrInvalidMethod) } + noShutdownDelay := false + if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS) + if err != nil { + return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err) + } + } + sr := &structs.AllocStopRequest{ - AllocID: allocID, + AllocID: allocID, + NoShutdownDelay: noShutdownDelay, } s.parseWriteRequest(req, &sr.WriteRequest) diff --git a/command/alloc_stop.go b/command/alloc_stop.go index 3c8f9cbd93c9..971adbc365fb 100644 --- a/command/alloc_stop.go +++ b/command/alloc_stop.go @@ -38,6 +38,10 @@ Stop Specific Options: screen, which can be used to examine the rescheduling evaluation using the eval-status command. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there + is no delay between service deregistration and task shutdown. + -verbose Show full information. ` @@ -47,12 +51,13 @@ Stop Specific Options: func (c *AllocStopCommand) Name() string { return "alloc stop" } func (c *AllocStopCommand) Run(args []string) int { - var detach, verbose bool + var detach, verbose, noShutdownDelay bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -115,7 +120,12 @@ func (c *AllocStopCommand) Run(args []string) int { return 1 } - resp, err := client.Allocations().Stop(alloc, nil) + var opts *api.QueryOptions + if noShutdownDelay { + opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}} + } + + resp, err := client.Allocations().Stop(alloc, opts) if err != nil { c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) return 1 diff --git a/command/job_stop.go b/command/job_stop.go index 8dd5d8a11972..43a838967d93 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -43,14 +43,18 @@ Stop Options: Override the priority of the evaluations produced as a result of this job deregistration. By default, this is set to the priority of the job. - -purge - Purge is used to stop the job and purge it from the system. If not set, the - job will still be queryable and will be purged by the garbage collector. - -global Stop a multi-region job in all its regions. By default job stop will stop only a single region at a time. Ignored for single-region jobs. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there + is no delay between service deregistration and task shutdown. + + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -67,12 +71,13 @@ func (c *JobStopCommand) Synopsis() string { func (c *JobStopCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-eval-priority": complete.PredictNothing, - "-purge": complete.PredictNothing, - "-global": complete.PredictNothing, - "-yes": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-eval-priority": complete.PredictNothing, + "-purge": complete.PredictNothing, + "-global": complete.PredictNothing, + "-no-shutdown-delay": complete.PredictNothing, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -94,7 +99,7 @@ func (c *JobStopCommand) AutocompleteArgs() complete.Predictor { func (c *JobStopCommand) Name() string { return "job stop" } func (c *JobStopCommand) Run(args []string) int { - var detach, purge, verbose, global, autoYes bool + var detach, purge, verbose, global, autoYes, noShutdownDelay bool var evalPriority int flags := c.Meta.FlagSet(c.Name(), FlagSetClient) @@ -102,6 +107,7 @@ func (c *JobStopCommand) Run(args []string) int { flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&global, "global", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") flags.BoolVar(&autoYes, "yes", false, "") flags.BoolVar(&purge, "purge", false, "") flags.IntVar(&evalPriority, "eval-priority", 0, "") @@ -199,7 +205,7 @@ func (c *JobStopCommand) Run(args []string) int { } // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority} + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 3a32b5f19646..0b44175adf62 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -320,7 +320,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes Evals: []*structs.Evaluation{eval}, Allocs: map[string]*structs.DesiredTransition{ args.AllocID: { - Migrate: helper.BoolToPtr(true), + Migrate: helper.BoolToPtr(true), + NoShutdownDelay: helper.BoolToPtr(args.NoShutdownDelay), }, }, } diff --git a/nomad/fsm.go b/nomad/fsm.go index 84721014560f..45725858fd1b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -605,7 +606,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i } err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { - err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx) if err != nil { n.logger.Error("deregistering job failed", @@ -645,7 +646,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err) return err } @@ -670,12 +671,27 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) } + if noShutdownDelay { + ws := memdb.NewWatchSet() + allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false) + if err != nil { + return err + } + transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + for _, alloc := range allocs { + err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition) + if err != nil { + return err + } + } + } + if purge { if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil { return fmt.Errorf("DeleteJob failed: %w", err) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f311..c1a0f503838c 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -3737,6 +3737,98 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { }) } +func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register requests + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp0 structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp0)) + + // Deregister but don't purge + dereg1 := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp1 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg1, &resp1)) + require.NotZero(resp1.Index) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.True(out.Stop) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp1.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp1.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + var ws memdb.WatchSet + allocs, err := state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.Nil(alloc.DesiredTransition) + //require.True(alloc.DesiredTransitions) + } + + // Deregister with no shutdown delay + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + NoShutdownDelay: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp2)) + require.NotZero(resp2.Index) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp2.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + allocs, err = state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.NotNil(alloc.DesiredTransition) + require.True(*(alloc.DesiredTransition.NoShutdownDelay)) + } + +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3d08fdca768a..4f2086ad4d15 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1600,7 +1600,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b } if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { - return fmt.Errorf("unable to update job scaling policies: %v", err) + return fmt.Errorf("unable to update job csi plugins: %v", err) } // Insert the job @@ -3371,7 +3371,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, // Handle each of the updated allocations for id, transition := range allocs { - if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { + if err := s.UpdateAllocDesiredTransitionTxn(txn, index, id, transition); err != nil { return err } } @@ -3392,7 +3392,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, // nestedUpdateAllocDesiredTransition is used to nest an update of an // allocations desired transition -func (s *StateStore) nestedUpdateAllocDesiredTransition( +func (s *StateStore) UpdateAllocDesiredTransitionTxn( txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 50f9f640b7cb..55be2cbe886b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -621,6 +621,11 @@ type JobDeregisterRequest struct { // in busy clusters with a large evaluation backlog. EvalPriority int + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and set it to "0s" for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation @@ -934,7 +939,8 @@ type AllocUpdateDesiredTransitionRequest struct { // AllocStopRequest is used to stop and reschedule a running Allocation. type AllocStopRequest struct { - AllocID string + AllocID string + NoShutdownDelay bool WriteRequest } @@ -9119,6 +9125,11 @@ type DesiredTransition struct { // This field is only used when operators want to force a placement even if // a failed allocation is not eligible to be rescheduled ForceReschedule *bool + + // NoShutdownDelay is used to indicate that whatever transition is + // desired should be applied immediately without waiting for + // shutdown delays + NoShutdownDelay *bool } // Merge merges the two desired transitions, preferring the values from the @@ -9135,6 +9146,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { if o.ForceReschedule != nil { d.ForceReschedule = o.ForceReschedule } + + if o.NoShutdownDelay != nil { + d.NoShutdownDelay = o.NoShutdownDelay + } } // ShouldMigrate returns whether the transition object dictates a migration. @@ -9157,6 +9172,12 @@ func (d *DesiredTransition) ShouldForceReschedule() bool { return d.ForceReschedule != nil && *d.ForceReschedule } +// ShouldIgnoreShutdownDelay returns whether the transition object dictates an +// immediate transition, skipping shutdown delays. +func (d *DesiredTransition) ShouldIgnoreShutdownDelay() bool { + return d.NoShutdownDelay != nil && *d.NoShutdownDelay +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop