From 7c391d4326a38d77de361675ad4732cd06ff8ab1 Mon Sep 17 00:00:00 2001 From: Rodrigo Broggi Date: Fri, 21 Jun 2024 16:17:34 +0200 Subject: [PATCH] issue-742: bug in `NextRun` (#743) * issue-742: bug in `NextRun` * issue-742: bug in `NextRun` correction --- scheduler.go | 16 ++++---- scheduler_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 96 insertions(+), 16 deletions(-) diff --git a/scheduler.go b/scheduler.go index 965f027d..ff816a8e 100644 --- a/scheduler.go +++ b/scheduler.go @@ -357,18 +357,16 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { return } - // if the job has more than one nextScheduled time, + // if the job has nextScheduled time in the past, // we need to remove any that are in the past. - if len(j.nextScheduled) > 1 { - var newNextScheduled []time.Time - for _, t := range j.nextScheduled { - if t.Before(s.now()) { - continue - } - newNextScheduled = append(newNextScheduled, t) + var newNextScheduled []time.Time + for _, t := range j.nextScheduled { + if t.Before(s.now()) { + continue } - j.nextScheduled = newNextScheduled + newNextScheduled = append(newNextScheduled, t) } + j.nextScheduled = newNextScheduled // if the job has a limited number of runs set, we need to // check how many runs have occurred and stop running this diff --git a/scheduler_test.go b/scheduler_test.go index a59b59bd..b3b73357 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -6,10 +6,12 @@ import ( "fmt" "os" "sync" + "sync/atomic" "testing" "time" "github.com/google/uuid" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -1185,6 +1187,92 @@ func TestScheduler_LimitModeAndSingleton(t *testing.T) { } } +func TestScheduler_OneTimeJob_DoesNotCleanupNext(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + schedulerStartTime := time.Date(2024, time.April, 3, 4, 5, 0, 0, time.UTC) + + tests := []struct { + name string + runAt time.Time + fakeClock clockwork.FakeClock + assertErr require.ErrorAssertionFunc + // asserts things about schedules, advance time and perform new assertions + advanceAndAsserts []func( + t *testing.T, + j Job, + clock clockwork.FakeClock, + runs *atomic.Uint32, + ) + }{ + { + name: "exhausted run do does not cleanup next item", + runAt: time.Date(2024, time.April, 22, 4, 5, 0, 0, time.UTC), + fakeClock: clockwork.NewFakeClockAt(schedulerStartTime), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + expected := time.Date(2024, time.April, 22, 4, 5, 0, 0, time.UTC) + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, expected, nextRunAt.UTC()) + + // advance and eventually run + oneSecondAfterNextRun := expected.Add(1 * time.Second) + + clock.Advance(oneSecondAfterNextRun.Sub(schedulerStartTime)) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, expected, lastRunAt, 1*time.Second) + + nextRunAt, err = j.NextRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, nextRunAt) + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t, WithClock(tt.fakeClock), WithLocation(time.UTC)) + t.Cleanup(func() { + require.NoError(t, s.Shutdown()) + }) + + runs := atomic.Uint32{} + j, err := s.NewJob( + OneTimeJob(OneTimeJobStartDateTime(tt.runAt)), + NewTask(func() { + runs.Add(1) + }), + ) + if tt.assertErr != nil { + tt.assertErr(t, err) + } else { + require.NoError(t, err) + s.Start() + + for _, advanceAndAssert := range tt.advanceAndAsserts { + advanceAndAssert(t, j, tt.fakeClock, &runs) + } + } + }) + } +} + var _ Elector = (*testElector)(nil) type testElector struct { @@ -1980,7 +2068,7 @@ func TestScheduler_OneTimeJob(t *testing.T) { s := newTestScheduler(t) - j, err := s.NewJob( + _, err := s.NewJob( OneTimeJob(tt.startAt()), NewTask(func() { jobRan <- struct{}{} @@ -1996,12 +2084,6 @@ func TestScheduler_OneTimeJob(t *testing.T) { t.Fatal("timed out waiting for job to run") } - var nextRun time.Time - for ; nextRun.IsZero(); nextRun, err = j.NextRun() { //nolint:revive - } - assert.NoError(t, err) - assert.True(t, nextRun.Before(time.Now())) - assert.NoError(t, s.Shutdown()) }) }