Skip to content

Commit

Permalink
scheduler: create placements for non-register MRD (#15325)
Browse files Browse the repository at this point in the history
* scheduler: create placements for non-register MRD

For multiregion jobs, the scheduler does not create placements on
registration because the deployment must wait for the other regions.
Once of these regions will then trigger the deployment to run.

Currently, this is done in the scheduler by considering any eval for a
multiregion job as "paused" since it's expected that another region will
eventually unpause it.

This becomes a problem where evals not triggered by a job registration
happen, such as on a node update. These types of regional changes do not
have other regions waiting to progress the deployment, and so they were
never resulting in placements.

The fix is to create a deployment at job registration time. This
additional piece of state allows the scheduler to differentiate between
a multiregion change, where there are other regions engaged in the
deployment so no placements are required, from a regional change, where
the scheduler does need to create placements.

This deployment starts in the new "initializing" status to signal to the
scheduler that it needs to compute the initial deployment state. The
multiregion deployment will wait until this deployment state is
persisted and its starts is set to "pending". Without this state
transition it's possible to hit a race condition where the plan applier
and the deployment watcher may step of each other and overwrite their
changes.

* changelog: add entry for #15325
  • Loading branch information
lgfa29 committed Nov 25, 2022
1 parent a861c83 commit 41a6c2e
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 32 deletions.
3 changes: 3 additions & 0 deletions .changelog/15325.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler (Enterprise): Fixed a bug that prevented new allocations from multiregion jobs to be placed in situations where other regions are not involved, such as node updates.
```
31 changes: 31 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,41 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index
}
}

if req.Deployment != nil {
// Cancel any preivous deployment.
lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID)
if err != nil {
return fmt.Errorf("failed to retrieve latest deployment: %v", err)
}
if lastDeployment != nil && lastDeployment.Active() {
activeDeployment := lastDeployment.Copy()
activeDeployment.Status = structs.DeploymentStatusCancelled
activeDeployment.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
if err := n.state.UpsertDeployment(index, activeDeployment); err != nil {
return err
}
}

// Update the deployment with the latest job indexes.
req.Deployment.JobCreateIndex = req.Job.CreateIndex
req.Deployment.JobModifyIndex = req.Job.ModifyIndex
req.Deployment.JobSpecModifyIndex = req.Job.JobModifyIndex
req.Deployment.JobVersion = req.Job.Version

if err := n.state.UpsertDeployment(index, req.Deployment); err != nil {
return err
}
}

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index

if req.Deployment != nil {
req.Eval.DeploymentID = req.Deployment.ID
}

if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
submittedEval = true
}

// Pre-register a deployment if necessary.
args.Deployment = j.multiregionCreateDeployment(job, eval)

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
Expand Down
6 changes: 6 additions & 0 deletions nomad/job_endpoint_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) {
return nil, nil
}

// multiregionCreateDeployment is used to create a deployment to register along
// with the job, if required.
func (j *Job) multiregionCreateDeployment(job *structs.Job, eval *structs.Evaluation) *structs.Deployment {
return nil
}

// multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, newVersion uint64) (bool, error) {
return false, nil
Expand Down
3 changes: 1 addition & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error
return txn.Commit()
}

// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
// true, all prior deployments for the same job will be cancelled.
// UpsertDeployment is used to insert or update a new deployment.
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
Expand Down
24 changes: 15 additions & 9 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,10 @@ type JobRegisterRequest struct {
// Eval is the evaluation that is associated with the job registration
Eval *Evaluation

// Deployment is the deployment to be create when the job is registered. If
// there is an active deployment for the job it will be canceled.
Deployment *Deployment

WriteRequest
}

Expand Down Expand Up @@ -8831,14 +8835,15 @@ func (v *Vault) Validate() error {

const (
// DeploymentStatuses are the various states a deployment can be be in
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusInitializing = "initializing"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"

// TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag

Expand Down Expand Up @@ -8972,7 +8977,8 @@ func (d *Deployment) Copy() *Deployment {
// Active returns whether the deployment is active or terminal.
func (d *Deployment) Active() bool {
switch d.Status {
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending:
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked,
DeploymentStatusUnblocking, DeploymentStatusInitializing, DeploymentStatusPending:
return true
default:
return false
Expand Down
52 changes: 31 additions & 21 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,17 +203,10 @@ func (a *allocReconciler) Compute() *reconcileResults {
// Detect if the deployment is paused
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deployment.Status == structs.DeploymentStatusPending ||
a.deployment.Status == structs.DeploymentStatusInitializing
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
// When we create the deployment later, it will be in a pending
// state. But we also need to tell Compute we're paused, otherwise we
// make placements on the paused deployment.
if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true
}
}

// Reconcile each group
complete := true
Expand All @@ -222,24 +215,35 @@ func (a *allocReconciler) Compute() *reconcileResults {
complete = complete && groupComplete
}

// Mark the deployment as complete if possible
if a.deployment != nil && complete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
if a.deployment != nil {
// Mark the deployment as complete if possible
if complete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
} else {
}

// Mark the deployment as pending since its state is now computed.
if a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}
Expand Down Expand Up @@ -573,6 +577,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
a.deployment.TaskGroups[group] = dstate
}

// Deployments that are still initializing need to be sent in full in the
// plan so its internal state can be persisted by the plan applier.
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deployment = a.deployment
}

// deploymentComplete is whether the deployment is complete which largely
// means that no placements were made or desired to be made
deploymentComplete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && !requireCanary
Expand Down
116 changes: 116 additions & 0 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5169,3 +5169,119 @@ func TestReconciler_RescheduleNot_Batch(t *testing.T) {
},
})
}

// Tests the reconciler properly handles the logic for computeDeploymentPaused
// for various job types.
func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
ci.Parallel(t)

type testCase struct {
name string
jobType string
isMultiregion bool
isPeriodic bool
isParameterized bool
expected bool
}

multiregionCfg := mock.MultiregionJob().Multiregion
periodicCfg := mock.PeriodicJob().Periodic
parameterizedCfg := &structs.ParameterizedJobConfig{
Payload: structs.DispatchPayloadRequired,
}

testCases := []testCase{
{
name: "single region service is not paused",
jobType: structs.JobTypeService,
isMultiregion: false,
isPeriodic: false,
isParameterized: false,
expected: false,
},
{
name: "multiregion service is paused",
jobType: structs.JobTypeService,
isMultiregion: true,
isPeriodic: false,
isParameterized: false,
expected: true,
},
{
name: "single region batch job is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isParameterized: false,
expected: false,
},
{
name: "multiregion batch job is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isParameterized: false,
expected: false,
},
{
name: "multiregion parameterized batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: true,
isPeriodic: false,
isParameterized: true,
expected: false,
},
{
name: "multiregion periodic batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: true,
isPeriodic: true,
isParameterized: false,
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var job *structs.Job

if tc.jobType == structs.JobTypeService {
job = mock.Job()
} else if tc.jobType == structs.JobTypeBatch {
job = mock.BatchJob()
}

require.NotNil(t, job, "invalid job type", tc.jobType)

var deployment *structs.Deployment
if tc.isMultiregion {
job.Multiregion = multiregionCfg

// This deployment is created by the Job.Register RPC and
// fetched by the scheduler before handing it to the
// reconciler.
if job.Type == structs.JobTypeService {
deployment = structs.NewDeployment(job, 100)
deployment.Status = structs.DeploymentStatusInitializing
deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
}

if tc.isPeriodic {
job.Periodic = periodicCfg
}

if tc.isParameterized {
job.ParameterizedJob = parameterizedCfg
}

reconciler := NewAllocReconciler(
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, deployment,
nil, nil, "", job.Priority)

_ = reconciler.Compute()

require.Equal(t, tc.expected, reconciler.deploymentPaused)
})
}
}

0 comments on commit 41a6c2e

Please sign in to comment.