diff --git a/Vagrantfile b/Vagrantfile index 96d70fab9c33..6f744f6849f0 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -2,7 +2,7 @@ # vi: set ft=ruby : # -LINUX_BASE_BOX = "bento/ubuntu-16.04" +LINUX_BASE_BOX = "bento/ubuntu-18.04" FREEBSD_BASE_BOX = "freebsd/FreeBSD-11.3-STABLE" LINUX_IP_ADDRESS = "10.199.0.200" diff --git a/nomad/fsm.go b/nomad/fsm.go index 10ca959fb76a..98a65590baf5 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -555,6 +555,15 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { } } + // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, + // so this may be nil during server upgrades. + if req.Eval != nil { + req.Eval.JobModifyIndex = index + if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + return err + } + } + return nil } @@ -565,14 +574,32 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - return n.state.WithWriteTransaction(func(tx state.Txn) error { - if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil { + err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + + if err != nil { n.logger.Error("deregistering job failed", "error", err) return err } return nil }) + + // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, + // so this may be nil during server upgrades. + // always attempt upsert eval even if job deregister fail + if req.Eval != nil { + req.Eval.JobModifyIndex = index + if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + return err + } + } + + if err != nil { + return err + } + + return nil } func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a1b5df3c8f97..1980dbd01dae 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -312,10 +312,39 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + // Create a new evaluation + now := time.Now().UnixNano() + submittedEval := false + var eval *structs.Evaluation + + // Set the submit time + args.Job.SubmitTime = now + + // If the job is periodic or parameterized, we don't create an eval. + if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: args.Job.Priority, + Type: args.Job.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: args.Job.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + reply.EvalID = eval.ID + } + // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { - // Set the submit time - args.Job.SetSubmitTime() + + // COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval + // 0.12.1 introduced atomic eval job registration + if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + args.Eval = eval + submittedEval = true + } // Commit this update via Raft fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) @@ -330,6 +359,12 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Populate the reply with job information reply.JobModifyIndex = index + reply.Index = index + + if submittedEval { + reply.EvalCreateIndex = index + } + } else { reply.JobModifyIndex = existingJob.JobModifyIndex } @@ -337,43 +372,29 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // used for multiregion start args.Job.JobModifyIndex = reply.JobModifyIndex - // If the job is periodic or parameterized, we don't create an eval. - if args.Job.IsPeriodic() || args.Job.IsParameterized() { + if eval == nil { return nil } - // Create a new evaluation - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: args.Job.Priority, - Type: args.Job.Type, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: args.Job.ID, - JobModifyIndex: reply.JobModifyIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } + if eval != nil && !submittedEval { + eval.JobModifyIndex = reply.JobModifyIndex + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "register") - return err - } + // Commit this evaluation via Raft + // There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not, before 0.12.1 + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "register") + return err + } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex + } // Kick off a multiregion deployment (enterprise only). if isRunner { @@ -689,7 +710,7 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis } // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), @@ -766,6 +787,33 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD } } + var eval *structs.Evaluation + + // The job priority / type is strange for this, since it's not a high + // priority even if the job was. + now := time.Now().UnixNano() + + // If the job is periodic or parameterized, we don't create an eval. + if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: args.JobID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + reply.EvalID = eval.ID + } + + // COMPAT(1.1.0): remove conditional and always set args.Eval + if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + args.Eval = eval + } + // Commit the job update via Raft _, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args) if err != nil { @@ -775,6 +823,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index + reply.EvalCreateIndex = index + reply.Index = index // Make a raft apply to release the CSI volume claims of terminal allocs. var result *multierror.Error @@ -783,44 +833,28 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD result = multierror.Append(result, err) } - // If the job is periodic or parameterized, we don't create an eval. - if job != nil && (job.IsPeriodic() || job.IsParameterized()) { - return nil - } + // COMPAT(1.1.0) - Remove entire conditional block + // 0.12.1 introduced atomic job deregistration eval + if eval != nil && args.Eval == nil { + // Create a new evaluation + eval.JobModifyIndex = index + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } - // Create a new evaluation - // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerJobDeregister, - JobID: args.JobID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } + // Commit this evaluation via Raft + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + result = multierror.Append(result, err) + j.logger.Error("eval create failed", "error", err, "method", "deregister") + return result.ErrorOrNil() + } - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - result = multierror.Append(result, err) - j.logger.Error("eval create failed", "error", err, "method", "deregister") - return result.ErrorOrNil() + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex return result.ErrorOrNil() } @@ -883,7 +917,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st } // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: jobNS.Namespace, @@ -976,7 +1010,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName)) } - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() // If the count is present, commit the job update via Raft // for now, we'll do this even if count didn't change @@ -1651,7 +1685,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } // Create an eval and mark it as requiring annotations and insert that as well - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), @@ -1849,7 +1883,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // If the job is periodic, we don't create an eval. if !dispatchJob.IsPeriodic() { // Create a new evaluation - now := time.Now().UTC().UnixNano() + now := time.Now().UnixNano() eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e1dc0727ab33..9edd97c279c5 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -10,6 +10,7 @@ import ( memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/raft" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -1571,6 +1572,309 @@ func TestJobEndpoint_Register_SemverConstraint(t *testing.T) { }) } +// TestJobEndpoint_Register_EvalCreation_Modern asserts that job register creates an eval +// atomically with the registration +func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + //// initial registration should create the job and a new eval + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.NotEmpty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + + //// re-registration should create a new eval, but leave the job untouched + var resp2 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2) + require.NoError(t, err) + require.NotZero(t, resp2.Index) + require.NotEmpty(t, resp2.EvalID) + require.NotEqual(t, resp.EvalID, resp2.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp2.JobModifyIndex, out.CreateIndex) + require.Equal(t, out.CreateIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, raftEval, eval) + + //// an update should update the job and create a new eval + req.Job.TaskGroups[0].Name += "a" + var resp3 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3) + require.NoError(t, err) + require.NotZero(t, resp3.Index) + require.NotEmpty(t, resp3.EvalID) + require.NotEqual(t, resp.EvalID, resp3.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp3.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) + + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.Empty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + }) +} + +// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job register creates an eval +// atomically with the registration, but handle legacy clients by adding a new eval update +func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + + // simulate presense of a server that doesn't handle + // new registration eval + c.Build = "0.12.0" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // keep s1 as the leader + if leader, _ := s1.getLeader(); !leader { + s1, s2 = s2, s1 + } + + codec := rpcClient(t, s1) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + //// initial registration should create the job and a new eval + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.NotEmpty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, eval, raftEval) + + //// re-registration should create a new eval, but leave the job untouched + var resp2 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2) + require.NoError(t, err) + require.NotZero(t, resp2.Index) + require.NotEmpty(t, resp2.EvalID) + require.NotEqual(t, resp.EvalID, resp2.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp2.JobModifyIndex, out.CreateIndex) + require.Equal(t, out.CreateIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) + + // this raft eval is the one found above + raftEval = evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, eval, raftEval) + + //// an update should update the job and create a new eval + req.Job.TaskGroups[0].Name += "a" + var resp3 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3) + require.NoError(t, err) + require.NotZero(t, resp3.Index) + require.NotEmpty(t, resp3.EvalID) + require.NotEqual(t, resp.EvalID, resp3.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp3.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) + + raftEval = evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, eval, raftEval) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.Empty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + }) +} + +// evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval +func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation { + var store raft.LogStore = s.raftInmem + if store == nil { + store = s.raftStore + } + require.NotNil(t, store) + + li, _ := store.LastIndex() + for i, _ := store.FirstIndex(); i <= li; i++ { + var log raft.Log + err := store.GetLog(i, &log) + require.NoError(t, err) + + if log.Type != raft.LogCommand { + continue + } + + if structs.MessageType(log.Data[0]) != structs.EvalUpdateRequestType { + continue + } + + var req structs.EvalUpdateRequest + structs.Decode(log.Data[1:], &req) + require.NoError(t, err) + + for _, eval := range req.Evals { + if eval.ID == evalID { + eval.CreateIndex = i + eval.ModifyIndex = i + return eval + } + } + } + + return nil +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -2839,6 +3143,184 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { } } +// TestJobEndpoint_Deregister_EvalCreation_Modern asserts that job deregister creates an eval +// atomically with the registration +func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + t.Run("job de-registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.NotEmpty(t, resp2.EvalID) + + state := s1.fsm.State() + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(t, err) + require.NotNil(t, eval) + require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) + + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.Empty(t, resp2.EvalID) + }) +} + +// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job deregister creates an eval +// atomically with the registration, but handle legacy clients by adding a new eval update +func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + + // simulate presense of a server that doesn't handle + // new registration eval + c.Build = "0.12.0" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // keep s1 as the leader + if leader, _ := s1.getLeader(); !leader { + s1, s2 = s2, s1 + } + + codec := rpcClient(t, s1) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.NotEmpty(t, resp2.EvalID) + + state := s1.fsm.State() + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(t, err) + require.NotNil(t, eval) + require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, eval, raftEval) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.Empty(t, resp2.EvalID) + }) +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/leader.go b/nomad/leader.go index ca071d2c8325..c19b2159b261 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -46,6 +46,8 @@ var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0")) var minClusterIDVersion = version.Must(version.NewVersion("0.10.4")) +var minJobRegisterAtomicEvalVersion = version.Must(version.NewVersion("0.12.1")) + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes diff --git a/nomad/periodic.go b/nomad/periodic.go index 0a64fac7dfac..a2ea2219f3ef 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -46,10 +46,24 @@ type JobEvalDispatcher interface { // DispatchJob creates an evaluation for the passed job and commits both the // evaluation and the job to the raft log. It returns the eval. func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { + now := time.Now().UTC().UnixNano() + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerPeriodicJob, + JobID: job.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + // Commit this update via Raft job.SetSubmitTime() req := structs.JobRegisterRequest{ - Job: job, + Job: job, + Eval: eval, WriteRequest: structs.WriteRequest{ Namespace: job.Namespace, }, @@ -62,35 +76,30 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { return nil, err } - // Create a new evaluation - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: job.Namespace, - Priority: job.Priority, - Type: job.Type, - TriggeredBy: structs.EvalTriggerPeriodicJob, - JobID: job.ID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - } - - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - return nil, err + eval.CreateIndex = index + eval.ModifyIndex = index + + // COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration + if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) { + // Create a new evaluation + eval.JobModifyIndex = index + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + + // Commit this evaluation via Raft + // There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not, before Nomad 0.12.1 + _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + return nil, err + } + + // Update its indexes. + eval.CreateIndex = evalIndex + eval.ModifyIndex = evalIndex } - // Update its indexes. - eval.CreateIndex = evalIndex - eval.ModifyIndex = evalIndex return eval, nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cc0550aba564..a5ad2eea259f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -563,6 +563,9 @@ type JobRegisterRequest struct { // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool + // Eval is the evaluation that is associated with the job registration + Eval *Evaluation + WriteRequest } @@ -576,6 +579,9 @@ type JobDeregisterRequest struct { // garbage collector Purge bool + // Eval is the evaluation to create that's associated with job deregister + Eval *Evaluation + WriteRequest }