Skip to content

Commit

Permalink
fix: remove defer and inline unlock for speed optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Juanadelacuesta committed Mar 22, 2023
1 parent 6a48053 commit eec8f23
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 38 deletions.
5 changes: 3 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func (s *Server) restorePeriodicDispatcher() error {
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
}

logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())
logger.Debug("periodic job force run during leadership establishment", "job", job.NamespacedID())
}

return nil
Expand All @@ -800,8 +800,9 @@ func (s *Server) restorePeriodicDispatcher() error {
// instances of the job running in order to determine if a new evaluation needs to
// be created upon periodic dispatcher restore
func (s *Server) isNewEvalNeeded(job *structs.Job) (bool, error) {

if job.Periodic.ProhibitOverlap {
running, err := s.RunningChildren(job)
running, err := s.periodicDispatcher.dispatcher.RunningChildren(job)
if err != nil {
return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err)
}
Expand Down
61 changes: 26 additions & 35 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,20 +439,21 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}

type mockJobEvalDispatcher struct {
called, children bool
evalToReturn *structs.Evaluation
forceEvalCalled, children bool
evalToReturn *structs.Evaluation
JobEvalDispatcher
}

func (mjed *mockJobEvalDispatcher) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
mjed.called = true
mjed.forceEvalCalled = true
return mjed.evalToReturn, nil
}

func (mjed *mockJobEvalDispatcher) RunningChildren(job *structs.Job) (bool, error) {
return mjed.children, nil
}

func testPeriodicJobWithOverlapEnabled(times ...time.Time) *structs.Job {
func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
job := testPeriodicJob(times...)
job.Periodic.ProhibitOverlap = true
return job
Expand Down Expand Up @@ -489,8 +490,9 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
eval, _ := s1.periodicDispatcher.createEval(job, past)

md := &mockJobEvalDispatcher{
children: false,
evalToReturn: eval,
children: false,
evalToReturn: eval,
JobEvalDispatcher: s1,
}

s1.periodicDispatcher.dispatcher = md
Expand Down Expand Up @@ -525,7 +527,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

if md.called != true {
if md.forceEvalCalled != true {
t.Fatalf("failed to force job evaluation")
}
}
Expand All @@ -545,7 +547,7 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)

job := testPeriodicJobWithOverlapEnabled(past, now, future)
job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Expand All @@ -571,7 +573,7 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand All @@ -597,9 +599,10 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

if md.called != true {
if md.forceEvalCalled != true {
t.Fatalf("failed to force job evaluation")
}
t.Fail()
}

func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
Expand All @@ -611,23 +614,15 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

md := &mockJobEvalDispatcher{
children: true,
evalToReturn: nil,
}

s1.periodicDispatcher.dispatcher = md

// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)
job := testPeriodicJob(past, now, future)

job := testPeriodicJob_OverlapEnabled(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
PolicyOverride: true,
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
Expand All @@ -638,13 +633,20 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
}

// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
eval, _ := s1.periodicDispatcher.createEval(job, past)

md := &mockJobEvalDispatcher{
children: true,
evalToReturn: eval,
}

s1.periodicDispatcher.dispatcher = md

// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)

// Sleep till after the job should have been launched.
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
Expand All @@ -659,19 +661,8 @@ func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
t.Fatalf("periodic job not restored")
}

// Check that an eval was made.
ws := memdb.NewWatchSet()
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}

if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}

if md.called != false {
t.Fatalf("job evaluation forced when job is already running")
if md.forceEvalCalled != false {
t.Fatalf("evaluation forced with job already running")
}
}

Expand Down
4 changes: 3 additions & 1 deletion nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ func (p *PeriodicDispatch) run(ctx context.Context, updateCh <-chan struct{}) {
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
defer p.l.Unlock()

nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
Expand All @@ -348,16 +347,19 @@ func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
p.logger.Error("failed to determine if periodic job has running children", "job", job.NamespacedID(), "error", err)
p.l.Unlock()
return
}

if running {
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
p.l.Unlock()
return
}
}

p.logger.Debug(" launching job", "job", job.NamespacedID(), "launch_time", launchTime)
p.l.Unlock()
p.createEval(job, launchTime)
}

Expand Down

0 comments on commit eec8f23

Please sign in to comment.