Skip to content

Commit

Permalink
[VRF-892] Remove noisy log poller warning for VRFv2(+) jobs (#12132)
Browse files Browse the repository at this point in the history
* [VRF-892] Remove noisy log poller warning for VRFv2(+) jobs

* Fixed shadowed variable

* Fix orphaned goroutine, address nit & add changeset
  • Loading branch information
vreff authored Mar 28, 2024
1 parent b3c70c6 commit 478f73b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .changeset/tasty-bobcats-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Remove noisy log poller warning in VRFv2 & VRFv2+ listener loops
50 changes: 29 additions & 21 deletions core/services/vrf/v2/listener_v2_log_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (lsn *listenerV2) runLogListener(
lastProcessedBlock int64
startingUp = true
)
filterName := lsn.getLogPollerFilterName()
ctx, cancel := lsn.chStop.NewCtx()
defer cancel()
for {
Expand All @@ -38,31 +39,30 @@ func (lsn *listenerV2) runLogListener(
case <-ticker.C:
start := time.Now()
lsn.l.Debugw("log listener loop")
// Filter registration is idempotent, so we can just call it every time
// and retry on errors using the ticker.
err := lsn.chain.LogPoller().RegisterFilter(ctx, logpoller.Filter{
Name: logpoller.FilterName(
"VRFListener",
"version", lsn.coordinator.Version(),
"keyhash", lsn.job.VRFSpec.PublicKey.MustHash(),
"coordinatorAddress", lsn.coordinator.Address()),
EventSigs: evmtypes.HashArray{
lsn.coordinator.RandomWordsFulfilledTopic(),
lsn.coordinator.RandomWordsRequestedTopic(),
},
Addresses: evmtypes.AddressArray{
lsn.coordinator.Address(),
},
})
if err != nil {
lsn.l.Errorw("error registering filter in log poller, retrying",
"err", err,
"elapsed", time.Since(start))
continue

// If filter has not already been successfully registered, register it.
if !lsn.chain.LogPoller().HasFilter(filterName) {
err := lsn.chain.LogPoller().RegisterFilter(ctx, logpoller.Filter{
Name: filterName,
EventSigs: evmtypes.HashArray{
lsn.coordinator.RandomWordsFulfilledTopic(),
lsn.coordinator.RandomWordsRequestedTopic(),
},
Addresses: evmtypes.AddressArray{
lsn.coordinator.Address(),
},
})
if err != nil {
lsn.l.Errorw("error registering filter in log poller, retrying",
"err", err,
"elapsed", time.Since(start))
continue
}
}

// on startup we want to initialize the last processed block
if startingUp {
var err error
lsn.l.Debugw("initializing last processed block on startup")
lastProcessedBlock, err = lsn.initializeLastProcessedBlock(ctx)
if err != nil {
Expand Down Expand Up @@ -97,6 +97,14 @@ func (lsn *listenerV2) runLogListener(
}
}

func (lsn *listenerV2) getLogPollerFilterName() string {
return logpoller.FilterName(
"VRFListener",
"version", lsn.coordinator.Version(),
"keyhash", lsn.job.VRFSpec.PublicKey.MustHash(),
"coordinatorAddress", lsn.coordinator.Address())
}

// initializeLastProcessedBlock returns the earliest block number that we need to
// process requests for. This is the block number of the earliest unfulfilled request
// or the latest finalized block, if there are no unfulfilled requests.
Expand Down
43 changes: 38 additions & 5 deletions core/services/vrf/v2/listener_v2_log_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -124,11 +126,13 @@ func setupVRFLogPollerListenerTH(t *testing.T,

chain := evmmocks.NewChain(t)
listener := &listenerV2{
respCount: map[string]uint64{},
job: j,
chain: chain,
l: logger.Sugared(lggr),
coordinator: coordinator,
respCount: map[string]uint64{},
job: j,
chain: chain,
l: logger.Sugared(lggr),
coordinator: coordinator,
inflightCache: vrfcommon.NewInflightCache(10),
chStop: make(chan struct{}),
}
ctx := testutils.Context(t)

Expand Down Expand Up @@ -228,6 +232,35 @@ func TestInitProcessedBlock_NoVRFReqs(t *testing.T) {
require.Equal(t, int64(6), lastProcessedBlock)
}

func TestLogPollerFilterRegistered(t *testing.T) {
t.Parallel()
// Instantiate listener.
th := setupVRFLogPollerListenerTH(t, false, 3, 3, 2, 1000, func(mockChain *evmmocks.Chain, th *vrfLogPollerListenerTH) {
mockChain.On("LogPoller").Maybe().Return(th.LogPoller)
})

// Run the log listener. This should register the log poller filter.
go th.Listener.runLogListener(time.Second, 1)

// Wait for the log poller filter to be registered.
filterName := th.Listener.getLogPollerFilterName()
gomega.NewWithT(t).Eventually(func() bool {
return th.Listener.chain.LogPoller().HasFilter(filterName)
}, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue())

// Once registered, expect the filter to stay registered.
gomega.NewWithT(t).Consistently(func() bool {
return th.Listener.chain.LogPoller().HasFilter(filterName)
}, 5*time.Second, 1*time.Second).Should(gomega.BeTrue())

// Close the listener to avoid an orphaned goroutine.
close(th.Listener.chStop)

// Assert channel is closed.
_, ok := (<-th.Listener.chStop)
assert.False(t, ok)
}

func TestInitProcessedBlock_NoUnfulfilledVRFReqs(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 478f73b

Please sign in to comment.