Skip to content

Commit

Permalink
node pools: apply node pool scheduler configuration (#17598)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 authored Jun 22, 2023
1 parent fe49f22 commit f4c7182
Show file tree
Hide file tree
Showing 18 changed files with 1,070 additions and 176 deletions.
23 changes: 23 additions & 0 deletions helper/iterator/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package iterator

// Iterator represents an object that can iterate over a set of values one at a
// time.
type Iterator interface {
// Next returns the next element or nil if there are none left.
Next() any
}

// Len consumes the iterator and returns the number of elements found.
//
// IMPORTANT: this method consumes the iterator, so it should not be used after
// Len() returns.
func Len(iter Iterator) int {
count := 0
for raw := iter.Next(); raw != nil; raw = iter.Next() {
count++
}
return count
}
7 changes: 6 additions & 1 deletion nomad/job_endpoint_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,12 @@ func (v *memoryOversubscriptionValidate) Validate(job *structs.Job) (warnings []
return nil, err
}

if c != nil && c.MemoryOversubscriptionEnabled {
pool, err := v.srv.State().NodePoolByName(nil, job.NodePool)
if err != nil {
return nil, err
}

if pool.MemoryOversubscriptionEnabled(c) {
return nil, nil
}

Expand Down
107 changes: 107 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2164,6 +2164,113 @@ func TestJobEndpoint_Register_ValidateMemoryMax(t *testing.T) {
require.Empty(t, resp.Warnings)
}

func TestJobEndpoint_Register_ValidateMemoryMax_NodePool(t *testing.T) {
ci.Parallel(t)

s, cleanupS := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)

// Store default scheduler configuration to reset between test cases.
_, defaultSchedConfig, err := s.State().SchedulerConfig()
must.NoError(t, err)

// Create test node pools.
noSchedConfig := mock.NodePool()
noSchedConfig.SchedulerConfiguration = nil

withMemOversub := mock.NodePool()
withMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
}

noMemOversub := mock.NodePool()
noMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(false),
}

s.State().UpsertNodePools(structs.MsgTypeTestSetup, 100, []*structs.NodePool{
noSchedConfig,
withMemOversub,
noMemOversub,
})

testCases := []struct {
name string
pool string
globalConfig *structs.SchedulerConfiguration
expectedWarning string
}{
{
name: "no scheduler config uses global config",
pool: noSchedConfig.Name,
globalConfig: &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expectedWarning: "",
},
{
name: "enabled via node pool",
pool: withMemOversub.Name,
globalConfig: &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
expectedWarning: "",
},
{
name: "disabled via node pool",
pool: noMemOversub.Name,
globalConfig: &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expectedWarning: "Memory oversubscription is not enabled",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set global scheduler config if provided.
if tc.globalConfig != nil {
idx, err := s.State().LatestIndex()
must.NoError(t, err)

err = s.State().SchedulerSetConfig(idx, tc.globalConfig)
must.NoError(t, err)
}

// Create job with node_pool and memory_max.
job := mock.Job()
job.TaskGroups[0].Tasks[0].Resources.MemoryMaxMB = 2000
job.NodePool = tc.pool

req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)

// Validate respose.
must.NoError(t, err)
if tc.expectedWarning != "" {
must.StrContains(t, resp.Warnings, tc.expectedWarning)
} else {
must.Eq(t, "", resp.Warnings)
}

// Reset to default global scheduler config.
err = s.State().SchedulerSetConfig(resp.Index+1, defaultSchedConfig)
must.NoError(t, err)
})
}
}

// evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval
func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation {
var store raft.LogStore = s.raftInmem
Expand Down
21 changes: 21 additions & 0 deletions nomad/structs/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,24 @@ func (n *NodePool) IsBuiltIn() bool {
}
}

