Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datacenter balance constraint #2168

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3186,10 +3186,11 @@ func (ta *TaskArtifact) Validate() error {
}

const (
ConstraintDistinctHosts = "distinct_hosts"
ConstraintRegex = "regexp"
ConstraintVersion = "version"
ConstraintSetContains = "set_contains"
ConstraintBalanceDatacenters = "balance_datacenters"
ConstraintDistinctHosts = "distinct_hosts"
ConstraintRegex = "regexp"
ConstraintVersion = "version"
ConstraintSetContains = "set_contains"
)

// Constraints are used to restrict placement options.
Expand Down
120 changes: 117 additions & 3 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"strconv"
"strings"

"github.com/hashicorp/go-version"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -157,6 +158,11 @@ type ProposedAllocConstraintIterator struct {
// they don't have to be calculated every time Next() is called.
tgDistinctHosts bool
jobDistinctHosts bool

// Store wherther the Job or TaskGroup has a balance_datacenters constraint so
// they don't have to be calculated every time Next() is called.
tgBalanceDatacenters bool
jobBalanceDatacenters bool
}

// NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator
Expand All @@ -171,11 +177,13 @@ func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *P
func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints)
iter.tgBalanceDatacenters = iter.hasBalanceDatacentersConstraint(tg.Constraints)
}

func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) {
iter.job = job
iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints)
iter.jobBalanceDatacenters = iter.hasBalanceDatacentersConstraint(job.Constraints)
}

func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool {
Expand All @@ -187,13 +195,23 @@ func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constrai
return false
}

func (iter *ProposedAllocConstraintIterator) hasBalanceDatacentersConstraint(constraints []*structs.Constraint) bool {
for _, con := range constraints {
if con.Operand == structs.ConstraintBalanceDatacenters {
return true
}
}

return false
}

func (iter *ProposedAllocConstraintIterator) Next() *structs.Node {
for {
// Get the next option from the source
option := iter.source.Next()

// Hot-path if the option is nil or there are no distinct_hosts constraints.
if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts) {
// Hot-path if the option is nil or there are no distinct_hosts or balance_datacenters constraints.
if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts || iter.jobBalanceDatacenters || iter.tgBalanceDatacenters) {
return option
}

Expand All @@ -202,6 +220,11 @@ func (iter *ProposedAllocConstraintIterator) Next() *structs.Node {
continue
}

if !iter.satisfiesBalanceDatacenters(option) {
iter.ctx.Metrics().FilterNode(option, structs.ConstraintBalanceDatacenters)
continue
}

return option
}
}
Expand Down Expand Up @@ -237,6 +260,95 @@ func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *stru
return true
}

// satisfiesBalance checks if the allocation on this node would make the zones
// unbalanced, this implies a greater than 1 difference between the lowest, and the
// highest zone
func (iter *ProposedAllocConstraintIterator) satisfiesBalanceDatacenters(option *structs.Node) bool {
// Check if there is no constraint set.
if !(iter.jobBalanceDatacenters || iter.tgBalanceDatacenters) {
return true
}

if len(iter.job.Datacenters) < 1 {
iter.ctx.Logger().Print("[ERR] Job needs at least 2 datacenter to use balance")
return false
}
var allocationsInCurrentDatacenter int

// get all the nodes
ws := memdb.NewWatchSet()
nodeIter, err := iter.ctx.State().Nodes(ws)
if err != nil {
iter.ctx.Logger().Print("[ERR] Failed to get nodes")
return false
}

// Fetch all the proposed allocations for all the nodes in the current datacenter
var proposed []*structs.Allocation

for {
next := nodeIter.Next()
if next == nil {
break
}

node := next.(*structs.Node)

// we only care about the nodes which have the same datacenter as the current node (the option)
if node.Datacenter != option.Datacenter {
continue
}

// proposed allocations
nodeProposed, err := iter.ctx.ProposedAllocs(node.ID)
if err != nil {
iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: failed to get proposed allocations: %v", err)
return false
}

proposed = append(proposed, nodeProposed...)
}

for _, alloc := range proposed {
// If the job has a balance_datacenters constraint we only need an alloc
// collision on the JobID but if the constraint is on the TaskGroup then
// we need both a job and TaskGroup collision.
jobCollision := alloc.JobID == iter.job.ID
tgCollision := alloc.TaskGroup == iter.tg.Name && jobCollision

if !((iter.jobBalanceDatacenters && jobCollision) || tgCollision) {
continue
}

// skip allocation with DesiredStatus other than running
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
continue
}

allocationsInCurrentDatacenter++
}

