From 8458da8f92877e3d3efcf8cbdb05eba5866a38bc Mon Sep 17 00:00:00 2001 From: Chris <104409744+vreff@users.noreply.github.com> Date: Wed, 21 Feb 2024 19:53:28 -0500 Subject: [PATCH 1/3] [VRF-892] Remove noisy log poller warning for VRFv2(+) jobs --- .../vrf/v2/listener_v2_log_listener.go | 51 +++++++++++-------- .../vrf/v2/listener_v2_log_listener_test.go | 34 +++++++++++-- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/core/services/vrf/v2/listener_v2_log_listener.go b/core/services/vrf/v2/listener_v2_log_listener.go index 07b4c2c3800..583bf25bfea 100644 --- a/core/services/vrf/v2/listener_v2_log_listener.go +++ b/core/services/vrf/v2/listener_v2_log_listener.go @@ -30,6 +30,7 @@ func (lsn *listenerV2) runLogListener( lastProcessedBlock int64 startingUp = true ) + filterName := lsn.getLogPollerFilterName() ctx, cancel := lsn.chStop.NewCtx() defer cancel() for { @@ -39,33 +40,31 @@ func (lsn *listenerV2) runLogListener( case <-ticker.C: start := time.Now() lsn.l.Debugw("log listener loop") - // Filter registration is idempotent, so we can just call it every time - // and retry on errors using the ticker. - err := lsn.chain.LogPoller().RegisterFilter(logpoller.Filter{ - Name: logpoller.FilterName( - "VRFListener", - "version", lsn.coordinator.Version(), - "keyhash", lsn.job.VRFSpec.PublicKey.MustHash(), - "coordinatorAddress", lsn.coordinator.Address()), - EventSigs: evmtypes.HashArray{ - lsn.coordinator.RandomWordsFulfilledTopic(), - lsn.coordinator.RandomWordsRequestedTopic(), - }, - Addresses: evmtypes.AddressArray{ - lsn.coordinator.Address(), - }, - }) - if err != nil { - lsn.l.Errorw("error registering filter in log poller, retrying", - "err", err, - "elapsed", time.Since(start)) - continue + + // If filter has not already been successfully registered, register it. + if !lsn.chain.LogPoller().HasFilter(filterName) { + err := lsn.chain.LogPoller().RegisterFilter(logpoller.Filter{ + Name: filterName, + EventSigs: evmtypes.HashArray{ + lsn.coordinator.RandomWordsFulfilledTopic(), + lsn.coordinator.RandomWordsRequestedTopic(), + }, + Addresses: evmtypes.AddressArray{ + lsn.coordinator.Address(), + }, + }) + if err != nil { + lsn.l.Errorw("error registering filter in log poller, retrying", + "err", err, + "elapsed", time.Since(start)) + continue + } } // on startup we want to initialize the last processed block if startingUp { lsn.l.Debugw("initializing last processed block on startup") - lastProcessedBlock, err = lsn.initializeLastProcessedBlock(ctx) + lastProcessedBlock, err := lsn.initializeLastProcessedBlock(ctx) if err != nil { lsn.l.Errorw("error initializing last processed block, retrying", "err", err, @@ -98,6 +97,14 @@ func (lsn *listenerV2) runLogListener( } } +func (lsn *listenerV2) getLogPollerFilterName() string { + return logpoller.FilterName( + "VRFListener", + "version", lsn.coordinator.Version(), + "keyhash", lsn.job.VRFSpec.PublicKey.MustHash(), + "coordinatorAddress", lsn.coordinator.Address()) +} + // initializeLastProcessedBlock returns the earliest block number that we need to // process requests for. This is the block number of the earliest unfulfilled request // or the latest finalized block, if there are no unfulfilled requests. diff --git a/core/services/vrf/v2/listener_v2_log_listener_test.go b/core/services/vrf/v2/listener_v2_log_listener_test.go index 6f5177c230a..a590a2f4fb7 100644 --- a/core/services/vrf/v2/listener_v2_log_listener_test.go +++ b/core/services/vrf/v2/listener_v2_log_listener_test.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" + "github.com/onsi/gomega" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -116,11 +117,12 @@ func setupVRFLogPollerListenerTH(t *testing.T, chain := evmmocks.NewChain(t) listener := &listenerV2{ - respCount: map[string]uint64{}, - job: j, - chain: chain, - l: logger.Sugared(lggr), - coordinator: coordinator, + respCount: map[string]uint64{}, + job: j, + chain: chain, + l: logger.Sugared(lggr), + coordinator: coordinator, + inflightCache: vrfcommon.NewInflightCache(10), } ctx := testutils.Context(t) @@ -221,6 +223,28 @@ func TestInitProcessedBlock_NoVRFReqs(t *testing.T) { require.Equal(t, int64(6), lastProcessedBlock) } +func TestLogPollerFilterRegistered(t *testing.T) { + t.Parallel() + // Instantiate listener. + th := setupVRFLogPollerListenerTH(t, false, 3, 3, 2, 1000, func(mockChain *evmmocks.Chain, th *vrfLogPollerListenerTH) { + mockChain.On("LogPoller").Maybe().Return(th.LogPoller) + }) + + // Run the log listener. This should register the log poller filter. + go th.Listener.runLogListener(time.Second, 1) + + // Wait for the log poller filter to be registered. + filterName := th.Listener.getLogPollerFilterName() + gomega.NewWithT(t).Eventually(func() bool { + return th.Listener.chain.LogPoller().HasFilter(filterName) + }, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue()) + + // Once registered, expect the filter to stay registered. + gomega.NewWithT(t).Consistently(func() bool { + return th.Listener.chain.LogPoller().HasFilter(filterName) + }, 5*time.Second, 1*time.Second).Should(gomega.BeTrue()) +} + func TestInitProcessedBlock_NoUnfulfilledVRFReqs(t *testing.T) { t.Parallel() From c803b85833668cc0ccd089489954ca49abb8abc4 Mon Sep 17 00:00:00 2001 From: Chris <104409744+vreff@users.noreply.github.com> Date: Thu, 22 Feb 2024 16:28:50 -0500 Subject: [PATCH 2/3] Fixed shadowed variable --- core/services/vrf/v2/listener_v2_log_listener.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/services/vrf/v2/listener_v2_log_listener.go b/core/services/vrf/v2/listener_v2_log_listener.go index 583bf25bfea..caba242bf54 100644 --- a/core/services/vrf/v2/listener_v2_log_listener.go +++ b/core/services/vrf/v2/listener_v2_log_listener.go @@ -63,11 +63,12 @@ func (lsn *listenerV2) runLogListener( // on startup we want to initialize the last processed block if startingUp { + var err2 error lsn.l.Debugw("initializing last processed block on startup") - lastProcessedBlock, err := lsn.initializeLastProcessedBlock(ctx) - if err != nil { + lastProcessedBlock, err2 = lsn.initializeLastProcessedBlock(ctx) + if err2 != nil { lsn.l.Errorw("error initializing last processed block, retrying", - "err", err, + "err", err2, "elapsed", time.Since(start)) continue } From 4836247127ae5078d9abcdf9f7695d870f3e4433 Mon Sep 17 00:00:00 2001 From: Chris <104409744+vreff@users.noreply.github.com> Date: Mon, 11 Mar 2024 16:02:56 -0400 Subject: [PATCH 3/3] Fix orphaned goroutine, address nit & add changeset --- .changeset/tasty-bobcats-hammer.md | 5 +++++ core/services/vrf/v2/listener_v2_log_listener.go | 8 ++++---- core/services/vrf/v2/listener_v2_log_listener_test.go | 9 +++++++++ 3 files changed, 18 insertions(+), 4 deletions(-) create mode 100644 .changeset/tasty-bobcats-hammer.md diff --git a/.changeset/tasty-bobcats-hammer.md b/.changeset/tasty-bobcats-hammer.md new file mode 100644 index 00000000000..69ffb6c1bcb --- /dev/null +++ b/.changeset/tasty-bobcats-hammer.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Remove noisy log poller warning in VRFv2 & VRFv2+ listener loops diff --git a/core/services/vrf/v2/listener_v2_log_listener.go b/core/services/vrf/v2/listener_v2_log_listener.go index caba242bf54..f6933abcb34 100644 --- a/core/services/vrf/v2/listener_v2_log_listener.go +++ b/core/services/vrf/v2/listener_v2_log_listener.go @@ -63,12 +63,12 @@ func (lsn *listenerV2) runLogListener( // on startup we want to initialize the last processed block if startingUp { - var err2 error + var err error lsn.l.Debugw("initializing last processed block on startup") - lastProcessedBlock, err2 = lsn.initializeLastProcessedBlock(ctx) - if err2 != nil { + lastProcessedBlock, err = lsn.initializeLastProcessedBlock(ctx) + if err != nil { lsn.l.Errorw("error initializing last processed block, retrying", - "err", err2, + "err", err, "elapsed", time.Since(start)) continue } diff --git a/core/services/vrf/v2/listener_v2_log_listener_test.go b/core/services/vrf/v2/listener_v2_log_listener_test.go index 0710d044241..4e0ae704cfc 100644 --- a/core/services/vrf/v2/listener_v2_log_listener_test.go +++ b/core/services/vrf/v2/listener_v2_log_listener_test.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -132,6 +133,7 @@ func setupVRFLogPollerListenerTH(t *testing.T, l: logger.Sugared(lggr), coordinator: coordinator, inflightCache: vrfcommon.NewInflightCache(10), + chStop: make(chan struct{}), } ctx := testutils.Context(t) @@ -252,6 +254,13 @@ func TestLogPollerFilterRegistered(t *testing.T) { gomega.NewWithT(t).Consistently(func() bool { return th.Listener.chain.LogPoller().HasFilter(filterName) }, 5*time.Second, 1*time.Second).Should(gomega.BeTrue()) + + // Close the listener to avoid an orphaned goroutine. + close(th.Listener.chStop) + + // Assert channel is closed. + _, ok := (<-th.Listener.chStop) + assert.False(t, ok) } func TestInitProcessedBlock_NoUnfulfilledVRFReqs(t *testing.T) {