Skip to content

Commit

Permalink
Move EVM node pool config (#9656)
Browse files Browse the repository at this point in the history
* Move EVM node pool config

* Update after merge

* Add test

* Update after merge
  • Loading branch information
george-dorin authored Jun 21, 2023
1 parent a1a8f33 commit 5fa2f37
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 233 deletions.
13 changes: 7 additions & 6 deletions core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"net/url"
"time"

"github.com/pkg/errors"
"github.com/smartcontractkit/sqlx"
Expand Down Expand Up @@ -93,7 +94,7 @@ func newChain(ctx context.Context, cfg evmconfig.ChainScopedConfig, nodes []*v2.
client = evmclient.NewNullClient(chainID, l)
} else if opts.GenEthClient == nil {
var err2 error
client, err2 = newEthClientFromChain(cfg, l, chainID, chainType, nodes)
client, err2 = newEthClientFromChain(cfg.EVM().NodePool(), cfg.EVM().NodeNoNewHeadsThreshold(), l, chainID, chainType, nodes)
if err2 != nil {
return nil, errors.Wrapf(err2, "failed to instantiate eth client for chain with ID %s", cfg.EVM().ChainID().String())
}
Expand Down Expand Up @@ -278,30 +279,30 @@ func (c *chain) Logger() logger.Logger { return c.logger }
func (c *chain) BalanceMonitor() monitor.BalanceMonitor { return c.balanceMonitor }
func (c *chain) GasEstimator() gas.EvmFeeEstimator { return c.gasEstimator }

func newEthClientFromChain(cfg evmclient.NodeConfig, lggr logger.Logger, chainID *big.Int, chainType config.ChainType, nodes []*v2.Node) (evmclient.Client, error) {
func newEthClientFromChain(cfg evmconfig.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, chainID *big.Int, chainType config.ChainType, nodes []*v2.Node) (evmclient.Client, error) {
var primaries []evmclient.Node
var sendonlys []evmclient.SendOnlyNode
for i, node := range nodes {
if node.SendOnly != nil && *node.SendOnly {
sendonly := evmclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID)
sendonlys = append(sendonlys, sendonly)
} else {
primary, err := newPrimary(cfg, lggr, node, int32(i), chainID)
primary, err := newPrimary(cfg, noNewHeadsThreshold, lggr, node, int32(i), chainID)
if err != nil {
return nil, err
}
primaries = append(primaries, primary)
}
}
return evmclient.NewClientWithNodes(lggr, cfg, primaries, sendonlys, chainID, chainType)
return evmclient.NewClientWithNodes(lggr, cfg.SelectionMode(), noNewHeadsThreshold, primaries, sendonlys, chainID, chainType)
}

func newPrimary(cfg evmclient.NodeConfig, lggr logger.Logger, n *v2.Node, id int32, chainID *big.Int) (evmclient.Node, error) {
func newPrimary(cfg evmconfig.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, n *v2.Node, id int32, chainID *big.Int) (evmclient.Node, error) {
if n.SendOnly != nil && *n.SendOnly {
return nil, errors.New("cannot cast send-only node to primary")
}

return evmclient.NewNode(cfg, lggr, (url.URL)(*n.WSURL), (*url.URL)(n.HTTPURL), *n.Name, id, chainID, *n.Order), nil
return evmclient.NewNode(cfg, noNewHeadsThreshold, lggr, (url.URL)(*n.WSURL), (*url.URL)(n.HTTPURL), *n.Name, id, chainID, *n.Order), nil
}

func EnsureChains(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, ids []utils.Big) error {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ var _ htrktypes.Client[*evmtypes.Head, ethereum.Subscription, *big.Int, common.H

// NewClientWithNodes instantiates a client from a list of nodes
// Currently only supports one primary
func NewClientWithNodes(logger logger.Logger, cfg PoolConfig, primaryNodes []Node, sendOnlyNodes []SendOnlyNode, chainID *big.Int, chainType config.ChainType) (*client, error) {
pool := NewPool(logger, cfg, primaryNodes, sendOnlyNodes, chainID, chainType)
func NewClientWithNodes(logger logger.Logger, selectionMode string, noNewHeadsThreshold time.Duration, primaryNodes []Node, sendOnlyNodes []SendOnlyNode, chainID *big.Int, chainType config.ChainType) (*client, error) {
pool := NewPool(logger, selectionMode, noNewHeadsThreshold, primaryNodes, sendOnlyNodes, chainID, chainType)
return &client{
logger: logger,
pool: pool,
Expand Down
6 changes: 3 additions & 3 deletions core/chains/evm/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func mustNewClient(t *testing.T, wsURL string, sendonlys ...url.URL) evmclient.C
}

func mustNewClientWithChainID(t *testing.T, wsURL string, chainID *big.Int, sendonlys ...url.URL) evmclient.Client {
cfg := evmclient.TestNodeConfig{
SelectionMode: evmclient.NodeSelectionMode_RoundRobin,
cfg := evmclient.TestNodePoolConfig{
NodeSelectionMode: evmclient.NodeSelectionMode_RoundRobin,
}
c, err := evmclient.NewClientWithTestNode(t, cfg, wsURL, nil, sendonlys, 42, chainID)
c, err := evmclient.NewClientWithTestNode(t, cfg, time.Second*0, wsURL, nil, sendonlys, 42, chainID)
require.NoError(t, err)
return c
}
Expand Down
27 changes: 13 additions & 14 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,24 @@ import (

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

type TestNodeConfig struct {
NoNewHeadsThreshold time.Duration
PollFailureThreshold uint32
PollInterval time.Duration
SelectionMode string
SyncThreshold uint32
type TestNodePoolConfig struct {
NodePollFailureThreshold uint32
NodePollInterval time.Duration
NodeSelectionMode string
NodeSyncThreshold uint32
}

func (tc TestNodeConfig) NodeNoNewHeadsThreshold() time.Duration { return tc.NoNewHeadsThreshold }
func (tc TestNodeConfig) NodePollFailureThreshold() uint32 { return tc.PollFailureThreshold }
func (tc TestNodeConfig) NodePollInterval() time.Duration { return tc.PollInterval }
func (tc TestNodeConfig) NodeSelectionMode() string { return tc.SelectionMode }
func (tc TestNodeConfig) NodeSyncThreshold() uint32 { return tc.SyncThreshold }
func (tc TestNodePoolConfig) PollFailureThreshold() uint32 { return tc.NodePollFailureThreshold }
func (tc TestNodePoolConfig) PollInterval() time.Duration { return tc.NodePollInterval }
func (tc TestNodePoolConfig) SelectionMode() string { return tc.NodeSelectionMode }
func (tc TestNodePoolConfig) SyncThreshold() uint32 { return tc.NodeSyncThreshold }

func NewClientWithTestNode(t *testing.T, cfg NodeConfig, rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, id int32, chainID *big.Int) (*client, error) {
func NewClientWithTestNode(t *testing.T, nodePoolCfg config.NodePool, noNewHeadsThreshold time.Duration, rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, id int32, chainID *big.Int) (*client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
if err != nil {
return nil, err
Expand All @@ -38,7 +37,7 @@ func NewClientWithTestNode(t *testing.T, cfg NodeConfig, rpcUrl string, rpcHTTPU
}

lggr := logger.TestLogger(t)
n := NewNode(cfg, lggr, *parsed, rpcHTTPURL, "eth-primary-0", id, chainID, 1)
n := NewNode(nodePoolCfg, noNewHeadsThreshold, lggr, *parsed, rpcHTTPURL, "eth-primary-0", id, chainID, 1)
n.(*node).setLatestReceived(0, utils.NewBigI(0))
primaries := []Node{n}

Expand All @@ -51,7 +50,7 @@ func NewClientWithTestNode(t *testing.T, cfg NodeConfig, rpcUrl string, rpcHTTPU
sendonlys = append(sendonlys, s)
}

pool := NewPool(lggr, cfg, primaries, sendonlys, chainID, "")
pool := NewPool(lggr, nodePoolCfg.SelectionMode(), noNewHeadsThreshold, primaries, sendonlys, chainID, "")
c := &client{logger: lggr, pool: pool}
t.Cleanup(c.Close)
return c, nil
Expand Down
30 changes: 12 additions & 18 deletions core/chains/evm/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -130,13 +131,14 @@ type rawclient struct {
// It must have a ws url and may have a http url
type node struct {
utils.StartStopOnce
lfcLog logger.Logger
rpcLog logger.Logger
name string
id int32
chainID *big.Int
cfg NodeConfig
order int32
lfcLog logger.Logger
rpcLog logger.Logger
name string
id int32
chainID *big.Int
nodePoolCfg config.NodePool
noNewHeadsThreshold time.Duration
order int32

ws rawclient
http *rawclient
Expand Down Expand Up @@ -169,22 +171,14 @@ type node struct {
nLiveNodes func() (count int, blockNumber int64, totalDifficulty *utils.Big)
}

// NodeConfig allows configuration of the node
type NodeConfig interface {
NodeNoNewHeadsThreshold() time.Duration
NodePollFailureThreshold() uint32
NodePollInterval() time.Duration
NodeSelectionMode() string
NodeSyncThreshold() uint32
}

// NewNode returns a new *node as Node
func NewNode(nodeCfg NodeConfig, lggr logger.Logger, wsuri url.URL, httpuri *url.URL, name string, id int32, chainID *big.Int, nodeOrder int32) Node {
func NewNode(nodeCfg config.NodePool, noNewHeadsThreshold time.Duration, lggr logger.Logger, wsuri url.URL, httpuri *url.URL, name string, id int32, chainID *big.Int, nodeOrder int32) Node {
n := new(node)
n.name = name
n.id = id
n.chainID = chainID
n.cfg = nodeCfg
n.nodePoolCfg = nodeCfg
n.noNewHeadsThreshold = noNewHeadsThreshold
n.ws.uri = wsuri
n.order = nodeOrder
if httpuri != nil {
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/client/node_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"testing"
"time"

"github.com/ethereum/go-ethereum"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -42,7 +43,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) {
t.Parallel()

s := testutils.NewWSServer(t, testutils.FixtureChainID, nil)
iN := NewNode(TestNodeConfig{}, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, nil, 1)
iN := NewNode(TestNodePoolConfig{}, time.Second*0, logger.TestLogger(t), *s.WSURL(), nil, "test node", 42, nil, 1)
n := iN.(*node)

assert.Equal(t, NodeStateUndialed, n.State())
Expand Down
18 changes: 9 additions & 9 deletions core/chains/evm/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var (
// state change in case we have to force a state transition due to no available
// nodes.
// NOTE: This only applies to out-of-sync nodes if they are the last available node
func zombieNodeCheckInterval(cfg NodeConfig) time.Duration {
interval := cfg.NodeNoNewHeadsThreshold()
func zombieNodeCheckInterval(noNewHeadsThreshold time.Duration) time.Duration {
interval := noNewHeadsThreshold
if interval <= 0 || interval > queryTimeout {
interval = queryTimeout
}
Expand Down Expand Up @@ -83,9 +83,9 @@ func (n *node) aliveLoop() {
}
}

noNewHeadsTimeoutThreshold := n.cfg.NodeNoNewHeadsThreshold()
pollFailureThreshold := n.cfg.NodePollFailureThreshold()
pollInterval := n.cfg.NodePollInterval()
noNewHeadsTimeoutThreshold := n.noNewHeadsThreshold
pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold()
pollInterval := n.nodePoolCfg.PollInterval()

lggr := n.lfcLog.Named("Alive").With("noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold)
lggr.Tracew("Alive loop starting", "nodeState", n.State())
Expand Down Expand Up @@ -208,7 +208,7 @@ func (n *node) aliveLoop() {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
outOfSyncT.Reset(zombieNodeCheckInterval(n.cfg))
outOfSyncT.Reset(zombieNodeCheckInterval(n.noNewHeadsThreshold))
continue
}
}
Expand All @@ -230,13 +230,13 @@ func (n *node) syncStatus(num int64, td *utils.Big) (outOfSync bool, liveNodes i
if n.nLiveNodes == nil {
return // skip for tests
}
threshold := n.cfg.NodeSyncThreshold()
threshold := n.nodePoolCfg.SyncThreshold()
if threshold == 0 {
return // disabled
}
// Check against best node
ln, highest, greatest := n.nLiveNodes()
mode := n.cfg.NodeSelectionMode()
mode := n.nodePoolCfg.SelectionMode()
switch mode {
case NodeSelectionMode_HighestHead, NodeSelectionMode_RoundRobin, NodeSelectionMode_PriorityLevel:
return num < highest-int64(threshold), ln
Expand Down Expand Up @@ -320,7 +320,7 @@ func (n *node) outOfSyncLoop(isOutOfSync func(num int64, td *utils.Big) bool) {
return
}
lggr.Debugw(msgReceivedBlock, "blockNumber", head.Number, "totalDifficulty", head.TotalDifficulty, "nodeState", n.State())
case <-time.After(zombieNodeCheckInterval(n.cfg)):
case <-time.After(zombieNodeCheckInterval(n.noNewHeadsThreshold)):
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 1 {
lggr.Critical("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
Expand Down
Loading

0 comments on commit 5fa2f37

Please sign in to comment.