Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Jun 7, 2023
1 parent fa3af50 commit 20b69f2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 24 deletions.
5 changes: 3 additions & 2 deletions quartz/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ func TestFunctionJob(t *testing.T) {

sched := quartz.NewStdScheduler()
sched.Start(ctx)
sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(100*time.Millisecond))
sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(200*time.Millisecond))
time.Sleep(time.Second)

sched.Clear()
sched.Stop()

Expand Down
34 changes: 20 additions & 14 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,17 @@ func (sched *StdScheduler) startExecutionLoop(ctx context.Context) {
log.Printf("Exit the empty execution loop.")
return
}
} else {
select {
case <-t.C:
sched.executeAndReschedule(ctx)
safeSetTimer(t, sched.calculateNextTick())
case nextJobAt := <-sched.interrupt:
safeSetTimer(t, nextJobAt)
case <-ctx.Done():
log.Printf("Exit the execution loop.")
return
}
continue
}
select {
case <-t.C:
sched.executeAndReschedule(ctx)
safeSetTimer(t, sched.calculateNextTick())
case nextJobAt := <-sched.interrupt:
safeSetTimer(t, nextJobAt)
case <-ctx.Done():
log.Printf("Exit the execution loop.")
return
}
}
}
Expand All @@ -265,7 +265,11 @@ func safeSetTimer(timer *time.Timer, next time.Time) {
// reset/stop the timer
if !timer.Stop() {
// drain if needed
<-timer.C
select {
case <-timer.C:
default:
}

}

// if the "next" time is in the future, we reset the timer to
Expand Down Expand Up @@ -307,6 +311,7 @@ func (sched *StdScheduler) queueLen() int {
func (sched *StdScheduler) calculateNextTick() time.Time {
sched.mtx.Lock()
defer sched.mtx.Unlock()

if sched.queue.Len() > 0 {
return time.Unix(0, sched.queue.Head().priority)
}
Expand All @@ -325,9 +330,9 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
return
}

next := time.Unix(0, sched.queue.Head().priority)
if time.Until(next) > 0 {
if next := time.Unix(0, sched.queue.Head().priority); time.Until(next) > 0 {
// return early
sched.reset(ctx, next)
return
}
it = heap.Pop(sched.queue).(*item)
Expand Down Expand Up @@ -363,6 +368,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
nextRunTime, err := it.Trigger.NextFireTime(it.priority)
if err != nil {
log.Printf("The Job '%s' got out the execution loop: %q", it.Job.Description(), err.Error())
sched.reset(ctx, time.Now().Add(-time.Millisecond))
return
}
it.priority = nextRunTime
Expand Down
26 changes: 18 additions & 8 deletions quartz/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,27 @@ func TestSchedulerBlockingSemantics(t *testing.T) {
defer timer.Stop()
select {
case <-timer.C:
t.Error("should never reach this")
return false, nil
case <-ctx.Done():
return true, nil
}
}),
quartz.NewSimpleTrigger(time.Millisecond))
quartz.NewSimpleTrigger(time.Millisecond),
)

ticker := time.NewTicker(4 * time.Millisecond)
defer ticker.Stop()
<-ticker.C
if atomic.LoadInt64(&n) == 0 {
t.Error("job should have run at least once")
}

const attempts = 100
switch tt {
case "Blocking":
BLOCKING:
for iters := 0; iters < 100; iters++ {
for iters := 0; iters < attempts; iters++ {
iters++
select {
case <-ctx.Done():
Expand All @@ -127,23 +131,29 @@ func TestSchedulerBlockingSemantics(t *testing.T) {
case "NonBlocking":
var lastN int64
NONBLOCKING:
for iters := 0; iters < 100; iters++ {
for iters := 0; iters < attempts; iters++ {
select {
case <-ctx.Done():
break NONBLOCKING
case <-ticker.C:
num := atomic.LoadInt64(&n)
if num <= lastN {
t.Errorf("on iter %d n did not increase %d",
iters, num,
)
if num > lastN {
break NONBLOCKING
}

lastN = num
}
}
num := atomic.LoadInt64(&n)
if num <= lastN {
t.Errorf("on iter %d n did not increase %d",
attempts, num,
)
}

case "WorkerSmall", "WorkerLarge":
WORKERS:
for iters := 0; iters < 100; iters++ {
for iters := 0; iters < attempts; iters++ {
select {
case <-ctx.Done():
break WORKERS
Expand Down
2 changes: 2 additions & 0 deletions quartz/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
)

func assertEqual[T any](t *testing.T, a T, b T) {
t.Helper()
if !reflect.DeepEqual(a, b) {
t.Fatalf("%v != %v", a, b)
}
}

func assertNotEqual[T any](t *testing.T, a T, b T) {
t.Helper()
if reflect.DeepEqual(a, b) {
t.Fatalf("%v == %v", a, b)
}
Expand Down

0 comments on commit 20b69f2

Please sign in to comment.