Skip to content

Commit

Permalink
core: implement system batch scheduler
Browse files Browse the repository at this point in the history
This PR implements a new "System Batch" scheduler type. Jobs can
make use of this new scheduler by setting their type to 'sysbatch'.

Like the name implies, sysbatch can be thought of as a hybrid between
system and batch jobs - it is for running short lived jobs intended to
run on every compatible node in the cluster.

As with batch jobs, sysbatch jobs can also be periodic and/or parameterized
dispatch jobs. A sysbatch job is considered complete when it has been run
on all compatible nodes until reaching a terminal state (success or failed
on retries).

Feasibility, rolling updates, and preemption are governed the same as
with system jobs.

Closes #2527
  • Loading branch information
shoenig committed Oct 22, 2020
1 parent 8a90b7e commit fe3fb7a
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 57 deletions.
18 changes: 11 additions & 7 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25

ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true

// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
Expand Down Expand Up @@ -196,7 +200,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {
Expand Down
5 changes: 2 additions & 3 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ OUTER:
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
if err != nil {
continue OUTER
}

if gc {
} else if gc {
jobEval = append(jobEval, eval.ID)
jobAlloc = append(jobAlloc, allocs...)
} else {
Expand All @@ -160,6 +158,7 @@ OUTER:
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}

c.logger.Debug("job GC found eligible objects",
"jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc))

Expand Down
13 changes: 8 additions & 5 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,16 @@ func jobIsGCable(obj interface{}) (bool, error) {
return true, nil
}

// Otherwise, only batch jobs are eligible because they complete on their
// own without a user stopping them.
if j.Type != structs.JobTypeBatch {
switch j.Type {
// Otherwise, batch and sysbatch jobs are eligible because they complete on
// their own without a user stopping them.
case structs.JobTypeBatch, structs.JobTypeSysBatch:
return true, nil

default:
// other job types may not be GC until stopped
return false, nil
}

return true, nil
}

// jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m
return iter, nil
}

// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
// JobsByGC returns an iterator over all jobs eligible or ineligible for garbage
// collection.
func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
Expand Down
27 changes: 15 additions & 12 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3744,10 +3744,11 @@ func (c *ComparableResources) NetIndex(n *NetworkResource) int {
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
JobTypeSysBatch = "sysbatch"
)

const (
Expand Down Expand Up @@ -4010,7 +4011,7 @@ func (j *Job) Validate() error {
mErr.Errors = append(mErr.Errors, errors.New("Job must be in a namespace"))
}
switch j.Type {
case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem:
case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem, JobTypeSysBatch:
case "":
mErr.Errors = append(mErr.Errors, errors.New("Missing job type"))
default:
Expand Down Expand Up @@ -4102,11 +4103,12 @@ func (j *Job) Validate() error {
}
}

// Validate periodic is only used with batch jobs.
// Validate periodic is only used with batch or sysbatch jobs.
if j.IsPeriodic() && j.Periodic.Enabled {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch {
mErr.Errors = append(mErr.Errors, fmt.Errorf(
"Periodic can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch,
))
}

if err := j.Periodic.Validate(); err != nil {
Expand All @@ -4115,9 +4117,10 @@ func (j *Job) Validate() error {
}

if j.IsParameterized() {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch))
if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch {
mErr.Errors = append(mErr.Errors, fmt.Errorf(
"Parameterized job can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch,
))
}

if err := j.ParameterizedJob.Validate(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"

// allocNodeTainted is the status used when stopping an alloc because it's
// allocNodeTainted is the status used when stopping an alloc because its
// node is tainted.
allocNodeTainted = "alloc not needed as node is tainted"

Expand Down
4 changes: 2 additions & 2 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type RankedNode struct {
TaskLifecycles map[string]*structs.TaskLifecycleConfig
AllocResources *structs.AllocatedSharedResources

// Allocs is used to cache the proposed allocations on the
// Proposed is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
Proposed []*structs.Allocation

Expand Down Expand Up @@ -60,7 +60,7 @@ func (r *RankedNode) SetTaskResources(task *structs.Task,
r.TaskLifecycles[task.Name] = task.Lifecycle
}

// RankFeasibleIterator is used to iteratively yield nodes along
// RankIterator is used to iteratively yield nodes along
// with ranking metadata. The iterators may manage some state for
// performance optimizations.
type RankIterator interface {
Expand Down
7 changes: 4 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ const (
// BuiltinSchedulers contains the built in registered schedulers
// which are available
var BuiltinSchedulers = map[string]Factory{
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
"sysbatch": NewSysBatchScheduler,
}

// NewScheduler is used to instantiate and return a new scheduler
Expand Down
13 changes: 9 additions & 4 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,13 @@ func NewSystemStack(ctx Context) *SystemStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint,
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)

Expand Down Expand Up @@ -360,11 +363,13 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)

Expand Down
57 changes: 38 additions & 19 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,31 @@ const (
// we will attempt to schedule if we continue to hit conflicts for system
// jobs.
maxSystemScheduleAttempts = 5

// maxSysBatchScheduleAttempts is used to limit the number of times we will
// attempt to schedule if we continue to hit conflicts for sysbatch jobs.
maxSysBatchScheduleAttempts = 2
)

// SystemScheduler is used for 'system' jobs. This scheduler is
// designed for services that should be run on every client.
// One for each job, containing an allocation for each node
// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is
// designed for jobs that should be run on every client. The 'system' mode
// will ensure those jobs continuously run regardless of successful task exits,
// whereas 'sysbatch' marks the task complete on success.
type SystemScheduler struct {
logger log.Logger
state State
planner Planner
logger log.Logger
state State
planner Planner
sysbatch bool

eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int

nodes []*structs.Node
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand All @@ -44,9 +51,19 @@ type SystemScheduler struct {
// scheduler.
func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler {
return &SystemScheduler{
logger: logger.Named("system_sched"),
state: state,
planner: planner,
logger: logger.Named("system_sched"),
state: state,
planner: planner,
sysbatch: false,
}
}

func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler {
return &SystemScheduler{
logger: logger.Named("sysbatch_sched"),
state: state,
planner: planner,
sysbatch: true,
}
}

Expand All @@ -71,9 +88,14 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
s.queuedAllocs, "")
}

limit := maxSystemScheduleAttempts
if s.sysbatch {
limit = maxSysBatchScheduleAttempts
}

// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if err := retryMax(limit, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(),
s.queuedAllocs, "")
Expand All @@ -94,8 +116,7 @@ func (s *SystemScheduler) process() (bool, error) {
ws := memdb.NewWatchSet()
s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err)
}
numTaskGroups := 0
if !s.job.Stopped() {
Expand Down Expand Up @@ -185,19 +206,17 @@ func (s *SystemScheduler) computeJobAllocs() error {
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err)
}

// Determine the tainted nodes containing job allocs
tainted, err := taintedNodes(s.state, allocs)
if err != nil {
return fmt.Errorf("failed to get tainted nodes for job '%s': %v",
s.eval.JobID, err)
return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err)
}

// Update the allocations which are in pending/running state on tainted
// nodes to lost
// nodes to lost.
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)

// Filter out the allocations in a terminal state
Expand Down

0 comments on commit fe3fb7a

Please sign in to comment.