Skip to content

Commit

Permalink
Backport of scheduler: create placements for non-register MRD into re…
Browse files Browse the repository at this point in the history
…lease/1.4.x (#15390)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core committed Nov 25, 2022
1 parent c203a84 commit f13d0a9
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 59 deletions.
3 changes: 3 additions & 0 deletions .changelog/15325.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler (Enterprise): Fixed a bug that prevented new allocations from multiregion jobs to be placed in situations where other regions are not involved, such as node updates.
```
31 changes: 31 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,41 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index
}
}

if req.Deployment != nil {
// Cancel any preivous deployment.
lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID)
if err != nil {
return fmt.Errorf("failed to retrieve latest deployment: %v", err)
}
if lastDeployment != nil && lastDeployment.Active() {
activeDeployment := lastDeployment.Copy()
activeDeployment.Status = structs.DeploymentStatusCancelled
activeDeployment.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
if err := n.state.UpsertDeployment(index, activeDeployment); err != nil {
return err
}
}

// Update the deployment with the latest job indexes.
req.Deployment.JobCreateIndex = req.Job.CreateIndex
req.Deployment.JobModifyIndex = req.Job.ModifyIndex
req.Deployment.JobSpecModifyIndex = req.Job.JobModifyIndex
req.Deployment.JobVersion = req.Job.Version

if err := n.state.UpsertDeployment(index, req.Deployment); err != nil {
return err
}
}

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index

if req.Deployment != nil {
req.Eval.DeploymentID = req.Deployment.ID
}

if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
submittedEval = true
}

// Pre-register a deployment if necessary.
args.Deployment = j.multiregionCreateDeployment(job, eval)

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
Expand Down
6 changes: 6 additions & 0 deletions nomad/job_endpoint_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job, nomadACLToken *s
return nil, nil
}

// multiregionCreateDeployment is used to create a deployment to register along
// with the job, if required.
func (j *Job) multiregionCreateDeployment(job *structs.Job, eval *structs.Evaluation) *structs.Deployment {
return nil
}

// multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, newVersion uint64) (bool, error) {
return false, nil
Expand Down
3 changes: 1 addition & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error
return txn.Commit()
}

// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
// true, all prior deployments for the same job will be cancelled.
// UpsertDeployment is used to insert or update a new deployment.
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
Expand Down
24 changes: 15 additions & 9 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ type JobRegisterRequest struct {
// Eval is the evaluation that is associated with the job registration
Eval *Evaluation

// Deployment is the deployment to be create when the job is registered. If
// there is an active deployment for the job it will be canceled.
Deployment *Deployment

WriteRequest
}

Expand Down Expand Up @@ -9219,14 +9223,15 @@ func (v *Vault) Validate() error {

const (
// DeploymentStatuses are the various states a deployment can be be in
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusInitializing = "initializing"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"

// TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag

Expand Down Expand Up @@ -9360,7 +9365,8 @@ func (d *Deployment) Copy() *Deployment {
// Active returns whether the deployment is active or terminal.
func (d *Deployment) Active() bool {
switch d.Status {
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending:
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked,
DeploymentStatusUnblocking, DeploymentStatusInitializing, DeploymentStatusPending:
return true
default:
return false
Expand Down
68 changes: 36 additions & 32 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,24 +231,35 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
}

func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// Mark the deployment as complete if possible
if a.deployment != nil && deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
if a.deployment != nil {
// Mark the deployment as complete if possible
if deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
} else {
}

// Mark the deployment as pending since its state is now computed.
if a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}
Expand All @@ -269,26 +280,18 @@ func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// allocReconciler that indicate the state of the deployment if one
// is required. The flags that are managed are:
// 1. deploymentFailed: Did the current deployment fail just as named.
// 2. deploymentPaused: Multiregion job types that use deployments run
// the deployments later during the fan-out stage. When the deployment
// is created it will be in a pending state. If an invariant violation
// is detected by the deploymentWatcher during it will enter a paused
// state. This flag tells Compute we're paused or pending, so we should
// not make placements on the deployment.
// 2. deploymentPaused: Set to true when the current deployment is paused,
// which is usually a manual user operation, or if the deployment is
// pending or initializing, which are the initial states for multi-region
// job deployments. This flag tells Compute that we should not make
// placements on the deployment.
func (a *allocReconciler) computeDeploymentPaused() {
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deployment.Status == structs.DeploymentStatusPending ||
a.deployment.Status == structs.DeploymentStatusInitializing
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
if a.job.IsMultiregion() &&
a.job.UsesDeployments() &&
!(a.job.IsPeriodic() || a.job.IsParameterized()) {

a.deploymentPaused = true
}
}
}

// cancelUnneededDeployments cancels any deployment that is not needed. If the
Expand Down Expand Up @@ -512,6 +515,12 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
a.computeMigrations(desiredChanges, migrate, tg, isCanarying)
a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive)

// Deployments that are still initializing need to be sent in full in the
// plan so its internal state can be persisted by the plan applier.
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deployment = a.deployment
}

deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace,
migrate, rescheduleNow, place, rescheduleLater, requiresCanaries)

Expand Down Expand Up @@ -889,11 +898,6 @@ func (a *allocReconciler) createDeployment(groupName string, strategy *structs.U
// A previous group may have made the deployment already. If not create one.
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
a.result.deployment = a.deployment
}

Expand Down
42 changes: 26 additions & 16 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6074,34 +6074,34 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
expected: true,
},
{
name: "multiregion periodic service is not paused",
jobType: structs.JobTypeService,
isMultiregion: true,
isPeriodic: true,
name: "single region batch job is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isParameterized: false,
expected: false,
},
{
name: "multiregion parameterized service is not paused",
jobType: structs.JobTypeService,
isMultiregion: true,
name: "multiregion batch job is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isParameterized: true,
isParameterized: false,
expected: false,
},
{
name: "single region batch job is not paused",
name: "multiregion parameterized batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isMultiregion: true,
isPeriodic: false,
isParameterized: false,
isParameterized: true,
expected: false,
},
{
name: "multiregion batch job is not paused",
name: "multiregion periodic batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isMultiregion: true,
isPeriodic: true,
isParameterized: false,
expected: false,
},
Expand All @@ -6119,8 +6119,18 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {

require.NotNil(t, job, "invalid job type", tc.jobType)

var deployment *structs.Deployment
if tc.isMultiregion {
job.Multiregion = multiregionCfg

// This deployment is created by the Job.Register RPC and
// fetched by the scheduler before handing it to the
// reconciler.
if job.UsesDeployments() {
deployment = structs.NewDeployment(job, 100)
deployment.Status = structs.DeploymentStatusInitializing
deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
}

if tc.isPeriodic {
Expand All @@ -6132,8 +6142,8 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
}

reconciler := NewAllocReconciler(
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, nil, nil, "", job.Priority, true)
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, deployment,
nil, nil, "", job.Priority, true)

_ = reconciler.Compute()

Expand Down

0 comments on commit f13d0a9

Please sign in to comment.