Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple instances of a periodic job are run simultaneously, when prohibit_overlap is true #16583

Merged
merged 21 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
51249fc
Multiple instances of a periodic job are run simultaneously, when pro…
jrasell Mar 21, 2023
e9850f3
Multiple instances of a periodic job are run simultaneously, when pro…
Juanadelacuesta Mar 21, 2023
3c858a9
style: refactor force run function
Juanadelacuesta Mar 22, 2023
4c59344
fix: remove defer and inline unlock for speed optimization
Juanadelacuesta Mar 22, 2023
8ac3e0e
Update nomad/leader.go
Juanadelacuesta Mar 22, 2023
90db021
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
23807bd
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
eb6cd35
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
f4c24bc
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
62125b1
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
b3eacaa
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
c762dc8
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
d182bd5
style: refactor tests to use must
Juanadelacuesta Mar 22, 2023
dd05f71
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
f4352f0
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
2a9a785
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
f841f4f
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
186f982
Update nomad/leader_test.go
Juanadelacuesta Mar 22, 2023
816e27c
fix: move back from defer to calling unlock before returning.
Juanadelacuesta Mar 22, 2023
3cbb6f4
style: refactor test to use must
Juanadelacuesta Mar 22, 2023
75ccde6
added new entry to changelog and update comments
Juanadelacuesta Mar 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16583.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: Added verification of cron jobs already running before forcing new evals right after leader change
```
31 changes: 30 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,16 +774,45 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

// We skip if the job doesn't allow overlap and there are already
// instances running
allowed, err := s.cronJobOverlapAllowed(job)
if err != nil {
return fmt.Errorf("failed to get job status: %v", err)
}
if !allowed {
continue
}

if _, err := s.periodicDispatcher.ForceEval(job.Namespace, job.ID); err != nil {
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
}
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())

logger.Debug("periodic job force run during leadership establishment", "job", job.NamespacedID())
}

return nil
}

// cronJobOverlapAllowed checks if the job allows for overlap and if there are already
// instances of the job running in order to determine if a new evaluation needs to
// be created upon periodic dispatcher restore
func (s *Server) cronJobOverlapAllowed(job *structs.Job) (bool, error) {
if job.Periodic.ProhibitOverlap {
running, err := s.periodicDispatcher.dispatcher.RunningChildren(job)
if err != nil {
return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err)
}

if running {
return false, nil
}
}

return true, nil
}

// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
Expand Down
158 changes: 156 additions & 2 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
now := time.Now()

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand All @@ -438,13 +438,35 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}
}

type mockJobEvalDispatcher struct {
forceEvalCalled, children bool
evalToReturn *structs.Evaluation
JobEvalDispatcher
}

func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) {
mjed.forceEvalCalled = true
return mjed.evalToReturn, nil
}

func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) {
return mjed.children, nil
}

func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
job := testPeriodicJob(times...)
job.Periodic.ProhibitOverlap = true
return job
}

func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()

testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
Expand All @@ -465,7 +487,16 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
}

// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
JobEvalDispatcher: s1,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)
Expand All @@ -475,6 +506,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)

s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand All @@ -495,6 +527,128 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
}

func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
must.NoError(t, err)

// Create an eval for the past launch.
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
must.NoError(t, s1.restorePeriodicDispatcher())

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))

// Check that an eval was made.
ws := memdb.NewWatchSet()
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
must.NoError(t, err)
must.NotNil(t, last)

must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch"))

must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
}

func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
must.NoError(t, err)

// Create an eval for the past launch.
eval, err := s1.periodicDispatcher.createEval(job, past)
must.NoError(t, err)

md := &mockJobEvalDispatcher{
children: true,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
must.NoError(t, s1.restorePeriodicDispatcher())

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))

must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running"))
}

func TestLeader_PeriodicDispatch(t *testing.T) {
Expand Down