Skip to content

Commit

Permalink
Implement affinity support in generic scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
preetapan committed Jul 16, 2018
1 parent d53a9b9 commit 2c61158
Show file tree
Hide file tree
Showing 10 changed files with 574 additions and 220 deletions.
7 changes: 4 additions & 3 deletions scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Context interface {
// RegexpCache is a cache of regular expressions
RegexpCache() map[string]*regexp.Regexp

// ConstraintCache is a cache of version constraints
ConstraintCache() map[string]version.Constraints
// VersionConstraintCache is a cache of version constraints
VersionConstraintCache() map[string]version.Constraints

// Eligibility returns a tracker for node eligibility in the context of the
// eval.
Expand All @@ -54,7 +54,8 @@ func (e *EvalCache) RegexpCache() map[string]*regexp.Regexp {
}
return e.reCache
}
func (e *EvalCache) ConstraintCache() map[string]version.Constraints {

func (e *EvalCache) VersionConstraintCache() map[string]version.Constraints {
if e.constraintCache == nil {
e.constraintCache = make(map[string]version.Constraints)
}
Expand Down
72 changes: 58 additions & 14 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,11 @@ func (c *ConstraintChecker) Feasible(option *structs.Node) bool {

func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, option *structs.Node) bool {
// Resolve the targets
lVal, ok := resolveConstraintTarget(constraint.LTarget, option)
lVal, ok := resolveTarget(constraint.LTarget, option)
if !ok {
return false
}
rVal, ok := resolveConstraintTarget(constraint.RTarget, option)
rVal, ok := resolveTarget(constraint.RTarget, option)
if !ok {
return false
}
Expand All @@ -416,8 +416,8 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti
return checkConstraint(c.ctx, constraint.Operand, lVal, rVal)
}

// resolveConstraintTarget is used to resolve the LTarget and RTarget of a Constraint
func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bool) {
// resolveTarget is used to resolve the LTarget and RTarget of a Constraint
func resolveTarget(target string, node *structs.Node) (interface{}, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return target, true
Expand Down Expand Up @@ -470,16 +470,28 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
case "<", "<=", ">", ">=":
return checkLexicalOrder(operand, lVal, rVal)
case structs.ConstraintVersion:
return checkVersionConstraint(ctx, lVal, rVal)
return checkVersionMatch(ctx, lVal, rVal)
case structs.ConstraintRegex:
return checkRegexpConstraint(ctx, lVal, rVal)
return checkRegexpMatch(ctx, lVal, rVal)
case structs.ConstraintSetContains:
return checkSetContainsConstraint(ctx, lVal, rVal)
return checkSetContainsAll(ctx, lVal, rVal)
default:
return false
}
}

// checkAffinity checks if a specific affinity is satisfied
func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool {
switch operand {
case structs.AffinitySetContainsAny:
return checkSetContainsAny(ctx, lVal, rVal)
case structs.AffinitySetContainsAll:
return checkSetContainsAll(ctx, lVal, rVal)
default:
return checkConstraint(ctx, operand, lVal, rVal)
}
}

// checkLexicalOrder is used to check for lexical ordering
func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
// Ensure the values are strings
Expand All @@ -506,9 +518,9 @@ func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
}
}

// checkVersionConstraint is used to compare a version on the
// checkVersionMatch is used to compare a version on the
// left hand side with a set of constraints on the right hand side
func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkVersionMatch(ctx Context, lVal, rVal interface{}) bool {
// Parse the version
var versionStr string
switch v := lVal.(type) {
Expand All @@ -533,7 +545,7 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
}

// Check the cache for a match
cache := ctx.ConstraintCache()
cache := ctx.VersionConstraintCache()
constraints := cache[constraintStr]

// Parse the constraints
Expand All @@ -549,9 +561,9 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
return constraints.Check(vers)
}

// checkRegexpConstraint is used to compare a value on the
// checkRegexpMatch is used to compare a value on the
// left hand side with a regexp on the right hand side
func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
Expand Down Expand Up @@ -582,9 +594,9 @@ func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool {
return re.MatchString(lStr)
}

// checkSetContainsConstraint is used to see if the left hand side contains the
// checkSetContainsAll is used to see if the left hand side contains the
// string on the right hand side
func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkSetContainsAll(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
Expand Down Expand Up @@ -614,6 +626,38 @@ func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool {
return true
}

// checkSetContainsAny is used to see if the left hand side contains any
// values on the right hand side
func checkSetContainsAny(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
return false
}

// RHS must be a string
rStr, ok := rVal.(string)
if !ok {
return false
}

input := strings.Split(lStr, ",")
lookup := make(map[string]struct{}, len(input))
for _, in := range input {
cleaned := strings.TrimSpace(in)
lookup[cleaned] = struct{}{}
}

for _, r := range strings.Split(rStr, ",") {
cleaned := strings.TrimSpace(r)
if _, ok := lookup[cleaned]; ok {
return true
}
}

return false
}

