Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of handle FSM.Apply errors in raftApply into release/1.3.x #16301

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/16287.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:bug
server: Fixed a bug where deregistering a job that was already garbage collected would create a new evaluation
```

```release-note:bug
server: Fixed a bug where the `system reconcile summaries` command and API would not return any scheduler-related errors
```

```release-note:bug
server: Fixed a bug where node updates that produced errors from service discovery or CSI plugin updates were not logged
```
2 changes: 1 addition & 1 deletion api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func TestJobs_Deregister(t *testing.T) {
must.NoError(t, err)
assertWriteMeta(t, wm)

// Attempting delete on non-existing job returns an error
// Attempting delete on non-existing job does not return an error
_, _, err = jobs.Deregister("nope", false, nil)
must.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func queryNodeList(t *testing.T, nodes *Nodes) ([]*NodeListStub, *QueryMeta) {
t.Helper()
var (
nodeListStub []*NodeListStub
queryMeta *QueryMeta
Expand Down
38 changes: 8 additions & 30 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -415,14 +412,11 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return fmt.Errorf("missing volume IDs")
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -470,14 +464,11 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
args.NodeID = alloc.NodeID
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
Expand Down Expand Up @@ -937,14 +928,11 @@ func (v *CSIVolume) checkpointClaim(vol *structs.CSIVolume, claim *structs.CSIVo
Namespace: vol.Namespace,
},
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
_, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, req)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
vol.ModifyIndex = index
return nil
}
Expand Down Expand Up @@ -1025,13 +1013,10 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
multierror.Append(&mErr, respErr)
multierror.Append(&mErr, err)
}

err = mErr.ErrorOrNil()
Expand Down Expand Up @@ -1120,14 +1105,11 @@ func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.
VolumeIDs: args.VolumeIDs,
WriteRequest: args.WriteRequest,
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
_, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
Expand Down Expand Up @@ -1570,16 +1552,12 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.
return fmt.Errorf("missing plugin ID")
}

resp, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
_, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
return err
}

if respErr, ok := resp.(error); ok {
return respErr
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
Expand Down
19 changes: 3 additions & 16 deletions nomad/drainer_shims.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent
}
}

resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return d.convertApplyErrors(resp, index, err)
_, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
return index, err
}

func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) {
Expand All @@ -38,19 +38,6 @@ func (d drainerShim) AllocUpdateDesiredTransition(allocs map[string]*structs.Des
Evals: evals,
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}
resp, index, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return d.convertApplyErrors(resp, index, err)
}

// convertApplyErrors parses the results of a raftApply and returns the index at
// which it was applied and any error that occurred. Raft Apply returns two
// separate errors, Raft library errors and user returned errors from the FSM.
// This helper, joins the errors by inspecting the applyResponse for an error.
func (d drainerShim) convertApplyErrors(applyResp interface{}, index uint64, err error) (uint64, error) {
if applyResp != nil {
if fsmErr, ok := applyResp.(error); ok && fsmErr != nil {
return index, fsmErr
}
}
_, index, err := d.s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
return index, err
}
23 changes: 9 additions & 14 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
args.Deployment = j.multiregionCreateDeployment(job, eval)

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
j.logger.Error("registering job failed", "error", err, "fsm", true)
return err
}
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.logger.Error("registering job failed", "error", err, "raft", true)
j.logger.Error("registering job failed", "error", err)
return err
}

Expand Down Expand Up @@ -812,6 +808,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
if err != nil {
return err
}
if job == nil {
return nil
}

var eval *structs.Evaluation

Expand All @@ -820,7 +819,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
now := time.Now().UnixNano()

// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
if !(job.IsPeriodic() || job.IsParameterized()) {

// The evaluation priority is determined by several factors. It
// defaults to the job default priority and is overridden by the
Expand All @@ -829,7 +828,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// If the user supplied an eval priority override, we subsequently
// use this.
priority := structs.JobDefaultPriority
if job != nil {
if job.Priority > 0 {
priority = job.Priority
}
if args.EvalPriority > 0 {
Expand Down Expand Up @@ -1930,13 +1929,9 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
}

// Commit this update via Raft
fsmErr, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err, ok := fsmErr.(error); ok && err != nil {
j.logger.Error("dispatched job register failed", "error", err, "fsm", true)
return err
}
_, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err != nil {
j.logger.Error("dispatched job register failed", "error", err, "raft", true)
j.logger.Error("dispatched job register failed", "error")
return err
}

Expand Down
55 changes: 9 additions & 46 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -3413,13 +3414,11 @@ func TestJobEndpoint_Deregister_ACL(t *testing.T) {
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)

// Deregistration is not idempotent, produces a new eval after the job is
// deregistered. TODO(langmartin) make it idempotent.
// Deregistration is idempotent
var validResp2 structs.JobDeregisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req, &validResp2)
require.NoError(err)
require.NotEqual("", validResp2.EvalID)
require.NotEqual(validResp.EvalID, validResp2.EvalID)
must.NoError(t, err)
must.Eq(t, "", validResp2.EvalID)
}

func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
Expand All @@ -3442,51 +3441,15 @@ func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
},
}
var resp2 structs.JobDeregisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2))
must.Eq(t, 0, resp2.JobModifyIndex, must.Sprint("expected no modify index"))

// Lookup the evaluation
state := s1.fsm.State()
ws := memdb.NewWatchSet()
eval, err := state.EvalByID(ws, resp2.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != resp2.EvalCreateIndex {
t.Fatalf("index mis-match")
}

if eval.Priority != structs.JobDefaultPriority {
t.Fatalf("bad: %#v", eval)
}
if eval.Type != structs.JobTypeService {
t.Fatalf("bad: %#v", eval)
}
if eval.TriggeredBy != structs.EvalTriggerJobDeregister {
t.Fatalf("bad: %#v", eval)
}
if eval.JobID != jobID {
t.Fatalf("bad: %#v", eval)
}
if eval.JobModifyIndex != resp2.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("eval CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("eval ModifyTime is unset: %#v", eval)
}
eval, err := state.EvalsByJob(ws, structs.DefaultNamespace, jobID)
must.NoError(t, err)
must.Nil(t, eval)
}

func TestJobEndpoint_Deregister_EvalPriority(t *testing.T) {
Expand Down
14 changes: 2 additions & 12 deletions nomad/namespace_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,11 @@ func (n *Namespace) UpsertNamespaces(args *structs.NamespaceUpsertRequest,
}

// Update via Raft
out, index, err := n.srv.raftApply(structs.NamespaceUpsertRequestType, args)
_, index, err := n.srv.raftApply(structs.NamespaceUpsertRequestType, args)
if err != nil {
return err
}

// Check if there was an error when applying.
if err, ok := out.(error); ok && err != nil {
return err
}

// Update the index
reply.Index = index
return nil
Expand Down Expand Up @@ -105,16 +100,11 @@ func (n *Namespace) DeleteNamespaces(args *structs.NamespaceDeleteRequest, reply
}

// Update via Raft
out, index, err := n.srv.raftApply(structs.NamespaceDeleteRequestType, args)
_, index, err := n.srv.raftApply(structs.NamespaceDeleteRequestType, args)
if err != nil {
return err
}

// Check if there was an error when applying.
if err, ok := out.(error); ok && err != nil {
return err
}

// Update the index
reply.Index = index
return nil
Expand Down
6 changes: 1 addition & 5 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,6 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
op.logger.Error("failed applying AutoPilot configuration", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}

// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
Expand Down Expand Up @@ -325,9 +322,8 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
if err != nil {
op.logger.Error("failed applying Scheduler configuration", "error", err)
return err
} else if respErr, ok := resp.(error); ok {
return respErr
}

// If CAS request, raft returns a boolean indicating if the update was applied.
// Otherwise, assume success
reply.Updated = true
Expand Down
5 changes: 1 addition & 4 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
Namespace: job.Namespace,
},
}
fsmErr, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err, ok := fsmErr.(error); ok && err != nil {
return nil, err
}
_, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
return nil, err
}
Expand Down
Loading