Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix quadratic performance with spread blocks #11712

Merged
merged 1 commit into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11712.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a performance bug where `spread` and node affinity can cause a job to take longer than the nack timeout to be evaluated.
```
243 changes: 243 additions & 0 deletions scheduler/spread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package scheduler

import (
"math"
"math/rand"
"sort"
"testing"
"time"

"fmt"

Expand Down Expand Up @@ -568,3 +571,243 @@ func Test_evenSpreadScoreBoost(t *testing.T) {
require.False(t, math.IsInf(boost, 1))
require.Equal(t, 1.0, boost)
}

// TestSpreadOnLargeCluster exercises potentially quadratic
// performance cases with spread scheduling when we have a large
// number of eligible nodes unless we limit the number that each
// MaxScore attempt considers. By reducing the total from MaxInt, we
// can prevent quadratic performance but then we need this test to
// verify we have satisfactory spread results.
func TestSpreadOnLargeCluster(t *testing.T) {
t.Parallel()
cases := []struct {
name string
nodeCount int
racks map[string]int
allocs int
}{
{
name: "nodes=10k even racks=100 allocs=500",
nodeCount: 10000,
racks: generateEvenRacks(10000, 100),
allocs: 500,
},
{
name: "nodes=10k even racks=100 allocs=50",
nodeCount: 10000,
racks: generateEvenRacks(10000, 100),
allocs: 50,
},
{
name: "nodes=10k even racks=10 allocs=500",
nodeCount: 10000,
racks: generateEvenRacks(10000, 10),
allocs: 500,
},
{
name: "nodes=10k even racks=10 allocs=50",
nodeCount: 10000,
racks: generateEvenRacks(10000, 10),
allocs: 500,
},
{
name: "nodes=10k small uneven racks allocs=500",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 50),
allocs: 500,
},
{
name: "nodes=10k small uneven racks allocs=50",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 50),
allocs: 500,
},
{
name: "nodes=10k many uneven racks allocs=500",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 500),
allocs: 500,
},
{
name: "nodes=10k many uneven racks allocs=50",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 500),
allocs: 50,
},
}

for i := range cases {
tc := cases[i]
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
h := NewHarness(t)
err := upsertNodes(h, tc.nodeCount, tc.racks)
require.NoError(t, err)
job := generateJob(tc.allocs)
eval, err := upsertJob(h, job)
require.NoError(t, err)

start := time.Now()
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second),
"time to evaluate exceeded EvalNackTimeout")

require.Len(t, h.Plans, 1)
require.False(t, h.Plans[0].IsNoOp())
require.NoError(t, validateEqualSpread(h))
})
}
}

// generateUnevenRacks creates a map of rack names to a count of nodes
// evenly distributed in those racks
func generateEvenRacks(nodes int, rackCount int) map[string]int {
racks := map[string]int{}
for i := 0; i < nodes; i++ {
racks[fmt.Sprintf("r%d", i%rackCount)]++
}
return racks
}

// generateUnevenRacks creates a random map of rack names to a count
// of nodes in that rack
func generateUnevenRacks(t *testing.T, nodes int, rackCount int) map[string]int {
rackNames := []string{}
for i := 0; i < rackCount; i++ {
rackNames = append(rackNames, fmt.Sprintf("r%d", i))
}

// print this so that any future test flakes can be more easily
// reproduced
seed := time.Now().UnixNano()
rand.Seed(seed)
t.Logf("nodes=%d racks=%d seed=%d\n", nodes, rackCount, seed)

racks := map[string]int{}
for i := 0; i < nodes; i++ {
idx := rand.Intn(len(rackNames))
racks[rackNames[idx]]++
}
return racks
}

// upsertNodes creates a collection of Nodes in the state store,
// distributed among the racks
func upsertNodes(h *Harness, count int, racks map[string]int) error {

datacenters := []string{"dc-1", "dc-2"}
rackAssignments := []string{}
for rack, count := range racks {
for i := 0; i < count; i++ {
rackAssignments = append(rackAssignments, rack)
}
}

for i := 0; i < count; i++ {
node := mock.Node()
node.Datacenter = datacenters[i%2]
node.Meta = map[string]string{}
node.Meta["rack"] = fmt.Sprintf("r%s", rackAssignments[i])
node.NodeResources.Cpu.CpuShares = 14000
node.NodeResources.Memory.MemoryMB = 32000
err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)
if err != nil {
return err
}
}
return nil
}

func generateJob(jobSize int) *structs.Job {
job := mock.Job()
job.Datacenters = []string{"dc-1", "dc-2"}
job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}}
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = jobSize
job.TaskGroups[0].Networks = nil
job.TaskGroups[0].Services = []*structs.Service{}
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 6000,
MemoryMB: 6000,
}
return job
}

func upsertJob(h *Harness, job *structs.Job) (*structs.Evaluation, error) {
err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)
if err != nil {
return nil, err
}
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
err = h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval})
if err != nil {
return nil, err
}
return eval, nil
}

// validateEqualSpread compares the resulting plan to the node
// metadata to verify that each group of spread targets has an equal
// distribution.
func validateEqualSpread(h *Harness) error {

iter, err := h.State.Nodes(nil)
if err != nil {
return err
}
i := 0
nodesToRacks := map[string]string{}
racksToAllocCount := map[string]int{}
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
rack, ok := node.Meta["rack"]
if ok {
nodesToRacks[node.ID] = rack
racksToAllocCount[rack] = 0
}
i++
}

// Collapse the count of allocations per node into a list of
// counts. The results should be clustered within one of each
// other.
for nodeID, nodeAllocs := range h.Plans[0].NodeAllocation {
racksToAllocCount[nodesToRacks[nodeID]] += len(nodeAllocs)
}
countSet := map[int]int{}
for _, count := range racksToAllocCount {
countSet[count]++
}

countSlice := []int{}
for count := range countSet {
countSlice = append(countSlice, count)
}

switch len(countSlice) {
case 1:
return nil
case 2, 3:
sort.Ints(countSlice)
for i := 1; i < len(countSlice); i++ {
if countSlice[i] != countSlice[i-1]+1 {
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
}
}
return nil
}
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
}
9 changes: 8 additions & 1 deletion scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,14 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.spread.SetTaskGroup(tg)

if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
s.limit.SetLimit(math.MaxInt32)
// scoring spread across all nodes has quadratic behavior, so
// we need to consider a subset of nodes to keep evaluaton times
// reasonable but enough to ensure spread is correct. this
// value was empirically determined.
s.limit.SetLimit(tg.Count)
if tg.Count < 100 {
s.limit.SetLimit(100)
}
}

if contextual, ok := s.quota.(ContextualIterator); ok {
Expand Down
8 changes: 6 additions & 2 deletions website/content/docs/job-specification/spread.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ spread stanza. Spread scores are combined with other scoring factors such as bin

A job or task group can have more than one spread criteria, with weights to express relative preference.

Spread criteria are treated as a soft preference by the Nomad scheduler.
If no nodes match a given spread criteria, placement is still successful.
Spread criteria are treated as a soft preference by the Nomad
scheduler. If no nodes match a given spread criteria, placement is
still successful. To avoid scoring every node for every placement,
allocations may not be perfectly spread. Spread works best on
attributes with similar number of nodes: identically configured racks
or similarly configured datacenters.

Spread may be expressed on [attributes][interpolation] or [client metadata][client-meta].
Additionally, spread may be specified at the [job][job] and [group][group] levels for ultimate flexibility. Job level spread criteria are inherited by all task groups in the job.
Expand Down