Skip to content

Commit

Permalink
Added RPC filter limits, metrics, and better memory management. (#10718)
Browse files Browse the repository at this point in the history
- Introduced `GetOrCreateGaugeVec` function to manage Prometheus
`GaugeVec` metrics.
- Implemented various subscription filter limits (max logs, max headers,
max transactions, max addresses, and max topics).
- Updated various test files to utilize the new `FiltersConfig` and
added comprehensive tests for the new filter limits.
- Added new flags for subscription filter limits.
- Improved log filtering and event handling with better memory
management and metric tracking.
- Added configuration to handle RPC subscription filter limits.
- Minor typo fixes.

-------
RPC nodes would frequently OOM. Based on metrics and pprof, I observed
high memory utilization in the rpchelper:
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total
Dropped 1081 nodes (cum <= 365.42MB)
Showing top 10 nodes out of 43
      flat  flat%   sum%        cum   cum%
65925.80MB 90.21% 90.21% 65925.80MB 90.21%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
 1944.80MB  2.66% 92.87%  1944.80MB  2.66%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
 1055.97MB  1.44% 94.31%  1055.97MB  1.44%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
  362.04MB   0.5% 94.81%   572.20MB  0.78%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
  257.73MB  0.35% 95.16%   587.85MB   0.8%  encoding/json.Marshal
   57.85MB 0.079% 95.24%  1658.66MB  2.27%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
   51.48MB  0.07% 95.31%  2014.50MB  2.76%  github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg
   23.01MB 0.031% 95.34%  1127.21MB  1.54%  github.com/ledgerwatch/erigon/core/vm.opCall
   11.50MB 0.016% 95.36%  1677.68MB  2.30%  github.com/ledgerwatch/erigon/core/vm.(*EVM).call
    6.50MB 0.0089% 95.37%   685.70MB  0.94%  github.com/ledgerwatch/erigon/turbo/transactions.DoCall
(pprof) list distributeLog.func1
Total: 71.37GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go
   64.38GB    64.38GB (flat, cum) 90.21% of Total
         .          .    206:   a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error {
         .          .    207:           if filter.allAddrs == 0 {
         .          .    208:                   _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address))
         .          .    209:                   if !addrOk {
         .          .    210:                           return nil
         .          .    211:                   }
         .          .    212:           }
         .          .    213:           var topics []libcommon.Hash
         .          .    214:           for _, topic := range eventLog.Topics {
   27.01GB    27.01GB    215:                   topics = append(topics, gointerfaces.ConvertH256ToHash(topic))
         .          .    216:           }
         .          .    217:           if filter.allTopics == 0 {
         .          .    218:                   if !a.chooseTopics(filter, topics) {
         .          .    219:                           return nil
         .          .    220:                   }
         .          .    221:           }
   37.37GB    37.37GB    222:           lg := &types2.Log{
         .          .    223:                   Address:     gointerfaces.ConvertH160toAddress(eventLog.Address),
         .          .    224:                   Topics:      topics,
         .          .    225:                   Data:        eventLog.Data,
         .          .    226:                   BlockNumber: eventLog.BlockNumber,
         .          .    227:                   TxHash:      gointerfaces.ConvertH256ToHash(eventLog.TransactionHash),
(pprof) %
```

- Added Metrics
    Implemented metrics to gain visibility into memory utilization.
- Optimized Allocations
Optimized object and slice allocations, resulting in significant
improvements in memory usage.
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 25.92GB, 94.14% of 27.53GB total
Dropped 785 nodes (cum <= 0.14GB)
Showing top 10 nodes out of 70
      flat  flat%   sum%        cum   cum%
   15.37GB 55.82% 55.82%    15.37GB 55.82%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
    8.15GB 29.60% 85.42%     8.15GB 29.60%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
    0.64GB  2.33% 87.75%        1GB  3.65%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog
    0.50GB  1.82% 89.57%     0.65GB  2.35%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
    0.36GB  1.32% 90.89%     0.36GB  1.32%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
    0.31GB  1.12% 92.01%     0.31GB  1.12%  bytes.growSlice
    0.30GB  1.08% 93.09%     0.30GB  1.08%  github.com/ledgerwatch/erigon/core/vm.(*Memory).Resize (inline)
    0.15GB  0.56% 93.65%     0.35GB  1.27%  github.com/ledgerwatch/erigon/core/state.(*IntraBlockState).AddSlotToAccessList
    0.07GB  0.24% 93.90%     0.23GB  0.84%  github.com/ledgerwatch/erigon/core/types.codecSelfer2.decLogs
    0.07GB  0.24% 94.14%     1.58GB  5.75%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
(pprof) list AddLogs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
   15.37GB    15.37GB (flat, cum) 55.82% of Total
         .          .    644:   ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log {
         .          .    645:           if !ok {
         .          .    646:                   st = make([]*types.Log, 0)
         .          .    647:           }
   15.37GB    15.37GB    648:           st = append(st, logs)
         .          .    649:           return st
         .          .    650:   })
         .          .    651:}
         .          .    652:
         .          .    653:// ReadLogs reads logs from the store associated with the given subscription ID.
(pprof) list AddPendingTxs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
    8.15GB     8.15GB (flat, cum) 29.60% of Total
         .          .    686:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    687:           if !ok {
         .          .    688:                   st = make([][]types.Transaction, 0)
         .          .    689:           }
    8.15GB     8.15GB    690:           st = append(st, txs)
         .          .    691:           return st
         .          .    692:   })
         .          .    693:}
         .          .    694:
         .          .    695:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
(pprof) %
```
- Identified unbound slices that could grow indefinitely, leading to
memory leaks. This issue occurs if subscribers do not request updates
using their subscription ID, especially when behind a load balancer that
does not pin clients to RPC nodes.

1. Architecture Design Changes
- Implement architectural changes to pin clients to RPC Nodes ensuring
subscribers that request updates using their subscription IDs will hit
the same node, which does clean up the objecs on each request.
- Reason for not choosing: This still relies on clients with
subscriptions to request updates and if they never do and do not call
the unsubscribe function it's an unbound memory leak. The RPC node
operator has no control over the behavior of the clients.
2. Implementing Timeouts
- Introduce timeouts for subscriptions to automatically clean up
unresponsive or inactive subscriptions.
- Reason for not choosing: Implementing timeouts would inadvertently
stop websocket subscriptions due to the current design of the code,
leading to a potential loss of data for active users. This solution
would be good but requires a lot more work.
3. Configurable Limits (Chosen Solution)
- Set configurable limits for various subscription parameters (e.g.,
logs, headers, transactions, addresses, topics) to manage memory
utilization effectively.
- Reason for choosing: This approach provides flexibility to RPC node
operators to configure limits as per their requirements. The default
behavior remains unchanged, making it a non-breaking change.
Additionally, it ensures current data is always available by pruning the
oldest data first.

```bash
	--rpc.subscription.filters.maxlogs=60
	--rpc.subscription.filters.maxheaders=60
	--rpc.subscription.filters.maxtxs=10_000
	--rpc.subscription.filters.maxaddresses=1_000
	--rpc.subscription.filters.maxtopics=1_000
```
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total
Dropped 403 nodes (cum <= 6.42MB)
Showing top 10 nodes out of 124
      flat  flat%   sum%        cum   cum%
  923.64MB 71.92% 71.92%   923.64MB 71.92%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
   54.59MB  4.25% 76.18%    54.59MB  4.25%  github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes
   38.21MB  2.98% 79.15%    38.21MB  2.98%  bytes.growSlice
   29.34MB  2.28% 81.44%    29.34MB  2.28%  github.com/ledgerwatch/erigon/p2p/rlpx.growslice
   26.44MB  2.06% 83.49%    34.16MB  2.66%  compress/flate.NewWriter
   18.66MB  1.45% 84.95%    21.16MB  1.65%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
   17.50MB  1.36% 86.31%    64.58MB  5.03%  github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP
      15MB  1.17% 87.48%    82.08MB  6.39%  github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary
   13.58MB  1.06% 88.54%    13.58MB  1.06%  github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink
    9.55MB  0.74% 89.28%     9.55MB  0.74%  github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...]
