Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add System Scheduler that runs tasks on every node #287

Merged
merged 10 commits into from
Oct 17, 2015
53 changes: 53 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,59 @@ func Job() *structs.Job {
return job
}

func SystemJob() *structs.Job {
job := &structs.Job{
Region: "global",
ID: structs.GenerateUUID(),
Name: "my-job",
Type: structs.JobTypeSystem,
Priority: 100,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
&structs.Constraint{
Hard: true,
LTarget: "$attr.kernel.name",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "web",
Count: 1,
Tasks: []*structs.Task{
&structs.Task{
Name: "web",
Driver: "exec",
Config: map[string]string{
"command": "/bin/date",
"args": "+%s",
},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
DynamicPorts: []string{"http"},
},
},
},
},
},
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
CreateIndex: 42,
ModifyIndex: 99,
}
return job
}

func Eval() *structs.Evaluation {
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ const (
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
)

const (
Expand Down Expand Up @@ -828,6 +829,12 @@ func (j *Job) Validate() error {
} else {
taskGroups[tg.Name] = idx
}

if j.Type == "system" && tg.Count != 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %d has count %d. Only count of 1 is supported with system scheduler",
idx+1, tg.Count))
}
}

// Validate the task group
Expand Down
15 changes: 14 additions & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"

"github.com/hashicorp/go-version"
Expand Down Expand Up @@ -129,10 +130,22 @@ func (iter *DriverIterator) Reset() {
func (iter *DriverIterator) hasDrivers(option *structs.Node) bool {
for driver := range iter.drivers {
driverStr := fmt.Sprintf("driver.%s", driver)
_, ok := option.Attributes[driverStr]
value, ok := option.Attributes[driverStr]
if !ok {
return false
}

enabled, err := strconv.ParseBool(value)
if err != nil {
iter.ctx.Logger().
Printf("[WARN] scheduler.DriverIterator: node %v has invalid driver setting %v: %v",
option.ID, driverStr, value)
return false
}

if !enabled {
return false
}
}
return true
}
Expand Down
7 changes: 5 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ func TestDriverIterator(t *testing.T) {
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)

nodes[0].Attributes["driver.foo"] = "2"
nodes[2].Attributes["driver.foo"] = "2"
nodes[0].Attributes["driver.foo"] = "1"
nodes[1].Attributes["driver.foo"] = "0"
nodes[2].Attributes["driver.foo"] = "true"
nodes[3].Attributes["driver.foo"] = "False"

drivers := map[string]struct{}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a negative test in as well to check the ParseBool logic

"exec": struct{}{},
Expand Down
121 changes: 7 additions & 114 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul
return s
}

// setStatus is used to update the status of the evaluation
func (s *GenericScheduler) setStatus(status, desc string) error {
s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status)
newEval := s.eval.Copy()
newEval.Status = status
newEval.StatusDescription = desc
if s.nextEval != nil {
newEval.NextEval = s.nextEval.ID
}
return s.planner.UpdateEval(newEval)
}

// Process is used to handle a single evaluation
func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
// Store the evaluation
Expand All @@ -106,7 +94,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return s.setStatus(structs.EvalStatusFailed, desc)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
}

// Retry up to the maxScheduleAttempts
Expand All @@ -116,13 +104,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
}
if err := retryMax(limit, s.process); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return s.setStatus(statusErr.EvalStatus, err.Error())
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
}
return err
}

// Update the status to complete
return s.setStatus(structs.EvalStatusComplete, "")
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "")
}

// process is wrapped in retryMax to iteratively run the handler until we have no
Expand All @@ -143,7 +131,7 @@ func (s *GenericScheduler) process() (bool, error) {
s.ctx = NewEvalContext(s.state, s.plan, s.logger)

// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx, nil)
s.stack = NewGenericStack(s.batch, s.ctx)
if s.job != nil {
s.stack.SetJob(s.job)
}
Expand Down Expand Up @@ -231,7 +219,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Attempt to do the upgrades in place
diff.update = s.inplaceUpdate(diff.update)
diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)

// Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate)
Expand All @@ -240,10 +228,10 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Treat migrations as an eviction and a new placement.
s.evictAndPlace(diff, diff.migrate, allocMigrating, &limit)
s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocMigrating, &limit)

// Treat non in-place updates as an eviction and new placement.
s.evictAndPlace(diff, diff.update, allocUpdating, &limit)
s.limitReached = evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit)

// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
Expand All @@ -254,101 +242,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
return s.computePlacements(diff.place)
}

// evictAndPlace is used to mark allocations for evicts and add them to the placement queue
func (s *GenericScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) {
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc)
diff.place = append(diff.place, a)
}
if n <= *limit {
*limit -= n
} else {
*limit = 0
s.limitReached = true
}
}

// inplaceUpdate attempts to update allocations in-place where possible.
func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple {
n := len(updates)
inplace := 0
for i := 0; i < n; i++ {
// Get the udpate
update := updates[i]

// Check if the task drivers or config has changed, requires
// a rolling upgrade since that cannot be done in-place.
existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name)
if tasksUpdated(update.TaskGroup, existing) {
continue
}

// Get the existing node
node, err := s.state.NodeByID(update.Alloc.NodeID)
if err != nil {
s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v",
s.eval, update.Alloc.NodeID, err)
continue
}
if node == nil {
continue
}

// Set the existing node as the base set
s.stack.SetNodes([]*structs.Node{node})

// Stage an eviction of the current allocation
s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop,
allocInPlace)

// Attempt to match the task group
option, size := s.stack.Select(update.TaskGroup)

// Pop the allocation
s.plan.PopUpdate(update.Alloc)

// Skip if we could not do an in-place update
if option == nil {
continue
}

// Restore the network offers from the existing allocation.
// We do not allow network resources (reserved/dynamic ports)
// to be updated. This is guarded in taskUpdated, so we can
// safely restore those here.
for task, resources := range option.TaskResources {
existing := update.Alloc.TaskResources[task]
resources.Networks = existing.Networks
}

// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *update.Alloc

// Update the allocation
newAlloc.EvalID = s.eval.ID
newAlloc.Job = s.job
newAlloc.Resources = size
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = s.ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
newAlloc.ClientStatus = structs.AllocClientStatusPending
s.plan.AppendAlloc(newAlloc)

// Remove this allocation from the slice
updates[i] = updates[n-1]
i--
n--
inplace++
}
if len(updates) > 0 {
s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates))
}
return updates[:n]
}

// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Get the base nodes
Expand Down
6 changes: 3 additions & 3 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestServiceSched_JobRegister(t *testing.T) {
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to deregister the job
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to deregister the job
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Expand Down Expand Up @@ -550,7 +550,7 @@ func TestServiceSched_RetryLimit(t *testing.T) {
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to deregister the job
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
var BuiltinSchedulers = map[string]Factory{
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
}

// NewScheduler is used to instantiate and return a new scheduler
Expand Down
Loading