Skip to content

Commit

Permalink
make diffSystemAllocsForNode aware of eligibility
Browse files Browse the repository at this point in the history
diffSystemAllocs -> diffSystemAllocsForNode, this function is only used
for diffing system allocations, but lacked awareness of eligible
nodes and the node ID that the allocation was going to be placed.

This change now ignores a change if its existing allocation is on an
ineligible node. For a new allocation, it also checks tainted and
ineligible nodes in the same function instead of nil-ing out the diff
after computation in diffSystemAllocs
  • Loading branch information
drewbailey committed Jan 30, 2020
1 parent 6f6836d commit f78a18b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 59 deletions.
23 changes: 14 additions & 9 deletions scheduler/system_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,11 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) {

}

// This test ensures that the scheduler correctly ignores ineligible
// nodes when scheduling due to a new node being added. The job has two
// task groups contrained to a particular node class. The desired behavior
// should be that the TaskGroup constrained to the newly added node class is
// added and that the TaskGroup constrained to the ineligible node is ignored.
func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
h := NewHarness(t)

Expand Down Expand Up @@ -1376,8 +1381,8 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
require.Equal(t, 0, val)

// Single plan with two NodeAllocations
require.Equal(t, 1, len(h.Plans))
require.Equal(t, 2, len(h.Plans[0].NodeAllocation))
require.Len(t, h.Plans, 1)
require.Len(t, h.Plans[0].NodeAllocation, 2)

// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible
Expand All @@ -1402,7 +1407,7 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {

// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[0].NodeAllocation {
require.Equal(t, 1, len(allocs))
require.Len(t, allocs, 1)
require.Equal(t, eval.ID, allocs[0].EvalID)
}

Expand Down Expand Up @@ -1432,27 +1437,27 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
// Ensure no failed TG allocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))

require.Equal(t, 2, len(h.Plans))
require.Equal(t, 1, len(h.Plans[1].NodeAllocation))
require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[1].NodeAllocation {
require.Equal(t, 1, len(allocs))
require.Len(t, allocs, 1)
require.Equal(t, eval3.ID, allocs[0].EvalID)
}

ws := memdb.NewWatchSet()

allocsNodeOne, err := h.State.AllocsByNode(ws, node.ID)
require.NoError(t, err)
require.Equal(t, 1, len(allocsNodeOne))
require.Len(t, allocsNodeOne, 1)

allocsNodeTwo, err := h.State.AllocsByNode(ws, nodeB.ID)
require.NoError(t, err)
require.Equal(t, 1, len(allocsNodeTwo))
require.Len(t, allocsNodeTwo, 1)

allocsNodeThree, err := h.State.AllocsByNode(ws, nodeBTwo.ID)
require.NoError(t, err)
require.Equal(t, 1, len(allocsNodeThree))
require.Len(t, allocsNodeThree, 1)
}

// No errors reported when no available nodes prevent placement
Expand Down
85 changes: 36 additions & 49 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (d *diffResult) Append(other *diffResult) {
d.lost = append(d.lost, other.lost...)
}

// diffAllocs is used to do a set difference between the target allocations
// and the existing allocations. This returns 6 sets of results, the list of
// named task groups that need to be placed (no existing allocation), the
// diffSystemAllocsForNode is used to do a set difference between the target allocations
// and the existing allocations for a particular node. This returns 6 sets of results,
// the list of named task groups that need to be placed (no existing allocation), the
// allocations that need to be updated (job definition is newer), allocs that
// need to be migrated (node is draining), the allocs that need to be evicted
// (no longer required), those that should be ignored and those that are lost
Expand All @@ -67,7 +67,8 @@ func (d *diffResult) Append(other *diffResult) {
// required is a set of allocations that must exist.
// allocs is a list of non terminal allocations.
// terminalAllocs is an index of the latest terminal allocations by name.
func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
func diffSystemAllocsForNode(job *structs.Job, nodeID string,
eligibleNodes, taintedNodes map[string]*structs.Node,
required map[string]*structs.TaskGroup, allocs []*structs.Allocation,
terminalAllocs map[string]*structs.Allocation) *diffResult {
result := &diffResult{}
Expand Down Expand Up @@ -126,6 +127,12 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
continue
}

// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := eligibleNodes[nodeID]; !ok {
goto IGNORE
}

