Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle FSM.Apply errors in raftApply #16287

Merged
merged 4 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/16287.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:bug
server: Fixed a bug where deregistering a job that was already garbage collected would create a new evaluation
```

```release-note:bug
server: Fixed a bug where the `system reconcile summaries` command and API would not return any scheduler-related errors
```

```release-note:bug
server: Fixed a bug where node updates that produced errors from service discovery or CSI plugin updates were not logged
```
2 changes: 1 addition & 1 deletion api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ func TestJobs_Deregister(t *testing.T) {
must.NoError(t, err)
assertWriteMeta(t, wm)

// Attempting delete on non-existing job returns an error
// Attempting delete on non-existing job does not return an error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I can imagine that has provoked some unnecessary 🤔

_, _, err = jobs.Deregister("nope", false, nil)
must.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func queryNodeList(t *testing.T, nodes *Nodes) ([]*NodeListStub, *QueryMeta) {
t.Helper()
var (
nodeListStub []*NodeListStub
queryMeta *QueryMeta
Expand Down
42 changes: 6 additions & 36 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,16 +1342,11 @@ func (a *ACL) UpsertRoles(
}

// Update via Raft.
out, index, err := a.srv.raftApply(structs.ACLRolesUpsertRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLRolesUpsertRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Populate the response. We do a lookup against the state to pick up the
// proper create / modify times.
stateSnapshot, err = a.srv.State().Snapshot()
Expand Down Expand Up @@ -1413,16 +1408,11 @@ func (a *ACL) DeleteRolesByID(
}

// Update via Raft.
out, index, err := a.srv.raftApply(structs.ACLRolesDeleteByIDRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLRolesDeleteByIDRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Update the index. There is no need to floor this as we are writing to
// state and therefore will get a non-zero index response.
reply.Index = index
Expand Down Expand Up @@ -1899,16 +1889,11 @@ func (a *ACL) UpsertAuthMethods(
}

// Update via Raft
out, index, err := a.srv.raftApply(structs.ACLAuthMethodsUpsertRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLAuthMethodsUpsertRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Populate the response. We do a lookup against the state to pick up the
// proper create / modify times.
stateSnapshot, err = a.srv.State().Snapshot()
Expand Down Expand Up @@ -1972,16 +1957,11 @@ func (a *ACL) DeleteAuthMethods(
}

// Update via Raft
out, index, err := a.srv.raftApply(structs.ACLAuthMethodsDeleteRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLAuthMethodsDeleteRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Update the index
reply.Index = index
return nil
Expand Down Expand Up @@ -2278,16 +2258,11 @@ func (a *ACL) UpsertBindingRules(
}

// Update via Raft.
out, index, err := a.srv.raftApply(structs.ACLBindingRulesUpsertRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLBindingRulesUpsertRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Populate the response. We do a lookup against the state to pick up the
// proper create / modify indexes.
stateSnapshot, err = a.srv.State().Snapshot()
Expand Down Expand Up @@ -2353,16 +2328,11 @@ func (a *ACL) DeleteBindingRules(
}

// Update via Raft.
out, index, err := a.srv.raftApply(structs.ACLBindingRulesDeleteRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLBindingRulesDeleteRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Update the index
reply.Index = index
return nil
Expand Down
38 changes: 8 additions & 30 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -397,14 +394,11 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return fmt.Errorf("missing volume IDs")
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -458,14 +452,11 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
args.NodeID = alloc.NodeID
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
Expand Down Expand Up @@ -931,14 +922,11 @@ func (v *CSIVolume) checkpointClaim(vol *structs.CSIVolume, claim *structs.CSIVo
Namespace: vol.Namespace,
},
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
vol.ModifyIndex = index
return nil
}
Expand Down Expand Up @@ -1023,13 +1011,10 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
multierror.Append(&mErr, respErr)
multierror.Append(&mErr, err)
}

err = mErr.ErrorOrNil()
Expand Down Expand Up @@ -1123,14 +1108,11 @@ func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.
VolumeIDs: args.VolumeIDs,
WriteRequest: args.WriteRequest,
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -1611,16 +1593,12 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.
return fmt.Errorf("missing plugin ID")
}

resp, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
return err
}

if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
Expand Down
19 changes: 3 additions & 16 deletions nomad/drainer_shims.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
}
}

resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return d.convertApplyErrors(resp, index, err)
_, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return index, err
}

func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) {
Expand All @@ -38,19 +38,6 @@ func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.Des
Evals: evals,
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
resp, index, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return d.convertApplyErrors(resp, index, err)
}

// convertApplyErrors parses the results of a raftApply and returns the index at
// which it was applied and any error that occurred. Raft Apply returns two
// separate errors, Raft library errors and user returned errors from the FSM.
// This helper, joins the errors by inspecting the applyResponse for an error.
func (d drainerShim) convertApplyErrors(applyResp interface{}, index uint64, err error) (uint64, error) {
if applyResp != nil {
if fsmErr, ok := applyResp.(error); ok && fsmErr != nil {
return index, fsmErr
}
}
_, index, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return index, err
}
23 changes: 9 additions & 14 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
args.Deployment = j.multiregionCreateDeployment(job, eval)

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
j.logger.Error("registering job failed", "error", err, "fsm", true)
return err
}
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.logger.Error("registering job failed", "error", err, "raft", true)
j.logger.Error("registering job failed", "error", err)
return err
}

Expand Down Expand Up @@ -850,6 +846,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
if err != nil {
return err
}
if job == nil {
return nil
}

var eval *structs.Evaluation

Expand All @@ -858,7 +857,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
now := time.Now().UnixNano()

// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
if !(job.IsPeriodic() || job.IsParameterized()) {

// The evaluation priority is determined by several factors. It
// defaults to the job default priority and is overridden by the
Expand All @@ -867,7 +866,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// If the user supplied an eval priority override, we subsequently
// use this.
priority := structs.JobDefaultPriority
if job != nil {
if job.Priority > 0 {
priority = job.Priority
}
if args.EvalPriority > 0 {
Expand Down Expand Up @@ -2027,13 +2026,9 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
}

// Commit this update via Raft
fsmErr, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err, ok := fsmErr.(error); ok && err != nil {
j.logger.Error("dispatched job register failed", "error", err, "fsm", true)
return err
}
_, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err != nil {
j.logger.Error("dispatched job register failed", "error", err, "raft", true)
j.logger.Error("dispatched job register failed", "error")
return err
}

Expand Down
Loading