Skip to content

Commit

Permalink
core: implement system batch scheduler
Browse files Browse the repository at this point in the history
This PR implements a new "System Batch" scheduler type. Jobs can
make use of this new scheduler by setting their type to 'sysbatch'.

Like the name implies, sysbatch can be thought of as a hybrid between
system and batch jobs - it is for running short lived jobs intended to
run on every compatible node in the cluster.

As with batch jobs, sysbatch jobs can also be periodic and/or parameterized
dispatch jobs. A sysbatch job is considered complete when it has been run
on all compatible nodes until reaching a terminal state (success or failed
on retries).

Feasibility and preemption are governed the same as with system jobs. In
this PR, the update stanza is not yet supported. The update stanza is sill
limited in functionality for the underlying system scheduler, and is
not useful yet for sysbatch jobs. Further work in #4740 will improve
support for the update stanza and deployments.

Closes #2527
  • Loading branch information
shoenig committed Oct 28, 2020
1 parent 595b450 commit 9c7624c
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ FEATURES:
* **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)]
* **Namespaces OSS**: Namespaces are now available in open source Nomad. [[GH-9135](https://github.com/hashicorp/nomad/issues/9135)]
* **Topology Visualization**: See all of the clients and allocations in a cluster at once. [[GH-9077](https://github.com/hashicorp/nomad/issues/9077)]
* **System Batch Scheduling**: New `sysbatch` scheduler type for running short lived jobs across all nodes. [[GH-9160](https://github.com/hashicorp/nomad/pull/9160)]

IMPROVEMENTS:
* core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)]
Expand Down
18 changes: 11 additions & 7 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25

ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true

// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
Expand Down Expand Up @@ -196,7 +200,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {
Expand Down
6 changes: 6 additions & 0 deletions helper/uuid/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ func Generate() string {
buf[8:10],
buf[10:16])
}

// Short is used to generate a random shortened UUID.
func Short() string {
id := Generate()
return id[len(id)-8:]
}
5 changes: 2 additions & 3 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ OUTER:
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
if err != nil {
continue OUTER
}

if gc {
} else if gc {
jobEval = append(jobEval, eval.ID)
jobAlloc = append(jobAlloc, allocs...)
} else {
Expand All @@ -160,6 +158,7 @@ OUTER:
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}

c.logger.Debug("job GC found eligible objects",
"jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc))

Expand Down
40 changes: 40 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,46 @@ func HCL() string {
`
}

func SystemBatchJob() *structs.Job {
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-sysbatch-%s", uuid.Short()),
Name: "my-sysbatch",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeSysBatch,
Priority: 10,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{{
Count: 1,
Name: "pings",
Tasks: []*structs.Task{{
Name: "ping-example",
Driver: "exec",
Config: map[string]interface{}{
"command": "/usr/bin/ping",
"args": []string{"-c", "5", "example.com"},
},
LogConfig: structs.DefaultLogConfig(),
}},
}},

Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}

func Job() *structs.Job {
job := &structs.Job{
Region: "global",
Expand Down
13 changes: 8 additions & 5 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,16 @@ func jobIsGCable(obj interface{}) (bool, error) {
return true, nil
}

// Otherwise, only batch jobs are eligible because they complete on their
// own without a user stopping them.
if j.Type != structs.JobTypeBatch {
switch j.Type {
// Otherwise, batch and sysbatch jobs are eligible because they complete on
// their own without a user stopping them.
case structs.JobTypeBatch, structs.JobTypeSysBatch:
return true, nil

default:
// other job types may not be GC until stopped
return false, nil
}

return true, nil
}

// jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,7 +1963,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m
return iter, nil
}

// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
// JobsByGC returns an iterator over all jobs eligible or ineligible for garbage
// collection.
func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
Expand Down
27 changes: 15 additions & 12 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3759,10 +3759,11 @@ func (c *ComparableResources) NetIndex(n *NetworkResource) int {
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
JobTypeSysBatch = "sysbatch"
)

const (
Expand Down Expand Up @@ -4025,7 +4026,7 @@ func (j *Job) Validate() error {
mErr.Errors = append(mErr.Errors, errors.New("Job must be in a namespace"))
}
switch j.Type {
case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem:
case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem, JobTypeSysBatch:
case "":
mErr.Errors = append(mErr.Errors, errors.New("Missing job type"))
default:
Expand Down Expand Up @@ -4117,11 +4118,12 @@ func (j *Job) Validate() error {
}
}

// Validate periodic is only used with batch jobs.
// Validate periodic is only used with batch or sysbatch jobs.
if j.IsPeriodic() && j.Periodic.Enabled {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch {
mErr.Errors = append(mErr.Errors, fmt.Errorf(
"Periodic can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch,
))
}

if err := j.Periodic.Validate(); err != nil {
Expand All @@ -4130,9 +4132,10 @@ func (j *Job) Validate() error {
}

if j.IsParameterized() {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch))
if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch {
mErr.Errors = append(mErr.Errors, fmt.Errorf(
"Parameterized job can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch,
))
}

if err := j.ParameterizedJob.Validate(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"

// allocNodeTainted is the status used when stopping an alloc because it's
// allocNodeTainted is the status used when stopping an alloc because its
// node is tainted.
allocNodeTainted = "alloc not needed as node is tainted"

Expand Down
4 changes: 2 additions & 2 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type RankedNode struct {
TaskLifecycles map[string]*structs.TaskLifecycleConfig
AllocResources *structs.AllocatedSharedResources

// Allocs is used to cache the proposed allocations on the
// Proposed is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
Proposed []*structs.Allocation

Expand Down Expand Up @@ -60,7 +60,7 @@ func (r *RankedNode) SetTaskResources(task *structs.Task,
r.TaskLifecycles[task.Name] = task.Lifecycle
}

// RankFeasibleIterator is used to iteratively yield nodes along
// RankIterator is used to iteratively yield nodes along
// with ranking metadata. The iterators may manage some state for
// performance optimizations.
type RankIterator interface {
Expand Down
7 changes: 4 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ const (
// BuiltinSchedulers contains the built in registered schedulers
// which are available
var BuiltinSchedulers = map[string]Factory{
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
"sysbatch": NewSysBatchScheduler,
}

// NewScheduler is used to instantiate and return a new scheduler
Expand Down
13 changes: 9 additions & 4 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,13 @@ func NewSystemStack(ctx Context) *SystemStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint,
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)

Expand Down Expand Up @@ -360,11 +363,13 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)

Expand Down
58 changes: 39 additions & 19 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,31 @@ const (
// we will attempt to schedule if we continue to hit conflicts for system
// jobs.
maxSystemScheduleAttempts = 5

// maxSysBatchScheduleAttempts is used to limit the number of times we will
// attempt to schedule if we continue to hit conflicts for sysbatch jobs.
maxSysBatchScheduleAttempts = 2
)

// SystemScheduler is used for 'system' jobs. This scheduler is
// designed for services that should be run on every client.
// One for each job, containing an allocation for each node
// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is
// designed for jobs that should be run on every client. The 'system' mode
// will ensure those jobs continuously run regardless of successful task exits,
// whereas 'sysbatch' marks the task complete on success.
type SystemScheduler struct {
logger log.Logger
state State
planner Planner
logger log.Logger
state State
planner Planner
sysbatch bool

eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int

nodes []*structs.Node
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand All @@ -44,9 +51,19 @@ type SystemScheduler struct {
// scheduler.
func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler {
return &SystemScheduler{
logger: logger.Named("system_sched"),
state: state,
planner: planner,
logger: logger.Named("system_sched"),
state: state,
planner: planner,
sysbatch: false,
}
}

func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler {
return &SystemScheduler{
logger: logger.Named("sysbatch_sched"),
state: state,
planner: planner,
sysbatch: true,
}
}

Expand All @@ -71,9 +88,14 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
s.queuedAllocs, "")
}

limit := maxSystemScheduleAttempts
if s.sysbatch {
limit = maxSysBatchScheduleAttempts
}

// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if err := retryMax(limit, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(),
s.queuedAllocs, "")
Expand All @@ -94,9 +116,9 @@ func (s *SystemScheduler) process() (bool, error) {
ws := memdb.NewWatchSet()
s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err)
}

numTaskGroups := 0
if !s.job.Stopped() {
numTaskGroups = len(s.job.TaskGroups)
Expand Down Expand Up @@ -185,19 +207,17 @@ func (s *SystemScheduler) computeJobAllocs() error {
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err)
}

// Determine the tainted nodes containing job allocs
tainted, err := taintedNodes(s.state, allocs)
if err != nil {
return fmt.Errorf("failed to get tainted nodes for job '%s': %v",
s.eval.JobID, err)
return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err)
}

// Update the allocations which are in pending/running state on tainted
// nodes to lost
// nodes to lost.
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)

// Filter out the allocations in a terminal state
Expand Down
Loading

0 comments on commit 9c7624c

Please sign in to comment.