// FeasibilityWrapper is a FeasibleIterator which wraps both job and task group
// FeasibilityCheckers in which feasibility checking can be skipped if the
// computed node class has previously been marked as eligible or ineligible.
Expand Down
6 changes: 3 additions & 3 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestResolveConstraintTarget(t *testing.T) {
}

for _, tc := range cases {
res, ok := resolveConstraintTarget(tc.target, tc.node)
res, ok := resolveTarget(tc.target, tc.node)
if ok != tc.result {
t.Fatalf("TC: %#v, Result: %v %v", tc, res, ok)
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestCheckVersionConstraint(t *testing.T) {
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkVersionConstraint(ctx, tc.lVal, tc.rVal); res != tc.result {
if res := checkVersionMatch(ctx, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestCheckRegexpConstraint(t *testing.T) {
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkRegexpConstraint(ctx, tc.lVal, tc.rVal); res != tc.result {
if res := checkRegexpMatch(ctx, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}
Expand Down
32 changes: 16 additions & 16 deletions scheduler/propertyset.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) {
}

// Filter to the correct set of allocs
allocs = p.filterAllocs(allocs, true)
allocs = filterAllocs(allocs, true, p.taskGroup)

// Get all the nodes that have been used by the allocs
nodes, err := p.buildNodeMap(allocs)
nodes, err := buildNodeMap(p.ctx.State(), allocs)
if err != nil {
p.errorBuilding = err
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
return
}

// Build existing properties map
p.populateProperties(allocs, nodes, p.existingValues)
populateProperties(p.constraint.LTarget, allocs, nodes, p.existingValues)
}

// PopulateProposed populates the proposed values and recomputes any cleared
Expand All @@ -149,31 +149,31 @@ func (p *propertySet) PopulateProposed() {
for _, updates := range p.ctx.Plan().NodeUpdate {
stopping = append(stopping, updates...)
}
stopping = p.filterAllocs(stopping, false)
stopping = filterAllocs(stopping, false, p.taskGroup)

// Gather the proposed allocations
var proposed []*structs.Allocation
for _, pallocs := range p.ctx.Plan().NodeAllocation {
proposed = append(proposed, pallocs...)
}
proposed = p.filterAllocs(proposed, true)
proposed = filterAllocs(proposed, true, p.taskGroup)

// Get the used nodes
both := make([]*structs.Allocation, 0, len(stopping)+len(proposed))
both = append(both, stopping...)
both = append(both, proposed...)
nodes, err := p.buildNodeMap(both)
nodes, err := buildNodeMap(p.ctx.State(), both)
if err != nil {
p.errorBuilding = err
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
return
}

// Populate the cleared values
p.populateProperties(stopping, nodes, p.clearedValues)
populateProperties(p.constraint.LTarget, stopping, nodes, p.clearedValues)

// Populate the proposed values
p.populateProperties(proposed, nodes, p.proposedValues)
populateProperties(p.constraint.LTarget, proposed, nodes, p.proposedValues)

// Remove any cleared value that is now being used by the proposed allocs
for value := range p.proposedValues {
Expand Down Expand Up @@ -247,7 +247,7 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin
// filterAllocs filters a set of allocations to just be those that are running
// and if the property set is operation at a task group level, for allocations
// for that task group
func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation {
func filterAllocs(allocs []*structs.Allocation, filterTerminal bool, taskGroup string) []*structs.Allocation {
n := len(allocs)
for i := 0; i < n; i++ {
remove := false
Expand All @@ -257,8 +257,8 @@ func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal

// If the constraint is on the task group filter the allocations to just
// those on the task group
if p.taskGroup != "" {
remove = remove || allocs[i].TaskGroup != p.taskGroup
if taskGroup != "" {
remove = remove || allocs[i].TaskGroup != taskGroup
}

if remove {
Expand All @@ -272,7 +272,7 @@ func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal

// buildNodeMap takes a list of allocations and returns a map of the nodes used
// by those allocations
func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {
func buildNodeMap(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) {
// Get all the nodes that have been used by the allocs
nodes := make(map[string]*structs.Node)
ws := memdb.NewWatchSet()
Expand All @@ -281,7 +281,7 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st
continue
}

node, err := p.ctx.State().NodeByID(ws, alloc.NodeID)
node, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err)
}
Expand All @@ -294,11 +294,11 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st

// populateProperties goes through all allocations and builds up the used
// properties from the nodes storing the results in the passed properties map.
func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node,
func populateProperties(lTarget string, allocs []*structs.Allocation, nodes map[string]*structs.Node,
properties map[string]uint64) {

for _, alloc := range allocs {
nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget)
nProperty, ok := getProperty(nodes[alloc.NodeID], lTarget)
if !ok {
continue
}
Expand All @@ -313,7 +313,7 @@ func getProperty(n *structs.Node, property string) (string, bool) {
return "", false
}

val, ok := resolveConstraintTarget(property, n)
val, ok := resolveTarget(property, n)
if !ok {
return "", false
}
Expand Down
Loading

0 comments on commit 2c61158

Please sign in to comment.