From da9e87175050a64dcf4750fb80a35e773b16c1e5 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Tue, 28 Mar 2023 14:16:11 +0100 Subject: [PATCH] Multiple instances of a periodic job are run simultaneously, when prohibit_overlap is true (#16583) (#16682) Co-authored-by: Juana De La Cuesta --- .changelog/16583.txt | 3 + nomad/leader.go | 31 ++++++++- nomad/leader_test.go | 158 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 189 insertions(+), 3 deletions(-) create mode 100644 .changelog/16583.txt diff --git a/.changelog/16583.txt b/.changelog/16583.txt new file mode 100644 index 000000000000..3ca0bd086df3 --- /dev/null +++ b/.changelog/16583.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: Added verification of cron jobs already running before forcing new evals right after leader change +``` diff --git a/nomad/leader.go b/nomad/leader.go index 6ef97b378d7f..399a2a7d761d 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -740,16 +740,45 @@ func (s *Server) restorePeriodicDispatcher() error { continue } + // We skip if the job doesn't allow overlap and there are already + // instances running + allowed, err := s.cronJobOverlapAllowed(job) + if err != nil { + return fmt.Errorf("failed to get job status: %v", err) + } + if !allowed { + continue + } + if _, err := s.periodicDispatcher.ForceEval(job.Namespace, job.ID); err != nil { logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err) return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err) } - logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID()) + + logger.Debug("periodic job force run during leadership establishment", "job", job.NamespacedID()) } return nil } +// cronJobOverlapAllowed checks if the job allows for overlap and if there are already +// instances of the job running in order to determine if a new evaluation needs to +// be created upon periodic dispatcher restore +func (s *Server) cronJobOverlapAllowed(job *structs.Job) (bool, error) { + if job.Periodic.ProhibitOverlap { + running, err := s.periodicDispatcher.dispatcher.RunningChildren(job) + if err != nil { + return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err) + } + + if running { + return false, nil + } + } + + return true, nil +} + // schedulePeriodic is used to do periodic job dispatch while we are leader func (s *Server) schedulePeriodic(stopCh chan struct{}) { evalGC := time.NewTicker(s.config.EvalGCInterval) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index b3e9c8a66c3b..3d4c821b950b 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -410,7 +410,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { now := time.Now() // Sleep till after the job should have been launched. - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) @@ -437,6 +437,27 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { } } +type mockJobEvalDispatcher struct { + forceEvalCalled, children bool + evalToReturn *structs.Evaluation + JobEvalDispatcher +} + +func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) { + mjed.forceEvalCalled = true + return mjed.evalToReturn, nil +} + +func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) { + return mjed.children, nil +} + +func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job { + job := testPeriodicJob(times...) + job.Periodic.ProhibitOverlap = true + return job +} + func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { ci.Parallel(t) @@ -444,6 +465,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { c.NumSchedulers = 0 }) defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) // Inject a periodic job that triggered once in the past, should trigger now @@ -464,7 +486,16 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { } // Create an eval for the past launch. - s1.periodicDispatcher.createEval(job, past) + eval, err := s1.periodicDispatcher.createEval(job, past) + must.NoError(t, err) + + md := &mockJobEvalDispatcher{ + children: false, + evalToReturn: eval, + JobEvalDispatcher: s1, + } + + s1.periodicDispatcher.dispatcher = md // Flush the periodic dispatcher, ensuring that no evals will be created. s1.periodicDispatcher.SetEnabled(false) @@ -474,6 +505,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) + s1.restorePeriodicDispatcher() // Ensure the job is tracked. @@ -494,6 +526,128 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { if last.Launch == past { t.Fatalf("restorePeriodicDispatcher did not force launch") } + + must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation")) +} + +func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + // Inject a periodic job that triggered once in the past, should trigger now + // and once in the future. + now := time.Now() + past := now.Add(-1 * time.Second) + future := now.Add(10 * time.Second) + + job := testPeriodicJob_OverlapEnabled(past, now, future) + req := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Namespace: job.Namespace, + }, + } + _, _, err := s1.raftApply(structs.JobRegisterRequestType, req) + must.NoError(t, err) + + // Create an eval for the past launch. + eval, err := s1.periodicDispatcher.createEval(job, past) + must.NoError(t, err) + + md := &mockJobEvalDispatcher{ + children: false, + evalToReturn: eval, + } + + s1.periodicDispatcher.dispatcher = md + + // Flush the periodic dispatcher, ensuring that no evals will be created. + s1.periodicDispatcher.SetEnabled(false) + + // Sleep till after the job should have been launched. + time.Sleep(3 * time.Second) + + // Restore the periodic dispatcher. + s1.periodicDispatcher.SetEnabled(true) + must.NoError(t, s1.restorePeriodicDispatcher()) + + // Ensure the job is tracked. + tuple := structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + } + must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored")) + + // Check that an eval was made. + ws := memdb.NewWatchSet() + last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.NotNil(t, last) + + must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch")) + + must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation")) +} + +func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + // Inject a periodic job that triggered once in the past, should trigger now + // and once in the future. + now := time.Now() + past := now.Add(-1 * time.Second) + future := now.Add(10 * time.Second) + + job := testPeriodicJob_OverlapEnabled(past, now, future) + req := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Namespace: job.Namespace, + }, + } + _, _, err := s1.raftApply(structs.JobRegisterRequestType, req) + must.NoError(t, err) + + // Create an eval for the past launch. + eval, err := s1.periodicDispatcher.createEval(job, past) + must.NoError(t, err) + + md := &mockJobEvalDispatcher{ + children: true, + evalToReturn: eval, + } + + s1.periodicDispatcher.dispatcher = md + + // Flush the periodic dispatcher, ensuring that no evals will be created. + s1.periodicDispatcher.SetEnabled(false) + + // Sleep till after the job should have been launched. + time.Sleep(3 * time.Second) + + // Restore the periodic dispatcher. + s1.periodicDispatcher.SetEnabled(true) + must.NoError(t, s1.restorePeriodicDispatcher()) + + // Ensure the job is tracked. + tuple := structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + } + must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored")) + + must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running")) } func TestLeader_PeriodicDispatch(t *testing.T) {