From 9c7624c6df1411210ac2595fbf7deb7cc71c7ccd Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 9 Oct 2020 16:31:38 -0500 Subject: [PATCH] core: implement system batch scheduler This PR implements a new "System Batch" scheduler type. Jobs can make use of this new scheduler by setting their type to 'sysbatch'. Like the name implies, sysbatch can be thought of as a hybrid between system and batch jobs - it is for running short lived jobs intended to run on every compatible node in the cluster. As with batch jobs, sysbatch jobs can also be periodic and/or parameterized dispatch jobs. A sysbatch job is considered complete when it has been run on all compatible nodes until reaching a terminal state (success or failed on retries). Feasibility and preemption are governed the same as with system jobs. In this PR, the update stanza is not yet supported. The update stanza is sill limited in functionality for the underlying system scheduler, and is not useful yet for sysbatch jobs. Further work in #4740 will improve support for the update stanza and deployments. Closes #2527 --- CHANGELOG.md | 1 + .../taskrunner/restarts/restarts.go | 18 ++-- helper/uuid/uuid.go | 6 ++ nomad/core_sched.go | 5 +- nomad/mock/mock.go | 40 +++++++++ nomad/state/schema.go | 13 +-- nomad/state/state_store.go | 2 +- nomad/structs/structs.go | 27 +++--- scheduler/generic_sched.go | 2 +- scheduler/rank.go | 4 +- scheduler/scheduler.go | 7 +- scheduler/stack.go | 13 ++- scheduler/system_sched.go | 58 ++++++++---- scheduler/util.go | 88 ++++++++++++++----- scheduler/util_test.go | 65 ++++++++++++++ website/pages/docs/job-specification/job.mdx | 2 +- .../docs/job-specification/reschedule.mdx | 4 +- .../pages/docs/job-specification/restart.mdx | 12 +-- website/pages/docs/schedulers.mdx | 30 ++++++- 19 files changed, 304 insertions(+), 93 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 318f2f9f3f66..283afda5e87b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ FEATURES: * **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)] * **Namespaces OSS**: Namespaces are now available in open source Nomad. [[GH-9135](https://github.com/hashicorp/nomad/issues/9135)] * **Topology Visualization**: See all of the clients and allocations in a cluster at once. [[GH-9077](https://github.com/hashicorp/nomad/issues/9077)] +* **System Batch Scheduling**: New `sysbatch` scheduler type for running short lived jobs across all nodes. [[GH-9160](https://github.com/hashicorp/nomad/pull/9160)] IMPROVEMENTS: * core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)] diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index 6ee0056ccd8b..429ee07a0384 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -14,15 +14,19 @@ const ( // jitter is the percent of jitter added to restart delays. jitter = 0.25 - ReasonNoRestartsAllowed = "Policy allows no restarts" - ReasonUnrecoverableErrror = "Error was unrecoverable" - ReasonWithinPolicy = "Restart within policy" - ReasonDelay = "Exceeded allowed attempts, applying a delay" + ReasonNoRestartsAllowed = "Policy allows no restarts" + ReasonUnrecoverableError = "Error was unrecoverable" + ReasonWithinPolicy = "Restart within policy" + ReasonDelay = "Exceeded allowed attempts, applying a delay" ) func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker { - // Batch jobs should not restart if they exit successfully - onSuccess := jobType != structs.JobTypeBatch + onSuccess := true + + // Batch & SysBatch jobs should not restart if they exit successfully + if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch { + onSuccess = false + } // Prestart sidecars should get restarted on success if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart { @@ -196,7 +200,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) { if r.startErr != nil { // If the error is not recoverable, do not restart. if !structs.IsRecoverable(r.startErr) { - r.reason = ReasonUnrecoverableErrror + r.reason = ReasonUnrecoverableError return structs.TaskNotRestarting, 0 } } else if r.exitRes != nil { diff --git a/helper/uuid/uuid.go b/helper/uuid/uuid.go index 145c817803d0..c0eec178ea9d 100644 --- a/helper/uuid/uuid.go +++ b/helper/uuid/uuid.go @@ -19,3 +19,9 @@ func Generate() string { buf[8:10], buf[10:16]) } + +// Short is used to generate a random shortened UUID. +func Short() string { + id := Generate() + return id[len(id)-8:] +} diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1ac135d0aaea..eb796f66bcaa 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -136,9 +136,7 @@ OUTER: gc, allocs, err := c.gcEval(eval, oldThreshold, true) if err != nil { continue OUTER - } - - if gc { + } else if gc { jobEval = append(jobEval, eval.ID) jobAlloc = append(jobAlloc, allocs...) } else { @@ -160,6 +158,7 @@ OUTER: if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 { return nil } + c.logger.Debug("job GC found eligible objects", "jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc)) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 6e412b165812..2390e82013c5 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -172,6 +172,46 @@ func HCL() string { ` } +func SystemBatchJob() *structs.Job { + job := &structs.Job{ + Region: "global", + ID: fmt.Sprintf("mock-sysbatch-%s", uuid.Short()), + Name: "my-sysbatch", + Namespace: structs.DefaultNamespace, + Type: structs.JobTypeSysBatch, + Priority: 10, + Datacenters: []string{"dc1"}, + Constraints: []*structs.Constraint{ + { + LTarget: "${attr.kernel.name}", + RTarget: "linux", + Operand: "=", + }, + }, + TaskGroups: []*structs.TaskGroup{{ + Count: 1, + Name: "pings", + Tasks: []*structs.Task{{ + Name: "ping-example", + Driver: "exec", + Config: map[string]interface{}{ + "command": "/usr/bin/ping", + "args": []string{"-c", "5", "example.com"}, + }, + LogConfig: structs.DefaultLogConfig(), + }}, + }}, + + Status: structs.JobStatusPending, + Version: 0, + CreateIndex: 42, + ModifyIndex: 99, + JobModifyIndex: 99, + } + job.Canonicalize() + return job +} + func Job() *structs.Job { job := &structs.Job{ Region: "global", diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 923b44617139..8178ec515f34 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -271,13 +271,16 @@ func jobIsGCable(obj interface{}) (bool, error) { return true, nil } - // Otherwise, only batch jobs are eligible because they complete on their - // own without a user stopping them. - if j.Type != structs.JobTypeBatch { + switch j.Type { + // Otherwise, batch and sysbatch jobs are eligible because they complete on + // their own without a user stopping them. + case structs.JobTypeBatch, structs.JobTypeSysBatch: + return true, nil + + default: + // other job types may not be GC until stopped return false, nil } - - return true, nil } // jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3fbc59fb144b..acc4370b8c38 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1963,7 +1963,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m return iter, nil } -// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage +// JobsByGC returns an iterator over all jobs eligible or ineligible for garbage // collection. func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 501b4a4bc768..b086d8acbce4 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3759,10 +3759,11 @@ func (c *ComparableResources) NetIndex(n *NetworkResource) int { const ( // JobTypeNomad is reserved for internal system tasks and is // always handled by the CoreScheduler. - JobTypeCore = "_core" - JobTypeService = "service" - JobTypeBatch = "batch" - JobTypeSystem = "system" + JobTypeCore = "_core" + JobTypeService = "service" + JobTypeBatch = "batch" + JobTypeSystem = "system" + JobTypeSysBatch = "sysbatch" ) const ( @@ -4025,7 +4026,7 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, errors.New("Job must be in a namespace")) } switch j.Type { - case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem: + case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem, JobTypeSysBatch: case "": mErr.Errors = append(mErr.Errors, errors.New("Missing job type")) default: @@ -4117,11 +4118,12 @@ func (j *Job) Validate() error { } } - // Validate periodic is only used with batch jobs. + // Validate periodic is only used with batch or sysbatch jobs. if j.IsPeriodic() && j.Periodic.Enabled { - if j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) + if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch { + mErr.Errors = append(mErr.Errors, fmt.Errorf( + "Periodic can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch, + )) } if err := j.Periodic.Validate(); err != nil { @@ -4130,9 +4132,10 @@ func (j *Job) Validate() error { } if j.IsParameterized() { - if j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch)) + if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch { + mErr.Errors = append(mErr.Errors, fmt.Errorf( + "Parameterized job can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch, + )) } if err := j.ParameterizedJob.Validate(); err != nil { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index c67eafad870a..b933deb1eb21 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -36,7 +36,7 @@ const ( // allocInPlace is the status used when speculating on an in-place update allocInPlace = "alloc updating in-place" - // allocNodeTainted is the status used when stopping an alloc because it's + // allocNodeTainted is the status used when stopping an alloc because its // node is tainted. allocNodeTainted = "alloc not needed as node is tainted" diff --git a/scheduler/rank.go b/scheduler/rank.go index 1653d9cf9067..ec4b2635d423 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -24,7 +24,7 @@ type RankedNode struct { TaskLifecycles map[string]*structs.TaskLifecycleConfig AllocResources *structs.AllocatedSharedResources - // Allocs is used to cache the proposed allocations on the + // Proposed is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. Proposed []*structs.Allocation @@ -60,7 +60,7 @@ func (r *RankedNode) SetTaskResources(task *structs.Task, r.TaskLifecycles[task.Name] = task.Lifecycle } -// RankFeasibleIterator is used to iteratively yield nodes along +// RankIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. type RankIterator interface { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a950690db44f..d1bbfa4c3e41 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -21,9 +21,10 @@ const ( // BuiltinSchedulers contains the built in registered schedulers // which are available var BuiltinSchedulers = map[string]Factory{ - "service": NewServiceScheduler, - "batch": NewBatchScheduler, - "system": NewSystemScheduler, + "service": NewServiceScheduler, + "batch": NewBatchScheduler, + "system": NewSystemScheduler, + "sysbatch": NewSysBatchScheduler, } // NewScheduler is used to instantiate and return a new scheduler diff --git a/scheduler/stack.go b/scheduler/stack.go index bccabc7899ab..96ce92713bfd 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -237,10 +237,13 @@ func NewSystemStack(ctx Context) *SystemStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, + tgs := []FeasibilityChecker{ + s.taskGroupDrivers, + s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices, - s.taskGroupNetwork} + s.taskGroupNetwork, + } avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) @@ -360,11 +363,13 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, + tgs := []FeasibilityChecker{ + s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices, - s.taskGroupNetwork} + s.taskGroupNetwork, + } avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 4b1e5c8cbfaa..c4c3826481cf 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -14,15 +14,21 @@ const ( // we will attempt to schedule if we continue to hit conflicts for system // jobs. maxSystemScheduleAttempts = 5 + + // maxSysBatchScheduleAttempts is used to limit the number of times we will + // attempt to schedule if we continue to hit conflicts for sysbatch jobs. + maxSysBatchScheduleAttempts = 2 ) -// SystemScheduler is used for 'system' jobs. This scheduler is -// designed for services that should be run on every client. -// One for each job, containing an allocation for each node +// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is +// designed for jobs that should be run on every client. The 'system' mode +// will ensure those jobs continuously run regardless of successful task exits, +// whereas 'sysbatch' marks the task complete on success. type SystemScheduler struct { - logger log.Logger - state State - planner Planner + logger log.Logger + state State + planner Planner + sysbatch bool eval *structs.Evaluation job *structs.Job @@ -30,8 +36,9 @@ type SystemScheduler struct { planResult *structs.PlanResult ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + + nodes []*structs.Node + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -44,9 +51,19 @@ type SystemScheduler struct { // scheduler. func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler { return &SystemScheduler{ - logger: logger.Named("system_sched"), - state: state, - planner: planner, + logger: logger.Named("system_sched"), + state: state, + planner: planner, + sysbatch: false, + } +} + +func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { + return &SystemScheduler{ + logger: logger.Named("sysbatch_sched"), + state: state, + planner: planner, + sysbatch: true, } } @@ -71,9 +88,14 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { s.queuedAllocs, "") } + limit := maxSystemScheduleAttempts + if s.sysbatch { + limit = maxSysBatchScheduleAttempts + } + // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } - if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil { + if err := retryMax(limit, s.process, progress); err != nil { if statusErr, ok := err.(*SetStatusError); ok { return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(), s.queuedAllocs, "") @@ -94,9 +116,9 @@ func (s *SystemScheduler) process() (bool, error) { ws := memdb.NewWatchSet() s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID) if err != nil { - return false, fmt.Errorf("failed to get job '%s': %v", - s.eval.JobID, err) + return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } + numTaskGroups := 0 if !s.job.Stopped() { numTaskGroups = len(s.job.TaskGroups) @@ -185,19 +207,17 @@ func (s *SystemScheduler) computeJobAllocs() error { ws := memdb.NewWatchSet() allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true) if err != nil { - return fmt.Errorf("failed to get allocs for job '%s': %v", - s.eval.JobID, err) + return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) } // Determine the tainted nodes containing job allocs tainted, err := taintedNodes(s.state, allocs) if err != nil { - return fmt.Errorf("failed to get tainted nodes for job '%s': %v", - s.eval.JobID, err) + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err) } // Update the allocations which are in pending/running state on tainted - // nodes to lost + // nodes to lost. updateNonTerminalAllocsToLost(s.plan, tainted, allocs) // Filter out the allocations in a terminal state diff --git a/scheduler/util.go b/scheduler/util.go index 7261f67deb8f..016541bdafef 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -39,6 +39,14 @@ type diffResult struct { place, update, migrate, stop, ignore, lost []allocTuple } +// todo remove +func (d *diffResult) String() string { + return fmt.Sprintf( + "diff[place: %v, update: %v, migrate: %v, stop: %v, ignore: %v, lost: %v]", + d.place, d.update, d.migrate, d.stop, d.ignore, d.lost, + ) +} + func (d *diffResult) GoString() string { return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d)", len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost)) @@ -60,21 +68,19 @@ func (d *diffResult) Append(other *diffResult) { // need to be migrated (node is draining), the allocs that need to be evicted // (no longer required), those that should be ignored and those that are lost // that need to be replaced (running on a lost node). -// -// job is the job whose allocs is going to be diff-ed. -// taintedNodes is an index of the nodes which are either down or in drain mode -// by name. -// required is a set of allocations that must exist. -// allocs is a list of non terminal allocations. -// terminalAllocs is an index of the latest terminal allocations by name. -func diffSystemAllocsForNode(job *structs.Job, nodeID string, - eligibleNodes, taintedNodes map[string]*structs.Node, - required map[string]*structs.TaskGroup, allocs []*structs.Allocation, - terminalAllocs map[string]*structs.Allocation) *diffResult { - result := &diffResult{} +func diffSystemAllocsForNode( + job *structs.Job, // job whose allocs are going to be diff-ed + nodeID string, + eligibleNodes map[string]*structs.Node, + taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + required map[string]*structs.TaskGroup, // set of allocations that must exist + allocs []*structs.Allocation, // non-terminal allocations that exist + terminalAllocs map[string]*structs.Allocation, // latest terminal allocations (by name) +) *diffResult { + result := new(diffResult) // Scan the existing updates - existing := make(map[string]struct{}) + existing := make(map[string]struct{}) // set of alloc names for _, exist := range allocs { // Index the existing node name := exist.Name @@ -102,6 +108,17 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, }) continue } + + // If we are a sysbatch job and terminal, ignore (or stop?) the alloc + if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() { + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If we are on a tainted node, we must migrate if we are a service or // if the batch allocation did not finish if node, ok := taintedNodes[exist.NodeID]; ok { @@ -154,6 +171,33 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, // Scan the required groups for name, tg := range required { + + // todo bug: job updates are ignored + // Check for a terminal sysbatch allocation, which should be not placed + // again. + if job.Type == structs.JobTypeSysBatch { + if alloc, ok := terminalAllocs[name]; ok { + // the alloc is terminal, but now the job has been updated + if job.JobModifyIndex != alloc.Job.JobModifyIndex { + replaceable := alloc.Copy() // we do not have the original + replaceable.NodeID = nodeID + result.update = append(result.update, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: replaceable, + }) + } else { + // alloc is terminal and job unchanged, leave it alone + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + } + continue + } + } + // Check for an existing allocation _, ok := existing[name] @@ -191,15 +235,13 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, // diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the // diffResult contain the specific nodeID they should be allocated on. -// -// job is the job whose allocs is going to be diff-ed. -// nodes is a list of nodes in ready state. -// taintedNodes is an index of the nodes which are either down or in drain mode -// by name. -// allocs is a list of non terminal allocations. -// terminalAllocs is an index of the latest terminal allocations by name. -func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node, - allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult { +func diffSystemAllocs( + job *structs.Job, // jobs whose allocations are going to be diff-ed + nodes []*structs.Node, // list of nodes in the ready state + taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) + allocs []*structs.Allocation, // non-terminal allocations + terminalAllocs map[string]*structs.Allocation, // latest terminal allocations (by name) +) *diffResult { // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) @@ -219,7 +261,7 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[ // Create the required task groups. required := materializeTaskGroups(job) - result := &diffResult{} + result := new(diffResult) for nodeID, allocs := range nodeAllocs { diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs) result.Append(diff) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 5c783d7e59db..a2af7ff0b06a 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -27,6 +27,71 @@ func TestMaterializeTaskGroups(t *testing.T) { } } +func newNode(name string) *structs.Node { + n := mock.Node() + n.Name = name + return n +} + +func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { + // For a sysbatch job, the scheduler should not re-place an allocation + // that has become terminal, unless the job has been updated. + + job := mock.SystemBatchJob() + required := materializeTaskGroups(job) + + eligible := map[string]*structs.Node{ + "node1": newNode("node1"), + } + + var live []*structs.Allocation // empty + + tainted := map[string]*structs.Node(nil) + + t.Run("current job", func(t *testing.T) { + terminal := map[string]*structs.Allocation{ + "my-sysbatch.pings[0]": &structs.Allocation{ + ID: uuid.Generate(), + NodeID: "node1", + Name: "my-sysbatch.pings[0]", + Job: job, + }, + } + + diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + require.Empty(t, diff.place) + require.Empty(t, diff.update) + require.Empty(t, diff.stop) + require.Empty(t, diff.migrate) + require.Empty(t, diff.lost) + require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["my-sysbatch.pings[0]"]) + }) + + t.Run("outdated job", func(t *testing.T) { + previousJob := job.Copy() + previousJob.JobModifyIndex -= 1 + terminal := map[string]*structs.Allocation{ + "my-sysbatch.pings[0]": &structs.Allocation{ + ID: uuid.Generate(), + NodeID: "node1", + Name: "my-sysbatch.pings[0]", + Job: previousJob, + }, + } + + expAlloc := terminal["my-sysbatch.pings[0]"] + expAlloc.NodeID = "node1" + + diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + require.Empty(t, diff.place) + require.Equal(t, 1, len(diff.update)) + require.Empty(t, diff.stop) + require.Empty(t, diff.migrate) + require.Empty(t, diff.lost) + require.Empty(t, diff.ignore) + }) +} + func TestDiffSystemAllocsForNode(t *testing.T) { job := mock.Job() required := materializeTaskGroups(job) diff --git a/website/pages/docs/job-specification/job.mdx b/website/pages/docs/job-specification/job.mdx index c12b83320dbc..b73a6f2f8dfe 100644 --- a/website/pages/docs/job-specification/job.mdx +++ b/website/pages/docs/job-specification/job.mdx @@ -114,7 +114,7 @@ job "docs" { node if any of its allocation statuses become "failed". - `type` `(string: "service")` - Specifies the [Nomad scheduler][scheduler] to - use. Nomad provides the `service`, `system` and `batch` schedulers. + use. Nomad provides the `service`, `system`, `batch`, and `sysbatch` schedulers. - `update` ([Update][update]: nil) - Specifies the task's update strategy. When omitted, rolling updates are disabled. diff --git a/website/pages/docs/job-specification/reschedule.mdx b/website/pages/docs/job-specification/reschedule.mdx index 9234ca725eb3..96d340f473ea 100644 --- a/website/pages/docs/job-specification/reschedule.mdx +++ b/website/pages/docs/job-specification/reschedule.mdx @@ -47,8 +47,8 @@ job "docs" { } ``` -~> The reschedule stanza does not apply to `system` jobs because they run on -every node. +~> The reschedule stanza does not apply to `system` or `sysbatch` jobs because +they run on every node. ## `reschedule` Parameters diff --git a/website/pages/docs/job-specification/restart.mdx b/website/pages/docs/job-specification/restart.mdx index 6e9e771db7e6..84b53ce9fa66 100644 --- a/website/pages/docs/job-specification/restart.mdx +++ b/website/pages/docs/job-specification/restart.mdx @@ -14,7 +14,7 @@ description: The "restart" stanza configures a group's behavior on task failure. ]} /> -The `restart` stanza configures a tasks's behavior on task failure. Restarts +The `restart` stanza configures a task's behavior on task failure. Restarts happen on the client that is running the task. ```hcl @@ -36,9 +36,9 @@ For example, assuming that the task group restart policy is: ```hcl restart { - interval = "30m" attempts = 2 delay = "15s" + interval = "30m" mode = "fail" } ``` @@ -55,9 +55,9 @@ then the effective restart policy for the task will be: ```hcl restart { - interval = "30m" attempts = 5 delay = "15s" + interval = "30m" mode = "fail" } ``` @@ -87,7 +87,7 @@ restart { The values for many of the `restart` parameters vary by job type. Here are the defaults by job type: -- The default batch restart policy is: +- The default restart policy for `batch` jobs is: ```hcl restart { @@ -98,13 +98,13 @@ defaults by job type: } ``` -- The default service and system job restart policy is: +- The default restart policy for `service`, `system`, and `sysbatch` jobs is: ```hcl restart { - interval = "30m" attempts = 2 delay = "15s" + interval = "30m" mode = "fail" } ``` diff --git a/website/pages/docs/schedulers.mdx b/website/pages/docs/schedulers.mdx index 304f6d60c241..120530e2f198 100644 --- a/website/pages/docs/schedulers.mdx +++ b/website/pages/docs/schedulers.mdx @@ -7,9 +7,9 @@ description: Learn about Nomad's various schedulers. # Schedulers -Nomad has three scheduler types that can be used when creating your job: -`service`, `batch` and `system`. Here we will describe the differences between -each of these schedulers. +Nomad has four scheduler types that can be used when creating your job: +`service`, `batch`, `system` and `sysbatch`. Here we will describe the differences +between each of these schedulers. ## Service @@ -61,8 +61,30 @@ Systems jobs are intended to run until explicitly stopped either by an operator or [preemption]. If a system task exits it is considered a failure and handled according to the job's [restart] stanza; system jobs do not have rescheduling. +## System Batch + +The `sysbatch` scheduler is used to register jobs that should be run to completion +on all clients that meet the job's constraints. The `sysbatch` scheduler will +schedule jobs similarly to the `system` scheduler, but like a `batch` job once a +task exists successfully it is not restarted on that client. + +This scheduler type is useful for issuing "one off" commands to be run on every +node in the cluster. Sysbatch jobs can also be created as [periodic] and [parameterized] +jobs. Since these tasks are managed by Nomad, they can take advantage of job +updating, service discovery, monitoring, and more. + +The `sysbatch` scheduler will preempt lower priority tasks running on a node if there +is not enough capacity to place the job. See preemption details on how tasks that +get preempted are chosen. + +Sysbatch jobs are intended to run until successful completion, explicitly stopped +by an operator, or evicted through [preemption]. Sysbatch tasks that exit with an +error are handled according to the job's [restart] stanza. + [borg]: https://research.google.com/pubs/pub43438.html -[sparrow]: https://cs.stanford.edu/~matei/papers/2013/sosp_sparrow.pdf +[parameterized]: /docs/job-specification/parameterized +[periodic]: /docs/job-specification/periodic [preemption]: /docs/internals/scheduling/preemption [restart]: /docs/job-specification/restart [reschedule]: /docs/job-specification/reschedule +[sparrow]: https://cs.stanford.edu/~matei/papers/2013/sosp_sparrow.pdf