Skip to content

Commit

Permalink
Merge pull request #1516 from hashicorp/f-lost-state-sched
Browse files Browse the repository at this point in the history
Make scheduler mark allocations as lost
  • Loading branch information
dadgar committed Aug 4, 2016
2 parents ca857d9 + 6b2e9ea commit eaf083a
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 198 deletions.
63 changes: 55 additions & 8 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.JobID = eval.JobID
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})

// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
alloc2.JobID = eval.JobID
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -68,6 +75,14 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
if outA != nil {
t.Fatalf("bad: %v", outA)
}

outA2, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 != nil {
t.Fatalf("bad: %v", outA2)
}
}

// An EvalGC should never reap a batch job
Expand Down Expand Up @@ -101,7 +116,15 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})

// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -141,6 +164,14 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
t.Fatalf("bad: %v", outA)
}

outA2, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}

outB, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -170,16 +201,24 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})

// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.JobID = alloc.JobID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "running" alloc
alloc2 := mock.Alloc()
alloc2.EvalID = eval.ID
state.UpsertJobSummary(1003, mock.JobSummary(alloc2.JobID))
err = state.UpsertAllocs(1004, []*structs.Allocation{alloc2})
alloc3 := mock.Alloc()
alloc3.EvalID = eval.ID
state.UpsertJobSummary(1003, mock.JobSummary(alloc3.JobID))
err = state.UpsertAllocs(1004, []*structs.Allocation{alloc3})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -211,7 +250,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
t.Fatalf("bad: %v", out)
}

outA, err := state.AllocByID(alloc2.ID)
outA, err := state.AllocByID(alloc3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -227,6 +266,14 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
if outB != nil {
t.Fatalf("bad: %v", outB)
}

outC, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outC != nil {
t.Fatalf("bad: %v", outC)
}
}

func TestCoreScheduler_EvalGC_Force(t *testing.T) {
Expand Down
33 changes: 7 additions & 26 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,30 +228,6 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
return fmt.Errorf("index update failed: %v", err)
}

// Update the state of the allocations which are in running state to lost
if status == structs.NodeStatusDown {
allocs, err := s.AllocsByNode(nodeID)
if err != nil {
return fmt.Errorf("error retrieving any allocations for the node: %v", nodeID)
}
for _, alloc := range allocs {
copyAlloc := alloc.Copy()
if alloc.ClientStatus == structs.AllocClientStatusPending ||
alloc.ClientStatus == structs.AllocClientStatusRunning {
copyAlloc.ClientStatus = structs.AllocClientStatusLost

// Updating the summary since we are changing the state of the
// allocation to lost
if err := s.updateSummaryWithAlloc(index, copyAlloc, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
}
}
}

txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
Expand Down Expand Up @@ -963,8 +939,13 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
alloc.CreateIndex = exist.CreateIndex
alloc.ModifyIndex = index
alloc.AllocModifyIndex = index
alloc.ClientStatus = exist.ClientStatus
alloc.ClientDescription = exist.ClientDescription

// If the scheduler is marking this allocation as lost we do not
// want to reuse the status of the existing allocation.
if alloc.ClientStatus != structs.AllocClientStatusLost {
alloc.ClientStatus = exist.ClientStatus
alloc.ClientDescription = exist.ClientDescription
}

// The job has been denormalized so re-attach the original job
if alloc.Job == nil {
Expand Down
141 changes: 33 additions & 108 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,114 +136,6 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
t.Fatalf("bad: %d", index)
}

alloc := mock.Alloc()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc.NodeID = node.ID
alloc1.NodeID = node.ID
alloc2.NodeID = node.ID
alloc.ClientStatus = structs.AllocClientStatusPending
alloc1.ClientStatus = structs.AllocClientStatusPending
alloc2.ClientStatus = structs.AllocClientStatusPending

if err := state.UpsertJob(850, alloc.Job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJob(851, alloc1.Job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJob(852, alloc2.Job); err != nil {
t.Fatal(err)
}
if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

// Change the state of the allocs to running and failed
newAlloc := alloc.Copy()
newAlloc.ClientStatus = structs.AllocClientStatusRunning

newAlloc1 := alloc1.Copy()
newAlloc1.ClientStatus = structs.AllocClientStatusFailed

if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil {
t.Fatalf("err: %v", err)
}

// Change the state of the node to down
if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil {
t.Fatalf("err: %v", err)
}

allocOut, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if allocOut.ClientStatus != structs.AllocClientStatusLost {
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, allocOut.ClientStatus)
}

alloc1Out, err := state.AllocByID(alloc1.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if alloc1Out.ClientStatus != structs.AllocClientStatusFailed {
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusFailed, alloc1Out.ClientStatus)
}

alloc2Out, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if alloc2Out.ClientStatus != structs.AllocClientStatusLost {
t.Fatalf("expected alloc status: %v, actual: %v", structs.AllocClientStatusLost, alloc2Out.ClientStatus)
}

js1, _ := state.JobSummaryByID(alloc.JobID)
js2, _ := state.JobSummaryByID(alloc1.JobID)
js3, _ := state.JobSummaryByID(alloc2.JobID)

expectedSummary1 := structs.JobSummary{
JobID: alloc.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Lost: 1,
},
},
CreateIndex: 850,
ModifyIndex: 1004,
}
expectedSummary2 := structs.JobSummary{
JobID: alloc1.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Failed: 1,
},
},
CreateIndex: 851,
ModifyIndex: 1003,
}
expectedSummary3 := structs.JobSummary{
JobID: alloc2.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Lost: 1,
},
},
CreateIndex: 852,
ModifyIndex: 1004,
}

