Skip to content

Commit

Permalink
Merge pull request #11111 from hashicorp/b-system-no-match
Browse files Browse the repository at this point in the history
scheduler: warn when system jobs cannot place an alloc
  • Loading branch information
schmichael committed Sep 13, 2021
2 parents b2b9013 + 2a142da commit 24b2770
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .changelog/11111.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scheduler: Warn users when system and sysbatch evaluations fail to place an allocation
```
77 changes: 67 additions & 10 deletions scheduler/scheduler_sysbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -758,23 +759,26 @@ func TestSysBatch_RetryLimit(t *testing.T) {
func TestSysBatch_Queued_With_Constraints(t *testing.T) {
h := NewHarness(t)

// Register a node
node := mock.Node()
node.Attributes["kernel.name"] = "darwin"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
nodes := createNodes(t, h, 3)

// Generate a sysbatch job which can't be placed on the node
job := mock.SystemBatchJob()
job.Constraints = []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "not_existing_os",
Operand: "=",
},
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

// Create a mock evaluation to deal with the node update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
Expand All @@ -787,6 +791,57 @@ func TestSysBatch_Queued_With_Constraints(t *testing.T) {
val, ok := h.Evals[0].QueuedAllocations["pinger"]
require.True(t, ok)
require.Zero(t, val)

failedTGAllocs := h.Evals[0].FailedTGAllocs
pretty.Println(failedTGAllocs)
require.NotNil(t, failedTGAllocs)
require.Contains(t, failedTGAllocs, "pinger")
require.Equal(t, len(nodes), failedTGAllocs["pinger"].NodesEvaluated)
require.Equal(t, len(nodes), failedTGAllocs["pinger"].NodesFiltered)

}

func TestSysBatch_Queued_With_Constraints_PartialMatch(t *testing.T) {
h := NewHarness(t)

// linux machines
linux := createNodes(t, h, 3)
for i := 0; i < 3; i++ {
node := mock.Node()
node.Attributes["kernel.name"] = "darwin"
node.ComputeClass()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Generate a sysbatch job which can't be placed on the node
job := mock.SystemBatchJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

// Create a mock evaluation to deal with the node update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
err := h.Process(NewSysBatchScheduler, eval)
require.NoError(t, err)

foundNodes := map[string]bool{}
for n := range h.Plans[0].NodeAllocation {
foundNodes[n] = true
}
expected := map[string]bool{}
for _, n := range linux {
expected[n.ID] = true
}

require.Equal(t, expected, foundNodes)
}

// This test ensures that the scheduler correctly ignores ineligible
Expand Down Expand Up @@ -879,7 +934,7 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
require.Equal(t, "complete", h.Evals[1].Status)

// Ensure no new plans
require.Equal(t, 1, len(h.Plans))
require.Len(t, h.Plans, 1)

// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[0].NodeAllocation {
Expand All @@ -890,8 +945,8 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
// Add a new node Class-B
var nodeBTwo *structs.Node
nodeBTwo = mock.Node()
require.NoError(t, nodeBTwo.ComputeClass())
nodeBTwo.NodeClass = "Class-B"
require.NoError(t, nodeBTwo.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), nodeBTwo))

// Evaluate the new node
Expand All @@ -910,8 +965,10 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
require.Nil(t, h.Process(NewSysBatchScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)

// Ensure no failed TG allocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")

require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
Expand Down
53 changes: 44 additions & 9 deletions scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,37 @@ func (s *SystemScheduler) computeJobAllocs() error {
return s.computePlacements(diff.place)
}

func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
if acc == nil {
return curr.Copy()
}

acc.NodesEvaluated += curr.NodesEvaluated
acc.NodesFiltered += curr.NodesFiltered
for k, v := range curr.ClassFiltered {
acc.ClassFiltered[k] += v
}
for k, v := range curr.ConstraintFiltered {
acc.ConstraintFiltered[k] += v
}
acc.AllocationTime += curr.AllocationTime
return acc
}

// computePlacements computes placements for allocations
func (s *SystemScheduler) computePlacements(place []allocTuple) error {
nodeByID := make(map[string]*structs.Node, len(s.nodes))
for _, node := range s.nodes {
nodeByID[node.ID] = node
}

// track node filtering, to only report an error if all nodes have been filtered
var filteredMetrics map[string]*structs.AllocMetric

nodes := make([]*structs.Node, 1)
for _, missing := range place {
tgName := missing.TaskGroup.Name

node, ok := nodeByID[missing.Alloc.NodeID]
if !ok {
s.logger.Debug("could not find node %q", missing.Alloc.NodeID)
Expand All @@ -305,17 +327,30 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// If the task can't be placed on this node, update reporting data
// and continue to short circuit the loop

// If this node was filtered because of constraint mismatches and we
// couldn't create an allocation then decrementing queued for that
// task group
// If this node was filtered because of constraint
// mismatches and we couldn't create an allocation then
// decrement queuedAllocs for that task group.
if s.ctx.metrics.NodesFiltered > 0 {
s.queuedAllocs[missing.TaskGroup.Name] -= 1
queued := s.queuedAllocs[tgName] - 1
s.queuedAllocs[tgName] = queued

if filteredMetrics == nil {
filteredMetrics = map[string]*structs.AllocMetric{}
}
filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics())

if queued <= 0 {
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.failedTGAllocs[tgName] = filteredMetrics[tgName]
}

// If we are annotating the plan, then decrement the desired
// placements based on whether the node meets the constraints
if s.eval.AnnotatePlan && s.plan.Annotations != nil &&
s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired := s.plan.Annotations.DesiredTGUpdates[tgName]
desired.Place -= 1
}

Expand All @@ -324,7 +359,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
}

// Check if this task group has already failed, reported to the user as a count
if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok {
if metric, ok := s.failedTGAllocs[tgName]; ok {
metric.CoalescedFailures += 1
metric.ExhaustResources(missing.TaskGroup)
continue
Expand All @@ -345,7 +380,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.ctx.Metrics().ExhaustResources(missing.TaskGroup)

// Actual failure to start this task on this candidate node, report it individually
s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
s.failedTGAllocs[tgName] = s.ctx.Metrics()
s.addBlocked(node)

continue
Expand Down Expand Up @@ -378,7 +413,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
TaskGroup: tgName,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
NodeName: option.Node.Name,
Expand Down Expand Up @@ -410,7 +445,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub(nil))
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired := s.plan.Annotations.DesiredTGUpdates[tgName]
desired.Preemptions += 1
}
}
Expand Down
10 changes: 6 additions & 4 deletions scheduler/scheduler_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
require.Equal(t, "complete", h.Evals[1].Status)

// Ensure no new plans
require.Equal(t, 1, len(h.Plans))
require.Len(t, h.Plans, 1)

// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[0].NodeAllocation {
Expand All @@ -1275,8 +1275,8 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
// Add a new node Class-B
var nodeBTwo *structs.Node
nodeBTwo = mock.Node()
require.NoError(t, nodeBTwo.ComputeClass())
nodeBTwo.NodeClass = "Class-B"
require.NoError(t, nodeBTwo.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), nodeBTwo))

// Evaluate the new node
Expand All @@ -1295,8 +1295,10 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)

// Ensure no failed TG allocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")

require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
Expand Down

0 comments on commit 24b2770

Please sign in to comment.