Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RPC filter limits, metrics, and better memory management. (#10718) #11894

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.")
rootCmd.PersistentFlags().DurationVar(&cfg.OverlayGetLogsTimeout, "rpc.overlay.getlogstimeout", rpccfg.DefaultOverlayGetLogsTimeout, "Maximum amount of time to wait for the answer from the overlay_getLogs call.")
rootCmd.PersistentFlags().DurationVar(&cfg.OverlayReplayBlockTimeout, "rpc.overlay.replayblocktimeout", rpccfg.DefaultOverlayReplayBlockTimeout, "Maximum amount of time to wait for the answer to replay a single block when called from an overlay_getLogs call.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxLogs, "rpc.subscription.filters.maxlogs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, "Maximum number of logs to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "rpc.subscription.filters.maxheaders", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "Maximum number of block headers to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.AllowUnprotectedTxs, utils.AllowUnprotectedTxs.Name, utils.AllowUnprotectedTxs.Value, utils.AllowUnprotectedTxs.Usage)
Expand Down Expand Up @@ -269,6 +274,7 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {

func EmbeddedServices(ctx context.Context,
erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig,
rpcFiltersConfig rpchelper.FiltersConfig,
blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer,
miningServer txpool.MiningServer, stateDiffClient StateChangesClient,
logger log.Logger,
Expand All @@ -291,7 +297,7 @@ func EmbeddedServices(ctx context.Context,

txPool = direct.NewTxPoolClient(txPoolServer)
mining = direct.NewMiningClient(miningServer)
ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger)
ff = rpchelper.New(ctx, rpcFiltersConfig, eth, txPool, mining, func() {}, logger)

return
}
Expand Down Expand Up @@ -541,7 +547,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
}()

ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger)
ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpcfg

import (
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"time"

"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand Down Expand Up @@ -51,6 +52,7 @@ type HttpCfg struct {
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
RpcFiltersConfig rpchelper.FiltersConfig
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
Expand Down
21 changes: 21 additions & 0 deletions erigon-lib/metrics/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
)

// NewCounter registers and returns new counter with the given name.
Expand Down Expand Up @@ -88,6 +89,26 @@ func GetOrCreateGauge(name string) Gauge {
return &gauge{g}
}

// GetOrCreateGaugeVec returns registered GaugeVec with the given name
// or creates a new GaugeVec if the registry doesn't contain a GaugeVec with
// the given name and labels.
//
// name must be a valid Prometheus-compatible metric with possible labels.
// labels are the names of the dimensions associated with the gauge vector.
// For instance,
//
// - foo, with labels []string{"bar", "baz"}
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func GetOrCreateGaugeVec(name string, labels []string, help ...string) *prometheus.GaugeVec {
gv, err := defaultSet.GetOrCreateGaugeVec(name, labels, help...)
if err != nil {
panic(fmt.Errorf("could not get or create new gaugevec: %w", err))
}

return gv
}

// NewSummary creates and returns new summary with the given name.
//
// name must be valid Prometheus-compatible metric with possible labels.
Expand Down
119 changes: 115 additions & 4 deletions erigon-lib/metrics/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
Expand All @@ -16,15 +17,23 @@ type namedMetric struct {
isAux bool
}

type namedMetricVec struct {
name string
metric *prometheus.GaugeVec
isAux bool
}

// Set is a set of metrics.
//
// Metrics belonging to a set are exported separately from global metrics.
//
// Set.WritePrometheus must be called for exporting metrics from the set.
type Set struct {
mu sync.Mutex
a []*namedMetric
m map[string]*namedMetric
mu sync.Mutex
a []*namedMetric
av []*namedMetricVec
m map[string]*namedMetric
vecs map[string]*namedMetricVec
}

var defaultSet = NewSet()
Expand All @@ -34,7 +43,8 @@ var defaultSet = NewSet()
// Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call.
func NewSet() *Set {
return &Set{
m: make(map[string]*namedMetric),
m: make(map[string]*namedMetric),
vecs: make(map[string]*namedMetricVec),
}
}

Expand All @@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
sort.Slice(s.av, lessFunc)
}
sa := append([]*namedMetric(nil), s.a...)
sav := append([]*namedMetricVec(nil), s.av...)
s.mu.Unlock()
for _, nm := range sa {
ch <- nm.metric.Desc()
}
for _, nmv := range sav {
nmv.metric.Describe(ch)
}
}

func (s *Set) Collect(ch chan<- prometheus.Metric) {
Expand All @@ -61,11 +78,18 @@ func (s *Set) Collect(ch chan<- prometheus.Metric) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
sort.Slice(s.av, lessFunc)
}
sa := append([]*namedMetric(nil), s.a...)
sav := append([]*namedMetricVec(nil), s.av...)
s.mu.Unlock()
for _, nm := range sa {
ch <- nm.metric
}
for _, nmv := range sav {
nmv.metric.Collect(ch)
}
}

// NewHistogram creates and returns new histogram in s with the given name.
Expand Down Expand Up @@ -307,6 +331,78 @@ func (s *Set) GetOrCreateGauge(name string, help ...string) (prometheus.Gauge, e
return g, nil
}

