Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix followers not creating periodic launch #3646

Merged
merged 2 commits into from
Dec 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ IMPROVEMENTS:

BUG FIXES:

* core: Fix issue in which restoring periodic jobs could fail when a leader
election occurs [GH-3646]
* core: Fixed an issue where the leader server could get into a state where it
was no longer performing the periodic leader loop duties after a barrier
timeout error [GH-3402]
Expand Down
15 changes: 7 additions & 8 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,21 +351,20 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
added, err := n.periodicDispatcher.Add(req.Job)
if err != nil {
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}

// Create a watch set
ws := memdb.NewWatchSet()

// If it is periodic, record the time it was inserted. This is necessary for
// recovering during leader election. It is possible that from the time it
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if added {
// If it is an active periodic job, record the time it was inserted. This is
// necessary for recovering during leader election. It is possible that from
// the time it is added to when it was suppose to launch, leader election
// occurs and the job was not launched. In this case, we use the insertion
// time to determine if a launch was missed.
if req.Job.IsPeriodicActive() {
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
Expand Down
59 changes: 59 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,65 @@ func TestFSM_RegisterJob(t *testing.T) {
}
}

func TestFSM_RegisterPeriodicJob_NonLeader(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

// Disable the dispatcher
fsm.periodicDispatcher.SetEnabled(false)

job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("not found!")
}
if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", jobOut.CreateIndex)
}

// Verify it wasn't added to the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job added to periodic runner")
}

// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}

func TestFSM_RegisterJob_BadNamespace(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
14 changes: 8 additions & 6 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,24 +360,26 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}

added, err := s.periodicDispatcher.Add(job)
if err != nil {
if err := s.periodicDispatcher.Add(job); err != nil {
return err
}

// We did not add the job to the tracker, this can be for a variety of
// reasons, but it means that we do not need to force run it.
if !added {
// We do not need to force run the job since it isn't active.
if !job.IsPeriodicActive() {
continue
}

// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || launch == nil {
if err != nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
if launch == nil {
return fmt.Errorf("no recorded periodic launch time for job %q in namespace %q",
job.ID, job.Namespace)
}

// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
Expand Down
14 changes: 7 additions & 7 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()

// Do nothing if not enabled
if !p.enabled {
return false, nil
return nil
}

// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()
disabled := !job.IsPeriodicActive()

tuple := structs.NamespacedID{
ID: job.ID,
Expand All @@ -216,20 +216,20 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
}

// If the job is disabled and we aren't tracking it, do nothing.
return false, nil
return nil
}

// Add or update the job.
p.tracked[tuple] = job
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if tracked {
if err := p.heap.Update(job, next); err != nil {
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
} else {
if err := p.heap.Push(job, next); err != nil {
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
}
Expand All @@ -240,7 +240,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
default:
}

return true, nil
return nil
}

// Remove stops tracking the passed job. If the job is not tracked, it is a
Expand Down
2 changes: 1 addition & 1 deletion nomad/periodic_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
job := mock.PeriodicJob()
job.Periodic.ProhibitOverlap = true // Shouldn't affect anything.
assert.Nil(state.UpsertJob(100, job))
_, err := s1.periodicDispatcher.Add(job)
err := s1.periodicDispatcher.Add(job)
assert.Nil(err)

// Force launch it.
Expand Down
Loading