Skip to content

Commit

Permalink
scheduler: fix quadratic performance with spread blocks
Browse files Browse the repository at this point in the history
When the scheduler picks a node for each evaluation, the
`LimitIterator` provides at most 2 eligible nodes for the
`MaxScoreIterator` to choose from. This keeps scheduling fast while
producing acceptable results because the results are binpacked.

Jobs with a `spread` block (or node affinity) remove this limit in
order to produce correct spread scoring. This means that every
allocation within a job with a `spread` block is evaluated against
_all_ eligible nodes. Operators of large clusters have reported that
jobs with `spread` blocks that are eligible on a large number of nodes
can take longer than the nack timeout to evaluate (60s). Typical
evaluations are processed in milliseconds.

In practice, it's not necessary to evaluate every eligible node for
every allocation on large clusters, because the `RandomIterator` at
the base of the scheduler stack produces enough variation in each pass
that the likelihood of an uneven spread is negligible. Note that only
feasibility is checked before the limit, so this only impacts the
number of _eligible_ nodes available for scoring, not the total number
of nodes.

This changeset sets the iterator limit for "large" `spread` block and
node affinity jobs to be equal to the number of desired
allocations. The included tests ensure that we have acceptable spread
results across a variety of large cluster topologies.
  • Loading branch information
