Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace gomega with chans #300

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 70 additions & 29 deletions pkg/utils/sleeper_task_test.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,80 @@
package utils_test

import (
"sync/atomic"
"testing"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

type countingWorker struct {
numJobsPerformed atomic.Int32
delay time.Duration
type chanWorker struct {
ch chan struct{}
delay time.Duration
}

func (t *countingWorker) Name() string {
return "CountingWorker"
func (t *chanWorker) Name() string {
return "ChanWorker"
}

func (t *countingWorker) Work() {
func (t *chanWorker) Work() {
if t.delay != 0 {
time.Sleep(t.delay)
}
// Without an atomic, the race detector fails
t.numJobsPerformed.Add(1)
}

func (t *countingWorker) getNumJobsPerformed() int {
return int(t.numJobsPerformed.Load())
t.ch <- struct{}{}
}

func TestSleeperTask_WakeupAfterStopPanics(t *testing.T) {
t.Parallel()

worker := &countingWorker{}
worker := &chanWorker{ch: make(chan struct{}, 1)}
sleeper := utils.NewSleeperTask(worker)

require.NoError(t, sleeper.Stop())

require.Panics(t, func() {
sleeper.WakeUp()
})
gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(0))

select {
case <-worker.ch:
t.Fatal("work was performed when none was expected")
default:
}
}

func TestSleeperTask_CallingStopTwiceFails(t *testing.T) {
t.Parallel()

worker := &countingWorker{}
worker := &chanWorker{}
sleeper := utils.NewSleeperTask(worker)
require.NoError(t, sleeper.Stop())
require.Error(t, sleeper.Stop())
}

func TestSleeperTask_WakeupPerformsWork(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)

worker := &countingWorker{}
worker := &chanWorker{ch: make(chan struct{}, 1)}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(1))

select {
case <-worker.ch:
case <-ctx.Done():
t.Error("timed out waiting for work to be performed")
}

require.NoError(t, sleeper.Stop())
}

type controllableWorker struct {
countingWorker
chanWorker
awaitWorkStarted chan struct{}
allowResumeWork chan struct{}
ignoreSignals bool
Expand All @@ -78,13 +85,14 @@ func (w *controllableWorker) Work() {
w.awaitWorkStarted <- struct{}{}
<-w.allowResumeWork
}
w.countingWorker.Work()
w.chanWorker.Work()
}

func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)

worker := &controllableWorker{awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
Expand All @@ -97,22 +105,55 @@ func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) {
worker.ignoreSignals = true
worker.allowResumeWork <- struct{}{}

gomega.NewWithT(t).Eventually(worker.getNumJobsPerformed).Should(gomega.Equal(2))
gomega.NewWithT(t).Consistently(worker.getNumJobsPerformed).Should(gomega.BeNumerically("<", 3))
for i := 0; i < 2; i++ {
select {
case <-worker.ch:
case <-ctx.Done():
t.Error("timed out waiting for work to be performed")
}
}

if !t.Failed() {
select {
case <-worker.ch:
t.Errorf("unexpected work performed")
case <-time.After(time.Second):
}
}

require.NoError(t, sleeper.Stop())
}

func TestSleeperTask_StopWaitsUntilWorkFinishes(t *testing.T) {
t.Parallel()

worker := &controllableWorker{awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
<-worker.awaitWorkStarted
require.Equal(t, 0, worker.getNumJobsPerformed())

select {
case <-worker.ch:
t.Error("work was performed when none was expected")
assert.NoError(t, sleeper.Stop())
return
default:
}

worker.allowResumeWork <- struct{}{}

require.NoError(t, sleeper.Stop())
require.Equal(t, worker.getNumJobsPerformed(), 1)

select {
case <-worker.ch:
default:
t.Fatal("work should have been performed")
}

select {
case <-worker.ch:
t.Fatal("extra work was performed")
default:
}
}
Loading