Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add util methods from core #295

Merged
merged 13 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/linkedin/goavro/v2 v2.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
github.com/pelletier/go-toml/v2 v2.1.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/riferrei/srclient v0.5.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg=
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
22 changes: 22 additions & 0 deletions pkg/config/toml.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package config

import (
"errors"
"io"

"github.com/pelletier/go-toml/v2"
)

// DecodeTOML decodes toml from r in to v.
// Requires strict field matches and returns full toml.StrictMissingError details.
func DecodeTOML(r io.Reader, v any) error {
d := toml.NewDecoder(r).DisallowUnknownFields()
if err := d.Decode(v); err != nil {
var strict *toml.StrictMissingError
if errors.As(err, &strict) {
return errors.New(strict.String())
}
return err
}
return nil
}
10 changes: 10 additions & 0 deletions pkg/utils/bytes/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ func TrimQuotes(input []byte) []byte {
}
return input
}

// IsEmpty returns true if bytes contains only zero values, or has len 0.
func IsEmpty(bytes []byte) bool {
for _, b := range bytes {
if b != 0 {
return false
}
}
return true
}
16 changes: 16 additions & 0 deletions pkg/utils/bytes/bytes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package bytes_test

import (
"testing"

"github.com/smartcontractkit/chainlink-common/pkg/utils/bytes"
"github.com/stretchr/testify/require"
)

func TestIsEmpty(t *testing.T) {
t.Parallel()

require.True(t, bytes.IsEmpty([]byte{0, 0, 0}))
require.True(t, bytes.IsEmpty([]byte{}))
require.False(t, bytes.IsEmpty([]byte{1, 2, 3, 5}))
}
123 changes: 123 additions & 0 deletions pkg/utils/sleeper_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package utils

import (
"fmt"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
)

// Worker is a simple interface that represents some work to do repeatedly
type Worker interface {
Work()
Name() string
}

// SleeperTask represents a task that waits in the background to process some work.
type SleeperTask struct {
services.StateMachine
worker Worker
chQueue chan struct{}
chStop chan struct{}
chDone chan struct{}
chWorkDone chan struct{}
}

// NewSleeperTask takes a worker and returns a SleeperTask.
//
// SleeperTask is guaranteed to call Work on the worker at least once for every
// WakeUp call.
// If the Worker is busy when WakeUp is called, the Worker will be called again
// immediately after it is finished. For this reason you should take care to
// make sure that Worker is idempotent.
// WakeUp does not block.
func NewSleeperTask(worker Worker) *SleeperTask {
s := &SleeperTask{
worker: worker,
chQueue: make(chan struct{}, 1),
chStop: make(chan struct{}),
chDone: make(chan struct{}),
chWorkDone: make(chan struct{}, 10),
}

_ = s.StartOnce("SleeperTask-"+worker.Name(), func() error {
go s.workerLoop()
return nil
})

return s
}

// Stop stops the SleeperTask
func (s *SleeperTask) Stop() error {
return s.StopOnce("SleeperTask-"+s.worker.Name(), func() error {
close(s.chStop)
select {
case <-s.chDone:
case <-time.After(15 * time.Second):
return fmt.Errorf("SleeperTask-%s took too long to stop", s.worker.Name())
}
return nil
})
}

func (s *SleeperTask) WakeUpIfStarted() {
s.IfStarted(func() {
select {
case s.chQueue <- struct{}{}:
default:
}
})
}

// WakeUp wakes up the sleeper task, asking it to execute its Worker.
func (s *SleeperTask) WakeUp() {
if !s.IfStarted(func() {
select {
case s.chQueue <- struct{}{}:
default:
}
}) {
panic("cannot wake up stopped sleeper task")
}
}

func (s *SleeperTask) workDone() {
select {
case s.chWorkDone <- struct{}{}:
default:
}
}

// WorkDone isn't part of the SleeperTask interface, but can be
// useful in tests to assert that the work has been done.
func (s *SleeperTask) WorkDone() <-chan struct{} {
return s.chWorkDone
}

