Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VRF-892] Remove noisy log poller warning for VRFv2(+) jobs #12132

Merged
merged 10 commits into from
Mar 28, 2024
5 changes: 5 additions & 0 deletions .changeset/tasty-bobcats-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Remove noisy log poller warning in VRFv2 & VRFv2+ listener loops
50 changes: 29 additions & 21 deletions core/services/vrf/v2/listener_v2_log_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (lsn *listenerV2) runLogListener(
lastProcessedBlock int64
startingUp = true
)
filterName := lsn.getLogPollerFilterName()
ctx, cancel := lsn.chStop.NewCtx()
defer cancel()
for {
Expand All @@ -38,31 +39,30 @@ func (lsn *listenerV2) runLogListener(
case <-ticker.C:
start := time.Now()
lsn.l.Debugw("log listener loop")
// Filter registration is idempotent, so we can just call it every time
// and retry on errors using the ticker.
err := lsn.chain.LogPoller().RegisterFilter(ctx, logpoller.Filter{
Name: logpoller.FilterName(
"VRFListener",
"version", lsn.coordinator.Version(),
"keyhash", lsn.job.VRFSpec.PublicKey.MustHash(),
"coordinatorAddress", lsn.coordinator.Address()),
EventSigs: evmtypes.HashArray{
lsn.coordinator.RandomWordsFulfilledTopic(),
lsn.coordinator.RandomWordsRequestedTopic(),
},
Addresses: evmtypes.AddressArray{
lsn.coordinator.Address(),
},
})
if err != nil {
lsn.l.Errorw("error registering filter in log poller, retrying",
"err", err,
"elapsed", time.Since(start))
continue

// If filter has not already been successfully registered, register it.
if !lsn.chain.LogPoller().HasFilter(filterName) {
err := lsn.chain.LogPoller().RegisterFilter(ctx, logpoller.Filter{
Name: filterName,
EventSigs: evmtypes.HashArray{
lsn.coordinator.RandomWordsFulfilledTopic(),
lsn.coordinator.RandomWordsRequestedTopic(),
},
Addresses: evmtypes.AddressArray{
lsn.coordinator.Address(),
},
})
if err != nil {
lsn.l.Errorw("error registering filter in log poller, retrying",
"err", err,
"elapsed", time.Since(start))
continue
}
}

// on startup we want to initialize the last processed block
if startingUp {
var err error
lsn.l.Debugw("initializing last processed block on startup")
lastProcessedBlock, err = lsn.initializeLastProcessedBlock(ctx)
if err != nil {
Expand Down Expand Up @@ -97,6 +97,14 @@ func (lsn *listenerV2) runLogListener(
}
}

func (lsn *listenerV2) getLogPollerFilterName() string {
return logpoller.FilterName(
"VRFListener",
"version", lsn.coordinator.Version(),
"keyhash", lsn.job.VRFSpec.PublicKey.MustHash(),
"coordinatorAddress", lsn.coordinator.Address())
}

// initializeLastProcessedBlock returns the earliest block number that we need to
// process requests for. This is the block number of the earliest unfulfilled request
// or the latest finalized block, if there are no unfulfilled requests.
Expand Down
43 changes: 38 additions & 5 deletions core/services/vrf/v2/listener_v2_log_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -124,11 +126,13 @@ func setupVRFLogPollerListenerTH(t *testing.T,

chain := evmmocks.NewChain(t)
listener := &listenerV2{
respCount: map[string]uint64{},
job: j,
chain: chain,
l: logger.Sugared(lggr),
coordinator: coordinator,
respCount: map[string]uint64{},
job: j,
chain: chain,
l: logger.Sugared(lggr),
coordinator: coordinator,
inflightCache: vrfcommon.NewInflightCache(10),
chStop: make(chan struct{}),
}
ctx := testutils.Context(t)

Expand Down Expand Up @@ -228,6 +232,35 @@ func TestInitProcessedBlock_NoVRFReqs(t *testing.T) {
require.Equal(t, int64(6), lastProcessedBlock)
}

func TestLogPollerFilterRegistered(t *testing.T) {
t.Parallel()
// Instantiate listener.
th := setupVRFLogPollerListenerTH(t, false, 3, 3, 2, 1000, func(mockChain *evmmocks.Chain, th *vrfLogPollerListenerTH) {
mockChain.On("LogPoller").Maybe().Return(th.LogPoller)
})

// Run the log listener. This should register the log poller filter.
go th.Listener.runLogListener(time.Second, 1)

// Wait for the log poller filter to be registered.
filterName := th.Listener.getLogPollerFilterName()
gomega.NewWithT(t).Eventually(func() bool {
return th.Listener.chain.LogPoller().HasFilter(filterName)
}, testutils.WaitTimeout(t), time.Second).Should(gomega.BeTrue())

// Once registered, expect the filter to stay registered.
gomega.NewWithT(t).Consistently(func() bool {
return th.Listener.chain.LogPoller().HasFilter(filterName)
}, 5*time.Second, 1*time.Second).Should(gomega.BeTrue())

// Close the listener to avoid an orphaned goroutine.
close(th.Listener.chStop)

// Assert channel is closed.
_, ok := (<-th.Listener.chStop)
assert.False(t, ok)
}

func TestInitProcessedBlock_NoUnfulfilledVRFReqs(t *testing.T) {
t.Parallel()

Expand Down
Loading