diff --git a/.changelog/11712.txt b/.changelog/11712.txt new file mode 100644 index 000000000000..376da7c229ec --- /dev/null +++ b/.changelog/11712.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a performance bug where `spread` and node affinity can cause a job to take longer than the nack timeout to be evaluated. +``` diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index c21a58f6b990..04040acb6755 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -2,7 +2,10 @@ package scheduler import ( "math" + "math/rand" + "sort" "testing" + "time" "fmt" @@ -568,3 +571,243 @@ func Test_evenSpreadScoreBoost(t *testing.T) { require.False(t, math.IsInf(boost, 1)) require.Equal(t, 1.0, boost) } + +// TestSpreadOnLargeCluster exercises potentially quadratic +// performance cases with spread scheduling when we have a large +// number of eligible nodes unless we limit the number that each +// MaxScore attempt considers. By reducing the total from MaxInt, we +// can prevent quadratic performance but then we need this test to +// verify we have satisfactory spread results. +func TestSpreadOnLargeCluster(t *testing.T) { + t.Parallel() + cases := []struct { + name string + nodeCount int + racks map[string]int + allocs int + }{ + { + name: "nodes=10k even racks=100 allocs=500", + nodeCount: 10000, + racks: generateEvenRacks(10000, 100), + allocs: 500, + }, + { + name: "nodes=10k even racks=100 allocs=50", + nodeCount: 10000, + racks: generateEvenRacks(10000, 100), + allocs: 50, + }, + { + name: "nodes=10k even racks=10 allocs=500", + nodeCount: 10000, + racks: generateEvenRacks(10000, 10), + allocs: 500, + }, + { + name: "nodes=10k even racks=10 allocs=50", + nodeCount: 10000, + racks: generateEvenRacks(10000, 10), + allocs: 500, + }, + { + name: "nodes=10k small uneven racks allocs=500", + nodeCount: 10000, + racks: generateUnevenRacks(t, 10000, 50), + allocs: 500, + }, + { + name: "nodes=10k small uneven racks allocs=50", + nodeCount: 10000, + racks: generateUnevenRacks(t, 10000, 50), + allocs: 500, + }, + { + name: "nodes=10k many uneven racks allocs=500", + nodeCount: 10000, + racks: generateUnevenRacks(t, 10000, 500), + allocs: 500, + }, + { + name: "nodes=10k many uneven racks allocs=50", + nodeCount: 10000, + racks: generateUnevenRacks(t, 10000, 500), + allocs: 50, + }, + } + + for i := range cases { + tc := cases[i] + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + h := NewHarness(t) + err := upsertNodes(h, tc.nodeCount, tc.racks) + require.NoError(t, err) + job := generateJob(tc.allocs) + eval, err := upsertJob(h, job) + require.NoError(t, err) + + start := time.Now() + err = h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second), + "time to evaluate exceeded EvalNackTimeout") + + require.Len(t, h.Plans, 1) + require.False(t, h.Plans[0].IsNoOp()) + require.NoError(t, validateEqualSpread(h)) + }) + } +} + +// generateUnevenRacks creates a map of rack names to a count of nodes +// evenly distributed in those racks +func generateEvenRacks(nodes int, rackCount int) map[string]int { + racks := map[string]int{} + for i := 0; i < nodes; i++ { + racks[fmt.Sprintf("r%d", i%rackCount)]++ + } + return racks +} + +// generateUnevenRacks creates a random map of rack names to a count +// of nodes in that rack +func generateUnevenRacks(t *testing.T, nodes int, rackCount int) map[string]int { + rackNames := []string{} + for i := 0; i < rackCount; i++ { + rackNames = append(rackNames, fmt.Sprintf("r%d", i)) + } + + // print this so that any future test flakes can be more easily + // reproduced + seed := time.Now().UnixNano() + rand.Seed(seed) + t.Logf("nodes=%d racks=%d seed=%d\n", nodes, rackCount, seed) + + racks := map[string]int{} + for i := 0; i < nodes; i++ { + idx := rand.Intn(len(rackNames)) + racks[rackNames[idx]]++ + } + return racks +} + +// upsertNodes creates a collection of Nodes in the state store, +// distributed among the racks +func upsertNodes(h *Harness, count int, racks map[string]int) error { + + datacenters := []string{"dc-1", "dc-2"} + rackAssignments := []string{} + for rack, count := range racks { + for i := 0; i < count; i++ { + rackAssignments = append(rackAssignments, rack) + } + } + + for i := 0; i < count; i++ { + node := mock.Node() + node.Datacenter = datacenters[i%2] + node.Meta = map[string]string{} + node.Meta["rack"] = fmt.Sprintf("r%s", rackAssignments[i]) + node.NodeResources.Cpu.CpuShares = 14000 + node.NodeResources.Memory.MemoryMB = 32000 + err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node) + if err != nil { + return err + } + } + return nil +} + +func generateJob(jobSize int) *structs.Job { + job := mock.Job() + job.Datacenters = []string{"dc-1", "dc-2"} + job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}} + job.Constraints = []*structs.Constraint{} + job.TaskGroups[0].Count = jobSize + job.TaskGroups[0].Networks = nil + job.TaskGroups[0].Services = []*structs.Service{} + job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 6000, + MemoryMB: 6000, + } + return job +} + +func upsertJob(h *Harness, job *structs.Job) (*structs.Evaluation, error) { + err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job) + if err != nil { + return nil, err + } + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + err = h.State.UpsertEvals(structs.MsgTypeTestSetup, + h.NextIndex(), []*structs.Evaluation{eval}) + if err != nil { + return nil, err + } + return eval, nil +} + +// validateEqualSpread compares the resulting plan to the node +// metadata to verify that each group of spread targets has an equal +// distribution. +func validateEqualSpread(h *Harness) error { + + iter, err := h.State.Nodes(nil) + if err != nil { + return err + } + i := 0 + nodesToRacks := map[string]string{} + racksToAllocCount := map[string]int{} + for { + raw := iter.Next() + if raw == nil { + break + } + node := raw.(*structs.Node) + rack, ok := node.Meta["rack"] + if ok { + nodesToRacks[node.ID] = rack + racksToAllocCount[rack] = 0 + } + i++ + } + + // Collapse the count of allocations per node into a list of + // counts. The results should be clustered within one of each + // other. + for nodeID, nodeAllocs := range h.Plans[0].NodeAllocation { + racksToAllocCount[nodesToRacks[nodeID]] += len(nodeAllocs) + } + countSet := map[int]int{} + for _, count := range racksToAllocCount { + countSet[count]++ + } + + countSlice := []int{} + for count := range countSet { + countSlice = append(countSlice, count) + } + + switch len(countSlice) { + case 1: + return nil + case 2, 3: + sort.Ints(countSlice) + for i := 1; i < len(countSlice); i++ { + if countSlice[i] != countSlice[i-1]+1 { + return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet) + } + } + return nil + } + return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet) +} diff --git a/scheduler/stack.go b/scheduler/stack.go index c9c8e3609319..d2b546107f73 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -163,7 +163,14 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.spread.SetTaskGroup(tg) if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() { - s.limit.SetLimit(math.MaxInt32) + // scoring spread across all nodes has quadratic behavior, so + // we need to consider a subset of nodes to keep evaluaton times + // reasonable but enough to ensure spread is correct. this + // value was empirically determined. + s.limit.SetLimit(tg.Count) + if tg.Count < 100 { + s.limit.SetLimit(100) + } } if contextual, ok := s.quota.(ContextualIterator); ok { diff --git a/website/content/docs/job-specification/spread.mdx b/website/content/docs/job-specification/spread.mdx index 0fab0f58ec09..3889e5623774 100644 --- a/website/content/docs/job-specification/spread.mdx +++ b/website/content/docs/job-specification/spread.mdx @@ -54,8 +54,12 @@ spread stanza. Spread scores are combined with other scoring factors such as bin A job or task group can have more than one spread criteria, with weights to express relative preference. -Spread criteria are treated as a soft preference by the Nomad scheduler. -If no nodes match a given spread criteria, placement is still successful. +Spread criteria are treated as a soft preference by the Nomad +scheduler. If no nodes match a given spread criteria, placement is +still successful. To avoid scoring every node for every placement, +allocations may not be perfectly spread. Spread works best on +attributes with similar number of nodes: identically configured racks +or similarly configured datacenters. Spread may be expressed on [attributes][interpolation] or [client metadata][client-meta]. Additionally, spread may be specified at the [job][job] and [group][group] levels for ultimate flexibility. Job level spread criteria are inherited by all task groups in the job.