(pprof) list AddPendingTxs.func1
Total: 1.25GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
  923.64MB   923.64MB (flat, cum) 71.92% of Total
         .          .    698:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    699:           if !ok {
         .          .    700:                   st = make([][]types.Transaction, 0)
         .          .    701:           }
         .          .    702:
         .          .    703:           // Calculate the total number of transactions in st
         .          .    704:           totalTxs := 0
         .          .    705:           for _, txBatch := range st {
         .          .    706:                   totalTxs += len(txBatch)
         .          .    707:           }
         .          .    708:
         .          .    709:           maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs
         .          .    710:           // If adding the new transactions would exceed maxTxs, remove oldest transactions
         .          .    711:           if maxTxs > 0 && totalTxs+len(txs) > maxTxs {
         .          .    712:                   // Flatten st to a single slice
  918.69MB   918.69MB    713:                   flatSt := make([]types.Transaction, 0, totalTxs)
         .          .    714:                   for _, txBatch := range st {
         .          .    715:                           flatSt = append(flatSt, txBatch...)
         .          .    716:                   }
         .          .    717:
         .          .    718:                   // Remove the oldest transactions to make space for new ones
         .          .    719:                   if len(flatSt)+len(txs) > maxTxs {
         .          .    720:                           flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:]
         .          .    721:                   }
         .          .    722:
         .          .    723:                   // Convert flatSt back to [][]types.Transaction with a single batch
         .          .    724:                   st = [][]types.Transaction{flatSt}
         .          .    725:           }
         .          .    726:
         .          .    727:           // Append the new transactions as a new batch
    5.95MB     4.95MB    728:           st = append(st, txs)
         .          .    729:           return st
         .          .    730:   })
         .          .    731:}
         .          .    732:
         .          .    733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