// GetOrCreateGaugeVec returns registered GaugeVec in s with the given name
// or creates new GaugeVec if s doesn't contain GaugeVec with the given name.
//
// name must be valid Prometheus-compatible metric with possible labels.
// For instance,
//
// - foo
// - foo{bar="baz"}
// - foo{bar="baz",aaa="b"}
//
// labels are the labels associated with the GaugeVec.
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
s.mu.Lock()
nm := s.vecs[name]
s.mu.Unlock()
if nm == nil {
metric, err := newGaugeVec(name, labels, help...)
if err != nil {
return nil, fmt.Errorf("invalid metric name %q: %w", name, err)
}

nmNew := &namedMetricVec{
name: name,
metric: metric,
}

s.mu.Lock()
nm = s.vecs[name]
if nm == nil {
nm = nmNew
s.vecs[name] = nm
s.av = append(s.av, nm)
}
s.mu.Unlock()
s.registerMetricVec(name, metric, false)
}

if nm.metric == nil {
return nil, fmt.Errorf("metric %q is nil", name)
}

metricType := reflect.TypeOf(nm.metric)
if metricType != reflect.TypeOf(&prometheus.GaugeVec{}) {
return nil, fmt.Errorf("metric %q isn't a GaugeVec. It is %s", name, metricType)
}

return nm.metric, nil
}

// newGaugeVec creates a new Prometheus GaugeVec.
func newGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
name, constLabels, err := parseMetric(name)
if err != nil {
return nil, err
}

helpStr := "gauge metric"
if len(help) > 0 {
helpStr = strings.Join(help, ", ")
}

gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: helpStr,
ConstLabels: constLabels,
}, labels)

return gv, nil
}

const defaultSummaryWindow = 5 * time.Minute

var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001}
Expand Down Expand Up @@ -431,6 +527,21 @@ func (s *Set) registerMetric(name string, m prometheus.Metric) {
s.mustRegisterLocked(name, m)
}

func (s *Set) registerMetricVec(name string, mv *prometheus.GaugeVec, isAux bool) {
s.mu.Lock()
defer s.mu.Unlock()

if _, exists := s.vecs[name]; !exists {
nmv := &namedMetricVec{
name: name,
metric: mv,
isAux: isAux,
}
s.vecs[name] = nmv
s.av = append(s.av, nmv)
}
}

// mustRegisterLocked registers given metric with the given name.
//
// Panics if the given name was already registered before.
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
}
// start HTTP API
httpRpcCfg := stack.Config().Http
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC,
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, httpRpcCfg.RpcFiltersConfig, blockReader, ethBackendRPC,
s.txPoolGrpcServer, miningRPC, stateDiffClient, s.logger)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ require (
github.com/pion/webrtc/v3 v3.1.42 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
6 changes: 6 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ var DefaultFlags = []cli.Flag{
&OverlayGetLogsFlag,
&OverlayReplayBlockFlag,

&RpcSubscriptionFiltersMaxLogsFlag,
&RpcSubscriptionFiltersMaxHeadersFlag,
&RpcSubscriptionFiltersMaxTxsFlag,
&RpcSubscriptionFiltersMaxAddressesFlag,
&RpcSubscriptionFiltersMaxTopicsFlag,

&utils.SnapKeepBlocksFlag,
&utils.SnapStopFlag,
&utils.DbPageSizeFlag,
Expand Down
50 changes: 42 additions & 8 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
)

var (
Expand Down Expand Up @@ -246,6 +247,32 @@ var (
Value: rpccfg.DefaultOverlayReplayBlockTimeout,
}

RpcSubscriptionFiltersMaxLogsFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxlogs",
Usage: "Maximum number of logs to store per subscription.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs,
}
RpcSubscriptionFiltersMaxHeadersFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxheaders",
Usage: "Maximum number of block headers to store per subscription.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders,
}
RpcSubscriptionFiltersMaxTxsFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxtxs",
Usage: "Maximum number of transactions to store per subscription.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs,
}
RpcSubscriptionFiltersMaxAddressesFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxaddresses",
Usage: "Maximum number of addresses per subscription to filter logs by.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses,
}
RpcSubscriptionFiltersMaxTopicsFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxtopics",
Usage: "Maximum number of topics per subscription to filter logs by.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics,
}

TxPoolCommitEvery = cli.DurationFlag{
Name: "txpool.commit.every",
Usage: "How often transactions should be committed to the storage",
Expand Down Expand Up @@ -487,14 +514,21 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
RpcFiltersConfig: rpchelper.FiltersConfig{
RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name),
RpcSubscriptionFiltersMaxHeaders: ctx.Int(RpcSubscriptionFiltersMaxHeadersFlag.Name),
RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name),
RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name),
RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name),
},
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),

OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),

Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/eth_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) {

ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m)
txPool := txpool.NewTxpoolClient(conn)
ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log)
ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log)

expected := 1
header := &types.Header{
Expand Down
Loading
Loading