diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cd9eac035db..d7bcee6f0f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,8 @@ IMPROVEMENTS: BUG FIXES: + * core: Fix issue in which restoring periodic jobs could fail when a leader + election occurs [GH-3646] * core: Fixed an issue where the leader server could get into a state where it was no longer performing the periodic leader loop duties after a barrier timeout error [GH-3402] diff --git a/nomad/fsm.go b/nomad/fsm.go index 4aa3bb3890d2..7d0ef4fc41a7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -351,8 +351,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // We always add the job to the periodic dispatcher because there is the // possibility that the periodic spec was removed and then we should stop // tracking it. - added, err := n.periodicDispatcher.Add(req.Job) - if err != nil { + if err := n.periodicDispatcher.Add(req.Job); err != nil { n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err) return err } @@ -360,12 +359,12 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // Create a watch set ws := memdb.NewWatchSet() - // If it is periodic, record the time it was inserted. This is necessary for - // recovering during leader election. It is possible that from the time it - // is added to when it was suppose to launch, leader election occurs and the - // job was not launched. In this case, we use the insertion time to - // determine if a launch was missed. - if added { + // If it is an active periodic job, record the time it was inserted. This is + // necessary for recovering during leader election. It is possible that from + // the time it is added to when it was suppose to launch, leader election + // occurs and the job was not launched. In this case, we use the insertion + // time to determine if a launch was missed. + if req.Job.IsPeriodicActive() { prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index cc688a848f7e..4f9051addaf1 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -331,6 +331,65 @@ func TestFSM_RegisterJob(t *testing.T) { } } +func TestFSM_RegisterPeriodicJob_NonLeader(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + // Disable the dispatcher + fsm.periodicDispatcher.SetEnabled(false) + + job := mock.PeriodicJob() + req := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Namespace: job.Namespace, + }, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + ws := memdb.NewWatchSet() + jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if jobOut == nil { + t.Fatalf("not found!") + } + if jobOut.CreateIndex != 1 { + t.Fatalf("bad index: %d", jobOut.CreateIndex) + } + + // Verify it wasn't added to the periodic runner. + tuple := structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + } + if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok { + t.Fatal("job added to periodic runner") + } + + // Verify the launch time was tracked. + launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if launchOut == nil { + t.Fatalf("not found!") + } + if launchOut.Launch.IsZero() { + t.Fatalf("bad launch time: %v", launchOut.Launch) + } +} + func TestFSM_RegisterJob_BadNamespace(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/leader.go b/nomad/leader.go index 01787f30e0b2..1b13bf86d768 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -360,14 +360,12 @@ func (s *Server) restorePeriodicDispatcher() error { continue } - added, err := s.periodicDispatcher.Add(job) - if err != nil { + if err := s.periodicDispatcher.Add(job); err != nil { return err } - // We did not add the job to the tracker, this can be for a variety of - // reasons, but it means that we do not need to force run it. - if !added { + // We do not need to force run the job since it isn't active. + if !job.IsPeriodicActive() { continue } @@ -375,9 +373,13 @@ func (s *Server) restorePeriodicDispatcher() error { // the time the periodic job was added. Otherwise it has the last launch // time of the periodic job. launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID) - if err != nil || launch == nil { + if err != nil { return fmt.Errorf("failed to get periodic launch time: %v", err) } + if launch == nil { + return fmt.Errorf("no recorded periodic launch time for job %q in namespace %q", + job.ID, job.Namespace) + } // nextLaunch is the next launch that should occur. nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation())) diff --git a/nomad/periodic.go b/nomad/periodic.go index 23d1d1e4547f..0549f900ccf4 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -192,18 +192,18 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job { // Add begins tracking of a periodic job. If it is already tracked, it acts as // an update to the jobs periodic spec. The method returns whether the job was // added and any error that may have occurred. -func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) { +func (p *PeriodicDispatch) Add(job *structs.Job) error { p.l.Lock() defer p.l.Unlock() // Do nothing if not enabled if !p.enabled { - return false, nil + return nil } // If we were tracking a job and it has been disabled, made non-periodic, // stopped or is parameterized, remove it - disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized() + disabled := !job.IsPeriodicActive() tuple := structs.NamespacedID{ ID: job.ID, @@ -216,7 +216,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) { } // If the job is disabled and we aren't tracking it, do nothing. - return false, nil + return nil } // Add or update the job. @@ -224,12 +224,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) { next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation())) if tracked { if err := p.heap.Update(job, next); err != nil { - return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err) + return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err) } p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace) } else { if err := p.heap.Push(job, next); err != nil { - return false, fmt.Errorf("failed to add job %v: %v", job.ID, err) + return fmt.Errorf("failed to add job %v: %v", job.ID, err) } p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace) } @@ -240,7 +240,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) { default: } - return true, nil + return nil } // Remove stops tracking the passed job. If the job is not tracked, it is a diff --git a/nomad/periodic_endpoint_test.go b/nomad/periodic_endpoint_test.go index 7547de5acd1a..575b9dd1ab98 100644 --- a/nomad/periodic_endpoint_test.go +++ b/nomad/periodic_endpoint_test.go @@ -77,7 +77,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) { job := mock.PeriodicJob() job.Periodic.ProhibitOverlap = true // Shouldn't affect anything. assert.Nil(state.UpsertJob(100, job)) - _, err := s1.periodicDispatcher.Add(job) + err := s1.periodicDispatcher.Add(job) assert.Nil(err) // Force launch it. diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 7e3f07658fcb..4bc3d20cc824 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) { // Enable and track something p.SetEnabled(true) job := mock.PeriodicJob() - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } tracked := p.Tracked() @@ -130,10 +130,8 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() job := mock.Job() - if added, err := p.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err) - } else if added { - t.Fatalf("Add of non-periodic job happened, expect no-op") } tracked := p.Tracked() @@ -147,8 +145,8 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) { p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() job.ParameterizedJob = &structs.ParameterizedJobConfig{} - if added, err := p.Add(job); err != nil || added { - t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add of periodic parameterized job failed: %v", err) } tracked := p.Tracked() @@ -162,8 +160,8 @@ func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) { p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() job.Stop = true - if added, err := p.Add(job); err != nil || added { - t.Fatalf("Add of stopped periodic job failed: %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add of stopped periodic job failed: %v", err) } tracked := p.Tracked() @@ -176,8 +174,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } tracked := p.Tracked() @@ -187,8 +185,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { // Update the job and add it again. job.Periodic.Spec = "foo" - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed: %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } tracked = p.Tracked() @@ -208,13 +206,9 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) { job := mock.PeriodicJob() job2 := mock.PeriodicJob() job2.Namespace = "test" - added, err := p.Add(job) - assert.Nil(err) - assert.True(added) + assert.Nil(p.Add(job)) - added, err = p.Add(job2) - assert.Nil(err) - assert.True(added) + assert.Nil(p.Add(job2)) assert.Len(p.Tracked(), 2) @@ -227,8 +221,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } tracked := p.Tracked() @@ -238,8 +232,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) { // Update the job to be non-periodic and add it again. job.Periodic = nil - if added, err := p.Add(job); err != nil || added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } tracked = p.Tracked() @@ -256,15 +250,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { job := testPeriodicJob(time.Now().Add(10 * time.Second)) // Add it. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } // Update it to be sooner and re-add. expected := time.Now().Round(1 * time.Second).Add(1 * time.Second) job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix()) - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } // Check that nothing is created. @@ -304,8 +298,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } tracked := p.Tracked() @@ -331,8 +325,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) { job := testPeriodicJob(time.Now().Add(1 * time.Second)) // Add it. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } // Remove the job. @@ -370,8 +364,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { job := testPeriodicJob(time.Now().Add(10 * time.Second)) // Add it. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } // ForceRun the job @@ -402,8 +396,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) { job.Periodic.ProhibitOverlap = true // Add it. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } time.Sleep(3 * time.Second) @@ -431,8 +425,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) { job := testPeriodicJob(launch1, launch2) // Add it. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } time.Sleep(3 * time.Second) @@ -463,11 +457,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) { job2 := testPeriodicJob(launch) // Add them. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } - if added, err := p.Add(job2); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job2); err != nil { + t.Fatalf("Add failed %v", err) } if l := len(p.Tracked()); l != 2 { @@ -503,11 +497,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) { job2.Namespace = "test" // Add them. - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } - if added, err := p.Add(job2); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job2); err != nil { + t.Fatalf("Add failed %v", err) } if l := len(p.Tracked()); l != 2 { @@ -587,8 +581,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) { shuffle(toDelete) for _, job := range jobs { - if added, err := p.Add(job); err != nil || !added { - t.Fatalf("Add failed %v %v", added, err) + if err := p.Add(job); err != nil { + t.Fatalf("Add failed %v", err) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 52c0f1410651..b15f38811103 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1939,6 +1939,12 @@ func (j *Job) IsPeriodic() bool { return j.Periodic != nil } +// IsPeriodicActive returns whether the job is an active periodic job that will +// create child jobs +func (j *Job) IsPeriodicActive() bool { + return j.IsPeriodic() && j.Periodic.Enabled && !j.Stopped() && !j.IsParameterized() +} + // IsParameterized returns whether a job is parameterized job. func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 07db00439ce6..26b2d2a02442 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -637,6 +637,58 @@ func TestJob_IsPeriodic(t *testing.T) { } } +func TestJob_IsPeriodicActive(t *testing.T) { + cases := []struct { + job *Job + active bool + }{ + { + job: &Job{ + Type: JobTypeService, + Periodic: &PeriodicConfig{ + Enabled: true, + }, + }, + active: true, + }, + { + job: &Job{ + Type: JobTypeService, + Periodic: &PeriodicConfig{ + Enabled: false, + }, + }, + active: false, + }, + { + job: &Job{ + Type: JobTypeService, + Periodic: &PeriodicConfig{ + Enabled: true, + }, + Stop: true, + }, + active: false, + }, + { + job: &Job{ + Type: JobTypeService, + Periodic: &PeriodicConfig{ + Enabled: false, + }, + ParameterizedJob: &ParameterizedJobConfig{}, + }, + active: false, + }, + } + + for i, c := range cases { + if act := c.job.IsPeriodicActive(); act != c.active { + t.Fatalf("case %d failed: got %v; want %v", i, act, c.active) + } + } +} + func TestJob_SystemJob_Validate(t *testing.T) { j := testJob() j.Type = JobTypeSystem