Skip to content

Commit

Permalink
Merge pull request #8435 from hashicorp/b-atomic-job-register
Browse files Browse the repository at this point in the history
Atomic eval insertion with job (de-)registration
  • Loading branch information
Mahmood Ali committed Jul 15, 2020
2 parents 7346774 + 37ad947 commit 24a7506
Show file tree
Hide file tree
Showing 7 changed files with 664 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# vi: set ft=ruby :
#

LINUX_BASE_BOX = "bento/ubuntu-16.04"
LINUX_BASE_BOX = "bento/ubuntu-18.04"
FREEBSD_BASE_BOX = "freebsd/FreeBSD-11.3-STABLE"

LINUX_IP_ADDRESS = "10.199.0.200"
Expand Down
31 changes: 29 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,15 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
}
}

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}

return nil
}

Expand All @@ -565,14 +574,32 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

return n.state.WithWriteTransaction(func(tx state.Txn) error {
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil {
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)

if err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}

return nil
})

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}

if err != nil {
return err
}

return nil
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
Expand Down
180 changes: 107 additions & 73 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,39 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Create a new evaluation
now := time.Now().UnixNano()
submittedEval := false
var eval *structs.Evaluation

// Set the submit time
args.Job.SubmitTime = now

// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = eval.ID
}

// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time
args.Job.SetSubmitTime()

// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
submittedEval = true
}

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
Expand All @@ -330,50 +359,42 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

// Populate the reply with job information
reply.JobModifyIndex = index
reply.Index = index

if submittedEval {
reply.EvalCreateIndex = index
}

} else {
reply.JobModifyIndex = existingJob.JobModifyIndex
}

// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex

// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
if eval == nil {
return nil
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
if eval != nil && !submittedEval {
eval.JobModifyIndex = reply.JobModifyIndex
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}
// Commit this evaluation via Raft
// There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not, before 0.12.1
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

// Kick off a multiregion deployment (enterprise only).
if isRunner {
Expand Down Expand Up @@ -689,7 +710,7 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down Expand Up @@ -766,6 +787,33 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
}

var eval *structs.Evaluation

// The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UnixNano()

// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = eval.ID
}

// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}

// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
Expand All @@ -775,6 +823,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD

// Populate the reply with job information
reply.JobModifyIndex = index
reply.EvalCreateIndex = index
reply.Index = index

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
Expand All @@ -783,44 +833,28 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
result = multierror.Append(result, err)
}

// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}
// COMPAT(1.1.0) - Remove entire conditional block
// 0.12.1 introduced atomic job deregistration eval
if eval != nil && args.Eval == nil {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return result.ErrorOrNil()
}

Expand Down Expand Up @@ -883,7 +917,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Expand Down Expand Up @@ -976,7 +1010,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName))
}

now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()

// If the count is present, commit the job update via Raft
// for now, we'll do this even if count didn't change
Expand Down Expand Up @@ -1651,7 +1685,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

// Create an eval and mark it as requiring annotations and insert that as well
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down Expand Up @@ -1849,7 +1883,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
// If the job is periodic, we don't create an eval.
if !dispatchJob.IsPeriodic() {
// Create a new evaluation
now := time.Now().UTC().UnixNano()
now := time.Now().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down
Loading

0 comments on commit 24a7506

Please sign in to comment.