Skip to content

Commit

Permalink
Merge pull request #3646 from hashicorp/b-periodic-non-leader
Browse files Browse the repository at this point in the history
Fix followers not creating periodic launch
  • Loading branch information
dadgar committed Dec 11, 2017
2 parents 82cbb70 + 8d0f34a commit 888acf9
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 69 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ IMPROVEMENTS:

BUG FIXES:

* core: Fix issue in which restoring periodic jobs could fail when a leader
election occurs [GH-3646]
* core: Fixed an issue where the leader server could get into a state where it
was no longer performing the periodic leader loop duties after a barrier
timeout error [GH-3402]
Expand Down
15 changes: 7 additions & 8 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,21 +351,20 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
added, err := n.periodicDispatcher.Add(req.Job)
if err != nil {
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}

// Create a watch set
ws := memdb.NewWatchSet()

// If it is periodic, record the time it was inserted. This is necessary for
// recovering during leader election. It is possible that from the time it
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if added {
// If it is an active periodic job, record the time it was inserted. This is
// necessary for recovering during leader election. It is possible that from
// the time it is added to when it was suppose to launch, leader election
// occurs and the job was not launched. In this case, we use the insertion
// time to determine if a launch was missed.
if req.Job.IsPeriodicActive() {
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
Expand Down
59 changes: 59 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,65 @@ func TestFSM_RegisterJob(t *testing.T) {
}
}

func TestFSM_RegisterPeriodicJob_NonLeader(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

// Disable the dispatcher
fsm.periodicDispatcher.SetEnabled(false)

job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("not found!")
}
if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", jobOut.CreateIndex)
}

// Verify it wasn't added to the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job added to periodic runner")
}

// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}

func TestFSM_RegisterJob_BadNamespace(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
14 changes: 8 additions & 6 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,24 +360,26 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

added, err := s.periodicDispatcher.Add(job)
if err != nil {
if err := s.periodicDispatcher.Add(job); err != nil {
return err
}

// We did not add the job to the tracker, this can be for a variety of
// reasons, but it means that we do not need to force run it.
if !added {
// We do not need to force run the job since it isn't active.
if !job.IsPeriodicActive() {
continue
}

// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || launch == nil {
if err != nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
if launch == nil {
return fmt.Errorf("no recorded periodic launch time for job %q in namespace %q",
job.ID, job.Namespace)
}

// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
Expand Down
14 changes: 7 additions & 7 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()

// Do nothing if not enabled
if !p.enabled {
return false, nil
return nil
}

// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()
disabled := !job.IsPeriodicActive()

tuple := structs.NamespacedID{
ID: job.ID,
Expand All @@ -216,20 +216,20 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
}

// If the job is disabled and we aren't tracking it, do nothing.
return false, nil
return nil
}

// Add or update the job.
p.tracked[tuple] = job
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if tracked {
if err := p.heap.Update(job, next); err != nil {
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
} else {
if err := p.heap.Push(job, next); err != nil {
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
}
Expand All @@ -240,7 +240,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
default:
}

return true, nil
return nil
}

// Remove stops tracking the passed job. If the job is not tracked, it is a
Expand Down
2 changes: 1 addition & 1 deletion nomad/periodic_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
job := mock.PeriodicJob()
job.Periodic.ProhibitOverlap = true // Shouldn't affect anything.
assert.Nil(state.UpsertJob(100, job))
_, err := s1.periodicDispatcher.Add(job)
err := s1.periodicDispatcher.Add(job)
assert.Nil(err)

// Force launch it.
Expand Down
Loading

0 comments on commit 888acf9

Please sign in to comment.