Skip to content

Commit

Permalink
Multiple instances of a periodic job are run simultaneously, when pro…
Browse files Browse the repository at this point in the history
…hibit_overlap is true (#16583) (#16681)

Co-authored-by: James Rasell <jrasell@hashicorp.com>
Co-authored-by: James Rasell <jrasell@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 28, 2023
1 parent df1109d commit 90ba690
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .changelog/16583.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: Added verification of cron jobs already running before forcing new evals right after leader change
```
31 changes: 30 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,16 +774,45 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

// We skip if the job doesn't allow overlap and there are already
// instances running
allowed, err := s.cronJobOverlapAllowed(job)
if err != nil {
return fmt.Errorf("failed to get job status: %v", err)
}
if !allowed {
continue
}

if _, err := s.periodicDispatcher.ForceEval(job.Namespace, job.ID); err != nil {
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
}
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())

logger.Debug("periodic job force run during leadership establishment", "job", job.NamespacedID())
}

return nil
}

// cronJobOverlapAllowed checks if the job allows for overlap and if there are already
// instances of the job running in order to determine if a new evaluation needs to
// be created upon periodic dispatcher restore
func (s *Server) cronJobOverlapAllowed(job *structs.Job) (bool, error) {
if job.Periodic.ProhibitOverlap {
running, err := s.periodicDispatcher.dispatcher.RunningChildren(job)
if err != nil {
return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err)
}

if running {
return false, nil
}
}

return true, nil
}

// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
Expand Down
158 changes: 156 additions & 2 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
now := time.Now()

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand All @@ -438,13 +438,35 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}
}

type mockJobEvalDispatcher struct {
forceEvalCalled, children bool
evalToReturn *structs.Evaluation
JobEvalDispatcher
}

func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) {
mjed.forceEvalCalled = true
return mjed.evalToReturn, nil
}

func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) {
return mjed.children, nil
}

func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
job := testPeriodicJob(times...)
job.Periodic.ProhibitOverlap = true
return job
}

func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()

testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
Expand All @@ -465,7 +487,16 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
}

// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
JobEvalDispatcher: s1,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)
Expand All @@ -475,6 +506,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)

s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand All @@ -495,6 +527,128 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
}

func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
must.NoError(t, err)

// Create an eval for the past launch.
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
must.NoError(t, s1.restorePeriodicDispatcher())

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))

// Check that an eval was made.
ws := memdb.NewWatchSet()
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
must.NoError(t, err)
must.NotNil(t, last)

must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch"))

must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
}

func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
must.NoError(t, err)

// Create an eval for the past launch.
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: true,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
must.NoError(t, s1.restorePeriodicDispatcher())

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))

must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running"))
}

func TestLeader_PeriodicDispatch(t *testing.T) {
Expand Down

0 comments on commit 90ba690

Please sign in to comment.