// MemoryOversubscriptionEnabled returns true if memory oversubscription is
// enabled in the node pool or in the global cluster configuration.
func (n *NodePool) MemoryOversubscriptionEnabled(global *SchedulerConfiguration) bool {

// Default to the global scheduler config.
memOversubEnabled := global != nil && global.MemoryOversubscriptionEnabled

// But overwrite it if the node pool also has it configured.
poolHasMemOversub := n != nil &&
n.SchedulerConfiguration != nil &&
n.SchedulerConfiguration.MemoryOversubscriptionEnabled != nil
if poolHasMemOversub {
memOversubEnabled = *n.SchedulerConfiguration.MemoryOversubscriptionEnabled
}

return memOversubEnabled
}

// SetHash is used to compute and set the hash of node pool
func (n *NodePool) SetHash() []byte {
// Initialize a 256bit Blake2 hash (32 bytes)
Expand Down Expand Up @@ -163,6 +181,9 @@ func (n *NodePool) SetHash() []byte {

// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
// node pool.
//
// When adding new values that should override global scheduler configuration,
// verify the scheduler handles the node pool configuration as well.
type NodePoolSchedulerConfiguration struct {

// SchedulerAlgorithm is the scheduling algorithm to use for the pool.
Expand Down
67 changes: 67 additions & 0 deletions nomad/structs/node_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,70 @@ func TestNodePool_IsBuiltIn(t *testing.T) {
})
}
}

func TestNodePool_MemoryOversubscriptionEnabled(t *testing.T) {
ci.Parallel(t)

testCases := []struct {
name string
pool *NodePool
global *SchedulerConfiguration
expected bool
}{
{
name: "global used if pool is nil",
pool: nil,
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: true,
},
{
name: "global used if pool doesn't have scheduler config",
pool: &NodePool{},
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: true,
},
{
name: "global used if pool doesn't specify memory oversub",
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
},
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: true,
},
{
name: "pool overrides global if it defines memory oversub",
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(false),
},
},
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: false,
},
{
name: "pool used if global is nil",
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
},
},
global: nil,
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := tc.pool.MemoryOversubscriptionEnabled(tc.global)
must.Eq(t, got, tc.expected)
})
}
}
20 changes: 20 additions & 0 deletions nomad/structs/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,26 @@ func (s *SchedulerConfiguration) EffectiveSchedulerAlgorithm() SchedulerAlgorith
return s.SchedulerAlgorithm
}

// WithNodePool returns a new SchedulerConfiguration with the node pool
// scheduler configuration applied.
func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfiguration {
schedConfig := s.Copy()

if pool == nil || pool.SchedulerConfiguration == nil {
return schedConfig
}

poolConfig := pool.SchedulerConfiguration
if poolConfig.SchedulerAlgorithm != "" {
schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm
}
if poolConfig.MemoryOversubscriptionEnabled != nil {
schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled
}

return schedConfig
}

func (s *SchedulerConfiguration) Canonicalize() {
if s != nil && s.SchedulerAlgorithm == "" {
s.SchedulerAlgorithm = SchedulerAlgorithmBinpack
Expand Down
108 changes: 108 additions & 0 deletions nomad/structs/operator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package structs

import (
"testing"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/shoenig/test/must"
)

func TestSchedulerConfiguration_WithNodePool(t *testing.T) {
ci.Parallel(t)

testCases := []struct {
name string
schedConfig *SchedulerConfiguration
pool *NodePool
expected *SchedulerConfiguration
}{
{
name: "nil pool returns same config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
pool: nil,
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
{
name: "nil pool scheduler config returns same config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
pool: &NodePool{},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
{
name: "pool with memory oversubscription overwrites config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
},
},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
},
{
name: "pool with scheduler algorithm overwrites config",
schedConfig: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmBinpack,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
expected: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
{
name: "pool without memory oversubscription does not modify config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
},
{
name: "pool without scheduler algorithm does not modify config",
schedConfig: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
},
expected: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := tc.schedConfig.WithNodePool(tc.pool)
must.Eq(t, tc.expected, got)
must.NotEqOp(t, tc.schedConfig, got)
})
}
}
Loading

0 comments on commit f4c7182

Please sign in to comment.