Skip to content

Commit

Permalink
Merge pull request #4527 from hashicorp/f-spread-backend
Browse files Browse the repository at this point in the history
Implement spreading allocations based on a target node attribute
  • Loading branch information
preetapan committed Aug 1, 2018
2 parents bb26ba3 + 383a4be commit a38546d
Show file tree
Hide file tree
Showing 8 changed files with 1,319 additions and 36 deletions.
26 changes: 26 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ func CopySliceAffinities(s []*Affinity) []*Affinity {
return c
}

func CopySliceSpreads(s []*Spread) []*Spread {
l := len(s)
if l == 0 {
return nil
}

c := make([]*Spread, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}

func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget {
l := len(s)
if l == 0 {
return nil
}

c := make([]*SpreadTarget, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}

// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
Expand Down
130 changes: 130 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,10 @@ type Job struct {
// scheduling preferences that apply to all groups and tasks
Affinities []*Affinity

// Spread can be specified at the job level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread

// TaskGroups are the collections of task groups that this job needs
// to run. Each task group is an atomic unit of scheduling and placement.
TaskGroups []*TaskGroup
Expand Down Expand Up @@ -2185,6 +2189,19 @@ func (j *Job) Validate() error {
}
}

if j.Type == JobTypeSystem {
if j.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range j.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}

// Check for duplicate task groups
taskGroups := make(map[string]int)
for idx, tg := range j.TaskGroups {
Expand Down Expand Up @@ -3336,6 +3353,10 @@ type TaskGroup struct {
// Affinities can be specified at the task group level to express
// scheduling preferences.
Affinities []*Affinity

// Spread can be specified at the task group level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread
}

func (tg *TaskGroup) Copy() *TaskGroup {
Expand All @@ -3349,6 +3370,7 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.RestartPolicy = ntg.RestartPolicy.Copy()
ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy()
ntg.Affinities = CopySliceAffinities(ntg.Affinities)
ntg.Spreads = CopySliceSpreads(ntg.Spreads)

if tg.Tasks != nil {
tasks := make([]*Task, len(ntg.Tasks))
Expand Down Expand Up @@ -3450,6 +3472,19 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}

if j.Type == JobTypeSystem {
if tg.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range tg.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}

if j.Type == JobTypeSystem {
if tg.ReschedulePolicy != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs should not have a reschedule policy"))
Expand Down Expand Up @@ -5384,6 +5419,101 @@ func (a *Affinity) Validate() error {
return mErr.ErrorOrNil()
}

// Spread is used to specify desired distribution of allocations according to weight
type Spread struct {
// Attribute is the node attribute used as the spread criteria
Attribute string

// Weight is the relative weight of this spread, useful when there are multiple
// spread and affinities
Weight int

// SpreadTarget is used to describe desired percentages for each attribute value
SpreadTarget []*SpreadTarget

// Memoized string representation
str string
}

func (s *Spread) Copy() *Spread {
if s == nil {
return nil
}
ns := new(Spread)
*ns = *s

ns.SpreadTarget = CopySliceSpreadTarget(s.SpreadTarget)
return ns
}

func (s *Spread) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.SpreadTarget, s.Weight)
return s.str
}

func (s *Spread) Validate() error {
var mErr multierror.Error
if s.Attribute == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing spread attribute"))
}
if s.Weight <= 0 || s.Weight > 100 {
mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100"))
}
seen := make(map[string]struct{})
sumPercent := uint32(0)

for _, target := range s.SpreadTarget {
// Make sure there are no duplicates
_, ok := seen[target.Value]
if !ok {
seen[target.Value] = struct{}{}
} else {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value)))
}
if target.Percent < 0 || target.Percent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value)))
}
sumPercent += target.Percent
}
if sumPercent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent)))
}
return mErr.ErrorOrNil()
}

// SpreadTarget is used to specify desired percentages for each attribute value
type SpreadTarget struct {
// Value is a single attribute value, like "dc1"
Value string

// Percent is the desired percentage of allocs
Percent uint32

// Memoized string representation
str string
}

func (s *SpreadTarget) Copy() *SpreadTarget {
if s == nil {
return nil
}

ns := new(SpreadTarget)
*ns = *s
return ns
}

func (s *SpreadTarget) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%q %v%%", s.Value, s.Percent)
return s.str
}

// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
// Sticky indicates whether the allocation is sticky to a node
Expand Down
131 changes: 131 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,21 @@ func TestJob_SystemJob_Validate(t *testing.T) {
err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have an affinity stanza")

// Add spread at job and task group level, that should fail validation
j.Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}
j.TaskGroups[0].Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}

err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have a spread stanza")

}

func TestJob_VaultPolicies(t *testing.T) {
Expand Down Expand Up @@ -3926,3 +3941,119 @@ func TestNode_Copy(t *testing.T) {
require.Equal(node.DrainStrategy, node2.DrainStrategy)
require.Equal(node.Drivers, node2.Drivers)
}

func TestSpread_Validate(t *testing.T) {
type tc struct {
spread *Spread
err error
name string
}

testCases := []tc{
{
spread: &Spread{},
err: fmt.Errorf("Missing spread attribute"),
name: "empty spread",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: -1,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 200,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 150,
},
},
},
err: fmt.Errorf("Spread target percentage for value \"dc2\" must be between 0 and 100"),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 75,
},
{
Value: "dc2",
Percent: 75,
},
},
},
err: fmt.Errorf("Sum of spread target percentages must not be greater than 100%%; got %d%%", 150),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc1",
Percent: 50,
},
},
},
err: fmt.Errorf("Spread target value \"dc1\" already defined"),
name: "No spread targets",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 50,
},
},
},
err: nil,
name: "Valid spread",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.spread.Validate()
if tc.err != nil {
require.NotNil(t, err)
require.Contains(t, err.Error(), tc.err.Error())
} else {
require.Nil(t, err)
}
})
}
}
Loading

0 comments on commit a38546d

Please sign in to comment.