From 20b69f29938c446a9f4ef670e8a4c023c0126a6a Mon Sep 17 00:00:00 2001 From: tycho garen Date: Wed, 7 Jun 2023 16:39:43 -0400 Subject: [PATCH] fix tests --- quartz/function_job_test.go | 5 +++-- quartz/scheduler.go | 34 ++++++++++++++++++++-------------- quartz/scheduler_test.go | 26 ++++++++++++++++++-------- quartz/util_test.go | 2 ++ 4 files changed, 43 insertions(+), 24 deletions(-) diff --git a/quartz/function_job_test.go b/quartz/function_job_test.go index 0a561d4..09870f8 100644 --- a/quartz/function_job_test.go +++ b/quartz/function_job_test.go @@ -26,9 +26,10 @@ func TestFunctionJob(t *testing.T) { sched := quartz.NewStdScheduler() sched.Start(ctx) - sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300)) - sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800)) + sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(100*time.Millisecond)) + sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(200*time.Millisecond)) time.Sleep(time.Second) + sched.Clear() sched.Stop() diff --git a/quartz/scheduler.go b/quartz/scheduler.go index fce7676..d746259 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -246,17 +246,17 @@ func (sched *StdScheduler) startExecutionLoop(ctx context.Context) { log.Printf("Exit the empty execution loop.") return } - } else { - select { - case <-t.C: - sched.executeAndReschedule(ctx) - safeSetTimer(t, sched.calculateNextTick()) - case nextJobAt := <-sched.interrupt: - safeSetTimer(t, nextJobAt) - case <-ctx.Done(): - log.Printf("Exit the execution loop.") - return - } + continue + } + select { + case <-t.C: + sched.executeAndReschedule(ctx) + safeSetTimer(t, sched.calculateNextTick()) + case nextJobAt := <-sched.interrupt: + safeSetTimer(t, nextJobAt) + case <-ctx.Done(): + log.Printf("Exit the execution loop.") + return } } } @@ -265,7 +265,11 @@ func safeSetTimer(timer *time.Timer, next time.Time) { // reset/stop the timer if !timer.Stop() { // drain if needed - <-timer.C + select { + case <-timer.C: + default: + } + } // if the "next" time is in the future, we reset the timer to @@ -307,6 +311,7 @@ func (sched *StdScheduler) queueLen() int { func (sched *StdScheduler) calculateNextTick() time.Time { sched.mtx.Lock() defer sched.mtx.Unlock() + if sched.queue.Len() > 0 { return time.Unix(0, sched.queue.Head().priority) } @@ -325,9 +330,9 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { return } - next := time.Unix(0, sched.queue.Head().priority) - if time.Until(next) > 0 { + if next := time.Unix(0, sched.queue.Head().priority); time.Until(next) > 0 { // return early + sched.reset(ctx, next) return } it = heap.Pop(sched.queue).(*item) @@ -363,6 +368,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { nextRunTime, err := it.Trigger.NextFireTime(it.priority) if err != nil { log.Printf("The Job '%s' got out the execution loop: %q", it.Job.Description(), err.Error()) + sched.reset(ctx, time.Now().Add(-time.Millisecond)) return } it.priority = nextRunTime diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index 00251db..2c6feab 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -96,23 +96,27 @@ func TestSchedulerBlockingSemantics(t *testing.T) { defer timer.Stop() select { case <-timer.C: + t.Error("should never reach this") return false, nil case <-ctx.Done(): return true, nil } }), - quartz.NewSimpleTrigger(time.Millisecond)) + quartz.NewSimpleTrigger(time.Millisecond), + ) ticker := time.NewTicker(4 * time.Millisecond) + defer ticker.Stop() <-ticker.C if atomic.LoadInt64(&n) == 0 { t.Error("job should have run at least once") } + const attempts = 100 switch tt { case "Blocking": BLOCKING: - for iters := 0; iters < 100; iters++ { + for iters := 0; iters < attempts; iters++ { iters++ select { case <-ctx.Done(): @@ -127,23 +131,29 @@ func TestSchedulerBlockingSemantics(t *testing.T) { case "NonBlocking": var lastN int64 NONBLOCKING: - for iters := 0; iters < 100; iters++ { + for iters := 0; iters < attempts; iters++ { select { case <-ctx.Done(): break NONBLOCKING case <-ticker.C: num := atomic.LoadInt64(&n) - if num <= lastN { - t.Errorf("on iter %d n did not increase %d", - iters, num, - ) + if num > lastN { + break NONBLOCKING } + lastN = num } } + num := atomic.LoadInt64(&n) + if num <= lastN { + t.Errorf("on iter %d n did not increase %d", + attempts, num, + ) + } + case "WorkerSmall", "WorkerLarge": WORKERS: - for iters := 0; iters < 100; iters++ { + for iters := 0; iters < attempts; iters++ { select { case <-ctx.Done(): break WORKERS diff --git a/quartz/util_test.go b/quartz/util_test.go index 0a17983..ab0c944 100644 --- a/quartz/util_test.go +++ b/quartz/util_test.go @@ -8,12 +8,14 @@ import ( ) func assertEqual[T any](t *testing.T, a T, b T) { + t.Helper() if !reflect.DeepEqual(a, b) { t.Fatalf("%v != %v", a, b) } } func assertNotEqual[T any](t *testing.T, a T, b T) { + t.Helper() if reflect.DeepEqual(a, b) { t.Fatalf("%v == %v", a, b) }