Skip to content

Commit

Permalink
Multiple instances of a periodic job are run simultaneously, when pro…
Browse files Browse the repository at this point in the history
…hibit_overlap is true

Fixes #11052
When restoring periodic dispatcher, all periodic jobs are forced without checking for previous children.
  • Loading branch information
Juanadelacuesta committed Mar 21, 2023
1 parent e4cb7db commit 9a9faf7
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 1 deletion.
178 changes: 178 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,34 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}
}

type mockJobEvalDispatcher struct {
called, children bool
evalToReturn *structs.Evaluation
}

func (mjed *mockJobEvalDispatcher) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
mjed.called = true
return mjed.evalToReturn, nil
}

func (mjed *mockJobEvalDispatcher) RunningChildren(job *structs.Job) (bool, error) {
return mjed.children, nil
}

func testPeriodicJobWithOverlapEnabled(times ...time.Time) *structs.Job {
job := testPeriodicJob(times...)
job.Periodic.ProhibitOverlap = true
return job
}

func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()

testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
Expand All @@ -464,6 +485,158 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Create an eval for the past launch.
eval, _ := s1.periodicDispatcher.createEval(job, past)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)

s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
t.Fatalf("periodic job not restored")
}

// Check that an eval was made.
ws := memdb.NewWatchSet()
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

if md.called != true {
t.Fatalf("failed to force job evaluation")
}
}

func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJobWithOverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Create an eval for the past launch.
eval, _ := s1.periodicDispatcher.createEval(job, past)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
t.Fatalf("periodic job not restored")
}

// Check that an eval was made.
ws := memdb.NewWatchSet()
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}

if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

if md.called != true {
t.Fatalf("failed to force job evaluation")
}
}

func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

md := &mockJobEvalDispatcher{
children: true,
evalToReturn: nil,
}

s1.periodicDispatcher.dispatcher = md

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)
job := testPeriodicJob(past, now, future)

req := structs.JobRegisterRequest{
Job: job,
PolicyOverride: true,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)

Expand Down Expand Up @@ -492,9 +665,14 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}

if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

if md.called != false {
t.Fatalf("job evaluation forced when job is already running")
}
}

func TestLeader_PeriodicDispatch(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,9 @@ func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
return nil
}

// ForceRunIfNotRunning causes the periodic job to be evaluated only if there are
// no overlap constrains in the definition and there are no jobs running already
func (p *PeriodicDispatch) ForceRunIfNotRunning(job *structs.Job) (*structs.Evaluation, error) {

if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
Expand Down

0 comments on commit 9a9faf7

Please sign in to comment.