From adf3f438d8aae4c749e5ddacb3fe08afd6e695e7 Mon Sep 17 00:00:00 2001 From: Bret <787344+bretep@users.noreply.github.com> Date: Wed, 12 Jun 2024 23:41:32 -0700 Subject: [PATCH] Added RPC filter limits, metrics, and better memory management. (#10718) - Introduced `GetOrCreateGaugeVec` function to manage Prometheus `GaugeVec` metrics. - Implemented various subscription filter limits (max logs, max headers, max transactions, max addresses, and max topics). - Updated various test files to utilize the new `FiltersConfig` and added comprehensive tests for the new filter limits. - Added new flags for subscription filter limits. - Improved log filtering and event handling with better memory management and metric tracking. - Added configuration to handle RPC subscription filter limits. - Minor typo fixes. ------- ### Problem RPC nodes would frequently OOM. Based on metrics and pprof, I observed high memory utilization in the rpchelper: ##### Initial Observation After 2 Hours ```text File: erigon Type: inuse_space Entering interactive mode (type "help" for commands, "o" for options) (pprof) top Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total Dropped 1081 nodes (cum <= 365.42MB) Showing top 10 nodes out of 43 flat flat% sum% cum cum% 65925.80MB 90.21% 90.21% 65925.80MB 90.21% github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 1944.80MB 2.66% 92.87% 1944.80MB 2.66% github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 1055.97MB 1.44% 94.31% 1055.97MB 1.44% github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 362.04MB 0.5% 94.81% 572.20MB 0.78% github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState 257.73MB 0.35% 95.16% 587.85MB 0.8% encoding/json.Marshal 57.85MB 0.079% 95.24% 1658.66MB 2.27% github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run 51.48MB 0.07% 95.31% 2014.50MB 2.76% github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg 23.01MB 0.031% 95.34% 1127.21MB 1.54% github.com/ledgerwatch/erigon/core/vm.opCall 11.50MB 0.016% 95.36% 1677.68MB 2.30% github.com/ledgerwatch/erigon/core/vm.(*EVM).call 6.50MB 0.0089% 95.37% 685.70MB 0.94% github.com/ledgerwatch/erigon/turbo/transactions.DoCall (pprof) list distributeLog.func1 Total: 71.37GB ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go 64.38GB 64.38GB (flat, cum) 90.21% of Total . . 206: a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error { . . 207: if filter.allAddrs == 0 { . . 208: _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address)) . . 209: if !addrOk { . . 210: return nil . . 211: } . . 212: } . . 213: var topics []libcommon.Hash . . 214: for _, topic := range eventLog.Topics { 27.01GB 27.01GB 215: topics = append(topics, gointerfaces.ConvertH256ToHash(topic)) . . 216: } . . 217: if filter.allTopics == 0 { . . 218: if !a.chooseTopics(filter, topics) { . . 219: return nil . . 220: } . . 221: } 37.37GB 37.37GB 222: lg := &types2.Log{ . . 223: Address: gointerfaces.ConvertH160toAddress(eventLog.Address), . . 224: Topics: topics, . . 225: Data: eventLog.Data, . . 226: BlockNumber: eventLog.BlockNumber, . . 227: TxHash: gointerfaces.ConvertH256ToHash(eventLog.TransactionHash), (pprof) % ``` ### Actions Taken - Added Metrics Implemented metrics to gain visibility into memory utilization. - Optimized Allocations Optimized object and slice allocations, resulting in significant improvements in memory usage. ##### Observation After 2 Hours Post-Optimization ```text File: erigon Type: inuse_space Entering interactive mode (type "help" for commands, "o" for options) (pprof) top Showing nodes accounting for 25.92GB, 94.14% of 27.53GB total Dropped 785 nodes (cum <= 0.14GB) Showing top 10 nodes out of 70 flat flat% sum% cum cum% 15.37GB 55.82% 55.82% 15.37GB 55.82% github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 8.15GB 29.60% 85.42% 8.15GB 29.60% github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 0.64GB 2.33% 87.75% 1GB 3.65% github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog 0.50GB 1.82% 89.57% 0.65GB 2.35% github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState 0.36GB 1.32% 90.89% 0.36GB 1.32% github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 0.31GB 1.12% 92.01% 0.31GB 1.12% bytes.growSlice 0.30GB 1.08% 93.09% 0.30GB 1.08% github.com/ledgerwatch/erigon/core/vm.(*Memory).Resize (inline) 0.15GB 0.56% 93.65% 0.35GB 1.27% github.com/ledgerwatch/erigon/core/state.(*IntraBlockState).AddSlotToAccessList 0.07GB 0.24% 93.90% 0.23GB 0.84% github.com/ledgerwatch/erigon/core/types.codecSelfer2.decLogs 0.07GB 0.24% 94.14% 1.58GB 5.75% github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run (pprof) list AddLogs.func1 Total: 27.53GB ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go 15.37GB 15.37GB (flat, cum) 55.82% of Total . . 644: ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log { . . 645: if !ok { . . 646: st = make([]*types.Log, 0) . . 647: } 15.37GB 15.37GB 648: st = append(st, logs) . . 649: return st . . 650: }) . . 651:} . . 652: . . 653:// ReadLogs reads logs from the store associated with the given subscription ID. (pprof) list AddPendingTxs.func1 Total: 27.53GB ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go 8.15GB 8.15GB (flat, cum) 29.60% of Total . . 686: ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction { . . 687: if !ok { . . 688: st = make([][]types.Transaction, 0) . . 689: } 8.15GB 8.15GB 690: st = append(st, txs) . . 691: return st . . 692: }) . . 693:} . . 694: . . 695:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID. (pprof) % ``` ### Findings - Identified unbound slices that could grow indefinitely, leading to memory leaks. This issue occurs if subscribers do not request updates using their subscription ID, especially when behind a load balancer that does not pin clients to RPC nodes. ### Solutions Considered 1. Architecture Design Changes - Implement architectural changes to pin clients to RPC Nodes ensuring subscribers that request updates using their subscription IDs will hit the same node, which does clean up the objecs on each request. - Reason for not choosing: This still relies on clients with subscriptions to request updates and if they never do and do not call the unsubscribe function it's an unbound memory leak. The RPC node operator has no control over the behavior of the clients. 2. Implementing Timeouts - Introduce timeouts for subscriptions to automatically clean up unresponsive or inactive subscriptions. - Reason for not choosing: Implementing timeouts would inadvertently stop websocket subscriptions due to the current design of the code, leading to a potential loss of data for active users. This solution would be good but requires a lot more work. 3. Configurable Limits (Chosen Solution) - Set configurable limits for various subscription parameters (e.g., logs, headers, transactions, addresses, topics) to manage memory utilization effectively. - Reason for choosing: This approach provides flexibility to RPC node operators to configure limits as per their requirements. The default behavior remains unchanged, making it a non-breaking change. Additionally, it ensures current data is always available by pruning the oldest data first. ### Test with these configured limits ```bash --rpc.subscription.filters.maxlogs=60 --rpc.subscription.filters.maxheaders=60 --rpc.subscription.filters.maxtxs=10_000 --rpc.subscription.filters.maxaddresses=1_000 --rpc.subscription.filters.maxtopics=1_000 ``` ##### Observation After 10 Hours with Limits ```text File: erigon Type: inuse_space Entering interactive mode (type "help" for commands, "o" for options) (pprof) top Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total Dropped 403 nodes (cum <= 6.42MB) Showing top 10 nodes out of 124 flat flat% sum% cum cum% 923.64MB 71.92% 71.92% 923.64MB 71.92% github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 54.59MB 4.25% 76.18% 54.59MB 4.25% github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes 38.21MB 2.98% 79.15% 38.21MB 2.98% bytes.growSlice 29.34MB 2.28% 81.44% 29.34MB 2.28% github.com/ledgerwatch/erigon/p2p/rlpx.growslice 26.44MB 2.06% 83.49% 34.16MB 2.66% compress/flate.NewWriter 18.66MB 1.45% 84.95% 21.16MB 1.65% github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState 17.50MB 1.36% 86.31% 64.58MB 5.03% github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP 15MB 1.17% 87.48% 82.08MB 6.39% github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary 13.58MB 1.06% 88.54% 13.58MB 1.06% github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink 9.55MB 0.74% 89.28% 9.55MB 0.74% github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...] (pprof) list AddPendingTxs.func1 Total: 1.25GB ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go 923.64MB 923.64MB (flat, cum) 71.92% of Total . . 698: ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction { . . 699: if !ok { . . 700: st = make([][]types.Transaction, 0) . . 701: } . . 702: . . 703: // Calculate the total number of transactions in st . . 704: totalTxs := 0 . . 705: for _, txBatch := range st { . . 706: totalTxs += len(txBatch) . . 707: } . . 708: . . 709: maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs . . 710: // If adding the new transactions would exceed maxTxs, remove oldest transactions . . 711: if maxTxs > 0 && totalTxs+len(txs) > maxTxs { . . 712: // Flatten st to a single slice 918.69MB 918.69MB 713: flatSt := make([]types.Transaction, 0, totalTxs) . . 714: for _, txBatch := range st { . . 715: flatSt = append(flatSt, txBatch...) . . 716: } . . 717: . . 718: // Remove the oldest transactions to make space for new ones . . 719: if len(flatSt)+len(txs) > maxTxs { . . 720: flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:] . . 721: } . . 722: . . 723: // Convert flatSt back to [][]types.Transaction with a single batch . . 724: st = [][]types.Transaction{flatSt} . . 725: } . . 726: . . 727: // Append the new transactions as a new batch 5.95MB 4.95MB 728: st = append(st, txs) . . 729: return st . . 730: }) . . 731:} . . 732: . . 733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID. ``` With these changes, the memory utilization has significantly improved, and the system is now more stable and predictable. ----- ## Graph of a single node over 7-days before and after these changes image Blue line indicates a deployment Sharp drop of green line is an OOM --- cmd/rpcdaemon/cli/config.go | 10 +- cmd/rpcdaemon/cli/httpcfg/http_cfg.go | 2 + erigon-lib/metrics/register.go | 21 ++++ erigon-lib/metrics/set.go | 119 +++++++++++++++++- eth/backend.go | 2 +- go.mod | 2 +- turbo/cli/default_flags.go | 6 + turbo/cli/flags.go | 50 ++++++-- turbo/jsonrpc/eth_block_test.go | 2 +- turbo/jsonrpc/eth_call_test.go | 2 +- turbo/jsonrpc/eth_filters_test.go | 4 +- turbo/jsonrpc/eth_mining_test.go | 4 +- turbo/jsonrpc/eth_subscribe_test.go | 2 +- turbo/jsonrpc/send_transaction_test.go | 4 +- turbo/jsonrpc/txpool_api_test.go | 2 +- turbo/rpchelper/config.go | 22 ++++ turbo/rpchelper/filters.go | 91 ++++++++++++-- turbo/rpchelper/filters_deadlock_test.go | 3 +- turbo/rpchelper/filters_test.go | 149 ++++++++++++++++++++++- turbo/rpchelper/logsfilter.go | 61 ++++++++-- turbo/rpchelper/metrics.go | 19 +++ 21 files changed, 522 insertions(+), 55 deletions(-) create mode 100644 turbo/rpchelper/config.go create mode 100644 turbo/rpchelper/metrics.go diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 977df9435ef..ad460580281 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -138,6 +138,11 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) { rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.") rootCmd.PersistentFlags().DurationVar(&cfg.OverlayGetLogsTimeout, "rpc.overlay.getlogstimeout", rpccfg.DefaultOverlayGetLogsTimeout, "Maximum amount of time to wait for the answer from the overlay_getLogs call.") rootCmd.PersistentFlags().DurationVar(&cfg.OverlayReplayBlockTimeout, "rpc.overlay.replayblocktimeout", rpccfg.DefaultOverlayReplayBlockTimeout, "Maximum amount of time to wait for the answer to replay a single block when called from an overlay_getLogs call.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxLogs, "rpc.subscription.filters.maxlogs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, "Maximum number of logs to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "rpc.subscription.filters.maxheaders", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "Maximum number of block headers to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.") rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage) rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage) rootCmd.PersistentFlags().BoolVar(&cfg.AllowUnprotectedTxs, utils.AllowUnprotectedTxs.Name, utils.AllowUnprotectedTxs.Value, utils.AllowUnprotectedTxs.Usage) @@ -267,6 +272,7 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error { func EmbeddedServices(ctx context.Context, erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig, + rpcFiltersConfig rpchelper.FiltersConfig, blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer, miningServer txpool.MiningServer, stateDiffClient StateChangesClient, logger log.Logger, @@ -289,7 +295,7 @@ func EmbeddedServices(ctx context.Context, txPool = direct.NewTxPoolClient(txPoolServer) mining = direct.NewMiningClient(miningServer) - ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger) + ff = rpchelper.New(ctx, rpcFiltersConfig, eth, txPool, mining, func() {}, logger) return } @@ -533,7 +539,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } }() - ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger) + ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger) return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err } diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index 3673eaac5b0..d66522bbe0b 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -1,6 +1,7 @@ package httpcfg import ( + "github.com/ledgerwatch/erigon/turbo/rpchelper" "time" "github.com/ledgerwatch/erigon-lib/common/datadir" @@ -51,6 +52,7 @@ type HttpCfg struct { RpcAllowListFilePath string RpcBatchConcurrency uint RpcStreamingDisable bool + RpcFiltersConfig rpchelper.FiltersConfig DBReadConcurrency int TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum TxPoolApiAddr string diff --git a/erigon-lib/metrics/register.go b/erigon-lib/metrics/register.go index 4a2e68f55e4..40112af3a1b 100644 --- a/erigon-lib/metrics/register.go +++ b/erigon-lib/metrics/register.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "github.com/prometheus/client_golang/prometheus" ) // NewCounter registers and returns new counter with the given name. @@ -88,6 +89,26 @@ func GetOrCreateGauge(name string) Gauge { return &gauge{g} } +// GetOrCreateGaugeVec returns registered GaugeVec with the given name +// or creates a new GaugeVec if the registry doesn't contain a GaugeVec with +// the given name and labels. +// +// name must be a valid Prometheus-compatible metric with possible labels. +// labels are the names of the dimensions associated with the gauge vector. +// For instance, +// +// - foo, with labels []string{"bar", "baz"} +// +// The returned GaugeVec is safe to use from concurrent goroutines. +func GetOrCreateGaugeVec(name string, labels []string, help ...string) *prometheus.GaugeVec { + gv, err := defaultSet.GetOrCreateGaugeVec(name, labels, help...) + if err != nil { + panic(fmt.Errorf("could not get or create new gaugevec: %w", err)) + } + + return gv +} + // NewSummary creates and returns new summary with the given name. // // name must be valid Prometheus-compatible metric with possible labels. diff --git a/erigon-lib/metrics/set.go b/erigon-lib/metrics/set.go index ad4b164c239..b362994dd68 100644 --- a/erigon-lib/metrics/set.go +++ b/erigon-lib/metrics/set.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "reflect" "sort" "strings" "sync" @@ -16,15 +17,23 @@ type namedMetric struct { isAux bool } +type namedMetricVec struct { + name string + metric *prometheus.GaugeVec + isAux bool +} + // Set is a set of metrics. // // Metrics belonging to a set are exported separately from global metrics. // // Set.WritePrometheus must be called for exporting metrics from the set. type Set struct { - mu sync.Mutex - a []*namedMetric - m map[string]*namedMetric + mu sync.Mutex + a []*namedMetric + av []*namedMetricVec + m map[string]*namedMetric + vecs map[string]*namedMetricVec } var defaultSet = NewSet() @@ -34,7 +43,8 @@ var defaultSet = NewSet() // Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call. func NewSet() *Set { return &Set{ - m: make(map[string]*namedMetric), + m: make(map[string]*namedMetric), + vecs: make(map[string]*namedMetricVec), } } @@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) { if !sort.SliceIsSorted(s.a, lessFunc) { sort.Slice(s.a, lessFunc) } + if !sort.SliceIsSorted(s.av, lessFunc) { + sort.Slice(s.av, lessFunc) + } sa := append([]*namedMetric(nil), s.a...) + sav := append([]*namedMetricVec(nil), s.av...) s.mu.Unlock() for _, nm := range sa { ch <- nm.metric.Desc() } + for _, nmv := range sav { + nmv.metric.Describe(ch) + } } func (s *Set) Collect(ch chan<- prometheus.Metric) { @@ -61,11 +78,18 @@ func (s *Set) Collect(ch chan<- prometheus.Metric) { if !sort.SliceIsSorted(s.a, lessFunc) { sort.Slice(s.a, lessFunc) } + if !sort.SliceIsSorted(s.av, lessFunc) { + sort.Slice(s.av, lessFunc) + } sa := append([]*namedMetric(nil), s.a...) + sav := append([]*namedMetricVec(nil), s.av...) s.mu.Unlock() for _, nm := range sa { ch <- nm.metric } + for _, nmv := range sav { + nmv.metric.Collect(ch) + } } // NewHistogram creates and returns new histogram in s with the given name. @@ -308,6 +332,78 @@ func (s *Set) GetOrCreateGauge(name string, help ...string) (prometheus.Gauge, e return g, nil } +// GetOrCreateGaugeVec returns registered GaugeVec in s with the given name +// or creates new GaugeVec if s doesn't contain GaugeVec with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// labels are the labels associated with the GaugeVec. +// +// The returned GaugeVec is safe to use from concurrent goroutines. +func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) { + s.mu.Lock() + nm := s.vecs[name] + s.mu.Unlock() + if nm == nil { + metric, err := newGaugeVec(name, labels, help...) + if err != nil { + return nil, fmt.Errorf("invalid metric name %q: %w", name, err) + } + + nmNew := &namedMetricVec{ + name: name, + metric: metric, + } + + s.mu.Lock() + nm = s.vecs[name] + if nm == nil { + nm = nmNew + s.vecs[name] = nm + s.av = append(s.av, nm) + } + s.mu.Unlock() + s.registerMetricVec(name, metric, false) + } + + if nm.metric == nil { + return nil, fmt.Errorf("metric %q is nil", name) + } + + metricType := reflect.TypeOf(nm.metric) + if metricType != reflect.TypeOf(&prometheus.GaugeVec{}) { + return nil, fmt.Errorf("metric %q isn't a GaugeVec. It is %s", name, metricType) + } + + return nm.metric, nil +} + +// newGaugeVec creates a new Prometheus GaugeVec. +func newGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) { + name, constLabels, err := parseMetric(name) + if err != nil { + return nil, err + } + + helpStr := "gauge metric" + if len(help) > 0 { + helpStr = strings.Join(help, ", ") + } + + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: helpStr, + ConstLabels: constLabels, + }, labels) + + return gv, nil +} + const defaultSummaryWindow = 5 * time.Minute var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001} @@ -432,6 +528,21 @@ func (s *Set) registerMetric(name string, m prometheus.Metric) { s.mustRegisterLocked(name, m) } +func (s *Set) registerMetricVec(name string, mv *prometheus.GaugeVec, isAux bool) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.vecs[name]; !exists { + nmv := &namedMetricVec{ + name: name, + metric: mv, + isAux: isAux, + } + s.vecs[name] = nmv + s.av = append(s.av, nmv) + } +} + // mustRegisterLocked registers given metric with the given name. // // Panics if the given name was already registered before. diff --git a/eth/backend.go b/eth/backend.go index 86c3a2f62b1..cd2bcdb0613 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1023,7 +1023,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig } // start HTTP API httpRpcCfg := stack.Config().Http - ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC, + ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, httpRpcCfg.RpcFiltersConfig, blockReader, ethBackendRPC, s.txPoolGrpcServer, miningRPC, stateDiffClient, s.logger) if err != nil { return err diff --git a/go.mod b/go.mod index d712e06fa55..87476efe841 100644 --- a/go.mod +++ b/go.mod @@ -243,7 +243,7 @@ require ( github.com/pion/webrtc/v3 v3.2.40 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index f808053c026..f8afd7b0625 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -94,6 +94,12 @@ var DefaultFlags = []cli.Flag{ &OverlayGetLogsFlag, &OverlayReplayBlockFlag, + &RpcSubscriptionFiltersMaxLogsFlag, + &RpcSubscriptionFiltersMaxHeadersFlag, + &RpcSubscriptionFiltersMaxTxsFlag, + &RpcSubscriptionFiltersMaxAddressesFlag, + &RpcSubscriptionFiltersMaxTopicsFlag, + &utils.SnapKeepBlocksFlag, &utils.SnapStopFlag, &utils.SnapStateStopFlag, diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index dc6fa9d00b3..3cb2986d28c 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/node/nodecfg" + "github.com/ledgerwatch/erigon/turbo/rpchelper" ) var ( @@ -256,6 +257,32 @@ var ( Value: rpccfg.DefaultOverlayReplayBlockTimeout, } + RpcSubscriptionFiltersMaxLogsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxlogs", + Usage: "Maximum number of logs to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, + } + RpcSubscriptionFiltersMaxHeadersFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxheaders", + Usage: "Maximum number of block headers to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, + } + RpcSubscriptionFiltersMaxTxsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxtxs", + Usage: "Maximum number of transactions to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, + } + RpcSubscriptionFiltersMaxAddressesFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxaddresses", + Usage: "Maximum number of addresses per subscription to filter logs by.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, + } + RpcSubscriptionFiltersMaxTopicsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxtopics", + Usage: "Maximum number of topics per subscription to filter logs by.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, + } + TxPoolCommitEvery = cli.DurationFlag{ Name: "txpool.commit.every", Usage: "How often transactions should be committed to the storage", @@ -522,14 +549,21 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name), DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name), RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name), - Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name), - Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name), - MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name), - TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name), - BatchLimit: ctx.Int(utils.RpcBatchLimit.Name), - ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name), - AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name), - MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name), + RpcFiltersConfig: rpchelper.FiltersConfig{ + RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name), + RpcSubscriptionFiltersMaxHeaders: ctx.Int(RpcSubscriptionFiltersMaxHeadersFlag.Name), + RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name), + RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name), + RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name), + }, + Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name), + Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name), + MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name), + TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name), + BatchLimit: ctx.Int(utils.RpcBatchLimit.Name), + ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name), + AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name), + MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name), OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name), diff --git a/turbo/jsonrpc/eth_block_test.go b/turbo/jsonrpc/eth_block_test.go index 52d1a805acc..2347e392cc6 100644 --- a/turbo/jsonrpc/eth_block_test.go +++ b/turbo/jsonrpc/eth_block_test.go @@ -74,7 +74,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) expected := 1 header := &types.Header{ diff --git a/turbo/jsonrpc/eth_call_test.go b/turbo/jsonrpc/eth_call_test.go index f3d1af3e996..1b8b9efac9e 100644 --- a/turbo/jsonrpc/eth_call_test.go +++ b/turbo/jsonrpc/eth_call_test.go @@ -43,7 +43,7 @@ func TestEstimateGas(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = libcommon.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") diff --git a/turbo/jsonrpc/eth_filters_test.go b/turbo/jsonrpc/eth_filters_test.go index aa1494baedf..51801ba337c 100644 --- a/turbo/jsonrpc/eth_filters_test.go +++ b/turbo/jsonrpc/eth_filters_test.go @@ -31,7 +31,7 @@ func TestNewFilters(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) ptf, err := api.NewPendingTransactionFilter(ctx) @@ -60,7 +60,7 @@ func TestLogsSubscribeAndUnsubscribe_WithoutConcurrentMapIssue(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) // generate some random topics topics := make([][]libcommon.Hash, 0) diff --git a/turbo/jsonrpc/eth_mining_test.go b/turbo/jsonrpc/eth_mining_test.go index 86c8de279c1..8aa131a3e47 100644 --- a/turbo/jsonrpc/eth_mining_test.go +++ b/turbo/jsonrpc/eth_mining_test.go @@ -24,7 +24,7 @@ func TestPendingBlock(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) engine := ethash.NewFaker() api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, nil, false, rpccfg.DefaultEvmCallTimeout, engine, @@ -51,7 +51,7 @@ func TestPendingLogs(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) expect := []byte{211} ch, id := ff.SubscribePendingLogs(1) diff --git a/turbo/jsonrpc/eth_subscribe_test.go b/turbo/jsonrpc/eth_subscribe_test.go index b593ab2762c..ab1d4df9817 100644 --- a/turbo/jsonrpc/eth_subscribe_test.go +++ b/turbo/jsonrpc/eth_subscribe_test.go @@ -49,7 +49,7 @@ func TestEthSubscribe(t *testing.T) { backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, m.BlockReader, logger, builder.NewLatestBlockBuiltStore()) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, m.BlockReader) - ff := rpchelper.New(ctx, backend, nil, nil, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, backend, nil, nil, func() {}, m.Log) newHeads, id := ff.SubscribeNewHeads(16) defer ff.UnsubscribeHeads(id) diff --git a/turbo/jsonrpc/send_transaction_test.go b/turbo/jsonrpc/send_transaction_test.go index 3eac26476c9..45c1a79f5e8 100644 --- a/turbo/jsonrpc/send_transaction_test.go +++ b/turbo/jsonrpc/send_transaction_test.go @@ -90,7 +90,7 @@ func TestSendRawTransaction(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mockSentry) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) api := jsonrpc.NewEthAPI(newBaseApiForTest(mockSentry), mockSentry.DB, nil, txPool, nil, 5000000, 1e18, 100_000, false, 100_000, 128, logger) buf := bytes.NewBuffer(nil) @@ -142,7 +142,7 @@ func TestSendRawTransactionUnprotected(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mockSentry) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) api := jsonrpc.NewEthAPI(newBaseApiForTest(mockSentry), mockSentry.DB, nil, txPool, nil, 5000000, 1e18, 100_000, false, 100_000, 128, logger) // Enable unproteced txs flag diff --git a/turbo/jsonrpc/txpool_api_test.go b/turbo/jsonrpc/txpool_api_test.go index 26c0f5d6336..1f31ea57d85 100644 --- a/turbo/jsonrpc/txpool_api_test.go +++ b/turbo/jsonrpc/txpool_api_test.go @@ -38,7 +38,7 @@ func TestTxPoolContent(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) agg := m.HistoryV3Components() api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, txPool) diff --git a/turbo/rpchelper/config.go b/turbo/rpchelper/config.go new file mode 100644 index 00000000000..21610b6a508 --- /dev/null +++ b/turbo/rpchelper/config.go @@ -0,0 +1,22 @@ +package rpchelper + +// FiltersConfig defines the configuration settings for RPC subscription filters. +// Each field represents a limit on the number of respective items that can be stored per subscription. +type FiltersConfig struct { + RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit) + RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit) +} + +// DefaultFiltersConfig defines the default settings for filter configurations. +// These default values set no limits on the number of logs, block headers, transactions, +// addresses, or topics that can be stored per subscription. +var DefaultFiltersConfig = FiltersConfig{ + RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription + RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription + RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription + RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by + RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by +} diff --git a/turbo/rpchelper/filters.go b/turbo/rpchelper/filters.go index cf8a80eaf0d..bfe7f80db4a 100644 --- a/turbo/rpchelper/filters.go +++ b/turbo/rpchelper/filters.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus" "io" "reflect" "sync" @@ -48,12 +49,14 @@ type Filters struct { pendingHeadsStores *concurrent.SyncMap[HeadsSubID, []*types.Header] pendingTxsStores *concurrent.SyncMap[PendingTxsSubID, [][]types.Transaction] logger log.Logger + + config FiltersConfig } // New creates a new Filters instance, initializes it, and starts subscription goroutines for Ethereum events. // It requires a context, Ethereum backend, transaction pool client, mining client, snapshot callback function, // and a logger for logging events. -func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { +func New(ctx context.Context, config FiltersConfig, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { logger.Info("rpc filters: subscribing to Erigon events") ff := &Filters{ @@ -67,15 +70,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, pendingHeadsStores: concurrent.NewSyncMap[HeadsSubID, []*types.Header](), pendingTxsStores: concurrent.NewSyncMap[PendingTxsSubID, [][]types.Transaction](), logger: logger, + config: config, } go func() { if ethBackend == nil { return } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec() return default: } @@ -83,6 +89,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if err := ethBackend.Subscribe(ctx, ff.OnNewEvent); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec() return default: } @@ -99,15 +106,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if ethBackend == nil { return } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Dec() return default: } if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs, &ff.logsRequestor); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Dec() return default: } @@ -122,15 +132,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if txPool != nil { go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Dec() return default: } if err := ff.subscribeToPendingTransactions(ctx, txPool); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Dec() return default: } @@ -145,15 +158,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if !reflect.ValueOf(mining).IsNil() { //https://groups.google.com/g/golang-nuts/c/wnH302gBa4I go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Dec() return default: } if err := ff.subscribeToPendingBlocks(ctx, mining); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Dec() return default: } @@ -166,15 +182,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, } }() go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Dec() return default: } if err := ff.subscribeToPendingLogs(ctx, mining); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Dec() return default: } @@ -414,26 +433,44 @@ func (ff *Filters) SubscribeLogs(size int, criteria filters.FilterCriteria) (<-c // If no addresses are specified, it means all addresses should be included f.allAddrs = 1 } else { + // Limit the number of addresses + addressCount := 0 for _, addr := range criteria.Addresses { - f.addrs.Put(addr, 1) + if ff.config.RpcSubscriptionFiltersMaxAddresses == 0 || addressCount < ff.config.RpcSubscriptionFiltersMaxAddresses { + f.addrs.Put(addr, 1) + addressCount++ + } else { + break + } } } - // Handle topics + // Handle topics and track the allowed topics if len(criteria.Topics) == 0 { // If no topics are specified, it means all topics should be included f.allTopics = 1 } else { + // Limit the number of topics + topicCount := 0 + allowedTopics := [][]libcommon.Hash{} for _, topics := range criteria.Topics { + allowedTopicsRow := []libcommon.Hash{} for _, topic := range topics { - f.topics.Put(topic, 1) + if ff.config.RpcSubscriptionFiltersMaxTopics == 0 || topicCount < ff.config.RpcSubscriptionFiltersMaxTopics { + f.topics.Put(topic, 1) + allowedTopicsRow = append(allowedTopicsRow, topic) + topicCount++ + } else { + break + } + } + if len(allowedTopicsRow) > 0 { + allowedTopics = append(allowedTopics, allowedTopicsRow) } } + f.topicsOriginal = allowedTopics } - // Store original topics for reference - f.topicsOriginal = criteria.Topics - // Add the filter to the list of log filters ff.logsSubs.addLogsFilters(f) @@ -602,12 +639,17 @@ func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) { } // AddLogs adds logs to the store associated with the given subscription ID. -func (ff *Filters) AddLogs(id LogsSubID, logs *types.Log) { +func (ff *Filters) AddLogs(id LogsSubID, log *types.Log) { ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log { if !ok { st = make([]*types.Log, 0) } - st = append(st, logs) + + maxLogs := ff.config.RpcSubscriptionFiltersMaxLogs + if maxLogs > 0 && len(st)+1 > maxLogs { + st = st[len(st)+1-maxLogs:] // Remove oldest logs to make space + } + st = append(st, log) return st }) } @@ -628,6 +670,11 @@ func (ff *Filters) AddPendingBlock(id HeadsSubID, block *types.Header) { if !ok { st = make([]*types.Header, 0) } + + maxHeaders := ff.config.RpcSubscriptionFiltersMaxHeaders + if maxHeaders > 0 && len(st) >= maxHeaders { + st = st[1:] // Remove the oldest header to make space + } st = append(st, block) return st }) @@ -649,6 +696,32 @@ func (ff *Filters) AddPendingTxs(id PendingTxsSubID, txs []types.Transaction) { if !ok { st = make([][]types.Transaction, 0) } + + // Calculate the total number of transactions in st + totalTxs := 0 + for _, txBatch := range st { + totalTxs += len(txBatch) + } + + maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs + // If adding the new transactions would exceed maxTxs, remove oldest transactions + if maxTxs > 0 && totalTxs+len(txs) > maxTxs { + // Flatten st to a single slice + flatSt := make([]types.Transaction, 0, totalTxs) + for _, txBatch := range st { + flatSt = append(flatSt, txBatch...) + } + + // Remove the oldest transactions to make space for new ones + if len(flatSt)+len(txs) > maxTxs { + flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:] + } + + // Convert flatSt back to [][]types.Transaction with a single batch + st = [][]types.Transaction{flatSt} + } + + // Append the new transactions as a new batch st = append(st, txs) return st }) diff --git a/turbo/rpchelper/filters_deadlock_test.go b/turbo/rpchelper/filters_deadlock_test.go index 6b143e27610..c76afbd5493 100644 --- a/turbo/rpchelper/filters_deadlock_test.go +++ b/turbo/rpchelper/filters_deadlock_test.go @@ -18,7 +18,8 @@ import ( func TestFiltersDeadlock_Test(t *testing.T) { t.Parallel() logger := log.New() - f := rpchelper.New(context.TODO(), nil, nil, nil, func() {}, logger) + config := rpchelper.FiltersConfig{} + f := rpchelper.New(context.TODO(), config, nil, nil, nil, func() {}, logger) crit := filters.FilterCriteria{ Addresses: nil, Topics: [][]libcommon.Hash{}, diff --git a/turbo/rpchelper/filters_test.go b/turbo/rpchelper/filters_test.go index b66a36e2901..edebc156334 100644 --- a/turbo/rpchelper/filters_test.go +++ b/turbo/rpchelper/filters_test.go @@ -2,6 +2,8 @@ package rpchelper import ( "context" + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/core/types" "testing" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -49,7 +51,7 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { v := <-subs _, ok := set[v] if ok { - t.Errorf("SubscriptionID Confict: %s", v) + t.Errorf("SubscriptionID Conflict: %s", v) return } set[v] = struct{}{} @@ -58,7 +60,8 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -90,7 +93,8 @@ func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing. func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) var nilTopic libcommon.Hash subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -123,7 +127,8 @@ func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAr func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -163,7 +168,8 @@ func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { func TestFilters_ThreeSubscriptionsWithDifferentCriteria(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -238,7 +244,8 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { return nil } - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) f.logsRequestor.Store(loadRequester) // first request has no filters @@ -347,3 +354,133 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { t.Error("6: expected topics to be empty") } } + +func TestFilters_AddLogs(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxLogs: 5} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + logID := LogsSubID("test-log") + logEntry := &types.Log{} + + // Add 10 logs to the store, but limit is 5 + for i := 0; i < 10; i++ { + f.AddLogs(logID, logEntry) + } + + logs, found := f.ReadLogs(logID) + if !found { + t.Error("expected to find logs in the store") + } + if len(logs) != 5 { + t.Errorf("expected 5 logs in the store, got %d", len(logs)) + } +} + +func TestFilters_AddLogs_Unlimited(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxLogs: 0} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + logID := LogsSubID("test-log") + logEntry := &types.Log{} + + // Add 10 logs to the store, limit is unlimited + for i := 0; i < 10; i++ { + f.AddLogs(logID, logEntry) + } + + logs, found := f.ReadLogs(logID) + if !found { + t.Error("expected to find logs in the store") + } + if len(logs) != 10 { + t.Errorf("expected 10 logs in the store, got %d", len(logs)) + } +} + +func TestFilters_AddPendingBlocks(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxHeaders: 3} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + headerID := HeadsSubID("test-header") + header := &types.Header{} + + // Add 5 headers to the store, but limit is 3 + for i := 0; i < 5; i++ { + f.AddPendingBlock(headerID, header) + } + + headers, found := f.ReadPendingBlocks(headerID) + if !found { + t.Error("expected to find headers in the store") + } + if len(headers) != 3 { + t.Errorf("expected 3 headers in the store, got %d", len(headers)) + } +} + +func TestFilters_AddPendingBlocks_Unlimited(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxHeaders: 0} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + headerID := HeadsSubID("test-header") + header := &types.Header{} + + // Add 5 headers to the store, limit is unlimited + for i := 0; i < 5; i++ { + f.AddPendingBlock(headerID, header) + } + + headers, found := f.ReadPendingBlocks(headerID) + if !found { + t.Error("expected to find headers in the store") + } + if len(headers) != 5 { + t.Errorf("expected 5 headers in the store, got %d", len(headers)) + } +} + +func TestFilters_AddPendingTxs(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxTxs: 4} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + txID := PendingTxsSubID("test-tx") + var tx types.Transaction = types.NewTransaction(0, libcommon.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), uint256.NewInt(10), 50000, uint256.NewInt(10), nil) + tx, _ = tx.WithSignature(*types.LatestSignerForChainID(nil), libcommon.Hex2Bytes("9bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094f8a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b100")) + + // Add 6 txs to the store, but limit is 4 + for i := 0; i < 6; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + txs, found := f.ReadPendingTxs(txID) + if !found { + t.Error("expected to find txs in the store") + } + totalTxs := 0 + for _, batch := range txs { + totalTxs += len(batch) + } + if totalTxs != 4 { + t.Errorf("expected 4 txs in the store, got %d", totalTxs) + } +} + +func TestFilters_AddPendingTxs_Unlimited(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxTxs: 0} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + txID := PendingTxsSubID("test-tx") + var tx types.Transaction = types.NewTransaction(0, libcommon.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), uint256.NewInt(10), 50000, uint256.NewInt(10), nil) + tx, _ = tx.WithSignature(*types.LatestSignerForChainID(nil), libcommon.Hex2Bytes("9bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094f8a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b100")) + + // Add 6 txs to the store, limit is unlimited + for i := 0; i < 6; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + txs, found := f.ReadPendingTxs(txID) + if !found { + t.Error("expected to find txs in the store") + } + totalTxs := 0 + for _, batch := range txs { + totalTxs += len(batch) + } + if totalTxs != 6 { + t.Errorf("expected 6 txs in the store, got %d", totalTxs) + } +} diff --git a/turbo/rpchelper/logsfilter.go b/turbo/rpchelper/logsfilter.go index 07321609627..b4130ca3de4 100644 --- a/turbo/rpchelper/logsfilter.go +++ b/turbo/rpchelper/logsfilter.go @@ -105,9 +105,15 @@ func (a *LogsFilterAggregator) createFilterRequest() *remote.LogsFilterRequest { // provided LogsFilter. If the count for any address or topic reaches zero, it is removed from the aggregated filter. func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) { a.aggLogsFilter.allAddrs -= f.allAddrs + if f.allAddrs > 0 { + // Decrement the count for AllAddresses + activeSubscriptionsLogsAllAddressesGauge.Dec() + } f.addrs.Range(func(addr libcommon.Address, count int) error { a.aggLogsFilter.addrs.Do(addr, func(value int, exists bool) (int, bool) { if exists { + // Decrement the count for subscribed address + activeSubscriptionsLogsAddressesGauge.Dec() newValue := value - count if newValue <= 0 { return 0, false @@ -119,9 +125,15 @@ func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) { return nil }) a.aggLogsFilter.allTopics -= f.allTopics + if f.allTopics > 0 { + // Decrement the count for AllTopics + activeSubscriptionsLogsAllTopicsGauge.Dec() + } f.topics.Range(func(topic libcommon.Hash, count int) error { a.aggLogsFilter.topics.Do(topic, func(value int, exists bool) (int, bool) { if exists { + // Decrement the count for subscribed topic + activeSubscriptionsLogsTopicsGauge.Dec() newValue := value - count if newValue <= 0 { return 0, false @@ -141,14 +153,26 @@ func (a *LogsFilterAggregator) addLogsFilters(f *LogsFilter) { a.logsFilterLock.Lock() defer a.logsFilterLock.Unlock() a.aggLogsFilter.allAddrs += f.allAddrs + if f.allAddrs > 0 { + // Increment the count for AllAddresses + activeSubscriptionsLogsAllAddressesGauge.Inc() + } f.addrs.Range(func(addr libcommon.Address, count int) error { + // Increment the count for subscribed address + activeSubscriptionsLogsAddressesGauge.Inc() a.aggLogsFilter.addrs.DoAndStore(addr, func(value int, exists bool) int { return value + count }) return nil }) a.aggLogsFilter.allTopics += f.allTopics + if f.allTopics > 0 { + // Increment the count for AllTopics + activeSubscriptionsLogsAllTopicsGauge.Inc() + } f.topics.Range(func(topic libcommon.Hash, count int) error { + // Increment the count for subscribed topic + activeSubscriptionsLogsTopicsGauge.Inc() a.aggLogsFilter.topics.DoAndStore(topic, func(value int, exists bool) int { return value + count }) @@ -179,6 +203,10 @@ func (a *LogsFilterAggregator) getAggMaps() (map[libcommon.Address]int, map[libc func (a *LogsFilterAggregator) distributeLog(eventLog *remote.SubscribeLogsReply) error { a.logsFilterLock.RLock() defer a.logsFilterLock.RUnlock() + + var lg types2.Log + var topics []libcommon.Hash + a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error { if filter.allAddrs == 0 { _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address)) @@ -186,27 +214,34 @@ func (a *LogsFilterAggregator) distributeLog(eventLog *remote.SubscribeLogsReply return nil } } - var topics []libcommon.Hash + + // Pre-allocate topics slice to the required size to avoid multiple allocations + topics = topics[:0] + if cap(topics) < len(eventLog.Topics) { + topics = make([]libcommon.Hash, 0, len(eventLog.Topics)) + } for _, topic := range eventLog.Topics { topics = append(topics, gointerfaces.ConvertH256ToHash(topic)) } + if filter.allTopics == 0 { if !a.chooseTopics(filter, topics) { return nil } } - lg := &types2.Log{ - Address: gointerfaces.ConvertH160toAddress(eventLog.Address), - Topics: topics, - Data: eventLog.Data, - BlockNumber: eventLog.BlockNumber, - TxHash: gointerfaces.ConvertH256ToHash(eventLog.TransactionHash), - TxIndex: uint(eventLog.TransactionIndex), - BlockHash: gointerfaces.ConvertH256ToHash(eventLog.BlockHash), - Index: uint(eventLog.LogIndex), - Removed: eventLog.Removed, - } - filter.sender.Send(lg) + + // Reuse lg object to avoid creating new instances + lg.Address = gointerfaces.ConvertH160toAddress(eventLog.Address) + lg.Topics = topics + lg.Data = eventLog.Data + lg.BlockNumber = eventLog.BlockNumber + lg.TxHash = gointerfaces.ConvertH256ToHash(eventLog.TransactionHash) + lg.TxIndex = uint(eventLog.TransactionIndex) + lg.BlockHash = gointerfaces.ConvertH256ToHash(eventLog.BlockHash) + lg.Index = uint(eventLog.LogIndex) + lg.Removed = eventLog.Removed + + filter.sender.Send(&lg) return nil }) return nil diff --git a/turbo/rpchelper/metrics.go b/turbo/rpchelper/metrics.go new file mode 100644 index 00000000000..41e963d3285 --- /dev/null +++ b/turbo/rpchelper/metrics.go @@ -0,0 +1,19 @@ +package rpchelper + +import ( + "github.com/ledgerwatch/erigon-lib/metrics" +) + +const ( + filterLabelName = "filter" + clientLabelName = "client" +) + +var ( + activeSubscriptionsGauge = metrics.GetOrCreateGaugeVec("subscriptions", []string{filterLabelName}, "Current number of subscriptions") + activeSubscriptionsLogsAllAddressesGauge = metrics.GetOrCreateGauge("subscriptions_logs_all_addresses") + activeSubscriptionsLogsAllTopicsGauge = metrics.GetOrCreateGauge("subscriptions_logs_all_topics") + activeSubscriptionsLogsAddressesGauge = metrics.GetOrCreateGauge("subscriptions_logs_addresses") + activeSubscriptionsLogsTopicsGauge = metrics.GetOrCreateGauge("subscriptions_logs_topics") + activeSubscriptionsLogsClientGauge = metrics.GetOrCreateGaugeVec("subscriptions_logs_client", []string{clientLabelName}, "Current number of subscriptions by client") +)