Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: stop allocs in unrelated nodes #11391

Merged
merged 5 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type SystemScheduler struct {
ctx *EvalContext
stack *SystemStack

nodes []*structs.Node
nodesByDC map[string]int
nodes []*structs.Node
notReadyNodes map[string]struct{}
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) {

// Get the ready nodes in the required datacenters
if !s.job.Stopped() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err)
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
live, term := structs.SplitTerminalAllocs(allocs)

// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term)
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term)
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
Expand Down
85 changes: 85 additions & 0 deletions scheduler/scheduler_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,91 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) {
}
}

func TestSystemSched_JobModify_RemoveDC(t *testing.T) {
h := NewHarness(t)

// Create some nodes
node1 := mock.Node()
node1.Datacenter = "dc1"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1))

node2 := mock.Node()
node2.Datacenter = "dc2"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))

fmt.Println("DC1 node: ", node1.ID)
fmt.Println("DC2 node: ", node2.ID)
nodes := []*structs.Node{node1, node2}

// Generate a fake job with allocations
job := mock.SystemJob()
job.Datacenters = []string{"dc1", "dc2"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Update the job
job2 := job.Copy()
job2.Datacenters = []string{"dc1"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2))

// Create a mock evaluation to deal with update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)

// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]

// Ensure the plan did not evict any allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Len(t, update, 1)

// Ensure the plan updated the existing allocs
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)

for _, p := range planned {
require.Equal(t, job2, p.Job, "should update job")
}

// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)

// Ensure all allocations placed
require.Len(t, out, 2)
h.AssertEvalStatus(t, structs.EvalStatusComplete)

}

func TestSystemSched_JobDeregister_Purged(t *testing.T) {
h := NewHarness(t)

Expand Down
34 changes: 24 additions & 10 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func diffSystemAllocsForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name)
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taintedNodes logic is actually confusing IMO. taintedNodes function filters the nodes with Down status as well or marked for draining in ShouldDrainNode

nomad/scheduler/util.go

Lines 351 to 377 in b0ce684

// taintedNodes is used to scan the allocations and then check if the
// underlying nodes are tainted, and should force a migration of the allocation.
// All the nodes returned in the map are tainted.
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) {
out := make(map[string]*structs.Node)
for _, alloc := range allocs {
if _, ok := out[alloc.NodeID]; ok {
continue
}
ws := memdb.NewWatchSet()
node, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return nil, err
}
// If the node does not exist, we should migrate
if node == nil {
out[alloc.NodeID] = nil
continue
}
if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil {
out[alloc.NodeID] = node
}
}
return out, nil
}
.

However, nodes that are up but marked for draining were already filtered out by readyNodesForDCs

nomad/scheduler/util.go

Lines 277 to 313 in b0ce684

// readyNodesInDCs returns all the ready nodes in the given datacenters and a
// mapping of each data center to the count of ready nodes.
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) {
// Index the DCs
dcMap := make(map[string]int, len(dcs))
for _, dc := range dcs {
dcMap[dc] = 0
}
// Scan the nodes
ws := memdb.NewWatchSet()
var out []*structs.Node
notReady := map[string]struct{}{}
iter, err := state.Nodes(ws)
if err != nil {
return nil, nil, nil, err
}
for {
raw := iter.Next()
if raw == nil {
break
}
// Filter on datacenter and status
node := raw.(*structs.Node)
if !node.Ready() {
notReady[node.ID] = struct{}{}
continue
}
if _, ok := dcMap[node.Datacenter]; !ok {
continue
}
out = append(out, node)
dcMap[node.Datacenter]++
}
return out, notReady, dcMap, nil
}
.

So taintedNodes is only the down nodes. Reasoning through the code is a bit more complex and didn't feel confident restructuring that logic to be more explicit about node state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to add a small nitpick on this, the comment says (by node name) but the code indexes them by ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that structs.TerminalByNodeByName struct is actually grouped by node ID too. I'll update the comment, but rename the struct in a follow up PR.

required map[string]*structs.TaskGroup, // set of allocations that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name)
Expand Down Expand Up @@ -139,10 +140,21 @@ func diffSystemAllocsForNode(

// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := eligibleNodes[nodeID]; !ok {
if _, ok := notReadyNodes[nodeID]; ok {
goto IGNORE
Comment on lines -142 to 144
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the source of the bug. Previously, if the node is not in the list of eligibleNodes - we ignore it. The assumption is the node is draining or was marked ineligible for scheduling. Now, we explicitly check if the node is not ready but in the DCes that the job targets.

}

// Existing allocations on nodes that are no longer targeted
// should be stopped
if _, ok := eligibleNodes[nodeID]; !ok {
result.stop = append(result.stop, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}

// If the definition is updated we need to update
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Expand Down Expand Up @@ -229,7 +241,8 @@ func diffSystemAllocsForNode(
// diffResult contain the specific nodeID they should be allocated on.
func diffSystemAllocs(
job *structs.Job, // jobs whose allocations are going to be diff-ed
nodes []*structs.Node, // list of nodes in the ready state
readyNodes []*structs.Node, // list of nodes in the ready state
notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name)
allocs []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by name)
Expand All @@ -238,12 +251,11 @@ func diffSystemAllocs(
// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
for _, alloc := range allocs {
nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic
nodeAllocs[alloc.NodeID] = nallocs
nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc)
}

eligibleNodes := make(map[string]*structs.Node)
for _, node := range nodes {
for _, node := range readyNodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
Expand All @@ -255,7 +267,7 @@ func diffSystemAllocs(

result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal)
result.Append(diff)
}

Expand All @@ -264,7 +276,7 @@ func diffSystemAllocs(

// readyNodesInDCs returns all the ready nodes in the given datacenters and a
// mapping of each data center to the count of ready nodes.
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) {
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) {
// Index the DCs
dcMap := make(map[string]int, len(dcs))
for _, dc := range dcs {
Expand All @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Scan the nodes
ws := memdb.NewWatchSet()
var out []*structs.Node
notReady := map[string]struct{}{}
iter, err := state.Nodes(ws)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for {
raw := iter.Next()
Expand All @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Filter on datacenter and status
node := raw.(*structs.Node)
if !node.Ready() {
notReady[node.ID] = struct{}{}
continue
}
if _, ok := dcMap[node.Datacenter]; !ok {
Expand All @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
out = append(out, node)
dcMap[node.Datacenter]++
}
return out, dcMap, nil
return out, notReady, dcMap, nil
}

// retryMax is used to retry a callback until it returns success or
Expand Down
Loading