Skip to content

Commit

Permalink
handle FSM.Apply errors in raftApply (#16287) (#16302)
Browse files Browse the repository at this point in the history
The signature of the `raftApply` function requires that the caller unwrap the
first returned value (the response from `FSM.Apply`) to see if it's an
error. This puts the burden on the caller to remember to check two different
places for errors, and we've done so inconsistently.

Update `raftApply` to do the unwrapping for us and return any `FSM.Apply` error
as the error value. Similar work was done in Consul in
hashicorp/consul#9991. This eliminates some boilerplate
and surfaces a few minor bugs in the process:

* job deregistrations of already-GC'd jobs were still emitting evals
* reconcile job summaries does not return scheduler errors
* node updates did not report errors associated with inconsistent service
  discovery or CSI plugin states

Note that although _most_ of the `FSM.Apply` functions return only errors (which
makes it tempting to remove the first return value entirely), there are few that
return `bool` for some reason and Variables relies on the response value for
proper CAS checking.

Co-authored-by: Tim Gross <tgross@hashicorp.com>
  • Loading branch information
hc-github-team-nomad-core and tgross committed Mar 2, 2023
1 parent 5eaab12 commit e9aba9c
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 168 deletions.
11 changes: 11 additions & 0 deletions .changelog/16287.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:bug
server: Fixed a bug where deregistering a job that was already garbage collected would create a new evaluation
```

```release-note:bug
server: Fixed a bug where the `system reconcile summaries` command and API would not return any scheduler-related errors
```

```release-note:bug
server: Fixed a bug where node updates that produced errors from service discovery or CSI plugin updates were not logged
```
2 changes: 1 addition & 1 deletion api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ func TestJobs_Deregister(t *testing.T) {
must.NoError(t, err)
assertWriteMeta(t, wm)

// Attempting delete on non-existing job returns an error
// Attempting delete on non-existing job does not return an error
_, _, err = jobs.Deregister("nope", false, nil)
must.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func queryNodeList(t *testing.T, nodes *Nodes) ([]*NodeListStub, *QueryMeta) {
t.Helper()
var (
nodeListStub []*NodeListStub
queryMeta *QueryMeta
Expand Down
14 changes: 2 additions & 12 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,16 +1208,11 @@ func (a *ACL) UpsertRoles(
}

// Update via Raft.
out, index, err := a.srv.raftApply(structs.ACLRolesUpsertRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLRolesUpsertRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Populate the response. We do a lookup against the state to pick up the
// proper create / modify times.
stateSnapshot, err = a.srv.State().Snapshot()
Expand Down Expand Up @@ -1273,16 +1268,11 @@ func (a *ACL) DeleteRolesByID(
}

// Update via Raft.
out, index, err := a.srv.raftApply(structs.ACLRolesDeleteByIDRequestType, args)
_, index, err := a.srv.raftApply(structs.ACLRolesDeleteByIDRequestType, args)
if err != nil {
return err
}

// Check if the FSM response, which is an interface, contains an error.
if err, ok := out.(error); ok && err != nil {
return err
}

// Update the index. There is no need to floor this as we are writing to
// state and therefore will get a non-zero index response.
reply.Index = index
Expand Down
38 changes: 8 additions & 30 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -415,14 +412,11 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return fmt.Errorf("missing volume IDs")
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -470,14 +464,11 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
args.NodeID = alloc.NodeID
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
Expand Down Expand Up @@ -937,14 +928,11 @@ func (v *CSIVolume) checkpointClaim(vol *structs.CSIVolume, claim *structs.CSIVo
Namespace: vol.Namespace,
},
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
vol.ModifyIndex = index
return nil
}
Expand Down Expand Up @@ -1025,13 +1013,10 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
multierror.Append(&mErr, respErr)
multierror.Append(&mErr, err)
}

err = mErr.ErrorOrNil()
Expand Down Expand Up @@ -1120,14 +1105,11 @@ func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.
VolumeIDs: args.VolumeIDs,
WriteRequest: args.WriteRequest,
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -1570,16 +1552,12 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.
return fmt.Errorf("missing plugin ID")
}

resp, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
return err
}

if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
Expand Down
19 changes: 3 additions & 16 deletions nomad/drainer_shims.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
}
}

resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return d.convertApplyErrors(resp, index, err)
_, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return index, err
}

func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) {
Expand All @@ -38,19 +38,6 @@ func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.Des
Evals: evals,
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
resp, index, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return d.convertApplyErrors(resp, index, err)
}

// convertApplyErrors parses the results of a raftApply and returns the index at
// which it was applied and any error that occurred. Raft Apply returns two
// separate errors, Raft library errors and user returned errors from the FSM.
// This helper, joins the errors by inspecting the applyResponse for an error.
func (d drainerShim) convertApplyErrors(applyResp interface{}, index uint64, err error) (uint64, error) {
if applyResp != nil {
if fsmErr, ok := applyResp.(error); ok && fsmErr != nil {
return index, fsmErr
}
}
_, index, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return index, err
}
23 changes: 9 additions & 14 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
args.Deployment = j.multiregionCreateDeployment(job, eval)

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
j.logger.Error("registering job failed", "error", err, "fsm", true)
return err
}
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.logger.Error("registering job failed", "error", err, "raft", true)
j.logger.Error("registering job failed", "error", err)
return err
}

Expand Down Expand Up @@ -812,6 +808,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
if err != nil {
return err
}
if job == nil {
return nil
}

var eval *structs.Evaluation

Expand All @@ -820,7 +819,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
now := time.Now().UnixNano()

// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
if !(job.IsPeriodic() || job.IsParameterized()) {

// The evaluation priority is determined by several factors. It
// defaults to the job default priority and is overridden by the
Expand All @@ -829,7 +828,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// If the user supplied an eval priority override, we subsequently
// use this.
priority := structs.JobDefaultPriority
if job != nil {
if job.Priority > 0 {
priority = job.Priority
}
if args.EvalPriority > 0 {
Expand Down Expand Up @@ -1934,13 +1933,9 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
}

// Commit this update via Raft
fsmErr, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err, ok := fsmErr.(error); ok && err != nil {
j.logger.Error("dispatched job register failed", "error", err, "fsm", true)
return err
}
_, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err != nil {
j.logger.Error("dispatched job register failed", "error", err, "raft", true)
j.logger.Error("dispatched job register failed", "error")
return err
}

Expand Down
54 changes: 8 additions & 46 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3577,13 +3577,11 @@ func TestJobEndpoint_Deregister_ACL(t *testing.T) {
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)

// Deregistration is not idempotent, produces a new eval after the job is
// deregistered. TODO(langmartin) make it idempotent.
// Deregistration is idempotent
var validResp2 structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req, &validResp2)
require.NoError(err)
require.NotEqual("", validResp2.EvalID)
require.NotEqual(validResp.EvalID, validResp2.EvalID)
must.NoError(t, err)
must.Eq(t, "", validResp2.EvalID)
}

func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
Expand All @@ -3606,51 +3604,15 @@ func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
},
}
var resp2 structs.JobDeregisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2))
must.Eq(t, 0, resp2.JobModifyIndex, must.Sprint("expected no modify index"))

// Lookup the evaluation
state := s1.fsm.State()
ws := memdb.NewWatchSet()
eval, err := state.EvalByID(ws, resp2.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != resp2.EvalCreateIndex {
t.Fatalf("index mis-match")
}

if eval.Priority != structs.JobDefaultPriority {
t.Fatalf("bad: %#v", eval)
}
if eval.Type != structs.JobTypeService {
t.Fatalf("bad: %#v", eval)
}
if eval.TriggeredBy != structs.EvalTriggerJobDeregister {
t.Fatalf("bad: %#v", eval)
}
if eval.JobID != jobID {
t.Fatalf("bad: %#v", eval)
}
if eval.JobModifyIndex != resp2.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("eval CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("eval ModifyTime is unset: %#v", eval)
}
eval, err := state.EvalsByJob(ws, structs.DefaultNamespace, jobID)
must.NoError(t, err)
must.Nil(t, eval)
}

func TestJobEndpoint_Deregister_EvalPriority(t *testing.T) {
Expand Down
15 changes: 3 additions & 12 deletions nomad/keyring_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ func (k *Keyring) Rotate(args *structs.KeyringRotateRootKeyRequest, reply *struc
Rekey: args.Full,
WriteRequest: args.WriteRequest,
}
out, index, err := k.srv.raftApply(structs.RootKeyMetaUpsertRequestType, req)
_, index, err := k.srv.raftApply(structs.RootKeyMetaUpsertRequestType, req)
if err != nil {
return err
}
if err, ok := out.(error); ok && err != nil {
return err
}
reply.Key = rootKey.Meta
reply.Index = index

Expand Down Expand Up @@ -175,13 +172,10 @@ func (k *Keyring) Update(args *structs.KeyringUpdateRootKeyRequest, reply *struc
}

// update the metadata via Raft
out, index, err := k.srv.raftApply(structs.RootKeyMetaUpsertRequestType, metaReq)
_, index, err := k.srv.raftApply(structs.RootKeyMetaUpsertRequestType, metaReq)
if err != nil {
return err
}
if err, ok := out.(error); ok && err != nil {
return err
}

reply.Index = index
return nil
Expand Down Expand Up @@ -319,13 +313,10 @@ func (k *Keyring) Delete(args *structs.KeyringDeleteRootKeyRequest, reply *struc
}

// update via Raft
out, index, err := k.srv.raftApply(structs.RootKeyMetaDeleteRequestType, args)
_, index, err := k.srv.raftApply(structs.RootKeyMetaDeleteRequestType, args)
if err != nil {
return err
}
if err, ok := out.(error); ok && err != nil {
return err
}

// remove the key from the keyring too
k.encrypter.RemoveKey(args.KeyID)
Expand Down
Loading

0 comments on commit e9aba9c

Please sign in to comment.