diff --git a/.changeset/tasty-bobcats-hammer.md b/.changeset/tasty-bobcats-hammer.md new file mode 100644 index 00000000000..69ffb6c1bcb --- /dev/null +++ b/.changeset/tasty-bobcats-hammer.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Remove noisy log poller warning in VRFv2 & VRFv2+ listener loops diff --git a/core/services/vrf/v2/listener_v2_log_listener.go b/core/services/vrf/v2/listener_v2_log_listener.go index e495eac5d8b..6fbe518411d 100644 --- a/core/services/vrf/v2/listener_v2_log_listener.go +++ b/core/services/vrf/v2/listener_v2_log_listener.go @@ -29,6 +29,7 @@ func (lsn *listenerV2) runLogListener( lastProcessedBlock int64 startingUp = true ) + filterName := lsn.getLogPollerFilterName() ctx, cancel := lsn.chStop.NewCtx() defer cancel() for { @@ -38,31 +39,30 @@ func (lsn *listenerV2) runLogListener( case <-ticker.C: start := time.Now() lsn.l.Debugw("log listener loop") - // Filter registration is idempotent, so we can just call it every time - // and retry on errors using the ticker. - err := lsn.chain.LogPoller().RegisterFilter(ctx, logpoller.Filter{ - Name: logpoller.FilterName( - "VRFListener", - "version", lsn.coordinator.Version(), - "keyhash", lsn.job.VRFSpec.PublicKey.MustHash(), - "coordinatorAddress", lsn.coordinator.Address()), - EventSigs: evmtypes.HashArray{ - lsn.coordinator.RandomWordsFulfilledTopic(), - lsn.coordinator.RandomWordsRequestedTopic(), - }, - Addresses: evmtypes.AddressArray{ - lsn.coordinator.Address(), - }, - }) - if err != nil { - lsn.l.Errorw("error registering filter in log poller, retrying", - "err", err, - "elapsed", time.Since(start)) - continue + + // If filter has not already been successfully registered, register it. + if !lsn.chain.LogPoller().HasFilter(filterName) { + err := lsn.chain.LogPoller().RegisterFilter(ctx, logpoller.Filter{ + Name: filterName, + EventSigs: evmtypes.HashArray{ + lsn.coordinator.RandomWordsFulfilledTopic(), + lsn.coordinator.RandomWordsRequestedTopic(), + }, + Addresses: evmtypes.AddressArray{ + lsn.coordinator.Address(), + }, + }) + if err != nil { + lsn.l.Errorw("error registering filter in log poller, retrying", + "err", err, + "elapsed", time.Since(start)) + continue + } } // on startup we want to initialize the last processed block if startingUp { + var err error lsn.l.Debugw("initializing last processed block on startup") lastProcessedBlock, err = lsn.initializeLastProcessedBlock(ctx) if err != nil { @@ -97,6 +97,14 @@ func (lsn *listenerV2) runLogListener( } } +func (lsn *listenerV2) getLogPollerFilterName() string { + return logpoller.FilterName( + "VRFListener", + "version", lsn.coordinator.Version(), + "keyhash", lsn.job.VRFSpec.PublicKey.MustHash(), + "coordinatorAddress", lsn.coordinator.Address()) +} + // initializeLastProcessedBlock returns the earliest block number that we need to // process requests for. This is the block number of the earliest unfulfilled request // or the latest finalized block, if there are no unfulfilled requests. diff --git a/core/services/vrf/v2/listener_v2_log_listener_test.go b/core/services/vrf/v2/listener_v2_log_listener_test.go index 81ec6473a92..15b0a5ecbe8 100644 --- a/core/services/vrf/v2/listener_v2_log_listener_test.go +++ b/core/services/vrf/v2/listener_v2_log_listener_test.go @@ -16,6 +16,8 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" + "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -124,11 +126,13 @@ func setupVRFLogPollerListenerTH(t *testing.T, chain := evmmocks.NewChain(t) listener := &listenerV2{ - respCount: map[string]uint64{}, - job: j, - chain: chain, - l: logger.Sugared(lggr), - coordinator: coordinator, + respCount: map[string]uint64{}, + job: j, + chain: chain, + l: logger.Sugared(lggr), + coordinator: coordinator, + inflightCache: vrfcommon.NewInflightCache(10), + chStop: make(chan struct{}), } ctx := testutils.Context(t) @@ -228,6 +232,35 @@ func TestInitProcessedBlock_NoVRFReqs(t *testing.T) { require.Equal(t, int64(6), lastProcessedBlock) } +func TestLogPollerFilterRegistered(t *testing.T) { + t.Parallel() + // Instantiate listener. + th := setupVRFLogPollerListenerTH(t, false, 3, 3, 2, 1000, func(mockChain *evmmocks.Chain, th *vrfLogPollerListenerTH) { + mockChain.On("LogPoller").Maybe().Return(th.LogPoller) + }) + + // Run the log listener. This should register the log poller filter. + go th.Listener.runLogListener(time.Second, 1) + + // Wait for the log poller filter to be registered. + filterName := th.Listener.getLogPollerFilterName() + gomega.NewWithT(t).Eventually(func() bool { + return th.Listener.chain.LogPoller().HasFilter(filterName) + }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) + + // Once registered, expect the filter to stay registered. + gomega.NewWithT(t).Consistently(func() bool { + return th.Listener.chain.LogPoller().HasFilter(filterName) + }, 5*time.Second, 1*time.Second).Should(gomega.BeTrue()) + + // Close the listener to avoid an orphaned goroutine. + close(th.Listener.chStop) + + // Assert channel is closed. + _, ok := (<-th.Listener.chStop) + assert.False(t, ok) +} + func TestInitProcessedBlock_NoUnfulfilledVRFReqs(t *testing.T) { t.Parallel()