Skip to content

Commit

Permalink
Merge pull request #3669 from hashicorp/b-planapply-eval-modindex
Browse files Browse the repository at this point in the history
Update eval modify index as part of plan apply.
  • Loading branch information
dadgar committed Dec 19, 2017
2 parents e2d4f1c + 039942f commit 259ee6b
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 194 deletions.
67 changes: 67 additions & 0 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,73 @@ func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
// test enqueueing an eval, updating a plan result for the same eval and de-queueing the eval
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil

state := s1.fsm.State()

if err := state.UpsertJob(999, job); err != nil {
t.Fatalf("err: %v", err)
}

eval := mock.Eval()
eval.JobID = job.ID

// Create an eval
if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}

s1.evalBroker.Enqueue(eval)

// Create a plan result and apply it with a later index
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
Job: job,
},
EvalID: eval.ID,
}
assert := assert.New(t)
err := state.UpsertPlanResults(1000, &res)
assert.Nil(err)

// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}

if resp.WaitIndex != 1000 {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1000)
}
}

func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return nil
}

// reconcileSummaries re-calculates the queued allocations for every job that we
// reconcileQueuedAllocations re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Get all the jobs
Expand Down Expand Up @@ -1142,7 +1142,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}

snap.UpsertEvals(100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
if err != nil {
Expand Down
61 changes: 33 additions & 28 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,13 +1213,18 @@ func TestFSM_ApplyPlanResults(t *testing.T) {

alloc.DeploymentID = d.ID

eval := mock.Eval()
eval.JobID = job.ID
fsm.State().UpsertEvals(1, []*structs.Evaluation{eval})

fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
},
Deployment: d,
EvalID: eval.ID,
}
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
if err != nil {
Expand All @@ -1233,32 +1238,32 @@ func TestFSM_ApplyPlanResults(t *testing.T) {

// Verify the allocation is registered
ws := memdb.NewWatchSet()
assert := assert.New(t)
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
assert.Nil(err)
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex

// Job should be re-attached
alloc.Job = job
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
assert.Equal(alloc, out)

dout, err := fsm.State().DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if tg, ok := dout.TaskGroups[alloc.TaskGroup]; !ok || tg.PlacedAllocs != 1 {
t.Fatalf("err: %v %v", tg, err)
}
assert.Nil(err)
tg, ok := dout.TaskGroups[alloc.TaskGroup]
assert.True(ok)
assert.NotNil(tg)
assert.Equal(1, tg.PlacedAllocs)

// Ensure that the original job is used
evictAlloc := alloc.Copy()
job = mock.Job()
job.Priority = 123
eval = mock.Eval()
eval.JobID = job.ID

fsm.State().UpsertEvals(2, []*structs.Evaluation{eval})

evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
Expand All @@ -1267,28 +1272,28 @@ func TestFSM_ApplyPlanResults(t *testing.T) {
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
},
EvalID: eval.ID,
}
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
assert.Nil(err)

resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
log := makeLog(buf)
//set the index to something other than 1
log.Index = 25
resp = fsm.Apply(log)
assert.Nil(resp)

// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("alloc found!")
}
if out.Job == nil || out.Job.Priority == 123 {
t.Fatalf("bad job")
}
assert.Nil(err)
assert.Equal(structs.AllocDesiredStatusEvict, out.DesiredStatus)
assert.NotNil(out.Job)
assert.NotEqual(123, out.Job.Priority)

evalOut, err := fsm.State().EvalByID(ws, eval.ID)
assert.Nil(err)
assert.Equal(log.Index, evalOut.ModifyIndex)

}

func TestFSM_DeploymentStatusUpdate(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
AnnotatePlan: true,
}

snap.UpsertEvals(100, []*structs.Evaluation{eval})

// Create an in-memory Planner that returns no errors and stores the
// submitted plan and created evals.
planner := &scheduler.Harness{
Expand Down
1 change: 1 addition & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *Server) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
Expand Down
Loading

0 comments on commit 259ee6b

Please sign in to comment.