Skip to content

Commit

Permalink
Change the balance_datacenters constraint
Browse files Browse the repository at this point in the history
  * to rely on currently allocated allocations rather then on the
    RR nature of the iterator.

    The allocations per datacenter is now calculated deterministically
    based on the number of allocations and on the number of datacenters.
  • Loading branch information
Crypto89 committed Feb 24, 2017
1 parent 8f10b43 commit ede929f
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 84 deletions.
10 changes: 5 additions & 5 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3186,11 +3186,11 @@ func (ta *TaskArtifact) Validate() error {
}

const (
ConstraintBalance = "balance_datacenter"
ConstraintDistinctHosts = "distinct_hosts"
ConstraintRegex = "regexp"
ConstraintVersion = "version"
ConstraintSetContains = "set_contains"
ConstraintBalanceDatacenters = "balance_datacenters"
ConstraintDistinctHosts = "distinct_hosts"
ConstraintRegex = "regexp"
ConstraintVersion = "version"
ConstraintSetContains = "set_contains"
)

// Constraints are used to restrict placement options.
Expand Down
107 changes: 60 additions & 47 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
"strconv"
"strings"

"math"

memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -160,8 +159,10 @@ type ProposedAllocConstraintIterator struct {
tgDistinctHosts bool
jobDistinctHosts bool

tgBalance bool
jobBalance bool
// Store wherther the Job or TaskGroup has a balance_datacenters constraint so
// they don't have to be calculated every time Next() is called.
tgBalanceDatacenters bool
jobBalanceDatacenters bool
}

// NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator
Expand All @@ -176,13 +177,13 @@ func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *P
func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints)
iter.tgBalance = iter.hasBalanceConstraint(tg.Constraints)
iter.tgBalanceDatacenters = iter.hasBalanceDatacentersConstraint(tg.Constraints)
}

func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) {
iter.job = job
iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints)
iter.jobBalance = iter.hasBalanceConstraint(job.Constraints)
iter.jobBalanceDatacenters = iter.hasBalanceDatacentersConstraint(job.Constraints)
}

func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool {
Expand All @@ -194,9 +195,9 @@ func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constrai
return false
}

