Skip to content

Commit

Permalink
wip lksfdj
Browse files Browse the repository at this point in the history
  • Loading branch information
shoenig committed Oct 28, 2020
1 parent 868594c commit 06f9f2c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ FEATURES:
* **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)]
* **Namespaces OSS**: Namespaces are now available in open source Nomad. [[GH-9135](https://github.com/hashicorp/nomad/issues/9135)]
* **Topology Visualization**: See all of the clients and allocations in a cluster at once. [[GH-9077](https://github.com/hashicorp/nomad/issues/9077)]
* **System Batch Scheduling**: New `sysbatch` scheduler type for running short lived jobs across all nodes. [[GH-9160](https://github.com/hashicorp/nomad/pull/9160)]

IMPROVEMENTS:
* core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)]
Expand Down
19 changes: 2 additions & 17 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func NewSystemScheduler(logger log.Logger, state State, planner Planner) Schedul
}

func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler {
fmt.Println("NewSysBatchScheduler")
return &SystemScheduler{
logger: logger.Named("sysbatch_sched"),
state: state,
Expand All @@ -70,7 +69,6 @@ func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Sched

// Process is used to handle a single evaluation.
func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
fmt.Println("SystemScheduler.Process, evalID:", eval.ID, "jobID:", eval.JobID)
// Store the evaluation
s.eval = eval

Expand All @@ -83,7 +81,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop,
structs.EvalTriggerQueuedAllocs, structs.EvalTriggerScaling:
fmt.Println("SystemScheduler.Process, triggerBy:", eval.TriggeredBy)
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
Expand All @@ -95,7 +92,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
if s.sysbatch {
limit = maxSysBatchScheduleAttempts
}
fmt.Println("SystemScheduler.Process, limit:", limit)

// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
Expand Down Expand Up @@ -123,12 +119,6 @@ func (s *SystemScheduler) process() (bool, error) {
return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err)
}

if s.job == nil {
fmt.Println("SystemScheduler.process, job is nil")
} else {
fmt.Println("SystemScheduler.process, job:", s.job.Name)
}

numTaskGroups := 0
if !s.job.Stopped() {
numTaskGroups = len(s.job.TaskGroups)
Expand All @@ -138,13 +128,10 @@ func (s *SystemScheduler) process() (bool, error) {
// Get the ready nodes in the required datacenters
if !s.job.Stopped() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
fmt.Println("SystemScheduler.process - not stopped, got", len(s.nodes), "nodes")
if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err)
}
} else {
fmt.Println("SystemScheduler.process job is stopped, prev nodes:", len(s.nodes))
} // for as long as the job is not stopped, continue on everything (?) todo not correct, right?
}

// Create a plan
s.plan = s.eval.MakePlan(s.job)
Expand All @@ -170,7 +157,6 @@ func (s *SystemScheduler) process() (bool, error) {
// If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan
// anyways to get the annotations.
if s.plan.IsNoOp() && !s.eval.AnnotatePlan {
fmt.Println("SystemScheduler.process, isNoOp, bail")
return true, nil
}

Expand Down Expand Up @@ -239,7 +225,7 @@ func (s *SystemScheduler) computeJobAllocs() error {

// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs, terminalAllocs)
/*s.logger.Debug*/ fmt.Println("SS.caj: reconciled current state with desired state",
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
"ignore", len(diff.ignore), "lost", len(diff.lost))
Expand Down Expand Up @@ -412,7 +398,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// If the new allocation is replacing an older allocation then we record the
// older allocation id so that they are chained
if missing.Alloc != nil {
fmt.Println("SS.computePlacement, replacement alloc, prev:", missing.Alloc.ID, "new:", alloc.ID)
alloc.PreviousAllocation = missing.Alloc.ID
}

Expand Down
23 changes: 0 additions & 23 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
for i := 0; i < tg.Count; i++ {
name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i)
out[name] = tg
fmt.Println("materialize task group, name:", name)
}
}
return out
Expand Down Expand Up @@ -80,18 +79,9 @@ func diffSystemAllocsForNode(
) *diffResult {
result := new(diffResult)

// todo: just fix this

fmt.Println("SH.dsafn, node:", nodeID, "job:", job.Name, "type:", job.Type)

for tAllocName, tAlloc := range terminalAllocs {
fmt.Println(" terminal alloc:", tAllocName, "status:", tAlloc.ClientStatus, "terminal:", tAlloc.TerminalStatus())
}

// Scan the existing updates
existing := make(map[string]struct{}) // set of alloc names
for _, exist := range allocs {
fmt.Println("SH.dsafn existing alloc:", exist.Name)
// Index the existing node
name := exist.Name
existing[name] = struct{}{}
Expand All @@ -106,7 +96,6 @@ func diffSystemAllocsForNode(
TaskGroup: tg,
Alloc: exist,
})
fmt.Println("SH.dsafn, stop:", name, "alloc:", exist.Name)
continue
}

Expand All @@ -117,20 +106,16 @@ func diffSystemAllocsForNode(
TaskGroup: tg,
Alloc: exist,
})
fmt.Println("SH.dsafn, migrate:", name, "alloc:", exist.Name)
continue
}

fmt.Println("SH jobType:", job.Type, "client_status", exist.ClientStatus, "desired_status", exist.DesiredStatus)

// If we are a sysbatch job and terminal, ignore (or stop?) the alloc
if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() {
result.ignore = append(result.ignore, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
fmt.Println("SH.dsafn, ignore:", name, "alloc:", exist.Name)
continue
}

Expand Down Expand Up @@ -184,8 +169,6 @@ func diffSystemAllocsForNode(
})
}

