Skip to content

Commit

Permalink
Multiple instances of a periodic job are run simultaneously, when pro…
Browse files Browse the repository at this point in the history
…hibit_overlap is true

Fixes #11052
When restoring periodic dispatcher, all periodic jobs are forced without checking for previous childre.
  • Loading branch information
jrasell authored and Juanadelacuesta committed Mar 21, 2023
1 parent a633b79 commit e4cb7db
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
8 changes: 6 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,11 +774,15 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

if _, err := s.periodicDispatcher.ForceRun(job.Namespace, job.ID); err != nil {
eval, err := s.periodicDispatcher.ForceRunIfNotRunning(job)
if err != nil {
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
}
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())

if eval != nil {
logger.Debug("periodic job force ran during leadership establishment", "job", job.NamespacedID())
}
}

return nil
Expand Down
26 changes: 20 additions & 6 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,32 @@ func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
return nil
}

func (p *PeriodicDispatch) ForceRunIfNotRunning(job *structs.Job) (*structs.Evaluation, error) {

if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
p.logger.Error("failed to determine if periodic job has running children", "job", job.NamespacedID(), "error", err)
return nil, nil
}

if running {
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
return nil, nil
}
}

return p.ForceRun(job.Namespace, job.ID)
}

// ForceRun causes the periodic job to be evaluated immediately and returns the
// subsequent eval.
func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluation, error) {
p.l.Lock()
defer p.l.Unlock()

// Do nothing if not enabled
if !p.enabled {
p.l.Unlock()
return nil, fmt.Errorf("periodic dispatch disabled")
}

Expand All @@ -291,11 +309,9 @@ func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluatio
}
job, tracked := p.tracked[tuple]
if !tracked {
p.l.Unlock()
return nil, fmt.Errorf("can't force run non-tracked job %q (%s)", jobID, namespace)
}

p.l.Unlock()
return p.createEval(job, time.Now().In(job.Periodic.GetLocation()))
}

Expand Down Expand Up @@ -335,6 +351,7 @@ func (p *PeriodicDispatch) run(ctx context.Context, updateCh <-chan struct{}) {
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
defer p.l.Unlock()

nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
Expand All @@ -349,19 +366,16 @@ func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
p.logger.Error("failed to determine if periodic job has running children", "job", job.NamespacedID(), "error", err)
p.l.Unlock()
return
}

if running {
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
p.l.Unlock()
return
}
}

p.logger.Debug(" launching job", "job", job.NamespacedID(), "launch_time", launchTime)
p.l.Unlock()
p.createEval(job, launchTime)
}

Expand Down

0 comments on commit e4cb7db

Please sign in to comment.