tgross committed Dec 20, 2021
1 parent 0ec5db4 commit 7daaf75
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/11712.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a performance bug where `spread` and node affinity can cause a job to take longer than the nack timeout to be evaluated.
```
239 changes: 239 additions & 0 deletions scheduler/spread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package scheduler

import (
"math"
"math/rand"
"sort"
"testing"
"time"

"fmt"

Expand Down Expand Up @@ -568,3 +571,239 @@ func Test_evenSpreadScoreBoost(t *testing.T) {
require.False(t, math.IsInf(boost, 1))
require.Equal(t, 1.0, boost)
}

// TestSpreadOnLargeCluster exercises potentially quadratic
// performance cases with spread scheduling when we have a large
// number of eligible nodes unless we limit the number that each
// MaxScore attempt considers. By reducing the total from MaxInt, we
// can prevent quadratic performance but then we need this test to
// verify we have satisfactory spread results.
func TestSpreadOnLargeCluster(t *testing.T) {
t.Parallel()
cases := []struct {
name string
nodeCount int
racks map[string]int
allocs int
}{
{
name: "nodes=10k even racks=100 allocs=500",
nodeCount: 10000,
racks: generateEvenRacks(10000, 100),
allocs: 500,
},
{
name: "nodes=10k even racks=100 allocs=50",
nodeCount: 10000,
racks: generateEvenRacks(10000, 100),
allocs: 50,
},
{
name: "nodes=10k even racks=10 allocs=500",
nodeCount: 10000,
racks: generateEvenRacks(10000, 10),
allocs: 500,
},
{
name: "nodes=10k even racks=10 allocs=50",
nodeCount: 10000,
racks: generateEvenRacks(10000, 10),
allocs: 500,
},
{
name: "nodes=10k small uneven racks allocs=500",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 50),
allocs: 500,
},
{
name: "nodes=10k small uneven racks allocs=50",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 50),
allocs: 500,
},
{
name: "nodes=10k many uneven racks allocs=500",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 500),
allocs: 500,
},
{
name: "nodes=10k many uneven racks allocs=50",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 500),
allocs: 50,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
h := NewHarness(t)
err := upsertNodes(h, tc.nodeCount, tc.racks)
require.NoError(t, err)
job := generateJob(tc.allocs)
eval, err := upsertJob(h, job)
require.NoError(t, err)

start := time.Now()
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second),
"time to evaluate exceeded EvalNackTimeout")

require.Len(t, h.Plans, 1)
require.False(t, h.Plans[0].IsNoOp())
require.NoError(t, validateEqualSpread(h))
})
}
}

// generateUnevenRacks creates a map of rack names to a count of nodes
// evenly distributed in those racks
func generateEvenRacks(nodes int, rackCount int) map[string]int {
racks := map[string]int{}
for i := 0; i < nodes; i++ {
racks[fmt.Sprintf("r%d", i%rackCount)]++
}
return racks
}

// generateUnevenRacks creates a random map of rack names to a count
// of nodes in that rack
func generateUnevenRacks(t *testing.T, nodes int, rackCount int) map[string]int {
rackNames := []string{}
for i := 0; i < rackCount; i++ {
rackNames = append(rackNames, fmt.Sprintf("r%d", i))
}

// print this so that any future test flakes can be more easily
// reproduced
seed := time.Now().UnixNano()
rand.Seed(seed)
t.Logf("nodes=%d racks=%d seed=%d\n", nodes, rackCount, seed)

racks := map[string]int{}
for i := 0; i < nodes; i++ {
idx := rand.Intn(len(rackNames))
racks[rackNames[idx]]++
}
return racks
}

// upsertNodes creates a collection of Nodes in the state store,
// distributed among the racks
func upsertNodes(h *Harness, count int, racks map[string]int) error {

datacenters := []string{"dc-1", "dc-2"}
rackAssignments := []string{}
for rack, count := range racks {
for i := 0; i < count; i++ {
rackAssignments = append(rackAssignments, rack)
}
}

for i := 0; i < count; i++ {
node := mock.Node()
node.Datacenter = datacenters[i%2]
node.Meta = map[string]string{}
node.Meta["rack"] = fmt.Sprintf("r%s", rackAssignments[i])
node.NodeResources.Cpu.CpuShares = 14000
node.NodeResources.Memory.MemoryMB = 32000
err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)
if err != nil {
return err
}
}
return nil
}

func generateJob(jobSize int) *structs.Job {
job := mock.Job()
job.Datacenters = []string{"dc-1", "dc-2"}
job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}}
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = jobSize
job.TaskGroups[0].Networks = nil
job.TaskGroups[0].Services = []*structs.Service{}
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 6000,
MemoryMB: 6000,
}
return job
}

func upsertJob(h *Harness, job *structs.Job) (*structs.Evaluation, error) {
err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)
if err != nil {
return nil, err
}
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
err = h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval})
if err != nil {
return nil, err
}
return eval, nil
}

// validateEqualSpread compares the resulting plan to the node
// metadata to verify that each group of spread targets has an equal
// distribution.
func validateEqualSpread(h *Harness) error {

iter, err := h.State.Nodes(nil)
if err != nil {
return err
}
i := 0
nodesToRacks := map[string]string{}
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
rack, ok := node.Meta["rack"]
if ok {
nodesToRacks[node.ID] = rack
}
i++
}

// Collapse the count of allocations per node into a list of
// counts. The results should be a single count or, if more than
// one count, they should be no more than one apart.
racksToAllocCount := map[string]int{}
for nodeID, nodeAllocs := range h.Plans[0].NodeAllocation {
racksToAllocCount[nodesToRacks[nodeID]] += len(nodeAllocs)
}
countSet := map[int]struct{}{}
for _, count := range racksToAllocCount {
countSet[count] = struct{}{}
}
countSlice := []int{}
for count := range countSet {
countSlice = append(countSlice, count)
}

switch len(countSlice) {
case 1:
return nil
case 2:
sort.Ints(countSlice)
if countSlice[0]+1 == countSlice[1] {
return nil
}
}
return fmt.Errorf(
"expected even distributon of allocs to racks, but got:\n%+v",
racksToAllocCount)
}
8 changes: 7 additions & 1 deletion scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,13 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.spread.SetTaskGroup(tg)

if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
s.limit.SetLimit(math.MaxInt32)
// scoring spread across all nodes has quadratic behavior, so
// we need to consider a subset of nodes to keep evaluaton times
// reasonable but enough to ensure spread is correct
s.limit.SetLimit(tg.Count)
if tg.Count < 100 {
s.limit.SetLimit(100)
}
}

if contextual, ok := s.quota.(ContextualIterator); ok {
Expand Down

0 comments on commit 7daaf75

Please sign in to comment.