diff --git a/.changelog/15325.txt b/.changelog/15325.txt new file mode 100644 index 000000000000..1ecec04c308f --- /dev/null +++ b/.changelog/15325.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler (Enterprise): Fixed a bug that prevented new allocations from multiregion jobs to be placed in situations where other regions are not involved, such as node updates. +``` diff --git a/nomad/fsm.go b/nomad/fsm.go index c9b9d9ec67c5..a66076e7a39a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -608,10 +608,41 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index } } + if req.Deployment != nil { + // Cancel any preivous deployment. + lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID) + if err != nil { + return fmt.Errorf("failed to retrieve latest deployment: %v", err) + } + if lastDeployment != nil && lastDeployment.Active() { + activeDeployment := lastDeployment.Copy() + activeDeployment.Status = structs.DeploymentStatusCancelled + activeDeployment.StatusDescription = structs.DeploymentStatusDescriptionNewerJob + if err := n.state.UpsertDeployment(index, activeDeployment); err != nil { + return err + } + } + + // Update the deployment with the latest job indexes. + req.Deployment.JobCreateIndex = req.Job.CreateIndex + req.Deployment.JobModifyIndex = req.Job.ModifyIndex + req.Deployment.JobSpecModifyIndex = req.Job.JobModifyIndex + req.Deployment.JobVersion = req.Job.Version + + if err := n.state.UpsertDeployment(index, req.Deployment); err != nil { + return err + } + } + // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, // so this may be nil during server upgrades. if req.Eval != nil { req.Eval.JobModifyIndex = index + + if req.Deployment != nil { + req.Eval.DeploymentID = req.Deployment.ID + } + if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 155ff12228e8..c5a9e7cc3d2b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -364,6 +364,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis submittedEval = true } + // Pre-register a deployment if necessary. + args.Deployment = j.multiregionCreateDeployment(job, eval) + // Commit this update via Raft fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) if err, ok := fsmErr.(error); ok && err != nil { diff --git a/nomad/job_endpoint_oss.go b/nomad/job_endpoint_oss.go index a6836a554712..53da7f7aed42 100644 --- a/nomad/job_endpoint_oss.go +++ b/nomad/job_endpoint_oss.go @@ -12,6 +12,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job, nomadACLToken *s return nil, nil } +// multiregionCreateDeployment is used to create a deployment to register along +// with the job, if required. +func (j *Job) multiregionCreateDeployment(job *structs.Job, eval *structs.Evaluation) *structs.Deployment { + return nil +} + // multiregionRegister is used to send a job across multiple regions func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, newVersion uint64) (bool, error) { return false, nil diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8101c7483d38..4e2f48c937b6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -536,8 +536,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error return txn.Commit() } -// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to -// true, all prior deployments for the same job will be cancelled. +// UpsertDeployment is used to insert or update a new deployment. func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error { txn := s.db.WriteTxn(index) defer txn.Abort() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5e3abb6625cd..00da163bb668 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -612,6 +612,10 @@ type JobRegisterRequest struct { // Eval is the evaluation that is associated with the job registration Eval *Evaluation + // Deployment is the deployment to be create when the job is registered. If + // there is an active deployment for the job it will be canceled. + Deployment *Deployment + WriteRequest } @@ -9219,14 +9223,15 @@ func (v *Vault) Validate() error { const ( // DeploymentStatuses are the various states a deployment can be be in - DeploymentStatusRunning = "running" - DeploymentStatusPaused = "paused" - DeploymentStatusFailed = "failed" - DeploymentStatusSuccessful = "successful" - DeploymentStatusCancelled = "cancelled" - DeploymentStatusPending = "pending" - DeploymentStatusBlocked = "blocked" - DeploymentStatusUnblocking = "unblocking" + DeploymentStatusRunning = "running" + DeploymentStatusPaused = "paused" + DeploymentStatusFailed = "failed" + DeploymentStatusSuccessful = "successful" + DeploymentStatusCancelled = "cancelled" + DeploymentStatusInitializing = "initializing" + DeploymentStatusPending = "pending" + DeploymentStatusBlocked = "blocked" + DeploymentStatusUnblocking = "unblocking" // TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag @@ -9360,7 +9365,8 @@ func (d *Deployment) Copy() *Deployment { // Active returns whether the deployment is active or terminal. func (d *Deployment) Active() bool { switch d.Status { - case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending: + case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, + DeploymentStatusUnblocking, DeploymentStatusInitializing, DeploymentStatusPending: return true default: return false diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index da4928f81815..d309cb30f6ab 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -231,24 +231,35 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool { } func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) { - // Mark the deployment as complete if possible - if a.deployment != nil && deploymentComplete { - if a.job.IsMultiregion() { - // the unblocking/successful states come after blocked, so we - // need to make sure we don't revert those states - if a.deployment.Status != structs.DeploymentStatusUnblocking && - a.deployment.Status != structs.DeploymentStatusSuccessful { + if a.deployment != nil { + // Mark the deployment as complete if possible + if deploymentComplete { + if a.job.IsMultiregion() { + // the unblocking/successful states come after blocked, so we + // need to make sure we don't revert those states + if a.deployment.Status != structs.DeploymentStatusUnblocking && + a.deployment.Status != structs.DeploymentStatusSuccessful { + a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: a.deployment.ID, + Status: structs.DeploymentStatusBlocked, + StatusDescription: structs.DeploymentStatusDescriptionBlocked, + }) + } + } else { a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusBlocked, - StatusDescription: structs.DeploymentStatusDescriptionBlocked, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, }) } - } else { + } + + // Mark the deployment as pending since its state is now computed. + if a.deployment.Status == structs.DeploymentStatusInitializing { a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusSuccessful, - StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + Status: structs.DeploymentStatusPending, + StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, }) } } @@ -269,26 +280,18 @@ func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) { // allocReconciler that indicate the state of the deployment if one // is required. The flags that are managed are: // 1. deploymentFailed: Did the current deployment fail just as named. -// 2. deploymentPaused: Multiregion job types that use deployments run -// the deployments later during the fan-out stage. When the deployment -// is created it will be in a pending state. If an invariant violation -// is detected by the deploymentWatcher during it will enter a paused -// state. This flag tells Compute we're paused or pending, so we should -// not make placements on the deployment. +// 2. deploymentPaused: Set to true when the current deployment is paused, +// which is usually a manual user operation, or if the deployment is +// pending or initializing, which are the initial states for multi-region +// job deployments. This flag tells Compute that we should not make +// placements on the deployment. func (a *allocReconciler) computeDeploymentPaused() { if a.deployment != nil { a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused || - a.deployment.Status == structs.DeploymentStatusPending + a.deployment.Status == structs.DeploymentStatusPending || + a.deployment.Status == structs.DeploymentStatusInitializing a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed } - if a.deployment == nil { - if a.job.IsMultiregion() && - a.job.UsesDeployments() && - !(a.job.IsPeriodic() || a.job.IsParameterized()) { - - a.deploymentPaused = true - } - } } // cancelUnneededDeployments cancels any deployment that is not needed. If the @@ -512,6 +515,12 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { a.computeMigrations(desiredChanges, migrate, tg, isCanarying) a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive) + // Deployments that are still initializing need to be sent in full in the + // plan so its internal state can be persisted by the plan applier. + if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusInitializing { + a.result.deployment = a.deployment + } + deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace, migrate, rescheduleNow, place, rescheduleLater, requiresCanaries) @@ -889,11 +898,6 @@ func (a *allocReconciler) createDeployment(groupName string, strategy *structs.U // A previous group may have made the deployment already. If not create one. if a.deployment == nil { a.deployment = structs.NewDeployment(a.job, a.evalPriority) - // in multiregion jobs, most deployments start in a pending state - if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) { - a.deployment.Status = structs.DeploymentStatusPending - a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer - } a.result.deployment = a.deployment } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index fc88fb54727a..d30e65a5ca9c 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -6074,34 +6074,34 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) { expected: true, }, { - name: "multiregion periodic service is not paused", - jobType: structs.JobTypeService, - isMultiregion: true, - isPeriodic: true, + name: "single region batch job is not paused", + jobType: structs.JobTypeBatch, + isMultiregion: false, + isPeriodic: false, isParameterized: false, expected: false, }, { - name: "multiregion parameterized service is not paused", - jobType: structs.JobTypeService, - isMultiregion: true, + name: "multiregion batch job is not paused", + jobType: structs.JobTypeBatch, + isMultiregion: false, isPeriodic: false, - isParameterized: true, + isParameterized: false, expected: false, }, { - name: "single region batch job is not paused", + name: "multiregion parameterized batch is not paused", jobType: structs.JobTypeBatch, - isMultiregion: false, + isMultiregion: true, isPeriodic: false, - isParameterized: false, + isParameterized: true, expected: false, }, { - name: "multiregion batch job is not paused", + name: "multiregion periodic batch is not paused", jobType: structs.JobTypeBatch, - isMultiregion: false, - isPeriodic: false, + isMultiregion: true, + isPeriodic: true, isParameterized: false, expected: false, }, @@ -6119,8 +6119,18 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) { require.NotNil(t, job, "invalid job type", tc.jobType) + var deployment *structs.Deployment if tc.isMultiregion { job.Multiregion = multiregionCfg + + // This deployment is created by the Job.Register RPC and + // fetched by the scheduler before handing it to the + // reconciler. + if job.UsesDeployments() { + deployment = structs.NewDeployment(job, 100) + deployment.Status = structs.DeploymentStatusInitializing + deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer + } } if tc.isPeriodic { @@ -6132,8 +6142,8 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) { } reconciler := NewAllocReconciler( - testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, - nil, nil, nil, "", job.Priority, true) + testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, deployment, + nil, nil, "", job.Priority, true) _ = reconciler.Compute()