From d51db63c6afe66ff6de2d7996932fe25bd056222 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 3 Aug 2017 12:37:58 -0700 Subject: [PATCH 1/2] Don't restore parameterized periodic jobs --- nomad/leader.go | 10 ++++++++-- nomad/leader_test.go | 22 ++++++++++++++++------ nomad/periodic.go | 2 ++ 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 3c308a7863e4..2f2c25ec325d 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -71,8 +71,7 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { if err := s.establishLeadership(stopCh); err != nil { - s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", - err) + s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err) goto WAIT } establishedLeader = true @@ -288,6 +287,13 @@ func (s *Server) restorePeriodicDispatcher() error { now := time.Now() for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) + + // We skip adding parameterized jobs because they themselves aren't + // tracked, only the dispatched children are. + if job.IsParameterized() { + continue + } + s.periodicDispatcher.Add(job) // If the periodic job has never been launched before, launch will hold diff --git a/nomad/leader_test.go b/nomad/leader_test.go index eda24c2b6319..dbb1de19d311 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -328,10 +328,12 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { t.Fatalf("Should have a leader") } - // Inject a periodic job and non-periodic job + // Inject a periodic job, a parameterized periodic job and a non-periodic job periodic := mock.PeriodicJob() nonPeriodic := mock.Job() - for _, job := range []*structs.Job{nonPeriodic, periodic} { + parameterizedPeriodic := mock.PeriodicJob() + parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{} + for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} { req := structs.JobRegisterRequest{ Job: job, } @@ -359,12 +361,20 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { t.Fatalf("should have leader") }) - // Check that the new leader is tracking the periodic job. + // Check that the new leader is tracking the periodic job only testutil.WaitForResult(func() (bool, error) { - _, tracked := leader.periodicDispatcher.tracked[periodic.ID] - return tracked, nil + if _, tracked := leader.periodicDispatcher.tracked[periodic.ID]; !tracked { + return false, fmt.Errorf("periodic job not tracked") + } + if _, tracked := leader.periodicDispatcher.tracked[nonPeriodic.ID]; tracked { + return false, fmt.Errorf("non periodic job tracked") + } + if _, tracked := leader.periodicDispatcher.tracked[parameterizedPeriodic.ID]; tracked { + return false, fmt.Errorf("parameterized periodic job tracked") + } + return true, nil }, func(err error) { - t.Fatalf("periodic job not tracked") + t.Fatalf(err.Error()) }) } diff --git a/nomad/periodic.go b/nomad/periodic.go index d01f26acecac..4d3ae9875a7e 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -164,11 +164,13 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) { } } +// TODO Why have a seperate start method // Start begins the goroutine that creates derived jobs and evals. func (p *PeriodicDispatch) Start() { p.l.Lock() p.running = true p.l.Unlock() + // XXX There must be two insances of the run routine go p.run() } From 09e90da82d6d8532eaf6eea373cc16a252700293 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 3 Aug 2017 13:40:34 -0700 Subject: [PATCH 2/2] Fix double close and cleanup code --- nomad/leader.go | 1 - nomad/leader_test.go | 2 -- nomad/periodic.go | 70 ++++++++++++++++-------------------------- nomad/periodic_test.go | 26 +++++++++++++++- 4 files changed, 52 insertions(+), 47 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 2f2c25ec325d..d54dfbbb2865 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -148,7 +148,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the periodic dispatcher, since we are now the leader. s.periodicDispatcher.SetEnabled(true) - s.periodicDispatcher.Start() // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { diff --git a/nomad/leader_test.go b/nomad/leader_test.go index dbb1de19d311..4fc8cb06dae9 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -408,7 +408,6 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) - s1.periodicDispatcher.Start() s1.restorePeriodicDispatcher() // Ensure the job is tracked. @@ -460,7 +459,6 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) - s1.periodicDispatcher.Start() s1.restorePeriodicDispatcher() // Ensure the job is tracked. diff --git a/nomad/periodic.go b/nomad/periodic.go index 4d3ae9875a7e..11145e62ac75 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -2,6 +2,7 @@ package nomad import ( "container/heap" + "context" "fmt" "log" "strconv" @@ -19,14 +20,12 @@ import ( type PeriodicDispatch struct { dispatcher JobEvalDispatcher enabled bool - running bool tracked map[string]*structs.Job heap *periodicHeap updateCh chan struct{} - stopCh chan struct{} - waitCh chan struct{} + stopFn context.CancelFunc logger *log.Logger l sync.RWMutex } @@ -141,8 +140,6 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri tracked: make(map[string]*structs.Job), heap: NewPeriodicHeap(), updateCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), - waitCh: make(chan struct{}), logger: logger, } } @@ -152,26 +149,21 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri // will stop any launched go routine and flush the dispatcher. func (p *PeriodicDispatch) SetEnabled(enabled bool) { p.l.Lock() + defer p.l.Unlock() + wasRunning := p.enabled p.enabled = enabled - p.l.Unlock() - if !enabled { - if p.running { - close(p.stopCh) - <-p.waitCh - p.running = false - } - p.Flush() - } -} -// TODO Why have a seperate start method -// Start begins the goroutine that creates derived jobs and evals. -func (p *PeriodicDispatch) Start() { - p.l.Lock() - p.running = true - p.l.Unlock() - // XXX There must be two insances of the run routine - go p.run() + // If we are transistioning from enabled to disabled, stop the daemon and + // flush. + if !enabled && wasRunning { + p.stopFn() + p.flush() + } else if enabled && !wasRunning { + // If we are transitioning from disabled to enabled, run the daemon. + ctx, cancel := context.WithCancel(context.Background()) + p.stopFn = cancel + go p.run(ctx) + } } // Tracked returns the set of tracked job IDs. @@ -232,11 +224,9 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { } // Signal an update. - if p.running { - select { - case p.updateCh <- struct{}{}: - default: - } + select { + case p.updateCh <- struct{}{}: + default: } return nil @@ -269,11 +259,9 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error { } // Signal an update. - if p.running { - select { - case p.updateCh <- struct{}{}: - default: - } + select { + case p.updateCh <- struct{}{}: + default: } p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID) @@ -305,13 +293,12 @@ func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) { func (p *PeriodicDispatch) shouldRun() bool { p.l.RLock() defer p.l.RUnlock() - return p.enabled && p.running + return p.enabled } // run is a long-lived function that waits till a job's periodic spec is met and // then creates an evaluation to run the job. -func (p *PeriodicDispatch) run() { - defer close(p.waitCh) +func (p *PeriodicDispatch) run(ctx context.Context) { var launchCh <-chan time.Time for p.shouldRun() { job, launch := p.nextLaunch() @@ -324,7 +311,7 @@ func (p *PeriodicDispatch) run() { } select { - case <-p.stopCh: + case <-ctx.Done(): return case <-p.updateCh: continue @@ -455,15 +442,12 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) { return time.Unix(int64(launch), 0), nil } -// Flush clears the state of the PeriodicDispatcher -func (p *PeriodicDispatch) Flush() { - p.l.Lock() - defer p.l.Unlock() - p.stopCh = make(chan struct{}) +// flush clears the state of the PeriodicDispatcher +func (p *PeriodicDispatch) flush() { p.updateCh = make(chan struct{}, 1) - p.waitCh = make(chan struct{}) p.tracked = make(map[string]*structs.Job) p.heap = NewPeriodicHeap() + p.stopFn = nil } // periodicHeap wraps a heap and gives operations other than Push/Pop. diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 9d63148adc48..3aacabf5002d 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -78,7 +78,6 @@ func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) { m := NewMockJobEvalDispatcher() d := NewPeriodicDispatch(logger, m) d.SetEnabled(true) - d.Start() return d, m } @@ -97,6 +96,31 @@ func testPeriodicJob(times ...time.Time) *structs.Job { return job } +// TestPeriodicDispatch_SetEnabled test that setting enabled twice is a no-op. +// This tests the reported issue: https://github.com/hashicorp/nomad/issues/2829 +func TestPeriodicDispatch_SetEnabled(t *testing.T) { + t.Parallel() + p, _ := testPeriodicDispatcher() + + // SetEnabled has been called once but do it again. + p.SetEnabled(true) + + // Now disable and make sure everything is fine. + p.SetEnabled(false) + + // Enable and track something + p.SetEnabled(true) + job := mock.PeriodicJob() + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + tracked := p.Tracked() + if len(tracked) != 1 { + t.Fatalf("Add didn't track the job: %v", tracked) + } +} + func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher()