Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15090] Ensure no leakage of evaluations for batch jobs. #15097

Merged
merged 12 commits into from
Jan 31, 2023
10 changes: 9 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,14 @@ type ServerConfig struct {

// EvalGCThreshold controls how "old" an eval must be to be collected by GC.
// Age is not the only requirement for a eval to be GCed but the threshold
// can be used to filter by age.
// can be used to filter by age. Please note that batch job evaluations are
// controlled by 'BatchEvalGCThreshold' instead.
EvalGCThreshold string `hcl:"eval_gc_threshold"`

// BatchEvalGCThreshold controls how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold string `hcl:"batch_eval_gc_threshold"`
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved

// DeploymentGCThreshold controls how "old" a deployment must be to be
// collected by GC. Age is not the only requirement for a deployment to be
// GCed but the threshold can be used to filter by age.
Expand Down Expand Up @@ -1827,6 +1832,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
if b.BatchEvalGCThreshold != "" {
result.BatchEvalGCThreshold = b.BatchEvalGCThreshold
}
if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold
}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestConfig_Merge(t *testing.T) {
RaftMultiplier: pointer.Of(5),
NumSchedulers: pointer.Of(1),
NodeGCThreshold: "1h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
MaxHeartbeatsPerSecond: 30.0,
Expand Down Expand Up @@ -339,6 +340,7 @@ func TestConfig_Merge(t *testing.T) {
NumSchedulers: pointer.Of(2),
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 2 * time.Minute,
MinHeartbeatTTL: 2 * time.Minute,
MaxHeartbeatsPerSecond: 200.0,
Expand Down
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ type Config struct {

// EvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC. This gives users some time to debug a failed evaluation.
//
// Please note that the rules for GC of evaluations which belong to a batch
// job are separate and controlled by `BatchEvalGCThreshold`
EvalGCThreshold time.Duration

// BatchEvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold time.Duration

// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration
Expand Down Expand Up @@ -444,6 +451,7 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
BatchEvalGCThreshold: 168 * time.Hour,
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
Expand Down
47 changes: 27 additions & 20 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ OUTER:
allEvalsGC := true
var jobAlloc, jobEval []string
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
gc, allocs, err := c.gcEval(eval, oldThreshold, oldThreshold, true)
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
continue OUTER
} else if gc {
Expand Down Expand Up @@ -238,6 +238,8 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

oldThreshold := c.getThreshold(eval, "eval",
"eval_gc_threshold", c.srv.config.EvalGCThreshold)
batchOldThreshold := c.getThreshold(eval, "eval",
"batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold)

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
Expand All @@ -246,7 +248,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

// The Evaluation GC should not handle batch jobs since those need to be
// garbage collected in one shot
gc, allocs, err := c.gcEval(eval, oldThreshold, false)
gc, allocs, err := c.gcEval(eval, oldThreshold, batchOldThreshold, false)
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand All @@ -272,7 +274,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, allowBatch bool) (
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, batchThresholdIndex uint64, allowBatch bool) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
Expand All @@ -297,16 +299,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
// collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the allocations. However,
// we do want to GC old Evals and Allocs if there are newer ones due to update. The age
// of the evaluation must also reach the threshold configured to be GCed so that one may
// debug old evaluations and referenced allocations.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running

// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
// Can collect if either holds:
// - Job doesn't exist
// - Job is Stopped and dead
// - allowBatch and the job is dead
//
// If we cannot collect outright, check if a partial GC may occur
collect := false
if job == nil {
collect = true
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -318,12 +324,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
collect = true
}

// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if !collect {
// Find allocs associated with older (based on createindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs, job)
return false, oldAllocs, nil
oldAllocs, gcEval := olderVersionTerminalAllocs(allocs, job, batchThresholdIndex)
return gcEval, oldAllocs, nil
}
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -344,16 +347,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil
}

// olderVersionTerminalAllocs returns terminal allocations whose job create index
// is older than the job's create index
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string {
// olderVersionTerminalAllocs returns a tuplie ([]string, bool). The first element is the list of
// terminal allocations which may be garbage collected for batch jobs. The second element indicates
// whether or not the allocation itself may be garbage collected.
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) ([]string, bool) {
var ret []string
var mayGCEval = true
for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() {
if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
ret = append(ret, alloc.ID)
} else {
mayGCEval = false
}
}
return ret
return ret, mayGCEval
}

// evalReap contacts the leader and issues a reap on the passed evals and
Expand Down
Loading