Skip to content

Commit

Permalink
scheduler: stop allocs in unrelated nodes
Browse files Browse the repository at this point in the history
he system scheduler should leave allocs on draining nodes as-is, but
stop node stop allocs on nodes that are no longer part of the job
datacenters.

Previously, the scheduler did not make the distinction and left system
job allocs intact if they are already running.
  • Loading branch information
Mahmood Ali committed Oct 26, 2021
1 parent 1df3f2e commit b0ce684
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 21 deletions.
2 changes: 1 addition & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions scheduler/scheduler_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type SystemScheduler struct {
ctx *EvalContext
stack *SystemStack

nodes []*structs.Node
nodesByDC map[string]int
nodes []*structs.Node
notReadyNodes map[string]struct{}
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) {

// Get the ready nodes in the required datacenters
if !s.job.Stopped() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err)
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
live, term := structs.SplitTerminalAllocs(allocs)

// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term)
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term)
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
Expand Down
34 changes: 24 additions & 10 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func diffSystemAllocsForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name)
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node name)
required map[string]*structs.TaskGroup, // set of allocations that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name)
Expand Down Expand Up @@ -139,10 +140,21 @@ func diffSystemAllocsForNode(

// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := eligibleNodes[nodeID]; !ok {
if _, ok := notReadyNodes[nodeID]; ok {
goto IGNORE
}

// Existing allocations on nodes that are no longer targeted
// should be stopped
if _, ok := eligibleNodes[nodeID]; !ok {
result.stop = append(result.stop, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}

// If the definition is updated we need to update
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Expand Down Expand Up @@ -229,7 +241,8 @@ func diffSystemAllocsForNode(
// diffResult contain the specific nodeID they should be allocated on.
func diffSystemAllocs(
job *structs.Job, // jobs whose allocations are going to be diff-ed
nodes []*structs.Node, // list of nodes in the ready state
readyNodes []*structs.Node, // list of nodes in the ready state
notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name)
allocs []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by name)
Expand All @@ -238,12 +251,11 @@ func diffSystemAllocs(
// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
for _, alloc := range allocs {
nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic
nodeAllocs[alloc.NodeID] = nallocs
nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc)
}

eligibleNodes := make(map[string]*structs.Node)
for _, node := range nodes {
for _, node := range readyNodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
Expand All @@ -255,7 +267,7 @@ func diffSystemAllocs(

result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal)
result.Append(diff)
}

Expand All @@ -264,7 +276,7 @@ func diffSystemAllocs(

// readyNodesInDCs returns all the ready nodes in the given datacenters and a
// mapping of each data center to the count of ready nodes.
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) {
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) {
// Index the DCs
dcMap := make(map[string]int, len(dcs))
for _, dc := range dcs {
Expand All @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Scan the nodes
ws := memdb.NewWatchSet()
var out []*structs.Node
notReady := map[string]struct{}{}
iter, err := state.Nodes(ws)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for {
raw := iter.Next()
Expand All @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Filter on datacenter and status
node := raw.(*structs.Node)
if !node.Ready() {
notReady[node.ID] = struct{}{}
continue
}
if _, ok := dcMap[node.Datacenter]; !ok {
Expand All @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
out = append(out, node)
dcMap[node.Datacenter]++
}
return out, dcMap, nil
return out, notReady, dcMap, nil
}

// retryMax is used to retry a callback until it returns success or
Expand Down
15 changes: 9 additions & 6 deletions scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
},
}

diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Empty(t, diff.update)
require.Empty(t, diff.stop)
Expand All @@ -87,7 +87,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"]
expAlloc.NodeID = "node1"

diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Equal(t, 1, len(diff.update))
require.Empty(t, diff.stop)
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
},
}

diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) {
// No terminal allocs
terminal := make(structs.TerminalByNodeByName)

diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestDiffSystemAllocs(t *testing.T) {
},
}

diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal)
diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestReadyNodesInDCs(t *testing.T) {
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3))
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4))

nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"})
nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"})
require.NoError(t, err)
require.Equal(t, 2, len(nodes))
require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID)
Expand All @@ -424,6 +424,9 @@ func TestReadyNodesInDCs(t *testing.T) {
require.Equal(t, 1, dc["dc1"])
require.Contains(t, dc, "dc2")
require.Equal(t, 1, dc["dc2"])

require.Contains(t, notReady, node3.ID)
require.Contains(t, notReady, node4.ID)
}

func TestRetryMax(t *testing.T) {
Expand Down

0 comments on commit b0ce684

Please sign in to comment.