if !reflect.DeepEqual(js1, &expectedSummary1) {
t.Fatalf("expected: %v, got: %v", expectedSummary1, js1)
}
if !reflect.DeepEqual(js2, &expectedSummary2) {
t.Fatalf("expected: %v, got: %#v", expectedSummary2, js2)
}
if !reflect.DeepEqual(js3, &expectedSummary3) {
t.Fatalf("expected: %v, got: %v", expectedSummary3, js3)
}

notify.verify(t)
}

Expand Down Expand Up @@ -1898,6 +1790,39 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
notify.verify(t)
}

// This test ensures that the state store will mark the clients status as lost
// when set rather than preferring the existing status.
func TestStateStore_UpdateAlloc_Lost(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
alloc.ClientStatus = "foo"

if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}

err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

alloc2 := new(structs.Allocation)
*alloc2 = *alloc
alloc2.ClientStatus = structs.AllocClientStatusLost
if err := state.UpsertAllocs(1001, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

out, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}

if out.ClientStatus != structs.AllocClientStatusLost {
t.Fatalf("bad: %#v", out)
}
}

// This test ensures an allocation can be updated when there is no job
// associated with it. This will happen when a job is stopped by an user which
// has non-terminal allocations on clients
Expand Down
15 changes: 11 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,7 +2537,7 @@ func (a *Allocation) TerminalStatus() bool {
}

switch a.ClientStatus {
case AllocClientStatusComplete, AllocClientStatusFailed:
case AllocClientStatusComplete, AllocClientStatusFailed, AllocClientStatusLost:
return true
default:
return false
Expand Down Expand Up @@ -3022,7 +3022,9 @@ type Plan struct {
Annotations *PlanAnnotations
}

func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
// AppendUpdate marks the allocation for eviction. The clientStatus of the
// allocation may be optionally set by passing in a non-empty value.
func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) {
newAlloc := new(Allocation)
*newAlloc = *alloc

Expand All @@ -3038,8 +3040,13 @@ func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil

newAlloc.DesiredStatus = status
newAlloc.DesiredDescription = desc
newAlloc.DesiredStatus = desiredStatus
newAlloc.DesiredDescription = desiredDesc

if clientStatus != "" {
newAlloc.ClientStatus = clientStatus
}

node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
Expand Down
11 changes: 9 additions & 2 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
// allocUpdating is the status used when a job requires an update
allocUpdating = "alloc is being updated due to job update"

// allocLost is the status used when an allocation is lost
allocLost = "alloc is lost since its node is down"

// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"

Expand Down Expand Up @@ -362,7 +365,7 @@ func (s *GenericScheduler) computeJobAllocs() error {

// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "")
}

// Attempt to do the upgrades in place
Expand All @@ -376,7 +379,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate)
limit := len(diff.update) + len(diff.migrate) + len(diff.lost)
if s.job != nil && s.job.Update.Rolling() {
limit = s.job.Update.MaxParallel
}
Expand All @@ -387,6 +390,10 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Treat non in-place updates as an eviction and new placement.
s.limitReached = s.limitReached || evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit)

// Lost allocations should be transistioned to desired status stop and client
// status lost and a new placement should be made
s.limitReached = s.limitReached || markLostAndPlace(s.ctx, diff, diff.lost, allocLost, &limit)

// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
if s.job != nil {
Expand Down
Loading

0 comments on commit eaf083a

Please sign in to comment.