func (iter *ProposedAllocConstraintIterator) hasBalanceConstraint(constraints []*structs.Constraint) bool {
func (iter *ProposedAllocConstraintIterator) hasBalanceDatacentersConstraint(constraints []*structs.Constraint) bool {
for _, con := range constraints {
if con.Operand == structs.ConstraintBalance {
if con.Operand == structs.ConstraintBalanceDatacenters {
return true
}
}
Expand All @@ -209,8 +210,8 @@ func (iter *ProposedAllocConstraintIterator) Next() *structs.Node {
// Get the next option from the source
option := iter.source.Next()

// Hot-path if the option is nil or there are no distinct_hosts constraints.
if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts || iter.jobBalance || iter.tgBalance) {
// Hot-path if the option is nil or there are no distinct_hosts or balance_datacenters constraints.
if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts || iter.jobBalanceDatacenters || iter.tgBalanceDatacenters) {
return option
}

Expand All @@ -219,8 +220,8 @@ func (iter *ProposedAllocConstraintIterator) Next() *structs.Node {
continue
}

if !iter.satisfiesBalance(option) {
iter.ctx.Metrics().FilterNode(option, structs.ConstraintBalance)
if !iter.satisfiesBalanceDatacenters(option) {
iter.ctx.Metrics().FilterNode(option, structs.ConstraintBalanceDatacenters)
continue
}

Expand Down Expand Up @@ -262,31 +263,29 @@ func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *stru
// satisfiesBalance checks if the allocation on this node would make the zones
// unbalanced, this implies a greater than 1 difference between the lowest, and the
// highest zone
func (iter *ProposedAllocConstraintIterator) satisfiesBalance(option *structs.Node) bool {
func (iter *ProposedAllocConstraintIterator) satisfiesBalanceDatacenters(option *structs.Node) bool {
// Check if there is no constraint set.
if !(iter.jobBalance || iter.tgBalance) {
if !(iter.jobBalanceDatacenters || iter.tgBalanceDatacenters) {
return true
}

if len(iter.job.Datacenters) == 0 {
iter.ctx.Logger().Print("[ERR] Job needs at least 1 datacenter to use balance")
if len(iter.job.Datacenters) < 1 {
iter.ctx.Logger().Print("[ERR] Job needs at least 2 datacenter to use balance")
return false
}

// fill the map with all the dc's in the selected datacenter
balanceMap := make(map[string]int)

for _, dc := range iter.job.Datacenters {
balanceMap[dc] = 0
}
var allocationsInCurrentDatacenter int

// get all the nodes
nodeIter, err := iter.ctx.State().Nodes()
ws := memdb.NewWatchSet()
nodeIter, err := iter.ctx.State().Nodes(ws)
if err != nil {
iter.ctx.Logger().Print("[ERR] Failed to get nodes")
return false
}

// Fetch all the proposed allocations for all the nodes in the current datacenter
var proposed []*structs.Allocation

for {
next := nodeIter.Next()
if next == nil {
Expand All @@ -295,45 +294,59 @@ func (iter *ProposedAllocConstraintIterator) satisfiesBalance(option *structs.No

node := next.(*structs.Node)

// we only care about the nodes which have the same datacenter as the current node (the option)
if node.Datacenter != option.Datacenter {
continue
}

// proposed allocations
proposed, err := iter.ctx.ProposedAllocs(node.ID)
nodeProposed, err := iter.ctx.ProposedAllocs(node.ID)
if err != nil {
iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: failed to get proposed allocations: %v", err)
return false
}

for _, alloc := range proposed {
jobCollision := alloc.JobID == iter.job.ID
taskCollision := alloc.TaskGroup == iter.tg.Name
proposed = append(proposed, nodeProposed...)
}

// skip jobs not in this job or taskgroup (for jobBalance/tgBalance)
if !(jobCollision && (iter.jobBalance || taskCollision)) {
continue
}
for _, alloc := range proposed {
// If the job has a balance_datacenters constraint we only need an alloc
// collision on the JobID but if the constraint is on the TaskGroup then
// we need both a job and TaskGroup collision.
jobCollision := alloc.JobID == iter.job.ID
tgCollision := alloc.TaskGroup == iter.tg.Name && jobCollision

// skip allocation with DesiredStatus other than running
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
continue
}
if !((iter.jobBalanceDatacenters && jobCollision) || tgCollision) {
continue
}

balanceMap[node.Datacenter]++
// skip allocation with DesiredStatus other than running
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
continue
}
}

min := math.MaxInt32
allocationsInCurrentDatacenter++
}

for _, n := range balanceMap {
if n < min {
min = n
// number of allocations per datacenter depends on the constraint location
// if the constraint is set on the job level, it is a sum of all the allocations
// in all the TaskGroups. If it is set on TaskGroup level, it's only for the Count
// of the (current) taskgroup
var allocations int
if iter.tgBalanceDatacenters {
allocations = iter.tg.Count
} else {
for _, tg := range iter.job.TaskGroups {
allocations += tg.Count
}
}

// if the current DC is higher than the minium, the node is not eligible
if balanceMap[option.Datacenter] > min {
return false
allocationsPerDatacenter := allocationsPerDatacenter(allocations, iter.job.Datacenters)
if c, ok := allocationsPerDatacenter[option.Datacenter]; ok && allocationsInCurrentDatacenter < c {
return true
}

return true
return false
}

func (iter *ProposedAllocConstraintIterator) Reset() {
Expand Down Expand Up @@ -428,7 +441,7 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
switch operand {
case structs.ConstraintDistinctHosts:
return true
case structs.ConstraintBalance:
case structs.ConstraintBalanceDatacenters:
return true
default:
break
Expand Down
48 changes: 22 additions & 26 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) {
}
}

func TestPropsedAllocConstraint_JobBalance(t *testing.T) {
func TestPropsedAllocConstraint_JobBalanceDatacenters(t *testing.T) {
store, ctx := testContext(t)

n3 := mock.Node()
Expand All @@ -631,12 +631,12 @@ func TestPropsedAllocConstraint_JobBalance(t *testing.T) {
static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalance}},
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}
Expand All @@ -652,7 +652,7 @@ func TestPropsedAllocConstraint_JobBalance(t *testing.T) {
}
}

func TestPropsedAllocConstraint_JobBalance_WithRunningJobs(t *testing.T) {
func TestPropsedAllocConstraint_JobBalanceDatacenters_WithRunningJobs(t *testing.T) {
store, ctx := testContext(t)

n3 := mock.Node()
Expand All @@ -671,12 +671,12 @@ func TestPropsedAllocConstraint_JobBalance_WithRunningJobs(t *testing.T) {
static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalance}},
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}
Expand Down Expand Up @@ -708,9 +708,14 @@ func TestPropsedAllocConstraint_JobBalance_WithRunningJobs(t *testing.T) {
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}

// since there is an allocation on dc1, the yielded node should be on dc2
if out[0].Datacenter != "dc2" {
t.Fatalf("Bad: proposed node is on the wrong datacenter, expected: dc1; got: %s", out[0].Datacenter)
}
}

func TestPropsedAllocConstraint_JobBalance_WithStoppedJobs(t *testing.T) {
func TestPropsedAllocConstraint_JobBalanceDatacenters_WithStoppedJobs(t *testing.T) {
store, ctx := testContext(t)

n3 := mock.Node()
Expand All @@ -729,26 +734,17 @@ func TestPropsedAllocConstraint_JobBalance_WithStoppedJobs(t *testing.T) {
static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalance}},
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: job.ID,
ID: structs.GenerateUUID(),
DesiredStatus: structs.AllocDesiredStatusRun,
},
}

// this allocation should be ignored since it's no longer active
plan.NodeAllocation[n3.ID] = []*structs.Allocation{
&structs.Allocation{
Expand All @@ -765,12 +761,12 @@ func TestPropsedAllocConstraint_JobBalance_WithStoppedJobs(t *testing.T) {

out := collectFeasible(propsed)

if len(out) != 1 {
if len(out) != 3 {
t.Fatalf("Bad: %#v", out)
}
}

func TestPropsedAllocConstraint_JobBalance_InfeasibleDC(t *testing.T) {
func TestPropsedAllocConstraint_JobBalanceDatacenters_InfeasibleDC(t *testing.T) {
store, ctx := testContext(t)

nodes := []*structs.Node{
Expand All @@ -785,12 +781,12 @@ func TestPropsedAllocConstraint_JobBalance_InfeasibleDC(t *testing.T) {
static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalance}},
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}
Expand Down
23 changes: 23 additions & 0 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,3 +721,26 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
}
}
}

// allocationsPerDatacenter returns a partition with an equal spread of the number of allocations per datacenter
// the allocations will be equally spread (allocations/len(datacenters)) and the remaining x allocations will be
// placed on the first x datacenters.
func allocationsPerDatacenter(allocations int, datacenters []string) map[string]int {
dcCount := len(datacenters)
result := make(map[string]int, dcCount)
remainder := allocations % dcCount

for _, dc := range datacenters {
// integer division of the number of allocations by the number of datacenters
// this will leave a "remainder", which we will add as long as there is some left
result[dc] = allocations / dcCount

// divide the remainder across the first x datacenters until the remainder equals zero
if remainder > 0 {
result[dc]++
remainder--
}
}

return result
}
13 changes: 7 additions & 6 deletions website/source/docs/job-specification/constraint.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,16 @@ constraint {
}
```

- `"balance_datacenter"` - Instructs the scheduler to force an equal spread across
all datacenters specified in the Job. When specified as a job constraint, it
applies to all groups in the job. When specified as a group constraint, the
effect is constrained to that group. Note that the `attribute` parameter should
be omitted when using this constraint.
- `"balance_datacenters"` - Instructs the scheduler to force an equal spread across
all datacenters specified in the job's [datacenter
list](https://www.nomadproject.io/docs/job-specification/job.html#datacenters).
When specified as a job constraint, it applies to all groups in the job. When
specified as a group constraint, the effect is constrained to that group. Note
that the `attribute` parameter should be omitted when using this constraint.

```hcl
constraint {
operator = "balance_datacenter"
operator = "balance_datacenters"
value = "true"
}
```
Expand Down

0 comments on commit ede929f

Please sign in to comment.