Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Multiple instances of a periodic job are run simultaneously, when prohibit_overlap is true into release/1.4.x #16683

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16583.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: Added verification of cron jobs already running before forcing new evals right after leader change
```
31 changes: 30 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,16 +764,45 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

// We skip if the job doesn't allow overlap and there are already
// instances running
allowed, err := s.cronJobOverlapAllowed(job)
if err != nil {
return fmt.Errorf("failed to get job status: %v", err)
}
if !allowed {
continue
}

if _, err := s.periodicDispatcher.ForceEval(job.Namespace, job.ID); err != nil {
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
}
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())

logger.Debug("periodic job force run during leadership establishment", "job", job.NamespacedID())
}

return nil
}

// cronJobOverlapAllowed checks if the job allows for overlap and if there are already
// instances of the job running in order to determine if a new evaluation needs to
// be created upon periodic dispatcher restore
func (s *Server) cronJobOverlapAllowed(job *structs.Job) (bool, error) {
if job.Periodic.ProhibitOverlap {
running, err := s.periodicDispatcher.dispatcher.RunningChildren(job)
if err != nil {
return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err)
}

if running {
return false, nil
}
}

return true, nil
}

// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
Expand Down
158 changes: 156 additions & 2 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
now := time.Now()

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand All @@ -438,13 +438,35 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}
}

type mockJobEvalDispatcher struct {
forceEvalCalled, children bool
evalToReturn *structs.Evaluation
JobEvalDispatcher
}

func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) {
mjed.forceEvalCalled = true
return mjed.evalToReturn, nil
}

func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) {
return mjed.children, nil
}

func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
job := testPeriodicJob(times...)
job.Periodic.ProhibitOverlap = true
return job
}

func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()

testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
Expand All @@ -465,7 +487,16 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
}

// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
JobEvalDispatcher: s1,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)
Expand All @@ -475,6 +506,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)

s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand All @@ -495,6 +527,128 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
}

func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
must.NoError(t, err)

// Create an eval for the past launch.
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
must.NoError(t, s1.restorePeriodicDispatcher())

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))

// Check that an eval was made.
ws := memdb.NewWatchSet()
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
must.NoError(t, err)
must.NotNil(t, last)

must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch"))

must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
}

func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
must.NoError(t, err)

// Create an eval for the past launch.
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: true,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
must.NoError(t, s1.restorePeriodicDispatcher())

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))

must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running"))
}

func TestLeader_PeriodicDispatch(t *testing.T) {
Expand Down