Skip to content

Commit

Permalink
scheduler: system scheduler should reconcile overlapping live allocs
Browse files Browse the repository at this point in the history
The system scheduler uses a separate code path for reconciliation. During the
investigation into the "plan for node rejected" bug which was fixed in #16401,
it was discovered this code path doesn't maintain the invariant that no more
than 1 allocation per system job task group (or `count` allocations for sysbatch
jobs) should be left running on a given client. While this condition should be
impossible to encounter, the scheduler should be reconciling these cases.

Add a new `ensureSingleSystemAlloc` function that enforces the invariant that
there can be only a single desired-running allocation for a given system job on
a given node.
  • Loading branch information
tgross committed Mar 15, 2023
1 parent 5f37b2f commit 1172f1a
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 66 deletions.
3 changes: 3 additions & 0 deletions .changelog/16097.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug where overlapping system allocations would not be stopped
```
7 changes: 4 additions & 3 deletions nomad/mock/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,10 @@ func SystemJob() *structs.Job {
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
Expand Down
69 changes: 67 additions & 2 deletions scheduler/scheduler_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"time"

memdb "github.com/hashicorp/go-memdb"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

func TestSystemSched_JobRegister(t *testing.T) {
Expand Down Expand Up @@ -3084,3 +3086,66 @@ func TestSystemSched_CSITopology(t *testing.T) {
must.Eq(t, structs.EvalStatusComplete, h.Evals[0].Status)

}

func TestSystemSched_OverlappingAllocations(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)

node := mock.Node()
node.Name = "good-node-1"
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Create 2 allocs for 2 versions of the same system job on the same node.
// This should not happen but has been observed, so the scheduler must "fix"
// this by stopping the old version.
oldJob := mock.SystemJob()
oldJob.ID = "my-job"
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), oldJob))

job := oldJob.Copy()
job.Version++
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

allocOld := mock.Alloc()
allocOld.Job = oldJob
allocOld.JobID = oldJob.ID
allocOld.NodeID = node.ID
allocOld.Name = "my-job.web[0]"

allocCurrent := allocOld.Copy()
allocCurrent.ID = uuid.Generate()
allocCurrent.Job = job

must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(),
[]*structs.Allocation{
allocOld, allocCurrent}))

// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
lastIndex := h.NextIndex()
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
lastIndex, []*structs.Evaluation{eval}))

// Process the evaluation
must.NoError(t, h.Process(NewSystemScheduler, eval))

// Ensure a single plan
must.Len(t, 1, h.Plans)
plan := h.Plans[0]
must.Nil(t, plan.Annotations, must.Sprint("expected no annotations"))

test.Len(t, 1, plan.NodeUpdate[node.ID], test.Sprint("expected 1 evict/stop"))
if len(plan.NodeUpdate[node.ID]) > 0 {
test.Eq(t, allocOld.ID, plan.NodeUpdate[node.ID][0].ID,
test.Sprintf("expected alloc=%s to be evicted/stopped", allocOld.ID))
}

test.Len(t, 0, plan.NodeAllocation[node.ID], test.Sprint("expected no updates"))
}
180 changes: 134 additions & 46 deletions scheduler/system_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package scheduler

import (
"fmt"
"sort"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

// materializeSystemTaskGroups is used to materialize all the task groups
// a system or sysbatch job requires.
// materializeSystemTaskGroups is used to materialize all the task groups a
// system or sysbatch job requires. Returns a map of allocation names to task
// groups.
func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
out := make(map[string]*structs.TaskGroup)
if job.Stopped() {
Expand Down Expand Up @@ -45,24 +47,34 @@ func diffSystemAllocsForNode(
eligibleNodes map[string]*structs.Node,
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node id)
required map[string]*structs.TaskGroup, // set of allocations that must exist
required map[string]*structs.TaskGroup, // set of task groups (by alloc name) that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
) *diffResult {
result := new(diffResult)

// Scan the existing updates
existing := make(map[string]struct{}) // set of alloc names
// Track a map of task group names (both those required and not) to
// diffResult for allocations of that name. This lets us enforce task-group
// global invariants before we merge all the results together for the node.
results := map[string]*diffResult{}

// Track the set of allocation names we've
// seen, so we can determine if new placements are needed.
existing := make(map[string]struct{})

for _, exist := range allocs {
// Index the existing node
name := exist.Name
existing[name] = struct{}{}

// Check for the definition in the required set
tg, ok := required[name]
result := results[name]
if result == nil {
result = new(diffResult)
results[name] = result
}

// If not required, we stop the alloc
// Check if the allocation's task group is in the required set (it might
// have been dropped from the jobspec). If not, we stop the alloc
tg, ok := required[name]
if !ok {
result.stop = append(result.stop, allocTuple{
Name: name,
Expand All @@ -88,8 +100,8 @@ func diffSystemAllocsForNode(
}
}

// If we have been marked for migration and aren't terminal, migrate
if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() {
// If we have been marked for migration, migrate
if exist.DesiredTransition.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{
Name: name,
TaskGroup: tg,
Expand All @@ -98,16 +110,6 @@ func diffSystemAllocsForNode(
continue
}

// If we are a sysbatch job and terminal, ignore (or stop?) the alloc
if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() {
result.ignore = append(result.ignore, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}

// Expired unknown allocs are lost. Expired checks that status is unknown.
if supportsDisconnectedClients && expired {
result.lost = append(result.lost, allocTuple{
Expand All @@ -118,20 +120,15 @@ func diffSystemAllocsForNode(
continue
}

taintedNode, nodeIsTainted := taintedNodes[exist.NodeID]

// Ignore unknown allocs that we want to reconnect eventually.
if supportsDisconnectedClients &&
exist.ClientStatus == structs.AllocClientStatusUnknown &&
exist.DesiredStatus == structs.AllocDesiredStatusRun {
result.ignore = append(result.ignore, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
goto IGNORE
}

node, nodeIsTainted := taintedNodes[exist.NodeID]

// Filter allocs on a node that is now re-connected to reconnecting.
if supportsDisconnectedClients &&
!nodeIsTainted &&
Expand All @@ -152,19 +149,20 @@ func diffSystemAllocsForNode(
// If we are on a tainted node, we must migrate if we are a service or
// if the batch allocation did not finish
if nodeIsTainted {
// If the job is batch and finished successfully, the fact that the
// node is tainted does not mean it should be migrated or marked as
// lost as the work was already successfully finished. However for
// service/system jobs, tasks should never complete. The check of
// batch type, defends against client bugs.
// If the job is batch and finished successfully (but not yet marked
// terminal), the fact that the node is tainted does not mean it
// should be migrated or marked as lost as the work was already
// successfully finished. However for service/system jobs, tasks
// should never complete. The check of batch type, defends against
// client bugs.
if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() {
goto IGNORE
}

// Filter running allocs on a node that is disconnected to be marked as unknown.
if node != nil &&
if taintedNode != nil &&
supportsDisconnectedClients &&
node.Status == structs.NodeStatusDisconnected &&
taintedNode.Status == structs.NodeStatusDisconnected &&
exist.ClientStatus == structs.AllocClientStatusRunning {

disconnect := exist.Copy()
Expand All @@ -179,7 +177,7 @@ func diffSystemAllocsForNode(
continue
}

if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) {
if taintedNode == nil || taintedNode.TerminalStatus() {
result.lost = append(result.lost, allocTuple{
Name: name,
TaskGroup: tg,
Expand Down Expand Up @@ -226,13 +224,32 @@ func diffSystemAllocsForNode(
TaskGroup: tg,
Alloc: exist,
})

}

// Scan the required groups
for name, tg := range required {

result := results[name]
if result == nil {
result = new(diffResult)
results[name] = result
}

// Check for an existing allocation
if _, ok := existing[name]; !ok {
if _, ok := existing[name]; ok {

// Assert that we don't have any extraneous allocations for this
// task group
count := tg.Count
if count == 0 {
count = 1
}
if len(result.ignore)+len(result.update)+len(result.reconnecting) > count {
ensureMaxSystemAllocCount(result, count)
}

} else {

// Check for a terminal sysbatch allocation, which should be not placed
// again unless the job has been updated.
Expand Down Expand Up @@ -288,7 +305,78 @@ func diffSystemAllocsForNode(
result.place = append(result.place, allocTuple)
}
}
return result

finalResult := new(diffResult)
for _, result := range results {
finalResult.Append(result)
}

return finalResult
}

// ensureMaxSystemAllocCount enforces the invariant that the per-node diffResult
// we have for a system or sysbatch job should never have more than "count"
// allocations in a desired-running state.
func ensureMaxSystemAllocCount(result *diffResult, count int) {

// sort descending by JobModifyIndex, then CreateIndex. The inputs are the
// ignore/update/reconnecting allocations for a single task group that were
// assigned to a single node, so that constrains the size of these slices
// and sorting won't be too expensive
sortTuples := func(tuples []allocTuple) {
sort.Slice(tuples, func(i, j int) bool {
I, J := tuples[i].Alloc, tuples[j].Alloc
if I.Job.JobModifyIndex == J.Job.JobModifyIndex {
return I.CreateIndex > J.CreateIndex
}
return I.Job.JobModifyIndex > J.Job.JobModifyIndex
})
}

// ignored allocs are current, so pick the most recent one and stop all the
// rest of the running allocs on the node
if len(result.ignore) > 0 {
if len(result.ignore) > count {
sortTuples(result.ignore)
result.stop = append(result.stop, result.ignore[count:]...)
result.ignore = result.ignore[:count]
}
count = count - len(result.ignore) // reduce the remaining count
if count < 1 {
result.stop = append(result.stop, result.update...)
result.stop = append(result.stop, result.reconnecting...)
result.update = []allocTuple{}
result.reconnecting = []allocTuple{}
return
}
}

// updated allocs are for updates of the job (in-place or destructive), so
// we can pick the most recent one and stop all the rest of the running
// allocs on the node
if len(result.update) > 0 {
if len(result.update) > count {
sortTuples(result.update)
result.stop = append(result.stop, result.update[count:]...)
result.update = result.update[:count]
}
count = count - len(result.update) // reduce the remaining count
if count < 1 {
result.stop = append(result.stop, result.reconnecting...)
result.reconnecting = []allocTuple{}
return
}
}

// reconnecting allocs are for when a node reconnects after being lost with
// running allocs. we should only see this case if we got out of sync but
// never got a chance to eval before the node disconnected. clean up the
// remaining mess.
if len(result.reconnecting) > count {
sortTuples(result.reconnecting)
result.stop = append(result.stop, result.reconnecting[count:]...)
result.reconnecting = result.reconnecting[:count]
}
}

// diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the
Expand All @@ -304,15 +392,15 @@ func diffSystemAllocs(
) *diffResult {

// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
allocsByNode := make(map[string][]*structs.Allocation, len(allocs))
for _, alloc := range allocs {
nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc)
allocsByNode[alloc.NodeID] = append(allocsByNode[alloc.NodeID], alloc)
}

eligibleNodes := make(map[string]*structs.Node)
for _, node := range readyNodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
if _, ok := allocsByNode[node.ID]; !ok {
allocsByNode[node.ID] = nil
}
eligibleNodes[node.ID] = node
}
Expand All @@ -321,8 +409,8 @@ func diffSystemAllocs(
required := materializeSystemTaskGroups(job)

result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients)
for nodeID, allocsForNode := range allocsByNode {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocsForNode, terminal, serverSupportsDisconnectedClients)
result.Append(diff)
}

Expand Down
Loading

0 comments on commit 1172f1a

Please sign in to comment.