Skip to content

Commit

Permalink
Fix double close and cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Aug 3, 2017
1 parent d51db63 commit 09e90da
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 47 deletions.
1 change: 0 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)
s.periodicDispatcher.Start()

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand Down Expand Up @@ -460,7 +459,6 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {

// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()

// Ensure the job is tracked.
Expand Down
70 changes: 27 additions & 43 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"container/heap"
"context"
"fmt"
"log"
"strconv"
Expand All @@ -19,14 +20,12 @@ import (
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
running bool

tracked map[string]*structs.Job
heap *periodicHeap

updateCh chan struct{}
stopCh chan struct{}
waitCh chan struct{}
stopFn context.CancelFunc
logger *log.Logger
l sync.RWMutex
}
Expand Down Expand Up @@ -141,8 +140,6 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: logger,
}
}
Expand All @@ -152,26 +149,21 @@ func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *Peri
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
defer p.l.Unlock()
wasRunning := p.enabled
p.enabled = enabled
p.l.Unlock()
if !enabled {
if p.running {
close(p.stopCh)
<-p.waitCh
p.running = false
}
p.Flush()
}
}

// TODO Why have a seperate start method
// Start begins the goroutine that creates derived jobs and evals.
func (p *PeriodicDispatch) Start() {
p.l.Lock()
p.running = true
p.l.Unlock()
// XXX There must be two insances of the run routine
go p.run()
// If we are transistioning from enabled to disabled, stop the daemon and
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
go p.run(ctx)
}
}

// Tracked returns the set of tracked job IDs.
Expand Down Expand Up @@ -232,11 +224,9 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
}

// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
select {
case p.updateCh <- struct{}{}:
default:
}

return nil
Expand Down Expand Up @@ -269,11 +259,9 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error {
}

// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
select {
case p.updateCh <- struct{}{}:
default:
}

p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID)
Expand Down Expand Up @@ -305,13 +293,12 @@ func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) {
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled && p.running
return p.enabled
}

// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
defer close(p.waitCh)
func (p *PeriodicDispatch) run(ctx context.Context) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
Expand All @@ -324,7 +311,7 @@ func (p *PeriodicDispatch) run() {
}

select {
case <-p.stopCh:
case <-ctx.Done():
return
case <-p.updateCh:
continue
Expand Down Expand Up @@ -455,15 +442,12 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
return time.Unix(int64(launch), 0), nil
}

// Flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) Flush() {
p.l.Lock()
defer p.l.Unlock()
p.stopCh = make(chan struct{})
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
p.updateCh = make(chan struct{}, 1)
p.waitCh = make(chan struct{})
p.tracked = make(map[string]*structs.Job)
p.heap = NewPeriodicHeap()
p.stopFn = nil
}

// periodicHeap wraps a heap and gives operations other than Push/Pop.
Expand Down
26 changes: 25 additions & 1 deletion nomad/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) {
m := NewMockJobEvalDispatcher()
d := NewPeriodicDispatch(logger, m)
d.SetEnabled(true)
d.Start()
return d, m
}

Expand All @@ -97,6 +96,31 @@ func testPeriodicJob(times ...time.Time) *structs.Job {
return job
}

// TestPeriodicDispatch_SetEnabled test that setting enabled twice is a no-op.
// This tests the reported issue: https://github.com/hashicorp/nomad/issues/2829
func TestPeriodicDispatch_SetEnabled(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()

// SetEnabled has been called once but do it again.
p.SetEnabled(true)

// Now disable and make sure everything is fine.
p.SetEnabled(false)

// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}

tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
}

func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
Expand Down

0 comments on commit 09e90da

Please sign in to comment.