// number of allocations per datacenter depends on the constraint location
// if the constraint is set on the job level, it is a sum of all the allocations
// in all the TaskGroups. If it is set on TaskGroup level, it's only for the Count
// of the (current) taskgroup
var allocations int
if iter.tgBalanceDatacenters {
allocations = iter.tg.Count
} else {
for _, tg := range iter.job.TaskGroups {
allocations += tg.Count
}
}

allocationsPerDatacenter := allocationsPerDatacenter(allocations, iter.job.Datacenters)
if c, ok := allocationsPerDatacenter[option.Datacenter]; ok && allocationsInCurrentDatacenter < c {
return true
}

return false
}

func (iter *ProposedAllocConstraintIterator) Reset() {
iter.source.Reset()
}
Expand Down Expand Up @@ -329,6 +441,8 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
switch operand {
case structs.ConstraintDistinctHosts:
return true
case structs.ConstraintBalanceDatacenters:
return true
default:
break
}
Expand Down
191 changes: 191 additions & 0 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,197 @@ func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) {
}
}

func TestPropsedAllocConstraint_JobBalanceDatacenters(t *testing.T) {
store, ctx := testContext(t)

n3 := mock.Node()
n3.Datacenter = "dc2"

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
n3,
}

for i, node := range nodes {
store.UpsertNode(uint64(i), node)
}

static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)

out := collectFeasible(propsed)

if len(out) != 3 {
t.Fatalf("Bad: %#v", out)
}
}

func TestPropsedAllocConstraint_JobBalanceDatacenters_WithRunningJobs(t *testing.T) {
store, ctx := testContext(t)

n3 := mock.Node()
n3.Datacenter = "dc2"

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
n3,
}

for i, node := range nodes {
store.UpsertNode(uint64(i), node)
}

static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: job.ID,
ID: structs.GenerateUUID(),
DesiredStatus: structs.AllocDesiredStatusRun,
},

// This should be ignored since it's another Job
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: "ignore",
ID: structs.GenerateUUID(),
DesiredStatus: structs.AllocDesiredStatusRun,
},
}

propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)

out := collectFeasible(propsed)

if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}

// since there is an allocation on dc1, the yielded node should be on dc2
if out[0].Datacenter != "dc2" {
t.Fatalf("Bad: proposed node is on the wrong datacenter, expected: dc1; got: %s", out[0].Datacenter)
}
}

func TestPropsedAllocConstraint_JobBalanceDatacenters_WithStoppedJobs(t *testing.T) {
store, ctx := testContext(t)

n3 := mock.Node()
n3.Datacenter = "dc2"

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
n3,
}

for i, node := range nodes {
store.UpsertNode(uint64(i), node)
}

static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

plan := ctx.Plan()
// this allocation should be ignored since it's no longer active
plan.NodeAllocation[n3.ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: job.ID,
ID: structs.GenerateUUID(),
DesiredStatus: structs.AllocDesiredStatusStop,
},
}

propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)

out := collectFeasible(propsed)

if len(out) != 3 {
t.Fatalf("Bad: %#v", out)
}
}

func TestPropsedAllocConstraint_JobBalanceDatacenters_InfeasibleDC(t *testing.T) {
store, ctx := testContext(t)

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}

for i, node := range nodes {
store.UpsertNode(uint64(i), node)
}

static := NewStaticIterator(ctx, nodes)

// Create a job with balance constraint
tg1 := &structs.TaskGroup{Name: "bar", Count: 1}
tg2 := &structs.TaskGroup{Name: "baz", Count: 1}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintBalanceDatacenters}},
Datacenters: []string{"dc1", "dc2"},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)

out := collectFeasible(propsed)

// Should only be able to schedule 1 tg on 2 nodes
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test to stack_test.go.

That is where you can really test the behavior is working correctly.


func collectFeasible(iter FeasibleIterator) (out []*structs.Node) {
for {
next := iter.Next()
Expand Down
Loading