// If the definition is updated we need to update
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Expand All @@ -152,19 +159,37 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,

// Require a placement if no existing allocation. If there
// is an existing allocation, we would have checked for a potential
// update or ignore above.
// update or ignore above. Ignore placements for tainted or
// ineligible nodes
if !ok {
result.place = append(result.place, allocTuple{
// Check if the placement would be for a tainted node
// and ignore if it is.
if _, tainted := taintedNodes[nodeID]; tainted {
continue
}
if _, eligible := eligibleNodes[nodeID]; !eligible {
continue
}

allocTuple := allocTuple{
Name: name,
TaskGroup: tg,
Alloc: terminalAllocs[name],
})
}

// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
if allocTuple.Alloc == nil || allocTuple.Alloc.NodeID != nodeID {
allocTuple.Alloc = &structs.Allocation{NodeID: nodeID}
}
result.place = append(result.place, allocTuple)
}
}
return result
}

// diffSystemAllocs is like diffAllocs however, the allocations in the
// diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the
// diffResult contain the specific nodeID they should be allocated on.
//
// job is the job whose allocs is going to be diff-ed.
Expand All @@ -183,58 +208,20 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[
nodeAllocs[alloc.NodeID] = nallocs
}

knownNodes := make(map[string]struct{})
eligibleNodes := make(map[string]*structs.Node)
for _, node := range nodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
knownNodes[node.ID] = struct{}{}
eligibleNodes[node.ID] = node
}

// Create the required task groups.
required := materializeTaskGroups(job)

result := &diffResult{}
for nodeID, allocs := range nodeAllocs {
diff := diffAllocs(job, taintedNodes, required, allocs, terminalAllocs)

// If the current allocation nodeID is not in the list
// of known nodes to the scheduler, and diffAllocs is
// attempting to update or place an system alloc on an ineligible
// node the diff should be ignored.
if _, ok := knownNodes[nodeID]; !ok && ((len(diff.update) > 0) || (len(diff.place) > 0)) {
for _, existing := range allocs {
tg, ok := required[existing.Name]
if !ok {
continue
}
diff.place = nil
diff.update = nil
diff.ignore = append(diff.ignore, allocTuple{
Name: existing.Name,
TaskGroup: tg,
Alloc: existing,
})
}
}

// If the node is tainted there should be no placements made
if _, ok := taintedNodes[nodeID]; ok {
diff.place = nil
} else {
// Mark the alloc as being for a specific node.
for i := range diff.place {
alloc := &diff.place[i]

// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
if alloc.Alloc == nil || alloc.Alloc.NodeID != nodeID {
alloc.Alloc = &structs.Allocation{NodeID: nodeID}
}
}
}

diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs)
result.Append(diff)
}

Expand Down
9 changes: 8 additions & 1 deletion scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func TestDiffAllocs(t *testing.T) {
*oldJob = *job
oldJob.JobModifyIndex -= 1

eligibleNode := mock.Node()
eligibleNode.ID = "zip"

drainNode := mock.Node()
drainNode.Drain = true

Expand All @@ -47,6 +50,10 @@ func TestDiffAllocs(t *testing.T) {
"drainNode": drainNode,
}

eligible := map[string]*structs.Node{
eligibleNode.ID: eligibleNode,
}

allocs := []*structs.Allocation{
// Update the 1st
{
Expand Down Expand Up @@ -113,7 +120,7 @@ func TestDiffAllocs(t *testing.T) {
},
}

diff := diffAllocs(job, tainted, required, allocs, terminalAllocs)
diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminalAllocs)
place := diff.place
update := diff.update
migrate := diff.migrate
Expand Down

0 comments on commit f78a18b

Please sign in to comment.