func (s *SleeperTask) workerLoop() {
defer close(s.chDone)

for {
select {
case <-s.chQueue:
s.worker.Work()
s.workDone()
case <-s.chStop:
return
}
}
}

type sleeperTaskWorker struct {
name string
work func()
}

// SleeperFuncTask returns a Worker to execute the given work function.
func SleeperFuncTask(work func(), name string) Worker {
return &sleeperTaskWorker{name: name, work: work}
}

func (w *sleeperTaskWorker) Name() string { return w.name }
func (w *sleeperTaskWorker) Work() { w.work() }
159 changes: 159 additions & 0 deletions pkg/utils/sleeper_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package utils_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

type chanWorker struct {
ch chan struct{}
delay time.Duration
}

func (t *chanWorker) Name() string {
return "ChanWorker"
}

func (t *chanWorker) Work() {
if t.delay != 0 {
time.Sleep(t.delay)
}
t.ch <- struct{}{}
}

func TestSleeperTask_WakeupAfterStopPanics(t *testing.T) {
t.Parallel()

worker := &chanWorker{ch: make(chan struct{}, 1)}
sleeper := utils.NewSleeperTask(worker)

require.NoError(t, sleeper.Stop())

require.Panics(t, func() {
sleeper.WakeUp()
})

select {
case <-worker.ch:
t.Fatal("work was performed when none was expected")
default:
}
}

func TestSleeperTask_CallingStopTwiceFails(t *testing.T) {
t.Parallel()

worker := &chanWorker{}
sleeper := utils.NewSleeperTask(worker)
require.NoError(t, sleeper.Stop())
require.Error(t, sleeper.Stop())
}

func TestSleeperTask_WakeupPerformsWork(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)

worker := &chanWorker{ch: make(chan struct{}, 1)}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()

select {
case <-worker.ch:
case <-ctx.Done():
t.Error("timed out waiting for work to be performed")
}

require.NoError(t, sleeper.Stop())
}

type controllableWorker struct {
chanWorker
awaitWorkStarted chan struct{}
allowResumeWork chan struct{}
ignoreSignals bool
}

func (w *controllableWorker) Work() {
if !w.ignoreSignals {
w.awaitWorkStarted <- struct{}{}
<-w.allowResumeWork
}
w.chanWorker.Work()
}

func TestSleeperTask_WakeupEnqueuesMaxTwice(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)

worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
<-worker.awaitWorkStarted
sleeper.WakeUp()
sleeper.WakeUp()
sleeper.WakeUp()
sleeper.WakeUp()
sleeper.WakeUp()
worker.ignoreSignals = true
worker.allowResumeWork <- struct{}{}

for i := 0; i < 2; i++ {
select {
case <-worker.ch:
case <-ctx.Done():
t.Error("timed out waiting for work to be performed")
}
}

if !t.Failed() {
select {
case <-worker.ch:
t.Errorf("unexpected work performed")
case <-time.After(time.Second):
}
}

require.NoError(t, sleeper.Stop())
}

func TestSleeperTask_StopWaitsUntilWorkFinishes(t *testing.T) {
t.Parallel()

worker := &controllableWorker{chanWorker: chanWorker{ch: make(chan struct{}, 1)}, awaitWorkStarted: make(chan struct{}), allowResumeWork: make(chan struct{})}
sleeper := utils.NewSleeperTask(worker)

sleeper.WakeUp()
<-worker.awaitWorkStarted

select {
case <-worker.ch:
t.Error("work was performed when none was expected")
assert.NoError(t, sleeper.Stop())
return
default:
}

worker.allowResumeWork <- struct{}{}

require.NoError(t, sleeper.Stop())

select {
case <-worker.ch:
default:
t.Fatal("work should have been performed")
}

select {
case <-worker.ch:
t.Fatal("extra work was performed")
default:
}
}
Loading
Loading