Skip to content

Commit

Permalink
style: refactor force run function
Browse files Browse the repository at this point in the history
  • Loading branch information
Juanadelacuesta committed Mar 22, 2023
1 parent 9a9faf7 commit 5e20afb
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 26 deletions.
34 changes: 30 additions & 4 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,20 +774,46 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

eval, err := s.periodicDispatcher.ForceRunIfNotRunning(job)
// We skip if the job doesn't allow overlap and there are already
// instances running
needed, err := s.isNewEvalNeeded(job)
if err != nil {
return fmt.Errorf("failed to get job status: %v", err)
}

if !needed {
continue
}

if _, err := s.periodicDispatcher.ForceRun(job.Namespace, job.ID); err != nil {
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
}

if eval != nil {
logger.Debug("periodic job force ran during leadership establishment", "job", job.NamespacedID())
}
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())
}

return nil
}

// isNewEvalNeeded checks if the job allows for overlap and if there are already
// instances of the job running in order to determine if a new evaluation needs to
// be created upon periodic dispatcher restore
func (s *Server) isNewEvalNeeded(job *structs.Job) (bool, error) {
if job.Periodic.ProhibitOverlap {
running, err := s.RunningChildren(job)
if err != nil {
return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err)
}

if running {
return false, nil
}
}

return true, nil
}

// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
Expand Down
6 changes: 3 additions & 3 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
now := time.Now()

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand Down
19 changes: 0 additions & 19 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,25 +274,6 @@ func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
return nil
}

// ForceRunIfNotRunning causes the periodic job to be evaluated only if there are
// no overlap constrains in the definition and there are no jobs running already
func (p *PeriodicDispatch) ForceRunIfNotRunning(job *structs.Job) (*structs.Evaluation, error) {
if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
p.logger.Error("failed to determine if periodic job has running children", "job", job.NamespacedID(), "error", err)
return nil, nil
}

if running {
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
return nil, nil
}
}

return p.ForceRun(job.Namespace, job.ID)
}

// ForceRun causes the periodic job to be evaluated immediately and returns the
// subsequent eval.
func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluation, error) {
Expand Down

0 comments on commit 5e20afb

Please sign in to comment.