From 5fa2f37bd0fb203333d62676e66b8c8ce9b3adaf Mon Sep 17 00:00:00 2001 From: george-dorin <120329946+george-dorin@users.noreply.github.com> Date: Wed, 21 Jun 2023 13:50:46 +0300 Subject: [PATCH] Move EVM node pool config (#9656) * Move EVM node pool config * Update after merge * Add test * Update after merge --- core/chains/evm/chain.go | 13 +- core/chains/evm/client/client.go | 4 +- core/chains/evm/client/client_test.go | 6 +- core/chains/evm/client/helpers_test.go | 27 ++--- core/chains/evm/client/node.go | 30 ++--- core/chains/evm/client/node_fsm_test.go | 3 +- core/chains/evm/client/node_lifecycle.go | 18 +-- core/chains/evm/client/node_lifecycle_test.go | 113 +++++++++--------- core/chains/evm/client/pool.go | 40 ++++--- core/chains/evm/client/pool_test.go | 8 +- core/chains/evm/config/config.go | 9 +- core/chains/evm/config/config_node_pool.go | 10 ++ .../evm/config/config_node_pool_test.go | 32 +++++ .../evm/config/mocks/chain_scoped_config.go | 70 ----------- core/chains/evm/config/v2/chain_scoped.go | 40 +++---- core/chains/evm/config/v2/config_node_pool.go | 25 ++++ 16 files changed, 215 insertions(+), 233 deletions(-) create mode 100644 core/chains/evm/config/config_node_pool.go create mode 100644 core/chains/evm/config/config_node_pool_test.go create mode 100644 core/chains/evm/config/v2/config_node_pool.go diff --git a/core/chains/evm/chain.go b/core/chains/evm/chain.go index d98c7e8f5b8..d972aa3e316 100644 --- a/core/chains/evm/chain.go +++ b/core/chains/evm/chain.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "net/url" + "time" "github.com/pkg/errors" "github.com/smartcontractkit/sqlx" @@ -93,7 +94,7 @@ func newChain(ctx context.Context, cfg evmconfig.ChainScopedConfig, nodes []*v2. client = evmclient.NewNullClient(chainID, l) } else if opts.GenEthClient == nil { var err2 error - client, err2 = newEthClientFromChain(cfg, l, chainID, chainType, nodes) + client, err2 = newEthClientFromChain(cfg.EVM().NodePool(), cfg.EVM().NodeNoNewHeadsThreshold(), l, chainID, chainType, nodes) if err2 != nil { return nil, errors.Wrapf(err2, "failed to instantiate eth client for chain with ID %s", cfg.EVM().ChainID().String()) } @@ -278,7 +279,7 @@ func (c *chain) Logger() logger.Logger { return c.logger } func (c *chain) BalanceMonitor() monitor.BalanceMonitor { return c.balanceMonitor } func (c *chain) GasEstimator() gas.EvmFeeEstimator { return c.gasEstimator } -func newEthClientFromChain(cfg evmclient.NodeConfig, lggr logger.Logger, chainID *big.Int, chainType config.ChainType, nodes []*v2.Node) (evmclient.Client, error) { +func newEthClientFromChain(cfg evmconfig.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, chainID *big.Int, chainType config.ChainType, nodes []*v2.Node) (evmclient.Client, error) { var primaries []evmclient.Node var sendonlys []evmclient.SendOnlyNode for i, node := range nodes { @@ -286,22 +287,22 @@ func newEthClientFromChain(cfg evmclient.NodeConfig, lggr logger.Logger, chainID sendonly := evmclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID) sendonlys = append(sendonlys, sendonly) } else { - primary, err := newPrimary(cfg, lggr, node, int32(i), chainID) + primary, err := newPrimary(cfg, noNewHeadsThreshold, lggr, node, int32(i), chainID) if err != nil { return nil, err } primaries = append(primaries, primary) } } - return evmclient.NewClientWithNodes(lggr, cfg, primaries, sendonlys, chainID, chainType) + return evmclient.NewClientWithNodes(lggr, cfg.SelectionMode(), noNewHeadsThreshold, primaries, sendonlys, chainID, chainType) } -func newPrimary(cfg evmclient.NodeConfig, lggr logger.Logger, n *v2.Node, id int32, chainID *big.Int) (evmclient.Node, error) { +func newPrimary(cfg evmconfig.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, n *v2.Node, id int32, chainID *big.Int) (evmclient.Node, error) { if n.SendOnly != nil && *n.SendOnly { return nil, errors.New("cannot cast send-only node to primary") } - return evmclient.NewNode(cfg, lggr, (url.URL)(*n.WSURL), (*url.URL)(n.HTTPURL), *n.Name, id, chainID, *n.Order), nil + return evmclient.NewNode(cfg, noNewHeadsThreshold, lggr, (url.URL)(*n.WSURL), (*url.URL)(n.HTTPURL), *n.Name, id, chainID, *n.Order), nil } func EnsureChains(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, ids []utils.Big) error { diff --git a/core/chains/evm/client/client.go b/core/chains/evm/client/client.go index b2bdad31b87..516bc68232c 100644 --- a/core/chains/evm/client/client.go +++ b/core/chains/evm/client/client.go @@ -94,8 +94,8 @@ var _ htrktypes.Client[*evmtypes.Head, ethereum.Subscription, *big.Int, common.H // NewClientWithNodes instantiates a client from a list of nodes // Currently only supports one primary -func NewClientWithNodes(logger logger.Logger, cfg PoolConfig, primaryNodes []Node, sendOnlyNodes []SendOnlyNode, chainID *big.Int, chainType config.ChainType) (*client, error) { - pool := NewPool(logger, cfg, primaryNodes, sendOnlyNodes, chainID, chainType) +func NewClientWithNodes(logger logger.Logger, selectionMode string, noNewHeadsThreshold time.Duration, primaryNodes []Node, sendOnlyNodes []SendOnlyNode, chainID *big.Int, chainType config.ChainType) (*client, error) { + pool := NewPool(logger, selectionMode, noNewHeadsThreshold, primaryNodes, sendOnlyNodes, chainID, chainType) return &client{ logger: logger, pool: pool, diff --git a/core/chains/evm/client/client_test.go b/core/chains/evm/client/client_test.go index 3ac1c688965..897928310ff 100644 --- a/core/chains/evm/client/client_test.go +++ b/core/chains/evm/client/client_test.go @@ -35,10 +35,10 @@ func mustNewClient(t *testing.T, wsURL string, sendonlys ...url.URL) evmclient.C } func mustNewClientWithChainID(t *testing.T, wsURL string, chainID *big.Int, sendonlys ...url.URL) evmclient.Client { - cfg := evmclient.TestNodeConfig{ - SelectionMode: evmclient.NodeSelectionMode_RoundRobin, + cfg := evmclient.TestNodePoolConfig{ + NodeSelectionMode: evmclient.NodeSelectionMode_RoundRobin, } - c, err := evmclient.NewClientWithTestNode(t, cfg, wsURL, nil, sendonlys, 42, chainID) + c, err := evmclient.NewClientWithTestNode(t, cfg, time.Second*0, wsURL, nil, sendonlys, 42, chainID) require.NoError(t, err) return c } diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index aa93fdff0e4..8a660eb38db 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -9,25 +9,24 @@ import ( "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" ) -type TestNodeConfig struct { - NoNewHeadsThreshold time.Duration - PollFailureThreshold uint32 - PollInterval time.Duration - SelectionMode string - SyncThreshold uint32 +type TestNodePoolConfig struct { + NodePollFailureThreshold uint32 + NodePollInterval time.Duration + NodeSelectionMode string + NodeSyncThreshold uint32 } -func (tc TestNodeConfig) NodeNoNewHeadsThreshold() time.Duration { return tc.NoNewHeadsThreshold } -func (tc TestNodeConfig) NodePollFailureThreshold() uint32 { return tc.PollFailureThreshold } -func (tc TestNodeConfig) NodePollInterval() time.Duration { return tc.PollInterval } -func (tc TestNodeConfig) NodeSelectionMode() string { return tc.SelectionMode } -func (tc TestNodeConfig) NodeSyncThreshold() uint32 { return tc.SyncThreshold } +func (tc TestNodePoolConfig) PollFailureThreshold() uint32 { return tc.NodePollFailureThreshold } +func (tc TestNodePoolConfig) PollInterval() time.Duration { return tc.NodePollInterval } +func (tc TestNodePoolConfig) SelectionMode() string { return tc.NodeSelectionMode } +func (tc TestNodePoolConfig) SyncThreshold() uint32 { return tc.NodeSyncThreshold } -func NewClientWithTestNode(t *testing.T, cfg NodeConfig, rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, id int32, chainID *big.Int) (*client, error) { +func NewClientWithTestNode(t *testing.T, nodePoolCfg config.NodePool, noNewHeadsThreshold time.Duration, rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, id int32, chainID *big.Int) (*client, error) { parsed, err := url.ParseRequestURI(rpcUrl) if err != nil { return nil, err @@ -38,7 +37,7 @@ func NewClientWithTestNode(t *testing.T, cfg NodeConfig, rpcUrl string, rpcHTTPU } lggr := logger.TestLogger(t) - n := NewNode(cfg, lggr, *parsed, rpcHTTPURL, "eth-primary-0", id, chainID, 1) + n := NewNode(nodePoolCfg, noNewHeadsThreshold, lggr, *parsed, rpcHTTPURL, "eth-primary-0", id, chainID, 1) n.(*node).setLatestReceived(0, utils.NewBigI(0)) primaries := []Node{n} @@ -51,7 +50,7 @@ func NewClientWithTestNode(t *testing.T, cfg NodeConfig, rpcUrl string, rpcHTTPU sendonlys = append(sendonlys, s) } - pool := NewPool(lggr, cfg, primaries, sendonlys, chainID, "") + pool := NewPool(lggr, nodePoolCfg.SelectionMode(), noNewHeadsThreshold, primaries, sendonlys, chainID, "") c := &client{logger: lggr, pool: pool} t.Cleanup(c.Close) return c, nil diff --git a/core/chains/evm/client/node.go b/core/chains/evm/client/node.go index dcdce756a25..a876aae78d3 100644 --- a/core/chains/evm/client/node.go +++ b/core/chains/evm/client/node.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -130,13 +131,14 @@ type rawclient struct { // It must have a ws url and may have a http url type node struct { utils.StartStopOnce - lfcLog logger.Logger - rpcLog logger.Logger - name string - id int32 - chainID *big.Int - cfg NodeConfig - order int32 + lfcLog logger.Logger + rpcLog logger.Logger + name string + id int32 + chainID *big.Int + nodePoolCfg config.NodePool + noNewHeadsThreshold time.Duration + order int32 ws rawclient http *rawclient @@ -169,22 +171,14 @@ type node struct { nLiveNodes func() (count int, blockNumber int64, totalDifficulty *utils.Big) } -// NodeConfig allows configuration of the node -type NodeConfig interface { - NodeNoNewHeadsThreshold() time.Duration - NodePollFailureThreshold() uint32 - NodePollInterval() time.Duration - NodeSelectionMode() string - NodeSyncThreshold() uint32 -} - // NewNode returns a new *node as Node -func NewNode(nodeCfg NodeConfig, lggr logger.Logger, wsuri url.URL, httpuri *url.URL, name string, id int32, chainID *big.Int, nodeOrder int32) Node { +func NewNode(nodeCfg config.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, wsuri url.URL, httpuri *url.URL, name string, id int32, chainID *big.Int, nodeOrder int32) Node { n := new(node) n.name = name n.id = id n.chainID = chainID - n.cfg = nodeCfg + n.nodePoolCfg = nodeCfg + n.noNewHeadsThreshold = noNewHeadsThreshold n.ws.uri = wsuri n.order = nodeOrder if httpuri != nil { diff --git a/core/chains/evm/client/node_fsm_test.go b/core/chains/evm/client/node_fsm_test.go index 25b5b983e9a..ce63a62a8bd 100644 --- a/core/chains/evm/client/node_fsm_test.go +++ b/core/chains/evm/client/node_fsm_test.go @@ -2,6 +2,7 @@ package client import ( "testing" + "time" "github.com/ethereum/go-ethereum" "github.com/stretchr/testify/assert" @@ -42,7 +43,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) { t.Parallel() s := testutils.NewWSServer(t, testutils.FixtureChainID, nil) - iN := NewNode(TestNodeConfig{}, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, nil, 1) + iN := NewNode(TestNodePoolConfig{}, time.Second*0, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, nil, 1) n := iN.(*node) assert.Equal(t, NodeStateUndialed, n.State()) diff --git a/core/chains/evm/client/node_lifecycle.go b/core/chains/evm/client/node_lifecycle.go index 486ceaa7285..d55df58d6ef 100644 --- a/core/chains/evm/client/node_lifecycle.go +++ b/core/chains/evm/client/node_lifecycle.go @@ -41,8 +41,8 @@ var ( // state change in case we have to force a state transition due to no available // nodes. // NOTE: This only applies to out-of-sync nodes if they are the last available node -func zombieNodeCheckInterval(cfg NodeConfig) time.Duration { - interval := cfg.NodeNoNewHeadsThreshold() +func zombieNodeCheckInterval(noNewHeadsThreshold time.Duration) time.Duration { + interval := noNewHeadsThreshold if interval <= 0 || interval > queryTimeout { interval = queryTimeout } @@ -83,9 +83,9 @@ func (n *node) aliveLoop() { } } - noNewHeadsTimeoutThreshold := n.cfg.NodeNoNewHeadsThreshold() - pollFailureThreshold := n.cfg.NodePollFailureThreshold() - pollInterval := n.cfg.NodePollInterval() + noNewHeadsTimeoutThreshold := n.noNewHeadsThreshold + pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold() + pollInterval := n.nodePoolCfg.PollInterval() lggr := n.lfcLog.Named("Alive").With("noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold) lggr.Tracew("Alive loop starting", "nodeState", n.State()) @@ -208,7 +208,7 @@ func (n *node) aliveLoop() { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state - outOfSyncT.Reset(zombieNodeCheckInterval(n.cfg)) + outOfSyncT.Reset(zombieNodeCheckInterval(n.noNewHeadsThreshold)) continue } } @@ -230,13 +230,13 @@ func (n *node) syncStatus(num int64, td *utils.Big) (outOfSync bool, liveNodes i if n.nLiveNodes == nil { return // skip for tests } - threshold := n.cfg.NodeSyncThreshold() + threshold := n.nodePoolCfg.SyncThreshold() if threshold == 0 { return // disabled } // Check against best node ln, highest, greatest := n.nLiveNodes() - mode := n.cfg.NodeSelectionMode() + mode := n.nodePoolCfg.SelectionMode() switch mode { case NodeSelectionMode_HighestHead, NodeSelectionMode_RoundRobin, NodeSelectionMode_PriorityLevel: return num < highest-int64(threshold), ln @@ -320,7 +320,7 @@ func (n *node) outOfSyncLoop(isOutOfSync func(num int64, td *utils.Big) bool) { return } lggr.Debugw(msgReceivedBlock, "blockNumber", head.Number, "totalDifficulty", head.TotalDifficulty, "nodeState", n.State()) - case <-time.After(zombieNodeCheckInterval(n.cfg)): + case <-time.After(zombieNodeCheckInterval(n.noNewHeadsThreshold)): if n.nLiveNodes != nil { if l, _, _ := n.nLiveNodes(); l < 1 { lggr.Critical("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state") diff --git a/core/chains/evm/client/node_lifecycle_test.go b/core/chains/evm/client/node_lifecycle_test.go index 1b119618485..9fdd9bb64e1 100644 --- a/core/chains/evm/client/node_lifecycle_test.go +++ b/core/chains/evm/client/node_lifecycle_test.go @@ -12,6 +12,7 @@ import ( "github.com/tidwall/gjson" "go.uber.org/zap" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -27,13 +28,13 @@ func standardHandler(method string, _ gjson.Result) (resp testutils.JSONRPCRespo return } -func newTestNode(t *testing.T, cfg NodeConfig) *node { - return newTestNodeWithCallback(t, cfg, standardHandler) +func newTestNode(t *testing.T, cfg config.NodePool, noNewHeadsThresholds time.Duration) *node { + return newTestNodeWithCallback(t, cfg, noNewHeadsThresholds, standardHandler) } -func newTestNodeWithCallback(t *testing.T, cfg NodeConfig, callback testutils.JSONRPCHandler) *node { +func newTestNodeWithCallback(t *testing.T, cfg config.NodePool, noNewHeadsThreshold time.Duration, callback testutils.JSONRPCHandler) *node { s := testutils.NewWSServer(t, testutils.FixtureChainID, callback) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, noNewHeadsThreshold, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) return n } @@ -68,8 +69,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Parallel() t.Run("with no poll and sync timeouts, exits on close", func(t *testing.T) { - pollAndSyncTimeoutsDisabledCfg := TestNodeConfig{} - n := newTestNode(t, pollAndSyncTimeoutsDisabledCfg) + pollAndSyncTimeoutsDisabledCfg := TestNodePoolConfig{} + n := newTestNode(t, pollAndSyncTimeoutsDisabledCfg, 0*time.Second) dial(t, n) ch := make(chan struct{}) @@ -84,9 +85,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("with no poll failures past threshold, stays alive", func(t *testing.T) { threshold := 5 - cfg := TestNodeConfig{PollFailureThreshold: uint32(threshold), PollInterval: testutils.TestInterval} + cfg := TestNodePoolConfig{NodePollFailureThreshold: uint32(threshold), NodePollInterval: testutils.TestInterval} var calls atomic.Int32 - n := newTestNodeWithCallback(t, cfg, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + n := newTestNodeWithCallback(t, cfg, time.Second*0, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { switch method { case "eth_subscribe": resp.Result = `"0x00"` @@ -125,8 +126,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) t.Run("with threshold poll failures, transitions to unreachable", func(t *testing.T) { - syncTimeoutsDisabledCfg := TestNodeConfig{PollFailureThreshold: 3, PollInterval: testutils.TestInterval} - n := newTestNode(t, syncTimeoutsDisabledCfg) + syncTimeoutsDisabledCfg := TestNodePoolConfig{NodePollFailureThreshold: 3, NodePollInterval: testutils.TestInterval} + n := newTestNode(t, syncTimeoutsDisabledCfg, time.Second*0) dial(t, n) defer func() { assert.NoError(t, n.Close()) }() @@ -140,9 +141,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { threshold := 3 - cfg := TestNodeConfig{PollFailureThreshold: uint32(threshold), PollInterval: testutils.TestInterval} + cfg := TestNodePoolConfig{NodePollFailureThreshold: uint32(threshold), NodePollInterval: testutils.TestInterval} var calls atomic.Int32 - n := newTestNodeWithCallback(t, cfg, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + n := newTestNodeWithCallback(t, cfg, time.Second*0, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { switch method { case "eth_subscribe": resp.Result = `"0x00"` @@ -177,8 +178,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) t.Run("if initial subscribe fails, transitions to unreachable", func(t *testing.T) { - pollDisabledCfg := TestNodeConfig{NoNewHeadsThreshold: testutils.TestInterval} - n := newTestNodeWithCallback(t, pollDisabledCfg, func(string, gjson.Result) (resp testutils.JSONRPCResponse) { return }) + pollDisabledCfg := TestNodePoolConfig{} + n := newTestNodeWithCallback(t, pollDisabledCfg, testutils.TestInterval, func(string, gjson.Result) (resp testutils.JSONRPCResponse) { return }) dial(t, n) defer func() { assert.NoError(t, n.Close()) }() @@ -197,7 +198,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { // NoNewHeadsThreshold needs to be positive but must be very large so // we don't time out waiting for a new head before we have a chance to // handle the server disconnect - cfg := TestNodeConfig{NoNewHeadsThreshold: testutils.WaitTimeout(t), PollInterval: 1 * time.Second} + cfg := TestNodePoolConfig{NodePollInterval: 1 * time.Second} chSubbed := make(chan struct{}, 1) chPolled := make(chan struct{}) s := testutils.NewWSServer(t, testutils.FixtureChainID, @@ -221,7 +222,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return }) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, testutils.WaitTimeout(t), logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) dial(t, n) @@ -245,7 +246,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) t.Run("when no new heads received for threshold, transitions to out of sync", func(t *testing.T) { - cfg := TestNodeConfig{NoNewHeadsThreshold: 1 * time.Second} + cfg := TestNodePoolConfig{} chSubbed := make(chan struct{}, 2) s := testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { @@ -267,7 +268,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return }) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, 1*time.Second, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) dial(t, n) @@ -288,7 +289,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { lggr, observedLogs := logger.TestLoggerObserved(t, zap.ErrorLevel) - pollDisabledCfg := TestNodeConfig{NoNewHeadsThreshold: testutils.TestInterval} + pollDisabledCfg := TestNodePoolConfig{} s := testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { switch method { @@ -305,7 +306,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return }) - iN := NewNode(pollDisabledCfg, lggr, *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(pollDisabledCfg, testutils.TestInterval, lggr, *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) n.nLiveNodes = func() (int, int64, *utils.Big) { return 1, 0, nil } dial(t, n) @@ -322,7 +323,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) t.Run("when behind more than SyncThreshold, transitions to out of sync", func(t *testing.T) { - cfg := TestNodeConfig{SyncThreshold: 10, PollFailureThreshold: 2, PollInterval: 100 * time.Millisecond, SelectionMode: NodeSelectionMode_HighestHead} + cfg := TestNodePoolConfig{NodeSyncThreshold: 10, NodePollFailureThreshold: 2, NodePollInterval: 100 * time.Millisecond, NodeSelectionMode: NodeSelectionMode_HighestHead} chSubbed := make(chan struct{}, 2) var highestHead atomic.Int64 const stall = 10 @@ -350,7 +351,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return }) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, 0*time.Second, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) n.nLiveNodes = func() (count int, blockNumber int64, totalDifficulty *utils.Big) { return 2, highestHead.Load(), nil @@ -377,14 +378,14 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { state, num, _ := n.StateAndLatest() return state == NodeStateOutOfSync && num == stall }) - assert.GreaterOrEqual(t, highestHead.Load(), int64(stall+cfg.SyncThreshold)) + assert.GreaterOrEqual(t, highestHead.Load(), int64(stall+cfg.SyncThreshold())) // Otherwise, there may be data race on dial() vs Close() (accessing ws.rpc) testutils.WaitWithTimeout(t, chSubbed, "timed out waiting for initial subscription for OutOfSync") }) t.Run("when behind but SyncThreshold=0, stay alive", func(t *testing.T) { - cfg := TestNodeConfig{SyncThreshold: 0, PollFailureThreshold: 2, PollInterval: 100 * time.Millisecond, SelectionMode: NodeSelectionMode_HighestHead} + cfg := TestNodePoolConfig{NodeSyncThreshold: 0, NodePollFailureThreshold: 2, NodePollInterval: 100 * time.Millisecond, NodeSelectionMode: NodeSelectionMode_HighestHead} chSubbed := make(chan struct{}, 1) var highestHead atomic.Int64 const stall = 10 @@ -412,7 +413,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return }) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, 0*time.Second, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) n.nLiveNodes = func() (count int, blockNumber int64, totalDifficulty *utils.Big) { return 2, highestHead.Load(), nil @@ -434,12 +435,12 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) assert.Equal(t, NodeStateAlive, n.state) - assert.GreaterOrEqual(t, highestHead.Load(), int64(stall+cfg.SyncThreshold)) + assert.GreaterOrEqual(t, highestHead.Load(), int64(stall+cfg.SyncThreshold())) }) t.Run("when behind more than SyncThreshold but we are the last live node, forcibly stays alive", func(t *testing.T) { lggr, observedLogs := logger.TestLoggerObserved(t, zap.ErrorLevel) - cfg := TestNodeConfig{SyncThreshold: 5, PollFailureThreshold: 2, PollInterval: 100 * time.Millisecond, SelectionMode: NodeSelectionMode_HighestHead} + cfg := TestNodePoolConfig{NodeSyncThreshold: 5, NodePollFailureThreshold: 2, NodePollInterval: 100 * time.Millisecond, NodeSelectionMode: NodeSelectionMode_HighestHead} chSubbed := make(chan struct{}, 1) var highestHead atomic.Int64 const stall = 10 @@ -471,7 +472,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return }) - iN := NewNode(cfg, lggr, *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, 0*time.Second, lggr, *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) n.nLiveNodes = func() (count int, blockNumber int64, totalDifficulty *utils.Big) { return 1, highestHead.Load(), nil @@ -494,7 +495,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { assert.Equal(t, NodeStateAlive, n.state) testutils.AssertEventually(t, func() bool { - return highestHead.Load() >= int64(stall+cfg.SyncThreshold) + return highestHead.Load() >= int64(stall+cfg.SyncThreshold()) }) testutils.WaitForLogMessageCount(t, observedLogs, msgCannotDisable, 1) @@ -510,8 +511,8 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { t.Parallel() t.Run("exits on close", func(t *testing.T) { - cfg := TestNodeConfig{} - n := newTestNode(t, cfg) + cfg := TestNodePoolConfig{} + n := newTestNode(t, cfg, time.Second*0) dial(t, n) n.setState(NodeStateOutOfSync) @@ -527,8 +528,8 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { }) t.Run("if initial subscribe fails, transitions to unreachable", func(t *testing.T) { - cfg := TestNodeConfig{} - n := newTestNodeWithCallback(t, cfg, func(string, gjson.Result) (resp testutils.JSONRPCResponse) { return }) + cfg := TestNodePoolConfig{} + n := newTestNodeWithCallback(t, cfg, time.Second*0, func(string, gjson.Result) (resp testutils.JSONRPCResponse) { return }) dial(t, n) n.setState(NodeStateOutOfSync) defer func() { assert.NoError(t, n.Close()) }() @@ -540,7 +541,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { }) t.Run("transitions to unreachable if remote RPC subscription channel closed", func(t *testing.T) { - cfg := TestNodeConfig{} + cfg := TestNodePoolConfig{} chSubbed := make(chan struct{}, 1) s := testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { @@ -556,7 +557,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return }) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, time.Duration(time.Second), logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) dial(t, n) @@ -584,7 +585,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { // we don't time out waiting for a new head before we have a chance to // handle the server disconnect lggr, observedLogs := logger.TestLoggerObserved(t, zap.DebugLevel) - cfg := TestNodeConfig{} + cfg := TestNodePoolConfig{} chSubbed := make(chan struct{}, 1) s := testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { @@ -603,7 +604,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return }) - iN := NewNode(cfg, lggr, *s.WSURL(), nil, "test node", 0, testutils.FixtureChainID, 1) + iN := NewNode(cfg, time.Second*0, lggr, *s.WSURL(), nil, "test node", 0, testutils.FixtureChainID, 1) n := iN.(*node) start(t, n) @@ -638,7 +639,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { t.Run("transitions to alive if back in-sync", func(t *testing.T) { lggr, observedLogs := logger.TestLoggerObserved(t, zap.DebugLevel) - cfg := TestNodeConfig{SyncThreshold: 5, SelectionMode: NodeSelectionMode_HighestHead} + cfg := TestNodePoolConfig{NodeSyncThreshold: 5, NodeSelectionMode: NodeSelectionMode_HighestHead} chSubbed := make(chan struct{}, 1) const stall = 42 s := testutils.NewWSServer(t, testutils.FixtureChainID, @@ -658,10 +659,10 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return }) - iN := NewNode(cfg, lggr, *s.WSURL(), nil, "test node", 0, testutils.FixtureChainID, 1) + iN := NewNode(cfg, time.Second*0, lggr, *s.WSURL(), nil, "test node", 0, testutils.FixtureChainID, 1) n := iN.(*node) n.nLiveNodes = func() (count int, blockNumber int64, totalDifficulty *utils.Big) { - return 2, stall + int64(cfg.SyncThreshold), nil + return 2, stall + int64(cfg.SyncThreshold()), nil } start(t, n) @@ -695,7 +696,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { }) t.Run("if no live nodes are available, forcibly marks this one alive again", func(t *testing.T) { - cfg := TestNodeConfig{NoNewHeadsThreshold: testutils.TestInterval} + cfg := TestNodePoolConfig{} chSubbed := make(chan struct{}, 1) s := testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { @@ -714,7 +715,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return }) - iN := NewNode(cfg, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) + iN := NewNode(cfg, testutils.TestInterval, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, testutils.FixtureChainID, 1) n := iN.(*node) n.nLiveNodes = func() (int, int64, *utils.Big) { return 0, 0, nil } @@ -737,8 +738,8 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) { t.Parallel() t.Run("exits on close", func(t *testing.T) { - cfg := TestNodeConfig{} - n := newTestNode(t, cfg) + cfg := TestNodePoolConfig{} + n := newTestNode(t, cfg, time.Second*0) start(t, n) n.setState(NodeStateUnreachable) @@ -753,8 +754,8 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) { }) t.Run("on successful redial and verify, transitions to alive", func(t *testing.T) { - cfg := TestNodeConfig{} - n := newTestNode(t, cfg) + cfg := TestNodePoolConfig{} + n := newTestNode(t, cfg, time.Second*0) start(t, n) defer func() { assert.NoError(t, n.Close()) }() n.setState(NodeStateUnreachable) @@ -768,10 +769,10 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) { }) t.Run("on successful redial but failed verify, transitions to invalid chain ID", func(t *testing.T) { - cfg := TestNodeConfig{} + cfg := TestNodePoolConfig{} s := testutils.NewWSServer(t, testutils.FixtureChainID, standardHandler) lggr, observedLogs := logger.TestLoggerObserved(t, zap.ErrorLevel) - iN := NewNode(cfg, lggr, *s.WSURL(), nil, "test node", 0, big.NewInt(42), 1) + iN := NewNode(cfg, time.Second*0, lggr, *s.WSURL(), nil, "test node", 0, big.NewInt(42), 1) n := iN.(*node) defer func() { assert.NoError(t, n.Close()) }() start(t, n) @@ -788,9 +789,9 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) { }) t.Run("on failed redial, keeps trying to redial", func(t *testing.T) { - cfg := TestNodeConfig{} + cfg := TestNodePoolConfig{} lggr, observedLogs := logger.TestLoggerObserved(t, zap.DebugLevel) - iN := NewNode(cfg, lggr, *testutils.MustParseURL(t, "ws://test.invalid"), nil, "test node", 0, big.NewInt(42), 1) + iN := NewNode(cfg, time.Second*0, lggr, *testutils.MustParseURL(t, "ws://test.invalid"), nil, "test node", 0, big.NewInt(42), 1) n := iN.(*node) defer func() { assert.NoError(t, n.Close()) }() start(t, n) @@ -808,8 +809,8 @@ func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) { t.Parallel() t.Run("exits on close", func(t *testing.T) { - cfg := TestNodeConfig{} - n := newTestNode(t, cfg) + cfg := TestNodePoolConfig{} + n := newTestNode(t, cfg, time.Second*0) start(t, n) n.setState(NodeStateInvalidChainID) @@ -824,8 +825,8 @@ func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) { }) t.Run("on successful verify, transitions to alive", func(t *testing.T) { - cfg := TestNodeConfig{} - n := newTestNode(t, cfg) + cfg := TestNodePoolConfig{} + n := newTestNode(t, cfg, time.Second*0) dial(t, n) defer func() { assert.NoError(t, n.Close()) }() n.setState(NodeStateInvalidChainID) @@ -839,10 +840,10 @@ func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) { }) t.Run("on failed verify, keeps checking", func(t *testing.T) { - cfg := TestNodeConfig{} + cfg := TestNodePoolConfig{} s := testutils.NewWSServer(t, testutils.FixtureChainID, standardHandler) lggr, observedLogs := logger.TestLoggerObserved(t, zap.ErrorLevel) - iN := NewNode(cfg, lggr, *s.WSURL(), nil, "test node", 0, big.NewInt(42), 1) + iN := NewNode(cfg, time.Second*0, lggr, *s.WSURL(), nil, "test node", 0, big.NewInt(42), 1) n := iN.(*node) defer func() { assert.NoError(t, n.Close()) }() dial(t, n) diff --git a/core/chains/evm/client/pool.go b/core/chains/evm/client/pool.go index 46f56ddd3f3..f9dca7e9cf8 100644 --- a/core/chains/evm/client/pool.go +++ b/core/chains/evm/client/pool.go @@ -57,13 +57,14 @@ type PoolConfig interface { // It is responsible for liveness checking and balancing queries across live nodes type Pool struct { utils.StartStopOnce - nodes []Node - sendonlys []SendOnlyNode - chainID *big.Int - chainType config.ChainType - logger logger.Logger - config PoolConfig - nodeSelector NodeSelector + nodes []Node + sendonlys []SendOnlyNode + chainID *big.Int + chainType config.ChainType + logger logger.Logger + selectionMode string + noNewHeadsThreshold time.Duration + nodeSelector NodeSelector activeMu sync.RWMutex activeNode Node @@ -72,13 +73,13 @@ type Pool struct { wg sync.WaitGroup } -func NewPool(logger logger.Logger, cfg PoolConfig, nodes []Node, sendonlys []SendOnlyNode, chainID *big.Int, chainType config.ChainType) *Pool { +func NewPool(logger logger.Logger, selectionMode string, noNewHeadsTreshold time.Duration, nodes []Node, sendonlys []SendOnlyNode, chainID *big.Int, chainType config.ChainType) *Pool { if chainID == nil { panic("chainID is required") } nodeSelector := func() NodeSelector { - switch cfg.NodeSelectionMode() { + switch selectionMode { case NodeSelectionMode_HighestHead: return NewHighestHeadNodeSelector(nodes) case NodeSelectionMode_RoundRobin: @@ -88,24 +89,25 @@ func NewPool(logger logger.Logger, cfg PoolConfig, nodes []Node, sendonlys []Sen case NodeSelectionMode_PriorityLevel: return NewPriorityLevelNodeSelector(nodes) default: - panic(fmt.Sprintf("unsupported NodeSelectionMode: %s", cfg.NodeSelectionMode())) + panic(fmt.Sprintf("unsupported NodeSelectionMode: %s", selectionMode)) } }() lggr := logger.Named("Pool").With("evmChainID", chainID.String()) p := &Pool{ - nodes: nodes, - sendonlys: sendonlys, - chainID: chainID, - chainType: chainType, - logger: lggr, - config: cfg, - nodeSelector: nodeSelector, - chStop: make(chan struct{}), + nodes: nodes, + sendonlys: sendonlys, + chainID: chainID, + chainType: chainType, + logger: lggr, + selectionMode: selectionMode, + noNewHeadsThreshold: noNewHeadsTreshold, + nodeSelector: nodeSelector, + chStop: make(chan struct{}), } - p.logger.Debugf("The pool is configured to use NodeSelectionMode: %s", cfg.NodeSelectionMode()) + p.logger.Debugf("The pool is configured to use NodeSelectionMode: %s", selectionMode) return p } diff --git a/core/chains/evm/client/pool_test.go b/core/chains/evm/client/pool_test.go index 59db2d3daf3..00c42597c36 100644 --- a/core/chains/evm/client/pool_test.go +++ b/core/chains/evm/client/pool_test.go @@ -157,7 +157,7 @@ func TestPool_Dial(t *testing.T) { for i, n := range test.sendNodes { sendNodes[i] = n.newSendOnlyNode(t, test.sendNodeChainID) } - p := evmclient.NewPool(logger.TestLogger(t), defaultConfig, nodes, sendNodes, test.poolChainID, "") + p := evmclient.NewPool(logger.TestLogger(t), defaultConfig.NodeSelectionMode(), time.Second*0, nodes, sendNodes, test.poolChainID, "") err := p.Dial(ctx) if err == nil { t.Cleanup(func() { assert.NoError(t, p.Close()) }) @@ -227,7 +227,7 @@ func (r *chainIDResps) newNode(t *testing.T, nodeChainID int64) evmclient.Node { } defer func() { r.id++ }() - return evmclient.NewNode(evmclient.TestNodeConfig{}, logger.TestLogger(t), *wsURL, httpURL, t.Name(), r.id, big.NewInt(nodeChainID), 0) + return evmclient.NewNode(evmclient.TestNodePoolConfig{}, time.Second*0, logger.TestLogger(t), *wsURL, httpURL, t.Name(), r.id, big.NewInt(nodeChainID), 0) } type chainIDService struct { @@ -250,7 +250,7 @@ func TestUnit_Pool_RunLoop(t *testing.T) { nodes := []evmclient.Node{n1, n2, n3} lggr, observedLogs := logger.TestLoggerObserved(t, zap.ErrorLevel) - p := evmclient.NewPool(lggr, defaultConfig, nodes, []evmclient.SendOnlyNode{}, &cltest.FixtureChainID, "") + p := evmclient.NewPool(lggr, defaultConfig.NodeSelectionMode(), time.Second*0, nodes, []evmclient.SendOnlyNode{}, &cltest.FixtureChainID, "") n1.On("String").Maybe().Return("n1") n2.On("String").Maybe().Return("n2") @@ -324,7 +324,7 @@ func TestUnit_Pool_BatchCallContextAll(t *testing.T) { sendonlys = append(sendonlys, s) } - p := evmclient.NewPool(logger.TestLogger(t), defaultConfig, nodes, sendonlys, &cltest.FixtureChainID, "") + p := evmclient.NewPool(logger.TestLogger(t), defaultConfig.NodeSelectionMode(), time.Second*0, nodes, sendonlys, &cltest.FixtureChainID, "") assert.True(t, p.ChainType().IsValid()) assert.False(t, p.ChainType().IsL2()) diff --git a/core/chains/evm/config/config.go b/core/chains/evm/config/config.go index 673ade92ccf..398bc1cc394 100644 --- a/core/chains/evm/config/config.go +++ b/core/chains/evm/config/config.go @@ -7,15 +7,9 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/chainlink/v2/core/assets" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/config" ) -// Deprecated, use EVM below -type ChainScopedOnlyConfig interface { - evmclient.NodeConfig -} - type EVM interface { HeadTracker() HeadTracker BalanceMonitor() BalanceMonitor @@ -23,6 +17,7 @@ type EVM interface { GasEstimator() GasEstimator OCR() OCR OCR2() OCR2 + NodePool() NodePool AutoCreateKey() bool BlockBackfillDepth() uint64 @@ -42,6 +37,7 @@ type EVM interface { NonceAutoSync() bool OperatorFactoryAddress() string RPCDefaultBatchSize() uint32 + NodeNoNewHeadsThreshold() time.Duration } type OCR interface { @@ -122,7 +118,6 @@ type BlockHistory interface { //go:generate mockery --quiet --name ChainScopedConfig --output ./mocks/ --case=underscore type ChainScopedConfig interface { config.AppConfig - ChainScopedOnlyConfig // Deprecated, to be replaced by EVM() below Validate() error EVM() EVM diff --git a/core/chains/evm/config/config_node_pool.go b/core/chains/evm/config/config_node_pool.go new file mode 100644 index 00000000000..aebe0b268ef --- /dev/null +++ b/core/chains/evm/config/config_node_pool.go @@ -0,0 +1,10 @@ +package config + +import "time" + +type NodePool interface { + PollFailureThreshold() uint32 + PollInterval() time.Duration + SelectionMode() string + SyncThreshold() uint32 +} diff --git a/core/chains/evm/config/config_node_pool_test.go b/core/chains/evm/config/config_node_pool_test.go new file mode 100644 index 00000000000..c04ce83997e --- /dev/null +++ b/core/chains/evm/config/config_node_pool_test.go @@ -0,0 +1,32 @@ +package config_test + +import ( + "math/big" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + v2 "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/v2" + configtest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest/v2" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +func TestNodePoolConfig(t *testing.T) { + gcfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + id := utils.NewBig(big.NewInt(rand.Int63())) + c.EVM[0] = &v2.EVMConfig{ + ChainID: id, + Chain: v2.Defaults(id, &v2.Chain{}), + } + }) + cfg := evmtest.NewChainScopedConfig(t, gcfg) + + require.Equal(t, "HighestHead", cfg.EVM().NodePool().SelectionMode()) + require.Equal(t, uint32(5), cfg.EVM().NodePool().SyncThreshold()) + require.Equal(t, time.Duration(10000000000), cfg.EVM().NodePool().PollInterval()) + require.Equal(t, uint32(5), cfg.EVM().NodePool().PollFailureThreshold()) +} diff --git a/core/chains/evm/config/mocks/chain_scoped_config.go b/core/chains/evm/config/mocks/chain_scoped_config.go index 80ccd0de128..bf84164cd3e 100644 --- a/core/chains/evm/config/mocks/chain_scoped_config.go +++ b/core/chains/evm/config/mocks/chain_scoped_config.go @@ -307,76 +307,6 @@ func (_m *ChainScopedConfig) Mercury() coreconfig.Mercury { return r0 } -// NodeNoNewHeadsThreshold provides a mock function with given fields: -func (_m *ChainScopedConfig) NodeNoNewHeadsThreshold() time.Duration { - ret := _m.Called() - - var r0 time.Duration - if rf, ok := ret.Get(0).(func() time.Duration); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(time.Duration) - } - - return r0 -} - -// NodePollFailureThreshold provides a mock function with given fields: -func (_m *ChainScopedConfig) NodePollFailureThreshold() uint32 { - ret := _m.Called() - - var r0 uint32 - if rf, ok := ret.Get(0).(func() uint32); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint32) - } - - return r0 -} - -// NodePollInterval provides a mock function with given fields: -func (_m *ChainScopedConfig) NodePollInterval() time.Duration { - ret := _m.Called() - - var r0 time.Duration - if rf, ok := ret.Get(0).(func() time.Duration); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(time.Duration) - } - - return r0 -} - -// NodeSelectionMode provides a mock function with given fields: -func (_m *ChainScopedConfig) NodeSelectionMode() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// NodeSyncThreshold provides a mock function with given fields: -func (_m *ChainScopedConfig) NodeSyncThreshold() uint32 { - ret := _m.Called() - - var r0 uint32 - if rf, ok := ret.Get(0).(func() uint32); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint32) - } - - return r0 -} - // OCR provides a mock function with given fields: func (_m *ChainScopedConfig) OCR() coreconfig.OCR { ret := _m.Called() diff --git a/core/chains/evm/config/v2/chain_scoped.go b/core/chains/evm/config/v2/chain_scoped.go index 24ad1defcde..690d0b24b67 100644 --- a/core/chains/evm/config/v2/chain_scoped.go +++ b/core/chains/evm/config/v2/chain_scoped.go @@ -147,6 +147,22 @@ func (e *evmConfig) MinIncomingConfirmations() uint32 { return *e.c.MinIncomingConfirmations } +func (e *evmConfig) NodePool() config.NodePool { + return &nodePoolConfig{c: e.c.NodePool} +} + +func (e *evmConfig) NodeNoNewHeadsThreshold() time.Duration { + return e.c.NoNewHeadsThreshold.Duration() +} + +func (c *ChainScoped) EVM() config.EVM { + return &evmConfig{c: c.cfg} +} + +func (c *ChainScoped) BlockEmissionIdleWarningThreshold() time.Duration { + return c.EVM().NodeNoNewHeadsThreshold() +} + func (e *evmConfig) MinContractPayment() *assets.Link { return e.c.MinContractPayment } @@ -171,27 +187,3 @@ func (e *evmConfig) OperatorFactoryAddress() string { } return e.c.OperatorFactoryAddress.String() } - -func (c *ChainScoped) EVM() config.EVM { - return &evmConfig{c: c.cfg} -} - -func (c *ChainScoped) NodeNoNewHeadsThreshold() time.Duration { - return c.cfg.NoNewHeadsThreshold.Duration() -} - -func (c *ChainScoped) NodePollFailureThreshold() uint32 { - return *c.cfg.NodePool.PollFailureThreshold -} - -func (c *ChainScoped) NodePollInterval() time.Duration { - return c.cfg.NodePool.PollInterval.Duration() -} - -func (c *ChainScoped) NodeSelectionMode() string { - return *c.cfg.NodePool.SelectionMode -} - -func (c *ChainScoped) NodeSyncThreshold() uint32 { - return *c.cfg.NodePool.SyncThreshold -} diff --git a/core/chains/evm/config/v2/config_node_pool.go b/core/chains/evm/config/v2/config_node_pool.go new file mode 100644 index 00000000000..514c3d4822d --- /dev/null +++ b/core/chains/evm/config/v2/config_node_pool.go @@ -0,0 +1,25 @@ +package v2 + +import ( + "time" +) + +type nodePoolConfig struct { + c NodePool +} + +func (n *nodePoolConfig) PollFailureThreshold() uint32 { + return *n.c.PollFailureThreshold +} + +func (n *nodePoolConfig) PollInterval() time.Duration { + return n.c.PollInterval.Duration() +} + +func (n *nodePoolConfig) SelectionMode() string { + return *n.c.SelectionMode +} + +func (n *nodePoolConfig) SyncThreshold() uint32 { + return *n.c.SyncThreshold +}