Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic eval insertion with job (de-)registration #8435

Merged
merged 5 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,13 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
}
}

if req.Eval != nil {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}

return nil
}

Expand All @@ -565,14 +572,30 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

return n.state.WithWriteTransaction(func(tx state.Txn) error {
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil {
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx)

if err != nil {
n.logger.Error("deregistering job failed", "error", err)
return err
}

return nil
})

// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}
Comment on lines +590 to +596
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that here we attempt to insert the eval even if the job deregistration fails. I find this behavior very odd but it's explicitly tested in

func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
- with it insuring that the resulting eval is populated and have JobModifyIndex set ?! I kept the behavior the same

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this was intended to protect against the previous non-atomic behavior? Ex. if you deregister but the eval wasn't persisted, but then tried to deregister again, the job would be potentially gone but you'd still want an eval to clean it up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually seems like a bug to me, but I think one that does not impact correctness (only waste some work doing nothing).

From peeking around generic_sched.go and reconcile.go, I'm not seeing us check if the evaluation is a Deregister. We always seem to checked if Job.Stopped is true! This means if we fail to update the statestore with the stopped Job, the Deregister evaluation will be the same as any other evaluation and end up being a noop for the job as it is not stopped and presumably already scheduled/allocated.

I suspect that @tgross is correct and that the test is merely asserting the non-atomic behavior existed: #981

Since we're making Job+Eval submissions atomic, I think we should at least trying to make this section atomic as well. I can't figure out where a Deregister eval for a non-Stopped job would have a desirable affect, but perhaps somebody else can find a case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to let the current behavior stand as-is, as stopping making an eval is a user visible change. Will follow up in another PR and we can discuss this issue further there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to let the current behavior stand as-is, as stopping making an eval is a user visible change.

I don't find this reason sufficient to keep it since the eval in question is a dereg but the effect would be a noop. If that's the behavior I feel like we're emitting a useless and actively misleading eval which should be treated like a bug and removed. There's no benefit to leaving it in place as anyone observing it would only be confused by its lack of affect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it should be changed - but I believe such a user-visible behavior (albeit a small one) change is outside the scope of this PR, so I will follow up in another PR.


if err != nil {
return err
}

return nil
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
Expand Down Expand Up @@ -663,7 +686,10 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
}

func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvals(index, evals); err != nil {
if err := n.state.UpsertEvals(index, evals); len(evals) == 1 && err == state.ErrDuplicateEval {
// the request is a duplicate, ignore processing it
notnoop marked this conversation as resolved.
Show resolved Hide resolved
return nil
} else if err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}
Expand Down
160 changes: 91 additions & 69 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,31 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
notnoop marked this conversation as resolved.
Show resolved Hide resolved
submittedEval := false

// Set the submit time
args.Job.SubmitTime = time.Now().UTC().UnixNano()
notnoop marked this conversation as resolved.
Show resolved Hide resolved

// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
args.Eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = args.Eval.ID
}

// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time
args.Job.SetSubmitTime()

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
Expand All @@ -328,52 +349,51 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

submittedEval = true

// Populate the reply with job information
reply.JobModifyIndex = index
reply.Index = index

if args.Eval != nil {
reply.EvalCreateIndex = index
}

} else {
reply.JobModifyIndex = existingJob.JobModifyIndex
}

// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex

// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
if args.Eval == nil {
return nil
}

// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check.
notnoop marked this conversation as resolved.
Show resolved Hide resolved
// 0.12.1 introduced atomic eval job registration
if args.Eval != nil &&
Copy link
Member

@tgross tgross Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this args.Eval != nil is always true; we're returning early from checking args.Eval == nil right above it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, do we need to remove the returning early above for multi region deployment? Returning early means that periodic/dispatch jobs will not be handled by multiregionStart? This PR doesn't change the behavior, but just noticed the MRD call.

Copy link
Member

@tgross tgross Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Periodic and dispatch jobs get kicked off with their normal dispatch mechanisms in MRD. We special case them in schedule/reconcile.go, rather than running them through the MRD in deploymentwatcher. (There might be future work there but that's a later phase of work.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, I'm tempted to remove the early return and somehow make it more multiregionStart that doesn't apply to paramterized/periodic jobs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad idea. There's a big blog of return nil at the top of the ENT functionality where we return early when we don't need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'll follow up in another PR - I'm a bit confused about the interaction and would love to do more testing.

!(submittedEval && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false)) {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
args.Eval.JobModifyIndex = reply.JobModifyIndex
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{args.Eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}
// Commit this evaluation via Raft
// There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not, before 0.12.1
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
if !submittedEval {
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}
}

// Kick off a multiregion deployment (enterprise only).
if isRunner {
Expand Down Expand Up @@ -766,6 +786,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
}

// The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
notnoop marked this conversation as resolved.
Show resolved Hide resolved
// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
args.Eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = args.Eval.ID
}

// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
Expand All @@ -775,6 +814,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD

// Populate the reply with job information
reply.JobModifyIndex = index
reply.EvalCreateIndex = index
reply.Index = index

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
Expand All @@ -783,44 +824,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
result = multierror.Append(result, err)
}

// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}

// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// COMPAT(1.1.0) - 0.12.1 introduced atomic job deregistration eval
if args.Eval != nil &&
!ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
args.Eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{args.Eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
// Commit this evaluation via Raft
_, _, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
notnoop marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return result.ErrorOrNil()
}

Expand Down
Loading