From 06f9f2c785c7913975ae2621a933f024b8024a88 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Wed, 28 Oct 2020 10:25:03 -0500 Subject: [PATCH] wip lksfdj --- CHANGELOG.md | 1 + scheduler/system_sched.go | 19 +----- scheduler/util.go | 23 ------- scheduler/util_test.go | 63 ++++++++++++------- .../docs/job-specification/reschedule.mdx | 4 +- .../pages/docs/job-specification/restart.mdx | 12 ++-- 6 files changed, 53 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 318f2f9f3f66..283afda5e87b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ FEATURES: * **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)] * **Namespaces OSS**: Namespaces are now available in open source Nomad. [[GH-9135](https://github.com/hashicorp/nomad/issues/9135)] * **Topology Visualization**: See all of the clients and allocations in a cluster at once. [[GH-9077](https://github.com/hashicorp/nomad/issues/9077)] +* **System Batch Scheduling**: New `sysbatch` scheduler type for running short lived jobs across all nodes. [[GH-9160](https://github.com/hashicorp/nomad/pull/9160)] IMPROVEMENTS: * core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)] diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 648192645920..c4c3826481cf 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -59,7 +59,6 @@ func NewSystemScheduler(logger log.Logger, state State, planner Planner) Schedul } func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { - fmt.Println("NewSysBatchScheduler") return &SystemScheduler{ logger: logger.Named("sysbatch_sched"), state: state, @@ -70,7 +69,6 @@ func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Sched // Process is used to handle a single evaluation. func (s *SystemScheduler) Process(eval *structs.Evaluation) error { - fmt.Println("SystemScheduler.Process, evalID:", eval.ID, "jobID:", eval.JobID) // Store the evaluation s.eval = eval @@ -83,7 +81,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerScaling: - fmt.Println("SystemScheduler.Process, triggerBy:", eval.TriggeredBy) default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) @@ -95,7 +92,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { if s.sysbatch { limit = maxSysBatchScheduleAttempts } - fmt.Println("SystemScheduler.Process, limit:", limit) // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } @@ -123,12 +119,6 @@ func (s *SystemScheduler) process() (bool, error) { return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } - if s.job == nil { - fmt.Println("SystemScheduler.process, job is nil") - } else { - fmt.Println("SystemScheduler.process, job:", s.job.Name) - } - numTaskGroups := 0 if !s.job.Stopped() { numTaskGroups = len(s.job.TaskGroups) @@ -138,13 +128,10 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) - fmt.Println("SystemScheduler.process - not stopped, got", len(s.nodes), "nodes") if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } - } else { - fmt.Println("SystemScheduler.process job is stopped, prev nodes:", len(s.nodes)) - } // for as long as the job is not stopped, continue on everything (?) todo not correct, right? + } // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -170,7 +157,6 @@ func (s *SystemScheduler) process() (bool, error) { // If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan // anyways to get the annotations. if s.plan.IsNoOp() && !s.eval.AnnotatePlan { - fmt.Println("SystemScheduler.process, isNoOp, bail") return true, nil } @@ -239,7 +225,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Diff the required and existing allocations diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs, terminalAllocs) - /*s.logger.Debug*/ fmt.Println("SS.caj: reconciled current state with desired state", + s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), "ignore", len(diff.ignore), "lost", len(diff.lost)) @@ -412,7 +398,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // If the new allocation is replacing an older allocation then we record the // older allocation id so that they are chained if missing.Alloc != nil { - fmt.Println("SS.computePlacement, replacement alloc, prev:", missing.Alloc.ID, "new:", alloc.ID) alloc.PreviousAllocation = missing.Alloc.ID } diff --git a/scheduler/util.go b/scheduler/util.go index d6ccf5b7a09d..016541bdafef 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -29,7 +29,6 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { for i := 0; i < tg.Count; i++ { name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) out[name] = tg - fmt.Println("materialize task group, name:", name) } } return out @@ -80,18 +79,9 @@ func diffSystemAllocsForNode( ) *diffResult { result := new(diffResult) - // todo: just fix this - - fmt.Println("SH.dsafn, node:", nodeID, "job:", job.Name, "type:", job.Type) - - for tAllocName, tAlloc := range terminalAllocs { - fmt.Println(" terminal alloc:", tAllocName, "status:", tAlloc.ClientStatus, "terminal:", tAlloc.TerminalStatus()) - } - // Scan the existing updates existing := make(map[string]struct{}) // set of alloc names for _, exist := range allocs { - fmt.Println("SH.dsafn existing alloc:", exist.Name) // Index the existing node name := exist.Name existing[name] = struct{}{} @@ -106,7 +96,6 @@ func diffSystemAllocsForNode( TaskGroup: tg, Alloc: exist, }) - fmt.Println("SH.dsafn, stop:", name, "alloc:", exist.Name) continue } @@ -117,12 +106,9 @@ func diffSystemAllocsForNode( TaskGroup: tg, Alloc: exist, }) - fmt.Println("SH.dsafn, migrate:", name, "alloc:", exist.Name) continue } - fmt.Println("SH jobType:", job.Type, "client_status", exist.ClientStatus, "desired_status", exist.DesiredStatus) - // If we are a sysbatch job and terminal, ignore (or stop?) the alloc if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() { result.ignore = append(result.ignore, allocTuple{ @@ -130,7 +116,6 @@ func diffSystemAllocsForNode( TaskGroup: tg, Alloc: exist, }) - fmt.Println("SH.dsafn, ignore:", name, "alloc:", exist.Name) continue } @@ -184,8 +169,6 @@ func diffSystemAllocsForNode( }) } - fmt.Println("SH.dsafn before scan required groups, node:", nodeID, "result:", result) - // Scan the required groups for name, tg := range required { @@ -196,8 +179,6 @@ func diffSystemAllocsForNode( if alloc, ok := terminalAllocs[name]; ok { // the alloc is terminal, but now the job has been updated if job.JobModifyIndex != alloc.Job.JobModifyIndex { - fmt.Println("SH.dsafn update because job, name:", name) - fmt.Println(" !! nodeID:", nodeID, "alloc.nodeID:", alloc.NodeID) replaceable := alloc.Copy() // we do not have the original replaceable.NodeID = nodeID result.update = append(result.update, allocTuple{ @@ -206,7 +187,6 @@ func diffSystemAllocsForNode( Alloc: replaceable, }) } else { - fmt.Println("SH.dsafn ignore because terminal:", name) // alloc is terminal and job unchanged, leave it alone result.ignore = append(result.ignore, allocTuple{ Name: name, @@ -220,7 +200,6 @@ func diffSystemAllocsForNode( // Check for an existing allocation _, ok := existing[name] - fmt.Println("SH.dsafn scan required, name:", name, "tg:", tg.Name, "exists:", ok) // Require a placement if no existing allocation. If there // is an existing allocation, we would have checked for a potential @@ -285,8 +264,6 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs) - fmt.Println("diff for node:", nodeID) - fmt.Println(" ", diff) result.Append(diff) } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 624766278701..a2af7ff0b06a 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -35,15 +35,10 @@ func newNode(name string) *structs.Node { func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { // For a sysbatch job, the scheduler should not re-place an allocation - // that has become terminal. + // that has become terminal, unless the job has been updated. job := mock.SystemBatchJob() required := materializeTaskGroups(job) - fmt.Println("required:", required) - - oldJob := new(structs.Job) - *oldJob = *job - oldJob.JobModifyIndex -= 1 eligible := map[string]*structs.Node{ "node1": newNode("node1"), @@ -53,22 +48,48 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { tainted := map[string]*structs.Node(nil) - terminal := map[string]*structs.Allocation{ - "my-sysbatch.pings[0]": &structs.Allocation{ - ID: uuid.Generate(), - NodeID: "node1", - Name: "my-sysbatch.pings[0]", - Job: job, - }, - } + t.Run("current job", func(t *testing.T) { + terminal := map[string]*structs.Allocation{ + "my-sysbatch.pings[0]": &structs.Allocation{ + ID: uuid.Generate(), + NodeID: "node1", + Name: "my-sysbatch.pings[0]", + Job: job, + }, + } + + diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + require.Empty(t, diff.place) + require.Empty(t, diff.update) + require.Empty(t, diff.stop) + require.Empty(t, diff.migrate) + require.Empty(t, diff.lost) + require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["my-sysbatch.pings[0]"]) + }) + + t.Run("outdated job", func(t *testing.T) { + previousJob := job.Copy() + previousJob.JobModifyIndex -= 1 + terminal := map[string]*structs.Allocation{ + "my-sysbatch.pings[0]": &structs.Allocation{ + ID: uuid.Generate(), + NodeID: "node1", + Name: "my-sysbatch.pings[0]", + Job: previousJob, + }, + } - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) - require.Empty(t, diff.place) - require.Empty(t, diff.update) - require.Empty(t, diff.stop) - require.Empty(t, diff.migrate) - require.Empty(t, diff.lost) - require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["my-sysbatch.pings[0]"]) + expAlloc := terminal["my-sysbatch.pings[0]"] + expAlloc.NodeID = "node1" + + diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + require.Empty(t, diff.place) + require.Equal(t, 1, len(diff.update)) + require.Empty(t, diff.stop) + require.Empty(t, diff.migrate) + require.Empty(t, diff.lost) + require.Empty(t, diff.ignore) + }) } func TestDiffSystemAllocsForNode(t *testing.T) { diff --git a/website/pages/docs/job-specification/reschedule.mdx b/website/pages/docs/job-specification/reschedule.mdx index 9234ca725eb3..96d340f473ea 100644 --- a/website/pages/docs/job-specification/reschedule.mdx +++ b/website/pages/docs/job-specification/reschedule.mdx @@ -47,8 +47,8 @@ job "docs" { } ``` -~> The reschedule stanza does not apply to `system` jobs because they run on -every node. +~> The reschedule stanza does not apply to `system` or `sysbatch` jobs because +they run on every node. ## `reschedule` Parameters diff --git a/website/pages/docs/job-specification/restart.mdx b/website/pages/docs/job-specification/restart.mdx index 6e9e771db7e6..84b53ce9fa66 100644 --- a/website/pages/docs/job-specification/restart.mdx +++ b/website/pages/docs/job-specification/restart.mdx @@ -14,7 +14,7 @@ description: The "restart" stanza configures a group's behavior on task failure. ]} /> -The `restart` stanza configures a tasks's behavior on task failure. Restarts +The `restart` stanza configures a task's behavior on task failure. Restarts happen on the client that is running the task. ```hcl @@ -36,9 +36,9 @@ For example, assuming that the task group restart policy is: ```hcl restart { - interval = "30m" attempts = 2 delay = "15s" + interval = "30m" mode = "fail" } ``` @@ -55,9 +55,9 @@ then the effective restart policy for the task will be: ```hcl restart { - interval = "30m" attempts = 5 delay = "15s" + interval = "30m" mode = "fail" } ``` @@ -87,7 +87,7 @@ restart { The values for many of the `restart` parameters vary by job type. Here are the defaults by job type: -- The default batch restart policy is: +- The default restart policy for `batch` jobs is: ```hcl restart { @@ -98,13 +98,13 @@ defaults by job type: } ``` -- The default service and system job restart policy is: +- The default restart policy for `service`, `system`, and `sysbatch` jobs is: ```hcl restart { - interval = "30m" attempts = 2 delay = "15s" + interval = "30m" mode = "fail" } ```