```

With these changes, the memory utilization has significantly improved,
and the system is now more stable and predictable.

-----
<img width="1567" alt="image"
src="https://github.com/ledgerwatch/erigon/assets/787344/264c9757-93cf-4be8-9883-5ca3187acd73">

Blue line indicates a deployment
Sharp drop of green line is an OOM
  • Loading branch information
bretep authored and taratorio committed Sep 5, 2024
1 parent 1089150 commit c66cff0
Show file tree
Hide file tree
Showing 22 changed files with 524 additions and 57 deletions.
10 changes: 8 additions & 2 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.")
rootCmd.PersistentFlags().DurationVar(&cfg.OverlayGetLogsTimeout, "rpc.overlay.getlogstimeout", rpccfg.DefaultOverlayGetLogsTimeout, "Maximum amount of time to wait for the answer from the overlay_getLogs call.")
rootCmd.PersistentFlags().DurationVar(&cfg.OverlayReplayBlockTimeout, "rpc.overlay.replayblocktimeout", rpccfg.DefaultOverlayReplayBlockTimeout, "Maximum amount of time to wait for the answer to replay a single block when called from an overlay_getLogs call.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxLogs, "rpc.subscription.filters.maxlogs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, "Maximum number of logs to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "rpc.subscription.filters.maxheaders", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "Maximum number of block headers to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.")
rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage)
rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage)
rootCmd.PersistentFlags().BoolVar(&cfg.AllowUnprotectedTxs, utils.AllowUnprotectedTxs.Name, utils.AllowUnprotectedTxs.Value, utils.AllowUnprotectedTxs.Usage)
Expand Down Expand Up @@ -269,6 +274,7 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {

func EmbeddedServices(ctx context.Context,
erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig,
rpcFiltersConfig rpchelper.FiltersConfig,
blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer,
miningServer txpool.MiningServer, stateDiffClient StateChangesClient,
logger log.Logger,
Expand All @@ -291,7 +297,7 @@ func EmbeddedServices(ctx context.Context,

txPool = direct.NewTxPoolClient(txPoolServer)
mining = direct.NewMiningClient(miningServer)
ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger)
ff = rpchelper.New(ctx, rpcFiltersConfig, eth, txPool, mining, func() {}, logger)

return
}
Expand Down Expand Up @@ -541,7 +547,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
}()

ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger)
ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpcfg

import (
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"time"

"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand Down Expand Up @@ -51,6 +52,7 @@ type HttpCfg struct {
RpcAllowListFilePath string
RpcBatchConcurrency uint
RpcStreamingDisable bool
RpcFiltersConfig rpchelper.FiltersConfig
DBReadConcurrency int
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
TxPoolApiAddr string
Expand Down
21 changes: 21 additions & 0 deletions erigon-lib/metrics/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
)

// NewCounter registers and returns new counter with the given name.
Expand Down Expand Up @@ -88,6 +89,26 @@ func GetOrCreateGauge(name string) Gauge {
return &gauge{g}
}

// GetOrCreateGaugeVec returns registered GaugeVec with the given name
// or creates a new GaugeVec if the registry doesn't contain a GaugeVec with
// the given name and labels.
//
// name must be a valid Prometheus-compatible metric with possible labels.
// labels are the names of the dimensions associated with the gauge vector.
// For instance,
//
// - foo, with labels []string{"bar", "baz"}
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func GetOrCreateGaugeVec(name string, labels []string, help ...string) *prometheus.GaugeVec {
gv, err := defaultSet.GetOrCreateGaugeVec(name, labels, help...)
if err != nil {
panic(fmt.Errorf("could not get or create new gaugevec: %w", err))
}

return gv
}

// NewSummary creates and returns new summary with the given name.
//
// name must be valid Prometheus-compatible metric with possible labels.
Expand Down
119 changes: 115 additions & 4 deletions erigon-lib/metrics/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
Expand All @@ -16,15 +17,23 @@ type namedMetric struct {
isAux bool
}

type namedMetricVec struct {
name string
metric *prometheus.GaugeVec
isAux bool
}

// Set is a set of metrics.
//
// Metrics belonging to a set are exported separately from global metrics.
//
// Set.WritePrometheus must be called for exporting metrics from the set.
type Set struct {
mu sync.Mutex
a []*namedMetric
m map[string]*namedMetric
mu sync.Mutex
a []*namedMetric
av []*namedMetricVec
m map[string]*namedMetric
vecs map[string]*namedMetricVec
}

var defaultSet = NewSet()
Expand All @@ -34,7 +43,8 @@ var defaultSet = NewSet()
// Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call.
func NewSet() *Set {
return &Set{
m: make(map[string]*namedMetric),
m: make(map[string]*namedMetric),
vecs: make(map[string]*namedMetricVec),
}
}

Expand All @@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
sort.Slice(s.av, lessFunc)
}
sa := append([]*namedMetric(nil), s.a...)
sav := append([]*namedMetricVec(nil), s.av...)
s.mu.Unlock()
for _, nm := range sa {
ch <- nm.metric.Desc()
}
for _, nmv := range sav {
nmv.metric.Describe(ch)
}
}

func (s *Set) Collect(ch chan<- prometheus.Metric) {
Expand All @@ -61,11 +78,18 @@ func (s *Set) Collect(ch chan<- prometheus.Metric) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
sort.Slice(s.av, lessFunc)
}
sa := append([]*namedMetric(nil), s.a...)
sav := append([]*namedMetricVec(nil), s.av...)
s.mu.Unlock()
for _, nm := range sa {
ch <- nm.metric
}
for _, nmv := range sav {
nmv.metric.Collect(ch)
}
}

// NewHistogram creates and returns new histogram in s with the given name.
Expand Down Expand Up @@ -307,6 +331,78 @@ func (s *Set) GetOrCreateGauge(name string, help ...string) (prometheus.Gauge, e
return g, nil
}

// GetOrCreateGaugeVec returns registered GaugeVec in s with the given name
// or creates new GaugeVec if s doesn't contain GaugeVec with the given name.
//
// name must be valid Prometheus-compatible metric with possible labels.
// For instance,
//
// - foo
// - foo{bar="baz"}
// - foo{bar="baz",aaa="b"}
//
// labels are the labels associated with the GaugeVec.
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
s.mu.Lock()
nm := s.vecs[name]
s.mu.Unlock()
if nm == nil {
metric, err := newGaugeVec(name, labels, help...)
if err != nil {
return nil, fmt.Errorf("invalid metric name %q: %w", name, err)
}

nmNew := &namedMetricVec{
name: name,
metric: metric,
}

s.mu.Lock()
nm = s.vecs[name]
if nm == nil {
nm = nmNew
s.vecs[name] = nm
s.av = append(s.av, nm)
}
s.mu.Unlock()
s.registerMetricVec(name, metric, false)
}

if nm.metric == nil {
return nil, fmt.Errorf("metric %q is nil", name)
}

metricType := reflect.TypeOf(nm.metric)
if metricType != reflect.TypeOf(&prometheus.GaugeVec{}) {
return nil, fmt.Errorf("metric %q isn't a GaugeVec. It is %s", name, metricType)
}

return nm.metric, nil
}

// newGaugeVec creates a new Prometheus GaugeVec.
func newGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
name, constLabels, err := parseMetric(name)
if err != nil {
return nil, err
}

helpStr := "gauge metric"
if len(help) > 0 {
helpStr = strings.Join(help, ", ")
}

gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: helpStr,
ConstLabels: constLabels,
}, labels)

return gv, nil
}

const defaultSummaryWindow = 5 * time.Minute

var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001}
Expand Down Expand Up @@ -431,6 +527,21 @@ func (s *Set) registerMetric(name string, m prometheus.Metric) {
s.mustRegisterLocked(name, m)
}

func (s *Set) registerMetricVec(name string, mv *prometheus.GaugeVec, isAux bool) {
s.mu.Lock()
defer s.mu.Unlock()

if _, exists := s.vecs[name]; !exists {
nmv := &namedMetricVec{
name: name,
metric: mv,
isAux: isAux,
}
s.vecs[name] = nmv
s.av = append(s.av, nmv)
}
}

// mustRegisterLocked registers given metric with the given name.
//
// Panics if the given name was already registered before.
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
}
// start HTTP API
httpRpcCfg := stack.Config().Http
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC,
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, httpRpcCfg.RpcFiltersConfig, blockReader, ethBackendRPC,
s.txPoolGrpcServer, miningRPC, stateDiffClient, s.logger)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ require (
github.com/pion/webrtc/v3 v3.1.42 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
6 changes: 6 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ var DefaultFlags = []cli.Flag{
&OverlayGetLogsFlag,
&OverlayReplayBlockFlag,

&RpcSubscriptionFiltersMaxLogsFlag,
&RpcSubscriptionFiltersMaxHeadersFlag,
&RpcSubscriptionFiltersMaxTxsFlag,
&RpcSubscriptionFiltersMaxAddressesFlag,
&RpcSubscriptionFiltersMaxTopicsFlag,

&utils.SnapKeepBlocksFlag,
&utils.SnapStopFlag,
&utils.DbPageSizeFlag,
Expand Down
50 changes: 42 additions & 8 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
)

var (
Expand Down Expand Up @@ -246,6 +247,32 @@ var (
Value: rpccfg.DefaultOverlayReplayBlockTimeout,
}

RpcSubscriptionFiltersMaxLogsFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxlogs",
Usage: "Maximum number of logs to store per subscription.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs,
}
RpcSubscriptionFiltersMaxHeadersFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxheaders",
Usage: "Maximum number of block headers to store per subscription.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders,
}
RpcSubscriptionFiltersMaxTxsFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxtxs",
Usage: "Maximum number of transactions to store per subscription.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs,
}
RpcSubscriptionFiltersMaxAddressesFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxaddresses",
Usage: "Maximum number of addresses per subscription to filter logs by.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses,
}
RpcSubscriptionFiltersMaxTopicsFlag = cli.IntFlag{
Name: "rpc.subscription.filters.maxtopics",
Usage: "Maximum number of topics per subscription to filter logs by.",
Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics,
}

TxPoolCommitEvery = cli.DurationFlag{
Name: "txpool.commit.every",
Usage: "How often transactions should be committed to the storage",
Expand Down Expand Up @@ -487,14 +514,21 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
RpcFiltersConfig: rpchelper.FiltersConfig{
RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name),
RpcSubscriptionFiltersMaxHeaders: ctx.Int(RpcSubscriptionFiltersMaxHeadersFlag.Name),
RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name),
RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name),
RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name),
},
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name),
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),

OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),

Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/eth_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) {

ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m)
txPool := txpool.NewTxpoolClient(conn)
ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log)
ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log)

expected := 1
header := &types.Header{
Expand Down
Loading

0 comments on commit c66cff0

Please sign in to comment.