Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "distinctHost" constraint #321

Merged
merged 11 commits into from
Oct 26, 2015
15 changes: 15 additions & 0 deletions jobspec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -256,6 +257,20 @@ func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error {
m["RTarget"] = constraint
}

if value, ok := m["unique"]; ok {
enabled, err := strconv.ParseBool(value.(string))
if err != nil {
return err
}

// If it is not enabled, skip the constraint.
if !enabled {
continue
}

m["Operand"] = "unique"
}

// Build the constraint
var c structs.Constraint
if err := mapstructure.WeakDecode(m, &c); err != nil {
Expand Down
18 changes: 18 additions & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ func TestParse(t *testing.T) {
false,
},

{
"unique-constraint.hcl",
&structs.Job{
ID: "foo",
Name: "foo",
Priority: 50,
Region: "global",
Type: "service",
Constraints: []*structs.Constraint{
&structs.Constraint{
Hard: true,
Operand: "unique",
},
},
},
false,
},

{
"specify-job.hcl",
&structs.Job{
Expand Down
5 changes: 5 additions & 0 deletions jobspec/test-fixtures/unique-constraint.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
job "foo" {
constraint {
unique = "true"
}
}
119 changes: 119 additions & 0 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,117 @@ func (iter *DriverIterator) hasDrivers(option *structs.Node) bool {
return true
}

// DynamicConstraintIterator is a FeasibleIterator which returns nodes that
// match constraints that are not static such as Node attributes but are
// effected by alloc placements. Examples are unique and tenancy constraints.
// This is used to filter on job and task group constraints.
type DynamicConstraintIterator struct {
ctx Context
source FeasibleIterator
tg *structs.TaskGroup
job *structs.Job

// Store whether the Job or TaskGroup has unique constraints so they don't
// have to be calculated every time Next() is called.
tgUnique bool
jobUnique bool
}

// NewDynamicConstraintIterator creates a DynamicConstraintIterator from a
// source.
func NewDynamicConstraintIterator(ctx Context, source FeasibleIterator) *DynamicConstraintIterator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to ProposedAllocConstraintIterator

iter := &DynamicConstraintIterator{
ctx: ctx,
source: source,
}
return iter
}

func (iter *DynamicConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
iter.tgUnique = iter.hasUniqueConstraint(tg.Constraints)
}

func (iter *DynamicConstraintIterator) SetJob(job *structs.Job) {
iter.job = job
iter.jobUnique = iter.hasUniqueConstraint(job.Constraints)
}

func (iter *DynamicConstraintIterator) hasUniqueConstraint(constraints []*structs.Constraint) bool {
if constraints == nil {
return false
}

for _, con := range constraints {
if con.Operand == "unique" {
return true
}
}
return false
}

func (iter *DynamicConstraintIterator) Next() *structs.Node {
if iter.job == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to redo these checks as the entire stack is broken if no job or TG

iter.ctx.Logger().Printf("[ERR] sched.dynamic-constraint: job not set")
return nil
}

if iter.tg == nil {
iter.ctx.Logger().Printf("[ERR] sched.dynamic-constraint: task group not set")
return nil
}

for {
// Get the next option from the source
option := iter.source.Next()

// Hot-path if the option is nil or there are no unique constraints.
if option == nil || (!iter.jobUnique && !iter.tgUnique) {
return option
}

if !iter.satisfiesUnique(option, iter.jobUnique) {
iter.ctx.Metrics().FilterNode(option, "unique")
continue
}

return option
}
}

// satisfiesUnique checks if the node satisfies a unique constraint either
// specified at the job level or the TaskGroup level.
func (iter *DynamicConstraintIterator) satisfiesUnique(option *structs.Node, job bool) bool {
// Get the proposed allocations
proposed, err := iter.ctx.ProposedAllocs(option.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigate caching this

if err != nil {
iter.ctx.Logger().Printf(
"[ERR] sched.dynamic-constraint: failed to get proposed allocations: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/sched/scheduler/g

The other log statement in this file says scheduler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

return false
}

// Skip the node if the task group has already been allocated on it.
for _, alloc := range proposed {
jobCollision := alloc.JobID == iter.job.ID
taskCollision := alloc.TaskGroup == iter.tg.Name

// If the job has a unique constraint we only need an alloc collision on
// the JobID but if the constraint is on the TaskGroup then we need both
// a job and TaskGroup collision.
jobInvalid := job && jobCollision
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function should not depend on the callers behavior to implicit rule out the !job and !tg mode.

tgInvalid := !job && jobCollision && taskCollision
if jobInvalid || tgInvalid {
return false
}
}

return true
}

func (iter *DynamicConstraintIterator) Reset() {
iter.source.Reset()
}

// ConstraintIterator is a FeasibleIterator which returns nodes
// that match a given set of constraints. This is used to filter
// on job, task group, and task constraints.
Expand Down Expand Up @@ -257,6 +368,14 @@ func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bo

// checkConstraint checks if a constraint is satisfied
func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
// Check for constraints not handled by this iterator.
switch operand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not merge this with the next one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we add more constraints that are not handled here I want it to be easily distinguishable which iterator is handling which constraints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case "unique":
return true
default:
break
}

switch operand {
case "=", "==", "is":
return reflect.DeepEqual(lVal, rVal)
Expand Down
174 changes: 174 additions & 0 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,180 @@ func TestCheckRegexpConstraint(t *testing.T) {
}
}

func TestDynamicConstraint_JobUnique_Feasible(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a job with a unique constraint and two task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: "unique"}},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(tg1)
dynamic.SetJob(job)

out := collectFeasible(dynamic)
if len(out) != 4 {
t.Fatalf("Bad: %#v", out)
}

selected := make(map[string]struct{}, 4)
for _, option := range out {
if _, ok := selected[option.ID]; ok {
t.Fatalf("selected node %v for more than one alloc", option)
}
selected[option.ID] = struct{}{}
}
}

func TestDynamicConstraint_JobUnique_Infeasible(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a job with a unique constraint and two task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: "unique"}},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}

// Add allocs placing tg1 on node1 and tg2 on node2. This should make the
// job unsatisfiable.
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: job.ID,
},

// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: "ignore 2",
},
}
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: job.ID,
},

// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: "ignore 2",
},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(tg1)
dynamic.SetJob(job)

out := collectFeasible(dynamic)
if len(out) != 0 {
t.Fatalf("Bad: %#v", out)
}
}

func TestDynamicConstraint_JobUnique_InfeasibleCount(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a job with a unique constraint and three task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg3 := &structs.TaskGroup{Name: "bam"}

job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: "unique"}},
TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(tg1)
dynamic.SetJob(job)

// It should not be able to place 3 tasks with only two nodes.
out := collectFeasible(dynamic)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
}

func TestDynamicConstraint_TaskGroupUnique(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

// Create a task group with a unique constraint.
taskGroup := &structs.TaskGroup{
Name: "example",
Constraints: []*structs.Constraint{
{Operand: "unique"},
},
}

// Add a planned alloc to node1.
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: taskGroup.Name,
JobID: "foo",
},
}

// Add a planned alloc to node2 with the same task group name but a
// different job.
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: taskGroup.Name,
JobID: "bar",
},
}

dynamic := NewDynamicConstraintIterator(ctx, static)
dynamic.SetTaskGroup(taskGroup)
dynamic.SetJob(&structs.Job{ID: "foo"})

out := collectFeasible(dynamic)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}

// Expect it to skip the first node as there is a previous alloc on it for
// the same task group.
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
}

func collectFeasible(iter FeasibleIterator) (out []*structs.Node) {
for {
next := iter.Next()
Expand Down
Loading