fmt.Println("SH.dsafn before scan required groups, node:", nodeID, "result:", result)

// Scan the required groups
for name, tg := range required {

Expand All @@ -196,8 +179,6 @@ func diffSystemAllocsForNode(
if alloc, ok := terminalAllocs[name]; ok {
// the alloc is terminal, but now the job has been updated
if job.JobModifyIndex != alloc.Job.JobModifyIndex {
fmt.Println("SH.dsafn update because job, name:", name)
fmt.Println(" !! nodeID:", nodeID, "alloc.nodeID:", alloc.NodeID)
replaceable := alloc.Copy() // we do not have the original
replaceable.NodeID = nodeID
result.update = append(result.update, allocTuple{
Expand All @@ -206,7 +187,6 @@ func diffSystemAllocsForNode(
Alloc: replaceable,
})
} else {
fmt.Println("SH.dsafn ignore because terminal:", name)
// alloc is terminal and job unchanged, leave it alone
result.ignore = append(result.ignore, allocTuple{
Name: name,
Expand All @@ -220,7 +200,6 @@ func diffSystemAllocsForNode(

// Check for an existing allocation
_, ok := existing[name]
fmt.Println("SH.dsafn scan required, name:", name, "tg:", tg.Name, "exists:", ok)

// Require a placement if no existing allocation. If there
// is an existing allocation, we would have checked for a potential
Expand Down Expand Up @@ -285,8 +264,6 @@ func diffSystemAllocs(
result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs)
fmt.Println("diff for node:", nodeID)
fmt.Println(" ", diff)
result.Append(diff)
}

Expand Down
63 changes: 42 additions & 21 deletions scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ func newNode(name string) *structs.Node {

func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
// For a sysbatch job, the scheduler should not re-place an allocation
// that has become terminal.
// that has become terminal, unless the job has been updated.

job := mock.SystemBatchJob()
required := materializeTaskGroups(job)
fmt.Println("required:", required)

oldJob := new(structs.Job)
*oldJob = *job
oldJob.JobModifyIndex -= 1

eligible := map[string]*structs.Node{
"node1": newNode("node1"),
Expand All @@ -53,22 +48,48 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {

tainted := map[string]*structs.Node(nil)

terminal := map[string]*structs.Allocation{
"my-sysbatch.pings[0]": &structs.Allocation{
ID: uuid.Generate(),
NodeID: "node1",
Name: "my-sysbatch.pings[0]",
Job: job,
},
}
t.Run("current job", func(t *testing.T) {
terminal := map[string]*structs.Allocation{
"my-sysbatch.pings[0]": &structs.Allocation{
ID: uuid.Generate(),
NodeID: "node1",
Name: "my-sysbatch.pings[0]",
Job: job,
},
}

diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Empty(t, diff.update)
require.Empty(t, diff.stop)
require.Empty(t, diff.migrate)
require.Empty(t, diff.lost)
require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["my-sysbatch.pings[0]"])
})

t.Run("outdated job", func(t *testing.T) {
previousJob := job.Copy()
previousJob.JobModifyIndex -= 1
terminal := map[string]*structs.Allocation{
"my-sysbatch.pings[0]": &structs.Allocation{
ID: uuid.Generate(),
NodeID: "node1",
Name: "my-sysbatch.pings[0]",
Job: previousJob,
},
}

diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Empty(t, diff.update)
require.Empty(t, diff.stop)
require.Empty(t, diff.migrate)
require.Empty(t, diff.lost)
require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["my-sysbatch.pings[0]"])
expAlloc := terminal["my-sysbatch.pings[0]"]
expAlloc.NodeID = "node1"

diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Equal(t, 1, len(diff.update))
require.Empty(t, diff.stop)
require.Empty(t, diff.migrate)
require.Empty(t, diff.lost)
require.Empty(t, diff.ignore)
})
}

func TestDiffSystemAllocsForNode(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions website/pages/docs/job-specification/reschedule.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ job "docs" {
}
```

~> The reschedule stanza does not apply to `system` jobs because they run on
every node.
~> The reschedule stanza does not apply to `system` or `sysbatch` jobs because
they run on every node.

## `reschedule` Parameters

Expand Down
12 changes: 6 additions & 6 deletions website/pages/docs/job-specification/restart.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ description: The "restart" stanza configures a group's behavior on task failure.
]}
/>

The `restart` stanza configures a tasks's behavior on task failure. Restarts
The `restart` stanza configures a task's behavior on task failure. Restarts
happen on the client that is running the task.

```hcl
Expand All @@ -36,9 +36,9 @@ For example, assuming that the task group restart policy is:

```hcl
restart {
interval = "30m"
attempts = 2
delay = "15s"
interval = "30m"
mode = "fail"
}
```
Expand All @@ -55,9 +55,9 @@ then the effective restart policy for the task will be:

```hcl
restart {
interval = "30m"
attempts = 5
delay = "15s"
interval = "30m"
mode = "fail"
}
```
Expand Down Expand Up @@ -87,7 +87,7 @@ restart {
The values for many of the `restart` parameters vary by job type. Here are the
defaults by job type:

- The default batch restart policy is:
- The default restart policy for `batch` jobs is:

```hcl
restart {
Expand All @@ -98,13 +98,13 @@ defaults by job type:
}
```

- The default service and system job restart policy is:
- The default restart policy for `service`, `system`, and `sysbatch` jobs is:

```hcl
restart {
interval = "30m"
attempts = 2
delay = "15s"
interval = "30m"
mode = "fail"
}
```
Expand Down

0 comments on commit 06f9f2c

Please sign in to comment.