Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VRF-892] Remove noisy log poller warning for VRFv2(+) jobs #12132

Merged
merged 10 commits into from
Mar 28, 2024
51 changes: 29 additions & 22 deletions core/services/vrf/v2/listener_v2_log_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
lastProcessedBlock int64
startingUp = true
)
filterName := lsn.getLogPollerFilterName()
ctx, cancel := lsn.chStop.NewCtx()
defer cancel()
for {
Expand All @@ -39,33 +40,31 @@
case <-ticker.C:
start := time.Now()
lsn.l.Debugw("log listener loop")
// Filter registration is idempotent, so we can just call it every time
// and retry on errors using the ticker.
err := lsn.chain.LogPoller().RegisterFilter(logpoller.Filter{
Name: logpoller.FilterName(
"VRFListener",
"version", lsn.coordinator.Version(),
"keyhash", lsn.job.VRFSpec.PublicKey.MustHash(),
"coordinatorAddress", lsn.coordinator.Address()),
EventSigs: evmtypes.HashArray{
lsn.coordinator.RandomWordsFulfilledTopic(),
lsn.coordinator.RandomWordsRequestedTopic(),
},
Addresses: evmtypes.AddressArray{
lsn.coordinator.Address(),
},
})
if err != nil {
lsn.l.Errorw("error registering filter in log poller, retrying",
"err", err,
"elapsed", time.Since(start))
continue

// If filter has not already been successfully registered, register it.
if !lsn.chain.LogPoller().HasFilter(filterName) {
err := lsn.chain.LogPoller().RegisterFilter(logpoller.Filter{
Name: filterName,
EventSigs: evmtypes.HashArray{
lsn.coordinator.RandomWordsFulfilledTopic(),
lsn.coordinator.RandomWordsRequestedTopic(),
},
Addresses: evmtypes.AddressArray{
lsn.coordinator.Address(),
},
})
if err != nil {
lsn.l.Errorw("error registering filter in log poller, retrying",
"err", err,
"elapsed", time.Since(start))
continue
}
}

// on startup we want to initialize the last processed block
if startingUp {
lsn.l.Debugw("initializing last processed block on startup")
lastProcessedBlock, err = lsn.initializeLastProcessedBlock(ctx)
lastProcessedBlock, err := lsn.initializeLastProcessedBlock(ctx)

Check failure on line 67 in core/services/vrf/v2/listener_v2_log_listener.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "lastProcessedBlock" shadows declaration at line 30 (govet)
if err != nil {
lsn.l.Errorw("error initializing last processed block, retrying",
"err", err,
Expand Down Expand Up @@ -98,6 +97,14 @@
}
}

func (lsn *listenerV2) getLogPollerFilterName() string {
return logpoller.FilterName(
"VRFListener",
"version", lsn.coordinator.Version(),
"keyhash", lsn.job.VRFSpec.PublicKey.MustHash(),
"coordinatorAddress", lsn.coordinator.Address())
}

// initializeLastProcessedBlock returns the earliest block number that we need to
// process requests for. This is the block number of the earliest unfulfilled request
// or the latest finalized block, if there are no unfulfilled requests.
Expand Down
34 changes: 29 additions & 5 deletions core/services/vrf/v2/listener_v2_log_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/onsi/gomega"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -116,11 +117,12 @@ func setupVRFLogPollerListenerTH(t *testing.T,

chain := evmmocks.NewChain(t)
listener := &listenerV2{
respCount: map[string]uint64{},
job: j,
chain: chain,
l: logger.Sugared(lggr),
coordinator: coordinator,
respCount: map[string]uint64{},
job: j,
chain: chain,
l: logger.Sugared(lggr),
coordinator: coordinator,
inflightCache: vrfcommon.NewInflightCache(10),
}
ctx := testutils.Context(t)

Expand Down Expand Up @@ -221,6 +223,28 @@ func TestInitProcessedBlock_NoVRFReqs(t *testing.T) {
require.Equal(t, int64(6), lastProcessedBlock)
}

func TestLogPollerFilterRegistered(t *testing.T) {
t.Parallel()
// Instantiate listener.
th := setupVRFLogPollerListenerTH(t, false, 3, 3, 2, 1000, func(mockChain *evmmocks.Chain, th *vrfLogPollerListenerTH) {
mockChain.On("LogPoller").Maybe().Return(th.LogPoller)
})

// Run the log listener. This should register the log poller filter.
go th.Listener.runLogListener(time.Second, 1)

// Wait for the log poller filter to be registered.
filterName := th.Listener.getLogPollerFilterName()
gomega.NewWithT(t).Eventually(func() bool {
return th.Listener.chain.LogPoller().HasFilter(filterName)
}, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue())

// Once registered, expect the filter to stay registered.
gomega.NewWithT(t).Consistently(func() bool {
return th.Listener.chain.LogPoller().HasFilter(filterName)
}, 5*time.Second, 1*time.Second).Should(gomega.BeTrue())
}

func TestInitProcessedBlock_NoUnfulfilledVRFReqs(t *testing.T) {
t.Parallel()

Expand Down
Loading