From 569ab1a8b6fae4688c9c380c0cddcece97697fd1 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 18 Nov 2022 19:16:42 -0500 Subject: [PATCH 1/2] scheduler: create placements for non-register MRD For multiregion jobs, the scheduler does not create placements on registration because the deployment must wait for the other regions. Once of these regions will then trigger the deployment to run. Currently, this is done in the scheduler by considering any eval for a multiregion job as "paused" since it's expected that another region will eventually unpause it. This becomes a problem where evals not triggered by a job registration happen, such as on a node update. These types of regional changes do not have other regions waiting to progress the deployment, and so they were never resulting in placements. The fix is to create a deployment at job registration time. This additional piece of state allows the scheduler to differentiate between a multiregion change, where there are other regions engaged in the deployment so no placements are required, from a regional change, where the scheduler does need to create placements. This deployment starts in the new "initializing" status to signal to the scheduler that it needs to compute the initial deployment state. The multiregion deployment will wait until this deployment state is persisted and its starts is set to "pending". Without this state transition it's possible to hit a race condition where the plan applier and the deployment watcher may step of each other and overwrite their changes. --- nomad/fsm.go | 31 +++++++++++++++++ nomad/job_endpoint.go | 3 ++ nomad/job_endpoint_oss.go | 6 ++++ nomad/state/state_store.go | 3 +- nomad/structs/structs.go | 24 ++++++++----- scheduler/reconcile.go | 68 ++++++++++++++++++++----------------- scheduler/reconcile_test.go | 42 ++++++++++++++--------- 7 files changed, 118 insertions(+), 59 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index b168f384fd18..16be7fdf921f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -609,10 +609,41 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index } } + if req.Deployment != nil { + // Cancel any preivous deployment. + lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID) + if err != nil { + return fmt.Errorf("failed to retrieve latest deployment: %v", err) + } + if lastDeployment != nil && lastDeployment.Active() { + activeDeployment := lastDeployment.Copy() + activeDeployment.Status = structs.DeploymentStatusCancelled + activeDeployment.StatusDescription = structs.DeploymentStatusDescriptionNewerJob + if err := n.state.UpsertDeployment(index, activeDeployment); err != nil { + return err + } + } + + // Update the deployment with the latest job indexes. + req.Deployment.JobCreateIndex = req.Job.CreateIndex + req.Deployment.JobModifyIndex = req.Job.ModifyIndex + req.Deployment.JobSpecModifyIndex = req.Job.JobModifyIndex + req.Deployment.JobVersion = req.Job.Version + + if err := n.state.UpsertDeployment(index, req.Deployment); err != nil { + return err + } + } + // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, // so this may be nil during server upgrades. if req.Eval != nil { req.Eval.JobModifyIndex = index + + if req.Deployment != nil { + req.Eval.DeploymentID = req.Deployment.ID + } + if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 155ff12228e8..c5a9e7cc3d2b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -364,6 +364,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis submittedEval = true } + // Pre-register a deployment if necessary. + args.Deployment = j.multiregionCreateDeployment(job, eval) + // Commit this update via Raft fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) if err, ok := fsmErr.(error); ok && err != nil { diff --git a/nomad/job_endpoint_oss.go b/nomad/job_endpoint_oss.go index a6836a554712..53da7f7aed42 100644 --- a/nomad/job_endpoint_oss.go +++ b/nomad/job_endpoint_oss.go @@ -12,6 +12,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job, nomadACLToken *s return nil, nil } +// multiregionCreateDeployment is used to create a deployment to register along +// with the job, if required. +func (j *Job) multiregionCreateDeployment(job *structs.Job, eval *structs.Evaluation) *structs.Deployment { + return nil +} + // multiregionRegister is used to send a job across multiple regions func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, newVersion uint64) (bool, error) { return false, nil diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8101c7483d38..4e2f48c937b6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -536,8 +536,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error return txn.Commit() } -// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to -// true, all prior deployments for the same job will be cancelled. +// UpsertDeployment is used to insert or update a new deployment. func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error { txn := s.db.WriteTxn(index) defer txn.Abort() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dc3d83b1e90c..e598e0af1a79 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -614,6 +614,10 @@ type JobRegisterRequest struct { // Eval is the evaluation that is associated with the job registration Eval *Evaluation + // Deployment is the deployment to be create when the job is registered. If + // there is an active deployment for the job it will be canceled. + Deployment *Deployment + WriteRequest } @@ -9216,14 +9220,15 @@ func (v *Vault) Validate() error { const ( // DeploymentStatuses are the various states a deployment can be be in - DeploymentStatusRunning = "running" - DeploymentStatusPaused = "paused" - DeploymentStatusFailed = "failed" - DeploymentStatusSuccessful = "successful" - DeploymentStatusCancelled = "cancelled" - DeploymentStatusPending = "pending" - DeploymentStatusBlocked = "blocked" - DeploymentStatusUnblocking = "unblocking" + DeploymentStatusRunning = "running" + DeploymentStatusPaused = "paused" + DeploymentStatusFailed = "failed" + DeploymentStatusSuccessful = "successful" + DeploymentStatusCancelled = "cancelled" + DeploymentStatusInitializing = "initializing" + DeploymentStatusPending = "pending" + DeploymentStatusBlocked = "blocked" + DeploymentStatusUnblocking = "unblocking" // TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag @@ -9357,7 +9362,8 @@ func (d *Deployment) Copy() *Deployment { // Active returns whether the deployment is active or terminal. func (d *Deployment) Active() bool { switch d.Status { - case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending: + case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, + DeploymentStatusUnblocking, DeploymentStatusInitializing, DeploymentStatusPending: return true default: return false diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index da4928f81815..d309cb30f6ab 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -231,24 +231,35 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool { } func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) { - // Mark the deployment as complete if possible - if a.deployment != nil && deploymentComplete { - if a.job.IsMultiregion() { - // the unblocking/successful states come after blocked, so we - // need to make sure we don't revert those states - if a.deployment.Status != structs.DeploymentStatusUnblocking && - a.deployment.Status != structs.DeploymentStatusSuccessful { + if a.deployment != nil { + // Mark the deployment as complete if possible + if deploymentComplete { + if a.job.IsMultiregion() { + // the unblocking/successful states come after blocked, so we + // need to make sure we don't revert those states + if a.deployment.Status != structs.DeploymentStatusUnblocking && + a.deployment.Status != structs.DeploymentStatusSuccessful { + a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: a.deployment.ID, + Status: structs.DeploymentStatusBlocked, + StatusDescription: structs.DeploymentStatusDescriptionBlocked, + }) + } + } else { a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusBlocked, - StatusDescription: structs.DeploymentStatusDescriptionBlocked, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, }) } - } else { + } + + // Mark the deployment as pending since its state is now computed. + if a.deployment.Status == structs.DeploymentStatusInitializing { a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusSuccessful, - StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + Status: structs.DeploymentStatusPending, + StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, }) } } @@ -269,26 +280,18 @@ func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) { // allocReconciler that indicate the state of the deployment if one // is required. The flags that are managed are: // 1. deploymentFailed: Did the current deployment fail just as named. -// 2. deploymentPaused: Multiregion job types that use deployments run -// the deployments later during the fan-out stage. When the deployment -// is created it will be in a pending state. If an invariant violation -// is detected by the deploymentWatcher during it will enter a paused -// state. This flag tells Compute we're paused or pending, so we should -// not make placements on the deployment. +// 2. deploymentPaused: Set to true when the current deployment is paused, +// which is usually a manual user operation, or if the deployment is +// pending or initializing, which are the initial states for multi-region +// job deployments. This flag tells Compute that we should not make +// placements on the deployment. func (a *allocReconciler) computeDeploymentPaused() { if a.deployment != nil { a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused || - a.deployment.Status == structs.DeploymentStatusPending + a.deployment.Status == structs.DeploymentStatusPending || + a.deployment.Status == structs.DeploymentStatusInitializing a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed } - if a.deployment == nil { - if a.job.IsMultiregion() && - a.job.UsesDeployments() && - !(a.job.IsPeriodic() || a.job.IsParameterized()) { - - a.deploymentPaused = true - } - } } // cancelUnneededDeployments cancels any deployment that is not needed. If the @@ -512,6 +515,12 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { a.computeMigrations(desiredChanges, migrate, tg, isCanarying) a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive) + // Deployments that are still initializing need to be sent in full in the + // plan so its internal state can be persisted by the plan applier. + if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusInitializing { + a.result.deployment = a.deployment + } + deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace, migrate, rescheduleNow, place, rescheduleLater, requiresCanaries) @@ -889,11 +898,6 @@ func (a *allocReconciler) createDeployment(groupName string, strategy *structs.U // A previous group may have made the deployment already. If not create one. if a.deployment == nil { a.deployment = structs.NewDeployment(a.job, a.evalPriority) - // in multiregion jobs, most deployments start in a pending state - if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) { - a.deployment.Status = structs.DeploymentStatusPending - a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer - } a.result.deployment = a.deployment } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index fc88fb54727a..d30e65a5ca9c 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -6074,34 +6074,34 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) { expected: true, }, { - name: "multiregion periodic service is not paused", - jobType: structs.JobTypeService, - isMultiregion: true, - isPeriodic: true, + name: "single region batch job is not paused", + jobType: structs.JobTypeBatch, + isMultiregion: false, + isPeriodic: false, isParameterized: false, expected: false, }, { - name: "multiregion parameterized service is not paused", - jobType: structs.JobTypeService, - isMultiregion: true, + name: "multiregion batch job is not paused", + jobType: structs.JobTypeBatch, + isMultiregion: false, isPeriodic: false, - isParameterized: true, + isParameterized: false, expected: false, }, { - name: "single region batch job is not paused", + name: "multiregion parameterized batch is not paused", jobType: structs.JobTypeBatch, - isMultiregion: false, + isMultiregion: true, isPeriodic: false, - isParameterized: false, + isParameterized: true, expected: false, }, { - name: "multiregion batch job is not paused", + name: "multiregion periodic batch is not paused", jobType: structs.JobTypeBatch, - isMultiregion: false, - isPeriodic: false, + isMultiregion: true, + isPeriodic: true, isParameterized: false, expected: false, }, @@ -6119,8 +6119,18 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) { require.NotNil(t, job, "invalid job type", tc.jobType) + var deployment *structs.Deployment if tc.isMultiregion { job.Multiregion = multiregionCfg + + // This deployment is created by the Job.Register RPC and + // fetched by the scheduler before handing it to the + // reconciler. + if job.UsesDeployments() { + deployment = structs.NewDeployment(job, 100) + deployment.Status = structs.DeploymentStatusInitializing + deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer + } } if tc.isPeriodic { @@ -6132,8 +6142,8 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) { } reconciler := NewAllocReconciler( - testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, - nil, nil, nil, "", job.Priority, true) + testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, deployment, + nil, nil, "", job.Priority, true) _ = reconciler.Compute() From 7d21ad8bb7c03c395b58512c4f7cb61cc919ad89 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 18 Nov 2022 20:09:39 -0500 Subject: [PATCH 2/2] changelog: add entry for #15325 --- .changelog/15325.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/15325.txt diff --git a/.changelog/15325.txt b/.changelog/15325.txt new file mode 100644 index 000000000000..1ecec04c308f --- /dev/null +++ b/.changelog/15325.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler (Enterprise): Fixed a bug that prevented new allocations from multiregion jobs to be placed in situations where other regions are not involved, such as node updates. +```