Skip to content

Commit

Permalink
Atomic eval insertion with job (de-)registration
Browse files Browse the repository at this point in the history
This fixes a bug where jobs may get "stuck" unprocessed that
dispropotionately affect periodic jobs around leadership transitions.
When registering a job, the job registration and the eval to process it
get applied to raft as two separate transactions; if the job
registration succeeds but eval application fails, the job may remain
unprocessed. Operators may detect such failure, when submitting a job
update and get a 500 error code, and they could retry; periodic jobs
failures are more likely to go unnoticed, and no further periodic
invocations will be processed until an operator force evaluation.

This fixes the issue by ensuring that the job registration and eval
application get persisted and processed atomically in the same raft log
entry.

Also, applies the same change to ensure atomicity in job deregistration.

Backward Compatibility

We must maintain compatibility in two scenarios: mixed clusters where a
leader can handle atomic updates but followers cannot, and a recent
cluster processes old log entries from legacy or mixed cluster mode.

To handle this constraints: ensure that the leader continue to emit the
Evaluation log entry until all servers have upgraded; also, when
processing raft logs, the servers honor evaluations found in both spots,
the Eval in job (de-)registration and the eval update entries.

When an updated server sees mix-mode behavior where an eval is inserted
into the raft log twice, it ignores the second instance.

I made one compromise in consistency in the mixed-mode scenario: servers
may disagree on the eval.CreateIndex value: the leader and updated
servers will report the job registration index while old servers will
report the index of the eval update log entry. This discripency doesn't
seem to be material - it's the eval.JobModifyIndex that matters.
  • Loading branch information
Mahmood Ali committed Jul 14, 2020
1 parent 3703489 commit 97c69ee
Show file tree
Hide file tree
Showing 7 changed files with 676 additions and 100 deletions.
32 changes: 29 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,13 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
}
}

if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}

return nil
}

Expand All @@ -565,14 +572,30 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

return n.state.WithWriteTransaction(func(tx state.Txn) error {
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil {
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)

if err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}

return nil
})

// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}

if err != nil {
return err
}

return nil
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
Expand Down Expand Up @@ -663,7 +686,10 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
}

func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvals(index, evals); err != nil {
if err := n.state.UpsertEvals(index, evals); len(evals) == 1 && err == state.ErrDuplicateEval {
// the request is a duplicate, ignore processing it
return nil
} else if err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}
Expand Down
160 changes: 91 additions & 69 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,31 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
submittedEval := false

// Set the submit time
args.Job.SubmitTime = time.Now().UTC().UnixNano()

// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
args.Eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = args.Eval.ID
}

// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time
args.Job.SetSubmitTime()

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
Expand All @@ -328,52 +349,51 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

submittedEval = true

// Populate the reply with job information
reply.JobModifyIndex = index
reply.Index = index

if args.Eval != nil {
reply.EvalCreateIndex = index
}

} else {
reply.JobModifyIndex = existingJob.JobModifyIndex
}

// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex

// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
if args.Eval == nil {
return nil
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check.
// 0.12.1 introduced atomic eval job registration
if args.Eval != nil &&
!(submittedEval && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false)) {
args.Eval.JobModifyIndex = reply.JobModifyIndex
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{args.Eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}
// Commit this evaluation via Raft
// There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not, before 0.12.1
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
if !submittedEval {
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}
}

// Kick off a multiregion deployment (enterprise only).
if isRunner {
Expand Down Expand Up @@ -766,6 +786,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
}

// The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
args.Eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = args.Eval.ID
}

// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
Expand All @@ -775,6 +814,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD

// Populate the reply with job information
reply.JobModifyIndex = index
reply.EvalCreateIndex = index
reply.Index = index

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
Expand All @@ -783,44 +824,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
result = multierror.Append(result, err)
}

// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}

// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// COMPAT(1.1.0) - 0.12.1 introduced atomic job deregistration eval
if args.Eval != nil &&
!ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
args.Eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{args.Eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
// Commit this evaluation via Raft
_, _, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return result.ErrorOrNil()
}

Expand Down
Loading

0 comments on commit 97c69ee

Please sign in to comment.