Skip to content

Commit

Permalink
ignore computed diffs if node is ineligible
Browse files Browse the repository at this point in the history
test flakey, add temp sleeps for debugging

fix computed class
  • Loading branch information
drewbailey committed Jan 28, 2020
1 parent 771c8ff commit 3627179
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 19 deletions.
5 changes: 0 additions & 5 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
node, ok := nodeByID[missing.Alloc.NodeID]
if !ok {
s.logger.Debug("could not find node %q", missing.Alloc.NodeID)
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}

s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
continue
}

Expand Down
189 changes: 175 additions & 14 deletions scheduler/system_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,153 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) {

}

func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
h := NewHarness(t)

// Create two nodes
var node *structs.Node
node = mock.Node()
node.NodeClass = "Class-A"
node.ComputeClass()
require.Nil(t, h.State.UpsertNode(h.NextIndex(), node))

var nodeB *structs.Node
nodeB = mock.Node()
nodeB.NodeClass = "Class-B"
nodeB.ComputeClass()
require.Nil(t, h.State.UpsertNode(h.NextIndex(), nodeB))

// Make a job with two task groups, each constraint to a node class
job := mock.SystemJob()
tgA := job.TaskGroups[0]
tgA.Name = "groupA"
tgA.Constraints = []*structs.Constraint{
{
LTarget: "${node.class}",
RTarget: node.NodeClass,
Operand: "=",
},
}
tgB := job.TaskGroups[0].Copy()
tgB.Name = "groupB"
tgB.Constraints = []*structs.Constraint{
{
LTarget: "${node.class}",
RTarget: nodeB.NodeClass,
Operand: "=",
},
}

// Upsert Job
job.TaskGroups = []*structs.TaskGroup{tgA, tgB}
require.Nil(t, h.State.UpsertJob(h.NextIndex(), job))

// Evaluate the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))

require.Nil(t, h.Process(NewSystemScheduler, eval))
require.Equal(t, "complete", h.Evals[0].Status)

// QueuedAllocations is drained
val, ok := h.Evals[0].QueuedAllocations["groupA"]
require.True(t, ok)
require.Equal(t, 0, val)

val, ok = h.Evals[0].QueuedAllocations["groupB"]
require.True(t, ok)
require.Equal(t, 0, val)

// Single plan with two NodeAllocations
require.Equal(t, 1, len(h.Plans))
require.Equal(t, 2, len(h.Plans[0].NodeAllocation))

// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible

// Evaluate the node update
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)

// Ensure no new plans
require.Equal(t, 1, len(h.Plans))

// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[0].NodeAllocation {
require.Equal(t, 1, len(allocs))
require.Equal(t, eval.ID, allocs[0].EvalID)
}

// Add a new node Class-B
var nodeBTwo *structs.Node
nodeBTwo = mock.Node()
nodeBTwo.ComputeClass()
nodeBTwo.NodeClass = "Class-B"
require.Nil(t, h.State.UpsertNode(h.NextIndex(), nodeBTwo))

// Evaluate the new node
eval3 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: nodeBTwo.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

// Ensure New eval is complete
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval3}))
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)

// Ensure no failed TG allocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))

require.Equal(t, 2, len(h.Plans))
require.Equal(t, 1, len(h.Plans[1].NodeAllocation))
// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[1].NodeAllocation {
require.Equal(t, 1, len(allocs))
require.Equal(t, eval3.ID, allocs[0].EvalID)
}

ws := memdb.NewWatchSet()

allocsNodeOne, err := h.State.AllocsByNode(ws, node.ID)
require.NoError(t, err)
require.Equal(t, 1, len(allocsNodeOne))

allocsNodeTwo, err := h.State.AllocsByNode(ws, nodeB.ID)
require.NoError(t, err)
require.Equal(t, 1, len(allocsNodeTwo))

allocsNodeThree, err := h.State.AllocsByNode(ws, nodeBTwo.ID)
require.NoError(t, err)
require.Equal(t, 1, len(allocsNodeThree))
}

// No errors reported when no available nodes prevent placement
func TestSystemSched_NoNodes(t *testing.T) {
func TestSystemSched_ExistingAllocNoNodes(t *testing.T) {
h := NewHarness(t)

var node *structs.Node
Expand Down Expand Up @@ -1348,28 +1493,44 @@ func TestSystemSched_NoNodes(t *testing.T) {

// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible
// Evaluate the job
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)

// Create a new job version, deploy
job2 := job.Copy()
job2.Meta["version"] = "2"
require.Nil(t, h.State.UpsertJob(h.NextIndex(), job2))

eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
// Run evaluation as a plan
eval3 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}

// Ensure New eval is complete
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)
require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval3}))
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)

// Ensure there is a FailedTGAlloc metric
require.Equal(t, 1, len(h.Evals[1].FailedTGAllocs))
// Ensure there are no FailedTGAllocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))
require.Equal(t, 0, h.Evals[2].QueuedAllocations[job2.Name])
}

// No errors reported when constraints prevent placement
Expand Down Expand Up @@ -1697,7 +1858,7 @@ func TestSystemSched_Preemption(t *testing.T) {
var nodes []*structs.Node
for i := 0; i < 2; i++ {
node := mock.Node()
//TODO(preetha): remove in 0.11
// TODO(preetha): remove in 0.11
node.Resources = &structs.Resources{
CPU: 3072,
MemoryMB: 5034,
Expand Down
22 changes: 22 additions & 0 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,12 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[
nodeAllocs[alloc.NodeID] = nallocs
}

knownNodes := make(map[string]struct{})
for _, node := range nodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
knownNodes[node.ID] = struct{}{}
}

// Create the required task groups.
Expand All @@ -196,6 +198,26 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[
for nodeID, allocs := range nodeAllocs {
diff := diffAllocs(job, taintedNodes, required, allocs, terminalAllocs)

// If the current allocation nodeID is not in the list
// of known nodes to the scheduler, and diffAllocs is
// attempting to update or place an system alloc on an ineligible
// node the diff should be ignored.
if _, ok := knownNodes[nodeID]; !ok && ((len(diff.update) > 0) || (len(diff.place) > 0)) {
for _, existing := range allocs {
tg, ok := required[existing.Name]
if !ok {
continue
}
diff.place = nil
diff.update = nil
diff.ignore = append(diff.ignore, allocTuple{
Name: existing.Name,
TaskGroup: tg,
Alloc: existing,
})
}
}

// If the node is tainted there should be no placements made
if _, ok := taintedNodes[nodeID]; ok {
diff.place = nil
Expand Down

0 comments on commit 3627179

Please sign in to comment.