From 4234546ca1d65b5ffd4338942633de24f025f1a6 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 9 Oct 2020 16:31:38 -0500 Subject: [PATCH] core: implement system batch scheduler This PR implements a new "System Batch" scheduler type. Jobs can make use of this new scheduler by setting their type to 'sysbatch'. Like the name implies, sysbatch can be thought of as a hybrid between system and batch jobs - it is for running short lived jobs intended to run on every compatible node in the cluster. As with batch jobs, sysbatch jobs can also be periodic and/or parameterized dispatch jobs. A sysbatch job is considered complete when it has been run on all compatible nodes until reaching a terminal state (success or failed on retries). Feasibility, rolling updates, and preemption are governed the same as with system jobs. Closes #2527 --- .../taskrunner/restarts/restarts.go | 18 ++-- nomad/core_sched.go | 5 +- nomad/state/schema.go | 13 ++- nomad/state/state_store.go | 2 +- nomad/structs/structs.go | 27 +++--- scheduler/generic_sched.go | 2 +- scheduler/rank.go | 4 +- scheduler/scheduler.go | 7 +- scheduler/stack.go | 13 ++- scheduler/system_sched.go | 77 +++++++++++---- scheduler/util.go | 96 ++++++++++++++----- 11 files changed, 182 insertions(+), 82 deletions(-) diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index 6ee0056ccd8b..429ee07a0384 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -14,15 +14,19 @@ const ( // jitter is the percent of jitter added to restart delays. jitter = 0.25 - ReasonNoRestartsAllowed = "Policy allows no restarts" - ReasonUnrecoverableErrror = "Error was unrecoverable" - ReasonWithinPolicy = "Restart within policy" - ReasonDelay = "Exceeded allowed attempts, applying a delay" + ReasonNoRestartsAllowed = "Policy allows no restarts" + ReasonUnrecoverableError = "Error was unrecoverable" + ReasonWithinPolicy = "Restart within policy" + ReasonDelay = "Exceeded allowed attempts, applying a delay" ) func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker { - // Batch jobs should not restart if they exit successfully - onSuccess := jobType != structs.JobTypeBatch + onSuccess := true + + // Batch & SysBatch jobs should not restart if they exit successfully + if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch { + onSuccess = false + } // Prestart sidecars should get restarted on success if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart { @@ -196,7 +200,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) { if r.startErr != nil { // If the error is not recoverable, do not restart. if !structs.IsRecoverable(r.startErr) { - r.reason = ReasonUnrecoverableErrror + r.reason = ReasonUnrecoverableError return structs.TaskNotRestarting, 0 } } else if r.exitRes != nil { diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1ac135d0aaea..eb796f66bcaa 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -136,9 +136,7 @@ OUTER: gc, allocs, err := c.gcEval(eval, oldThreshold, true) if err != nil { continue OUTER - } - - if gc { + } else if gc { jobEval = append(jobEval, eval.ID) jobAlloc = append(jobAlloc, allocs...) } else { @@ -160,6 +158,7 @@ OUTER: if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 { return nil } + c.logger.Debug("job GC found eligible objects", "jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc)) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 923b44617139..8178ec515f34 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -271,13 +271,16 @@ func jobIsGCable(obj interface{}) (bool, error) { return true, nil } - // Otherwise, only batch jobs are eligible because they complete on their - // own without a user stopping them. - if j.Type != structs.JobTypeBatch { + switch j.Type { + // Otherwise, batch and sysbatch jobs are eligible because they complete on + // their own without a user stopping them. + case structs.JobTypeBatch, structs.JobTypeSysBatch: + return true, nil + + default: + // other job types may not be GC until stopped return false, nil } - - return true, nil } // jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index be6516d56bb2..6a82b614f6c6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1954,7 +1954,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m return iter, nil } -// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage +// JobsByGC returns an iterator over all jobs eligible or ineligible for garbage // collection. func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a9a4af3c71f7..b835e8fc5be0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3758,10 +3758,11 @@ func (c *ComparableResources) NetIndex(n *NetworkResource) int { const ( // JobTypeNomad is reserved for internal system tasks and is // always handled by the CoreScheduler. - JobTypeCore = "_core" - JobTypeService = "service" - JobTypeBatch = "batch" - JobTypeSystem = "system" + JobTypeCore = "_core" + JobTypeService = "service" + JobTypeBatch = "batch" + JobTypeSystem = "system" + JobTypeSysBatch = "sysbatch" ) const ( @@ -4024,7 +4025,7 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, errors.New("Job must be in a namespace")) } switch j.Type { - case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem: + case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem, JobTypeSysBatch: case "": mErr.Errors = append(mErr.Errors, errors.New("Missing job type")) default: @@ -4116,11 +4117,12 @@ func (j *Job) Validate() error { } } - // Validate periodic is only used with batch jobs. + // Validate periodic is only used with batch or sysbatch jobs. if j.IsPeriodic() && j.Periodic.Enabled { - if j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) + if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch { + mErr.Errors = append(mErr.Errors, fmt.Errorf( + "Periodic can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch, + )) } if err := j.Periodic.Validate(); err != nil { @@ -4129,9 +4131,10 @@ func (j *Job) Validate() error { } if j.IsParameterized() { - if j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch)) + if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch { + mErr.Errors = append(mErr.Errors, fmt.Errorf( + "Parameterized job can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch, + )) } if err := j.ParameterizedJob.Validate(); err != nil { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index c67eafad870a..b933deb1eb21 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -36,7 +36,7 @@ const ( // allocInPlace is the status used when speculating on an in-place update allocInPlace = "alloc updating in-place" - // allocNodeTainted is the status used when stopping an alloc because it's + // allocNodeTainted is the status used when stopping an alloc because its // node is tainted. allocNodeTainted = "alloc not needed as node is tainted" diff --git a/scheduler/rank.go b/scheduler/rank.go index 1653d9cf9067..ec4b2635d423 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -24,7 +24,7 @@ type RankedNode struct { TaskLifecycles map[string]*structs.TaskLifecycleConfig AllocResources *structs.AllocatedSharedResources - // Allocs is used to cache the proposed allocations on the + // Proposed is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. Proposed []*structs.Allocation @@ -60,7 +60,7 @@ func (r *RankedNode) SetTaskResources(task *structs.Task, r.TaskLifecycles[task.Name] = task.Lifecycle } -// RankFeasibleIterator is used to iteratively yield nodes along +// RankIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. type RankIterator interface { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a950690db44f..d1bbfa4c3e41 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -21,9 +21,10 @@ const ( // BuiltinSchedulers contains the built in registered schedulers // which are available var BuiltinSchedulers = map[string]Factory{ - "service": NewServiceScheduler, - "batch": NewBatchScheduler, - "system": NewSystemScheduler, + "service": NewServiceScheduler, + "batch": NewBatchScheduler, + "system": NewSystemScheduler, + "sysbatch": NewSysBatchScheduler, } // NewScheduler is used to instantiate and return a new scheduler diff --git a/scheduler/stack.go b/scheduler/stack.go index bccabc7899ab..96ce92713bfd 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -237,10 +237,13 @@ func NewSystemStack(ctx Context) *SystemStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, + tgs := []FeasibilityChecker{ + s.taskGroupDrivers, + s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices, - s.taskGroupNetwork} + s.taskGroupNetwork, + } avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) @@ -360,11 +363,13 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, + tgs := []FeasibilityChecker{ + s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices, - s.taskGroupNetwork} + s.taskGroupNetwork, + } avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 4b1e5c8cbfaa..648192645920 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -14,15 +14,21 @@ const ( // we will attempt to schedule if we continue to hit conflicts for system // jobs. maxSystemScheduleAttempts = 5 + + // maxSysBatchScheduleAttempts is used to limit the number of times we will + // attempt to schedule if we continue to hit conflicts for sysbatch jobs. + maxSysBatchScheduleAttempts = 2 ) -// SystemScheduler is used for 'system' jobs. This scheduler is -// designed for services that should be run on every client. -// One for each job, containing an allocation for each node +// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is +// designed for jobs that should be run on every client. The 'system' mode +// will ensure those jobs continuously run regardless of successful task exits, +// whereas 'sysbatch' marks the task complete on success. type SystemScheduler struct { - logger log.Logger - state State - planner Planner + logger log.Logger + state State + planner Planner + sysbatch bool eval *structs.Evaluation job *structs.Job @@ -30,8 +36,9 @@ type SystemScheduler struct { planResult *structs.PlanResult ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + + nodes []*structs.Node + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -44,14 +51,26 @@ type SystemScheduler struct { // scheduler. func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler { return &SystemScheduler{ - logger: logger.Named("system_sched"), - state: state, - planner: planner, + logger: logger.Named("system_sched"), + state: state, + planner: planner, + sysbatch: false, + } +} + +func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { + fmt.Println("NewSysBatchScheduler") + return &SystemScheduler{ + logger: logger.Named("sysbatch_sched"), + state: state, + planner: planner, + sysbatch: true, } } // Process is used to handle a single evaluation. func (s *SystemScheduler) Process(eval *structs.Evaluation) error { + fmt.Println("SystemScheduler.Process, evalID:", eval.ID, "jobID:", eval.JobID) // Store the evaluation s.eval = eval @@ -64,6 +83,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerScaling: + fmt.Println("SystemScheduler.Process, triggerBy:", eval.TriggeredBy) default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) @@ -71,9 +91,15 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { s.queuedAllocs, "") } + limit := maxSystemScheduleAttempts + if s.sysbatch { + limit = maxSysBatchScheduleAttempts + } + fmt.Println("SystemScheduler.Process, limit:", limit) + // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } - if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil { + if err := retryMax(limit, s.process, progress); err != nil { if statusErr, ok := err.(*SetStatusError); ok { return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(), s.queuedAllocs, "") @@ -94,9 +120,15 @@ func (s *SystemScheduler) process() (bool, error) { ws := memdb.NewWatchSet() s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID) if err != nil { - return false, fmt.Errorf("failed to get job '%s': %v", - s.eval.JobID, err) + return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) + } + + if s.job == nil { + fmt.Println("SystemScheduler.process, job is nil") + } else { + fmt.Println("SystemScheduler.process, job:", s.job.Name) } + numTaskGroups := 0 if !s.job.Stopped() { numTaskGroups = len(s.job.TaskGroups) @@ -106,10 +138,13 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + fmt.Println("SystemScheduler.process - not stopped, got", len(s.nodes), "nodes") if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } - } + } else { + fmt.Println("SystemScheduler.process job is stopped, prev nodes:", len(s.nodes)) + } // for as long as the job is not stopped, continue on everything (?) todo not correct, right? // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -135,6 +170,7 @@ func (s *SystemScheduler) process() (bool, error) { // If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan // anyways to get the annotations. if s.plan.IsNoOp() && !s.eval.AnnotatePlan { + fmt.Println("SystemScheduler.process, isNoOp, bail") return true, nil } @@ -185,19 +221,17 @@ func (s *SystemScheduler) computeJobAllocs() error { ws := memdb.NewWatchSet() allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true) if err != nil { - return fmt.Errorf("failed to get allocs for job '%s': %v", - s.eval.JobID, err) + return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) } // Determine the tainted nodes containing job allocs tainted, err := taintedNodes(s.state, allocs) if err != nil { - return fmt.Errorf("failed to get tainted nodes for job '%s': %v", - s.eval.JobID, err) + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err) } // Update the allocations which are in pending/running state on tainted - // nodes to lost + // nodes to lost. updateNonTerminalAllocsToLost(s.plan, tainted, allocs) // Filter out the allocations in a terminal state @@ -205,7 +239,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Diff the required and existing allocations diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs, terminalAllocs) - s.logger.Debug("reconciled current state with desired state", + /*s.logger.Debug*/ fmt.Println("SS.caj: reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), "ignore", len(diff.ignore), "lost", len(diff.lost)) @@ -378,6 +412,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // If the new allocation is replacing an older allocation then we record the // older allocation id so that they are chained if missing.Alloc != nil { + fmt.Println("SS.computePlacement, replacement alloc, prev:", missing.Alloc.ID, "new:", alloc.ID) alloc.PreviousAllocation = missing.Alloc.ID } diff --git a/scheduler/util.go b/scheduler/util.go index 7261f67deb8f..bf28dc13ca92 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -29,6 +29,7 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { for i := 0; i < tg.Count; i++ { name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) out[name] = tg + fmt.Println("materialize task group, name:", name) } } return out @@ -39,6 +40,14 @@ type diffResult struct { place, update, migrate, stop, ignore, lost []allocTuple } +// todo remove +func (d *diffResult) String() string { + return fmt.Sprintf( + "diff[place: %v, update: %v, migrate: %v, stop: %v, ignore: %v, lost: %v]", + d.place, d.update, d.migrate, d.stop, d.ignore, d.lost, + ) +} + func (d *diffResult) GoString() string { return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d)", len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost)) @@ -60,22 +69,29 @@ func (d *diffResult) Append(other *diffResult) { // need to be migrated (node is draining), the allocs that need to be evicted // (no longer required), those that should be ignored and those that are lost // that need to be replaced (running on a lost node). -// -// job is the job whose allocs is going to be diff-ed. -// taintedNodes is an index of the nodes which are either down or in drain mode -// by name. -// required is a set of allocations that must exist. -// allocs is a list of non terminal allocations. -// terminalAllocs is an index of the latest terminal allocations by name. -func diffSystemAllocsForNode(job *structs.Job, nodeID string, - eligibleNodes, taintedNodes map[string]*structs.Node, - required map[string]*structs.TaskGroup, allocs []*structs.Allocation, - terminalAllocs map[string]*structs.Allocation) *diffResult { - result := &diffResult{} +func diffSystemAllocsForNode( + job *structs.Job, // job whose allocs are going to be diff-ed + nodeID string, + eligibleNodes map[string]*structs.Node, + taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + required map[string]*structs.TaskGroup, // set of allocations that must exist + allocs []*structs.Allocation, // non-terminal allocations that exist + terminalAllocs map[string]*structs.Allocation, // latest terminal allocations (by name) +) *diffResult { + result := new(diffResult) + + // todo: just fix this + + fmt.Println("SH.dsafn, node:", nodeID, "job:", job.Name, "type:", job.Type) + + for tAllocName, tAlloc := range terminalAllocs { + fmt.Println(" terminal alloc:", tAllocName, "status", tAlloc.ClientStatus, "terminal", tAlloc.TerminalStatus()) + } // Scan the existing updates - existing := make(map[string]struct{}) + existing := make(map[string]struct{}) // set of alloc names for _, exist := range allocs { + fmt.Println("SH.dsafn existing alloc:", exist.Name) // Index the existing node name := exist.Name existing[name] = struct{}{} @@ -90,6 +106,7 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, TaskGroup: tg, Alloc: exist, }) + fmt.Println("SH.dsafn, stop:", name, "alloc:", exist.Name) continue } @@ -100,8 +117,23 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, TaskGroup: tg, Alloc: exist, }) + fmt.Println("SH.dsafn, migrate:", name, "alloc:", exist.Name) + continue + } + + fmt.Println("SH jobType:", job.Type, "client_status", exist.ClientStatus, "desired_status", exist.DesiredStatus) + + // If we are a sysbatch job and terminal, ignore (or stop?) the alloc + if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() { + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + fmt.Println("SH.dsafn, ignore:", name, "alloc:", exist.Name) continue } + // If we are on a tainted node, we must migrate if we are a service or // if the batch allocation did not finish if node, ok := taintedNodes[exist.NodeID]; ok { @@ -152,10 +184,28 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, }) } + fmt.Println("SH.dsafn before scan required groups, node:", nodeID, "result:", result) + // Scan the required groups for name, tg := range required { + + // Check for a terminal sysbatch allocation, which should be not placed + // again. + if job.Type == structs.JobTypeSysBatch { + if _, ok := terminalAllocs[name]; ok { + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: terminalAllocs[name], + }) + fmt.Println("SH.dsafn ignore terminal:", name) + continue + } + } + // Check for an existing allocation _, ok := existing[name] + fmt.Println("SH.dsafn scan required, name:", name, "tg:", tg.Name, "exists:", ok) // Require a placement if no existing allocation. If there // is an existing allocation, we would have checked for a potential @@ -191,15 +241,13 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string, // diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the // diffResult contain the specific nodeID they should be allocated on. -// -// job is the job whose allocs is going to be diff-ed. -// nodes is a list of nodes in ready state. -// taintedNodes is an index of the nodes which are either down or in drain mode -// by name. -// allocs is a list of non terminal allocations. -// terminalAllocs is an index of the latest terminal allocations by name. -func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node, - allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult { +func diffSystemAllocs( + job *structs.Job, // jobs whose allocations are going to be diff-ed + nodes []*structs.Node, // list of nodes in the ready state + taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) + allocs []*structs.Allocation, // non-terminal allocations + terminalAllocs map[string]*structs.Allocation, // latest terminal allocations (by name) +) *diffResult { // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) @@ -219,9 +267,11 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[ // Create the required task groups. required := materializeTaskGroups(job) - result := &diffResult{} + result := new(diffResult) for nodeID, allocs := range nodeAllocs { diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs) + fmt.Println("diff for node:", nodeID) + fmt.Println(" ", diff) result.Append(diff) }