Skip to content

Commit

Permalink
replace gomega with chans
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Dec 21, 2023
1 parent 32aed04 commit a5d977a
Showing 1 changed file with 70 additions and 29 deletions.
99 changes: 70 additions & 29 deletions pkg/utils/sleeper_task_test.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,80 @@
package utils_test

import (
"sync/atomic"
"testing"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

type countingWorker struct {
numJobsPerformed atomic.Int32
delay time.Duration
type chanWorker struct {
ch chan struct{}
delay time.Duration
}

func (t *countingWorker) Name() string {
return "CountingWorker"
func (t *chanWorker) Name() string {
return "ChanWorker"
}

func (t *countingWorker) Work() {
func (t *chanWorker) Work() {
if t.delay != 0 {
time.Sleep(t.delay)
}
// Without an atomic, the race detector fails
t.numJobsPerformed.Add(1)
}

func (t *countingWorker) getNumJobsPerformed() int {
return int(t.numJobsPerformed.Load())
t.ch <- struct{}{}
}

func TestSleeperTask_WakeupAfterStopPanics(t *testing.T) {
t.Parallel()

worker := &countingWorker{}
worker := &chanWorker{ch: make(chan struct{}, 1)}
sleeper := utils.NewSleeperTask(worker)

require.NoError(t, sleeper.Stop())

require.Panics(t, func() {
sleeper.WakeUp()
})
gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(0))

select {
case <-worker.ch:
t.Fatal("work was performed when none was expected")
default:
}
}

func TestSleeperTask_CallingStopTwiceFails(t *testing.T) {
t.Parallel()

worker := &countingWorker{}
worker := &chanWorker{}
sleeper := utils.NewSleeperTask(worker)
require.NoError(t, sleeper.Stop())
require.Error(t, sleeper.Stop())
}

func TestSleeperTask_WakeupPerformsWork(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)

worker := &countingWorker{}
worker := &chanWorker{ch: make(chan struct{}, 1)}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(1))

select {
case <-worker.ch:
case <-ctx.Done():
t.Error("timed out waiting for work to be performed")
}

require.NoError(t, sleeper.Stop())
}

type controllableWorker struct {
countingWorker
chanWorker
awaitWorkStarted chan struct{}
allowResumeWork chan struct{}
ignoreSignals bool
Expand All @@ -78,13 +85,14 @@ func (w *controllableWorker) Work() {
w.awaitWorkStarted <- struct{}{}
<-w.allowResumeWork
}
w.countingWorker.Work()
w.chanWorker.Work()
}

func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)

worker := &controllableWorker{awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
Expand All @@ -97,22 +105,55 @@ func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) {
worker.ignoreSignals = true
worker.allowResumeWork <- struct{}{}

gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(2))
gomega.NewWithT(t).Consistently(worker.getNumJobsPerformed).Should(gomega.BeNumerically("<", 3))
for i := 0; i < 2; i++ {
select {
case <-worker.ch:
case <-ctx.Done():
t.Error("timed out waiting for work to be performed")
}
}

if !t.Failed() {
select {
case <-worker.ch:
t.Errorf("unexpected work performed")
case <-time.After(time.Second):
}
}

require.NoError(t, sleeper.Stop())
}

func TestSleeperTask_StopWaitsUntilWorkFinishes(t *testing.T) {
t.Parallel()

worker := &controllableWorker{awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
<-worker.awaitWorkStarted
require.Equal(t, 0, worker.getNumJobsPerformed())

select {
case <-worker.ch:
t.Error("work was performed when none was expected")
assert.NoError(t, sleeper.Stop())
return
default:
}

worker.allowResumeWork <- struct{}{}

require.NoError(t, sleeper.Stop())
require.Equal(t, worker.getNumJobsPerformed(), 1)

select {
case <-worker.ch:
default:
t.Fatal("work should have been performed")
}

select {
case <-worker.ch:
t.Fatal("extra work was performed")
default:
}
}

0 comments on commit a5d977a

Please sign in to comment.