diff --git a/pkg/utils/sleeper_task_test.go b/pkg/utils/sleeper_task_test.go index ab91a8e0d..3cc233fbc 100644 --- a/pkg/utils/sleeper_task_test.go +++ b/pkg/utils/sleeper_task_test.go @@ -1,41 +1,36 @@ package utils_test import ( - "sync/atomic" "testing" "time" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - - "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/utils" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) -type countingWorker struct { - numJobsPerformed atomic.Int32 - delay time.Duration +type chanWorker struct { + ch chan struct{} + delay time.Duration } -func (t *countingWorker) Name() string { - return "CountingWorker" +func (t *chanWorker) Name() string { + return "ChanWorker" } -func (t *countingWorker) Work() { +func (t *chanWorker) Work() { if t.delay != 0 { time.Sleep(t.delay) } - // Without an atomic, the race detector fails - t.numJobsPerformed.Add(1) -} - -func (t *countingWorker) getNumJobsPerformed() int { - return int(t.numJobsPerformed.Load()) + t.ch <- struct{}{} } func TestSleeperTask_WakeupAfterStopPanics(t *testing.T) { t.Parallel() - worker := &countingWorker{} + worker := &chanWorker{ch: make(chan struct{}, 1)} sleeper := utils.NewSleeperTask(worker) require.NoError(t, sleeper.Stop()) @@ -43,13 +38,18 @@ func TestSleeperTask_WakeupAfterStopPanics(t *testing.T) { require.Panics(t, func() { sleeper.WakeUp() }) - gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(0)) + + select { + case <-worker.ch: + t.Fatal("work was performed when none was expected") + default: + } } func TestSleeperTask_CallingStopTwiceFails(t *testing.T) { t.Parallel() - worker := &countingWorker{} + worker := &chanWorker{} sleeper := utils.NewSleeperTask(worker) require.NoError(t, sleeper.Stop()) require.Error(t, sleeper.Stop()) @@ -57,17 +57,24 @@ func TestSleeperTask_CallingStopTwiceFails(t *testing.T) { func TestSleeperTask_WakeupPerformsWork(t *testing.T) { t.Parallel() + ctx := tests.Context(t) - worker := &countingWorker{} + worker := &chanWorker{ch: make(chan struct{}, 1)} sleeper := utils.NewSleeperTask(worker) sleeper.WakeUp() - gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(1)) + + select { + case <-worker.ch: + case <-ctx.Done(): + t.Error("timed out waiting for work to be performed") + } + require.NoError(t, sleeper.Stop()) } type controllableWorker struct { - countingWorker + chanWorker awaitWorkStarted chan struct{} allowResumeWork chan struct{} ignoreSignals bool @@ -78,13 +85,14 @@ func (w *controllableWorker) Work() { w.awaitWorkStarted <- struct{}{} <-w.allowResumeWork } - w.countingWorker.Work() + w.chanWorker.Work() } func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) { t.Parallel() + ctx := tests.Context(t) - worker := &controllableWorker{awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})} + worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})} sleeper := utils.NewSleeperTask(worker) sleeper.WakeUp() @@ -97,22 +105,55 @@ func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) { worker.ignoreSignals = true worker.allowResumeWork <- struct{}{} - gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(2)) - gomega.NewWithT(t).Consistently(worker.getNumJobsPerformed).Should(gomega.BeNumerically("<", 3)) + for i := 0; i < 2; i++ { + select { + case <-worker.ch: + case <-ctx.Done(): + t.Error("timed out waiting for work to be performed") + } + } + + if !t.Failed() { + select { + case <-worker.ch: + t.Errorf("unexpected work performed") + case <-time.After(time.Second): + } + } + require.NoError(t, sleeper.Stop()) } func TestSleeperTask_StopWaitsUntilWorkFinishes(t *testing.T) { t.Parallel() - worker := &controllableWorker{awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})} + worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})} sleeper := utils.NewSleeperTask(worker) sleeper.WakeUp() <-worker.awaitWorkStarted - require.Equal(t, 0, worker.getNumJobsPerformed()) + + select { + case <-worker.ch: + t.Error("work was performed when none was expected") + assert.NoError(t, sleeper.Stop()) + return + default: + } + worker.allowResumeWork <- struct{}{} require.NoError(t, sleeper.Stop()) - require.Equal(t, worker.getNumJobsPerformed(), 1) + + select { + case <-worker.ch: + default: + t.Fatal("work should have been performed") + } + + select { + case <-worker.ch: + t.Fatal("extra work was performed") + default: + } }