Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "distinctHost" constraint #321

Merged
merged 11 commits into from
Oct 26, 2015
15 changes: 15 additions & 0 deletions jobspec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -256,6 +257,20 @@ func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error {
m["RTarget"] = constraint
}

if value, ok := m["distinctHosts"]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing in HCL uses camal case

enabled, err := strconv.ParseBool(value.(string))
if err != nil {
return err
}

// If it is not enabled, skip the constraint.
if !enabled {
continue
}

m["Operand"] = "distinctHosts"
}

// Build the constraint
var c structs.Constraint
if err := mapstructure.WeakDecode(m, &c); err != nil {
Expand Down
18 changes: 18 additions & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ func TestParse(t *testing.T) {
false,
},

{
"distinctHosts-constraint.hcl",
&structs.Job{
ID: "foo",
Name: "foo",
Priority: 50,
Region: "global",
Type: "service",
Constraints: []*structs.Constraint{
&structs.Constraint{
Hard: true,
Operand: "distinctHosts",
},
},
},
false,
},

{
"specify-job.hcl",
&structs.Job{
Expand Down
5 changes: 5 additions & 0 deletions jobspec/test-fixtures/distinctHosts-constraint.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
job "foo" {
constraint {
distinctHosts = "true"
}
}
119 changes: 119 additions & 0 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,117 @@ func (iter *DriverIterator) hasDrivers(option *structs.Node) bool {
return true
}

// DynamicConstraintIterator is a FeasibleIterator which returns nodes that
// match constraints that are not static such as Node attributes but are
// effected by alloc placements. Examples are distinctHosts and tenancy constraints.
// This is used to filter on job and task group constraints.
type DynamicConstraintIterator struct {
ctx Context
source FeasibleIterator
tg *structs.TaskGroup
job *structs.Job

// Store whether the Job or TaskGroup has a distinctHosts constraints so
// they don't have to be calculated every time Next() is called.
tgDistinctHosts bool
jobDistinctHosts bool
}

// NewDynamicConstraintIterator creates a DynamicConstraintIterator from a
// source.
func NewDynamicConstraintIterator(ctx Context, source FeasibleIterator) *DynamicConstraintIterator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to ProposedAllocConstraintIterator

iter := &DynamicConstraintIterator{
ctx: ctx,
source: source,
}
return iter
}

func (iter *DynamicConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints)
}

func (iter *DynamicConstraintIterator) SetJob(job *structs.Job) {
iter.job = job
iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints)
}

func (iter *DynamicConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool {
if constraints == nil {
return false
}

for _, con := range constraints {
if con.Operand == "distinctHosts" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a const in structs

return true
}
}
return false
}

func (iter *DynamicConstraintIterator) Next() *structs.Node {
if iter.job == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to redo these checks as the entire stack is broken if no job or TG

iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: job not set")
return nil
}

if iter.tg == nil {
iter.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: task group not set")
return nil
}

for {
// Get the next option from the source
option := iter.source.Next()

// Hot-path if the option is nil or there are no distinctHosts constraints.
if option == nil || (!iter.jobDistinctHosts && !iter.tgDistinctHosts) {
return option
}

if !iter.satisfiesDistinctHosts(option, iter.jobDistinctHosts) {
iter.ctx.Metrics().FilterNode(option, "distinctHosts")
continue
}

return option
}
}

// satisfiesDistinctHosts checks if the node satisfies a distinctHosts
// constraint either specified at the job level or the TaskGroup level.
func (iter *DynamicConstraintIterator) satisfiesDistinctHosts(option *structs.Node, job bool) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job argument is not necessary

// Get the proposed allocations
proposed, err := iter.ctx.ProposedAllocs(option.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigate caching this

if err != nil {
iter.ctx.Logger().Printf(
"[ERR] scheduler.dynamic-constraint: failed to get proposed allocations: %v", err)
return false
}

// Skip the node if the task group has already been allocated on it.
for _, alloc := range proposed {
jobCollision := alloc.JobID == iter.job.ID
taskCollision := alloc.TaskGroup == iter.tg.Name

// If the job has a distinctHosts constraint we only need an alloc
// collision on the JobID but if the constraint is on the TaskGroup then
// we need both a job and TaskGroup collision.
jobInvalid := job && jobCollision
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function should not depend on the callers behavior to implicit rule out the !job and !tg mode.

tgInvalid := !job && jobCollision && taskCollision
if jobInvalid || tgInvalid {
return false
}
}

return true
}

func (iter *DynamicConstraintIterator) Reset() {
iter.source.Reset()
}

// ConstraintIterator is a FeasibleIterator which returns nodes
// that match a given set of constraints. This is used to filter
// on job, task group, and task constraints.
Expand Down Expand Up @@ -257,6 +368,14 @@ func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bo

// checkConstraint checks if a constraint is satisfied
func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
// Check for constraints not handled by this iterator.
switch operand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not merge this with the next one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we add more constraints that are not handled here I want it to be easily distinguishable which iterator is handling which constraints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case "distinctHosts":
return true
default:
break
}

switch operand {
case "=", "==", "is":
return reflect.DeepEqual(lVal, rVal)
Expand Down
174 changes: 174 additions & 0 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,180 @@ func TestCheckRegexpConstraint(t *testing.T) {
}
}

func TestDynamicConstraint_JobDistinctHosts(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a job with a distinctHosts constraint and two task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: "distinctHosts"}},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(tg1)
dynamic.SetJob(job)

out := collectFeasible(dynamic)
if len(out) != 4 {
t.Fatalf("Bad: %#v", out)
}

selected := make(map[string]struct{}, 4)
for _, option := range out {
if _, ok := selected[option.ID]; ok {
t.Fatalf("selected node %v for more than one alloc", option)
}
selected[option.ID] = struct{}{}
}
}

func TestDynamicConstraint_JobDistinctHosts_Infeasible(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a job with a distinctHosts constraint and two task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: "distinctHosts"}},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

// Add allocs placing tg1 on node1 and tg2 on node2. This should make the
// job unsatisfiable.
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: job.ID,
},

// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: "ignore 2",
},
}
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: job.ID,
},

// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: "ignore 2",
},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(tg1)
dynamic.SetJob(job)

out := collectFeasible(dynamic)
if len(out) != 0 {
t.Fatalf("Bad: %#v", out)
}
}

func TestDynamicConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a job with a distinctHosts constraint and three task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg3 := &structs.TaskGroup{Name: "bam"}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: "distinctHosts"}},
TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(tg1)
dynamic.SetJob(job)

// It should not be able to place 3 tasks with only two nodes.
out := collectFeasible(dynamic)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
}

func TestDynamicConstraint_TaskGroupDistinctHosts(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a task group with a distinctHosts constraint.
taskGroup := &structs.TaskGroup{
Name: "example",
Constraints: []*structs.Constraint{
{Operand: "distinctHosts"},
},
}

// Add a planned alloc to node1.
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: taskGroup.Name,
JobID: "foo",
},
}

// Add a planned alloc to node2 with the same task group name but a
// different job.
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: taskGroup.Name,
JobID: "bar",
},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(taskGroup)
dynamic.SetJob(&structs.Job{ID: "foo"})

out := collectFeasible(dynamic)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}

// Expect it to skip the first node as there is a previous alloc on it for
// the same task group.
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
}

func collectFeasible(iter FeasibleIterator) (out []*structs.Node) {
for {
next := iter.Next()
Expand Down
Loading