diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 977df9435ef..ad460580281 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -138,6 +138,11 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) { rootCmd.PersistentFlags().DurationVar(&cfg.EvmCallTimeout, "rpc.evmtimeout", rpccfg.DefaultEvmCallTimeout, "Maximum amount of time to wait for the answer from EVM call.") rootCmd.PersistentFlags().DurationVar(&cfg.OverlayGetLogsTimeout, "rpc.overlay.getlogstimeout", rpccfg.DefaultOverlayGetLogsTimeout, "Maximum amount of time to wait for the answer from the overlay_getLogs call.") rootCmd.PersistentFlags().DurationVar(&cfg.OverlayReplayBlockTimeout, "rpc.overlay.replayblocktimeout", rpccfg.DefaultOverlayReplayBlockTimeout, "Maximum amount of time to wait for the answer to replay a single block when called from an overlay_getLogs call.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxLogs, "rpc.subscription.filters.maxlogs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, "Maximum number of logs to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "rpc.subscription.filters.maxheaders", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, "Maximum number of block headers to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTxs, "rpc.subscription.filters.maxtxs", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, "Maximum number of transactions to store per subscription.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "rpc.subscription.filters.maxaddresses", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, "Maximum number of addresses per subscription to filter logs by.") + rootCmd.PersistentFlags().IntVar(&cfg.RpcFiltersConfig.RpcSubscriptionFiltersMaxTopics, "rpc.subscription.filters.maxtopics", rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, "Maximum number of topics per subscription to filter logs by.") rootCmd.PersistentFlags().IntVar(&cfg.BatchLimit, utils.RpcBatchLimit.Name, utils.RpcBatchLimit.Value, utils.RpcBatchLimit.Usage) rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage) rootCmd.PersistentFlags().BoolVar(&cfg.AllowUnprotectedTxs, utils.AllowUnprotectedTxs.Name, utils.AllowUnprotectedTxs.Value, utils.AllowUnprotectedTxs.Usage) @@ -267,6 +272,7 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error { func EmbeddedServices(ctx context.Context, erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig, + rpcFiltersConfig rpchelper.FiltersConfig, blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer, miningServer txpool.MiningServer, stateDiffClient StateChangesClient, logger log.Logger, @@ -289,7 +295,7 @@ func EmbeddedServices(ctx context.Context, txPool = direct.NewTxPoolClient(txPoolServer) mining = direct.NewMiningClient(miningServer) - ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger) + ff = rpchelper.New(ctx, rpcFiltersConfig, eth, txPool, mining, func() {}, logger) return } @@ -533,7 +539,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } }() - ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger) + ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger) return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err } diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index 3673eaac5b0..d66522bbe0b 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -1,6 +1,7 @@ package httpcfg import ( + "github.com/ledgerwatch/erigon/turbo/rpchelper" "time" "github.com/ledgerwatch/erigon-lib/common/datadir" @@ -51,6 +52,7 @@ type HttpCfg struct { RpcAllowListFilePath string RpcBatchConcurrency uint RpcStreamingDisable bool + RpcFiltersConfig rpchelper.FiltersConfig DBReadConcurrency int TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum TxPoolApiAddr string diff --git a/erigon-lib/metrics/register.go b/erigon-lib/metrics/register.go index 4a2e68f55e4..40112af3a1b 100644 --- a/erigon-lib/metrics/register.go +++ b/erigon-lib/metrics/register.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "github.com/prometheus/client_golang/prometheus" ) // NewCounter registers and returns new counter with the given name. @@ -88,6 +89,26 @@ func GetOrCreateGauge(name string) Gauge { return &gauge{g} } +// GetOrCreateGaugeVec returns registered GaugeVec with the given name +// or creates a new GaugeVec if the registry doesn't contain a GaugeVec with +// the given name and labels. +// +// name must be a valid Prometheus-compatible metric with possible labels. +// labels are the names of the dimensions associated with the gauge vector. +// For instance, +// +// - foo, with labels []string{"bar", "baz"} +// +// The returned GaugeVec is safe to use from concurrent goroutines. +func GetOrCreateGaugeVec(name string, labels []string, help ...string) *prometheus.GaugeVec { + gv, err := defaultSet.GetOrCreateGaugeVec(name, labels, help...) + if err != nil { + panic(fmt.Errorf("could not get or create new gaugevec: %w", err)) + } + + return gv +} + // NewSummary creates and returns new summary with the given name. // // name must be valid Prometheus-compatible metric with possible labels. diff --git a/erigon-lib/metrics/set.go b/erigon-lib/metrics/set.go index ad4b164c239..b362994dd68 100644 --- a/erigon-lib/metrics/set.go +++ b/erigon-lib/metrics/set.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "reflect" "sort" "strings" "sync" @@ -16,15 +17,23 @@ type namedMetric struct { isAux bool } +type namedMetricVec struct { + name string + metric *prometheus.GaugeVec + isAux bool +} + // Set is a set of metrics. // // Metrics belonging to a set are exported separately from global metrics. // // Set.WritePrometheus must be called for exporting metrics from the set. type Set struct { - mu sync.Mutex - a []*namedMetric - m map[string]*namedMetric + mu sync.Mutex + a []*namedMetric + av []*namedMetricVec + m map[string]*namedMetric + vecs map[string]*namedMetricVec } var defaultSet = NewSet() @@ -34,7 +43,8 @@ var defaultSet = NewSet() // Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call. func NewSet() *Set { return &Set{ - m: make(map[string]*namedMetric), + m: make(map[string]*namedMetric), + vecs: make(map[string]*namedMetricVec), } } @@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) { if !sort.SliceIsSorted(s.a, lessFunc) { sort.Slice(s.a, lessFunc) } + if !sort.SliceIsSorted(s.av, lessFunc) { + sort.Slice(s.av, lessFunc) + } sa := append([]*namedMetric(nil), s.a...) + sav := append([]*namedMetricVec(nil), s.av...) s.mu.Unlock() for _, nm := range sa { ch <- nm.metric.Desc() } + for _, nmv := range sav { + nmv.metric.Describe(ch) + } } func (s *Set) Collect(ch chan<- prometheus.Metric) { @@ -61,11 +78,18 @@ func (s *Set) Collect(ch chan<- prometheus.Metric) { if !sort.SliceIsSorted(s.a, lessFunc) { sort.Slice(s.a, lessFunc) } + if !sort.SliceIsSorted(s.av, lessFunc) { + sort.Slice(s.av, lessFunc) + } sa := append([]*namedMetric(nil), s.a...) + sav := append([]*namedMetricVec(nil), s.av...) s.mu.Unlock() for _, nm := range sa { ch <- nm.metric } + for _, nmv := range sav { + nmv.metric.Collect(ch) + } } // NewHistogram creates and returns new histogram in s with the given name. @@ -308,6 +332,78 @@ func (s *Set) GetOrCreateGauge(name string, help ...string) (prometheus.Gauge, e return g, nil } +// GetOrCreateGaugeVec returns registered GaugeVec in s with the given name +// or creates new GaugeVec if s doesn't contain GaugeVec with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// - foo +// - foo{bar="baz"} +// - foo{bar="baz",aaa="b"} +// +// labels are the labels associated with the GaugeVec. +// +// The returned GaugeVec is safe to use from concurrent goroutines. +func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) { + s.mu.Lock() + nm := s.vecs[name] + s.mu.Unlock() + if nm == nil { + metric, err := newGaugeVec(name, labels, help...) + if err != nil { + return nil, fmt.Errorf("invalid metric name %q: %w", name, err) + } + + nmNew := &namedMetricVec{ + name: name, + metric: metric, + } + + s.mu.Lock() + nm = s.vecs[name] + if nm == nil { + nm = nmNew + s.vecs[name] = nm + s.av = append(s.av, nm) + } + s.mu.Unlock() + s.registerMetricVec(name, metric, false) + } + + if nm.metric == nil { + return nil, fmt.Errorf("metric %q is nil", name) + } + + metricType := reflect.TypeOf(nm.metric) + if metricType != reflect.TypeOf(&prometheus.GaugeVec{}) { + return nil, fmt.Errorf("metric %q isn't a GaugeVec. It is %s", name, metricType) + } + + return nm.metric, nil +} + +// newGaugeVec creates a new Prometheus GaugeVec. +func newGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) { + name, constLabels, err := parseMetric(name) + if err != nil { + return nil, err + } + + helpStr := "gauge metric" + if len(help) > 0 { + helpStr = strings.Join(help, ", ") + } + + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: helpStr, + ConstLabels: constLabels, + }, labels) + + return gv, nil +} + const defaultSummaryWindow = 5 * time.Minute var defaultSummaryQuantiles = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.97: 0.003, 0.99: 0.001} @@ -432,6 +528,21 @@ func (s *Set) registerMetric(name string, m prometheus.Metric) { s.mustRegisterLocked(name, m) } +func (s *Set) registerMetricVec(name string, mv *prometheus.GaugeVec, isAux bool) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.vecs[name]; !exists { + nmv := &namedMetricVec{ + name: name, + metric: mv, + isAux: isAux, + } + s.vecs[name] = nmv + s.av = append(s.av, nmv) + } +} + // mustRegisterLocked registers given metric with the given name. // // Panics if the given name was already registered before. diff --git a/eth/backend.go b/eth/backend.go index 86c3a2f62b1..cd2bcdb0613 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1023,7 +1023,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig } // start HTTP API httpRpcCfg := stack.Config().Http - ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC, + ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, httpRpcCfg.RpcFiltersConfig, blockReader, ethBackendRPC, s.txPoolGrpcServer, miningRPC, stateDiffClient, s.logger) if err != nil { return err diff --git a/go.mod b/go.mod index d712e06fa55..87476efe841 100644 --- a/go.mod +++ b/go.mod @@ -243,7 +243,7 @@ require ( github.com/pion/webrtc/v3 v3.2.40 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index f808053c026..f8afd7b0625 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -94,6 +94,12 @@ var DefaultFlags = []cli.Flag{ &OverlayGetLogsFlag, &OverlayReplayBlockFlag, + &RpcSubscriptionFiltersMaxLogsFlag, + &RpcSubscriptionFiltersMaxHeadersFlag, + &RpcSubscriptionFiltersMaxTxsFlag, + &RpcSubscriptionFiltersMaxAddressesFlag, + &RpcSubscriptionFiltersMaxTopicsFlag, + &utils.SnapKeepBlocksFlag, &utils.SnapStopFlag, &utils.SnapStateStopFlag, diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index dc6fa9d00b3..3cb2986d28c 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/node/nodecfg" + "github.com/ledgerwatch/erigon/turbo/rpchelper" ) var ( @@ -256,6 +257,32 @@ var ( Value: rpccfg.DefaultOverlayReplayBlockTimeout, } + RpcSubscriptionFiltersMaxLogsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxlogs", + Usage: "Maximum number of logs to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxLogs, + } + RpcSubscriptionFiltersMaxHeadersFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxheaders", + Usage: "Maximum number of block headers to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxHeaders, + } + RpcSubscriptionFiltersMaxTxsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxtxs", + Usage: "Maximum number of transactions to store per subscription.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTxs, + } + RpcSubscriptionFiltersMaxAddressesFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxaddresses", + Usage: "Maximum number of addresses per subscription to filter logs by.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxAddresses, + } + RpcSubscriptionFiltersMaxTopicsFlag = cli.IntFlag{ + Name: "rpc.subscription.filters.maxtopics", + Usage: "Maximum number of topics per subscription to filter logs by.", + Value: rpchelper.DefaultFiltersConfig.RpcSubscriptionFiltersMaxTopics, + } + TxPoolCommitEvery = cli.DurationFlag{ Name: "txpool.commit.every", Usage: "How often transactions should be committed to the storage", @@ -522,14 +549,21 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name), DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name), RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name), - Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name), - Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name), - MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name), - TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name), - BatchLimit: ctx.Int(utils.RpcBatchLimit.Name), - ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name), - AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name), - MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name), + RpcFiltersConfig: rpchelper.FiltersConfig{ + RpcSubscriptionFiltersMaxLogs: ctx.Int(RpcSubscriptionFiltersMaxLogsFlag.Name), + RpcSubscriptionFiltersMaxHeaders: ctx.Int(RpcSubscriptionFiltersMaxHeadersFlag.Name), + RpcSubscriptionFiltersMaxTxs: ctx.Int(RpcSubscriptionFiltersMaxTxsFlag.Name), + RpcSubscriptionFiltersMaxAddresses: ctx.Int(RpcSubscriptionFiltersMaxAddressesFlag.Name), + RpcSubscriptionFiltersMaxTopics: ctx.Int(RpcSubscriptionFiltersMaxTopicsFlag.Name), + }, + Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name), + Feecap: ctx.Float64(utils.RPCGlobalTxFeeCapFlag.Name), + MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name), + TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name), + BatchLimit: ctx.Int(utils.RpcBatchLimit.Name), + ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name), + AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name), + MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name), OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name), diff --git a/turbo/jsonrpc/eth_block_test.go b/turbo/jsonrpc/eth_block_test.go index 52d1a805acc..2347e392cc6 100644 --- a/turbo/jsonrpc/eth_block_test.go +++ b/turbo/jsonrpc/eth_block_test.go @@ -74,7 +74,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) expected := 1 header := &types.Header{ diff --git a/turbo/jsonrpc/eth_call_test.go b/turbo/jsonrpc/eth_call_test.go index f3d1af3e996..1b8b9efac9e 100644 --- a/turbo/jsonrpc/eth_call_test.go +++ b/turbo/jsonrpc/eth_call_test.go @@ -43,7 +43,7 @@ func TestEstimateGas(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = libcommon.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") diff --git a/turbo/jsonrpc/eth_filters_test.go b/turbo/jsonrpc/eth_filters_test.go index aa1494baedf..51801ba337c 100644 --- a/turbo/jsonrpc/eth_filters_test.go +++ b/turbo/jsonrpc/eth_filters_test.go @@ -31,7 +31,7 @@ func TestNewFilters(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 1e18, 100_000, false, 100_000, 128, log.New()) ptf, err := api.NewPendingTransactionFilter(ctx) @@ -60,7 +60,7 @@ func TestLogsSubscribeAndUnsubscribe_WithoutConcurrentMapIssue(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) // generate some random topics topics := make([][]libcommon.Hash, 0) diff --git a/turbo/jsonrpc/eth_mining_test.go b/turbo/jsonrpc/eth_mining_test.go index 86c8de279c1..8aa131a3e47 100644 --- a/turbo/jsonrpc/eth_mining_test.go +++ b/turbo/jsonrpc/eth_mining_test.go @@ -24,7 +24,7 @@ func TestPendingBlock(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mock.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) engine := ethash.NewFaker() api := NewEthAPI(NewBaseApi(ff, stateCache, m.BlockReader, nil, false, rpccfg.DefaultEvmCallTimeout, engine, @@ -51,7 +51,7 @@ func TestPendingLogs(t *testing.T) { m := mock.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, nil, mining, func() {}, m.Log) expect := []byte{211} ch, id := ff.SubscribePendingLogs(1) diff --git a/turbo/jsonrpc/eth_subscribe_test.go b/turbo/jsonrpc/eth_subscribe_test.go index b593ab2762c..ab1d4df9817 100644 --- a/turbo/jsonrpc/eth_subscribe_test.go +++ b/turbo/jsonrpc/eth_subscribe_test.go @@ -49,7 +49,7 @@ func TestEthSubscribe(t *testing.T) { backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, m.BlockReader, logger, builder.NewLatestBlockBuiltStore()) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, m.BlockReader) - ff := rpchelper.New(ctx, backend, nil, nil, func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, backend, nil, nil, func() {}, m.Log) newHeads, id := ff.SubscribeNewHeads(16) defer ff.UnsubscribeHeads(id) diff --git a/turbo/jsonrpc/send_transaction_test.go b/turbo/jsonrpc/send_transaction_test.go index 3eac26476c9..45c1a79f5e8 100644 --- a/turbo/jsonrpc/send_transaction_test.go +++ b/turbo/jsonrpc/send_transaction_test.go @@ -90,7 +90,7 @@ func TestSendRawTransaction(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mockSentry) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) api := jsonrpc.NewEthAPI(newBaseApiForTest(mockSentry), mockSentry.DB, nil, txPool, nil, 5000000, 1e18, 100_000, false, 100_000, 128, logger) buf := bytes.NewBuffer(nil) @@ -142,7 +142,7 @@ func TestSendRawTransactionUnprotected(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, mockSentry) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, mockSentry.Log) api := jsonrpc.NewEthAPI(newBaseApiForTest(mockSentry), mockSentry.DB, nil, txPool, nil, 5000000, 1e18, 100_000, false, 100_000, 128, logger) // Enable unproteced txs flag diff --git a/turbo/jsonrpc/txpool_api_test.go b/turbo/jsonrpc/txpool_api_test.go index 26c0f5d6336..1f31ea57d85 100644 --- a/turbo/jsonrpc/txpool_api_test.go +++ b/turbo/jsonrpc/txpool_api_test.go @@ -38,7 +38,7 @@ func TestTxPoolContent(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) + ff := rpchelper.New(ctx, rpchelper.DefaultFiltersConfig, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) agg := m.HistoryV3Components() api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, txPool) diff --git a/turbo/rpchelper/config.go b/turbo/rpchelper/config.go new file mode 100644 index 00000000000..21610b6a508 --- /dev/null +++ b/turbo/rpchelper/config.go @@ -0,0 +1,22 @@ +package rpchelper + +// FiltersConfig defines the configuration settings for RPC subscription filters. +// Each field represents a limit on the number of respective items that can be stored per subscription. +type FiltersConfig struct { + RpcSubscriptionFiltersMaxLogs int // Maximum number of logs to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxHeaders int // Maximum number of block headers to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxTxs int // Maximum number of transactions to store per subscription. Default: 0 (no limit) + RpcSubscriptionFiltersMaxAddresses int // Maximum number of addresses per subscription to filter logs by. Default: 0 (no limit) + RpcSubscriptionFiltersMaxTopics int // Maximum number of topics per subscription to filter logs by. Default: 0 (no limit) +} + +// DefaultFiltersConfig defines the default settings for filter configurations. +// These default values set no limits on the number of logs, block headers, transactions, +// addresses, or topics that can be stored per subscription. +var DefaultFiltersConfig = FiltersConfig{ + RpcSubscriptionFiltersMaxLogs: 0, // No limit on the number of logs per subscription + RpcSubscriptionFiltersMaxHeaders: 0, // No limit on the number of block headers per subscription + RpcSubscriptionFiltersMaxTxs: 0, // No limit on the number of transactions per subscription + RpcSubscriptionFiltersMaxAddresses: 0, // No limit on the number of addresses per subscription to filter logs by + RpcSubscriptionFiltersMaxTopics: 0, // No limit on the number of topics per subscription to filter logs by +} diff --git a/turbo/rpchelper/filters.go b/turbo/rpchelper/filters.go index cf8a80eaf0d..bfe7f80db4a 100644 --- a/turbo/rpchelper/filters.go +++ b/turbo/rpchelper/filters.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus" "io" "reflect" "sync" @@ -48,12 +49,14 @@ type Filters struct { pendingHeadsStores *concurrent.SyncMap[HeadsSubID, []*types.Header] pendingTxsStores *concurrent.SyncMap[PendingTxsSubID, [][]types.Transaction] logger log.Logger + + config FiltersConfig } // New creates a new Filters instance, initializes it, and starts subscription goroutines for Ethereum events. // It requires a context, Ethereum backend, transaction pool client, mining client, snapshot callback function, // and a logger for logging events. -func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { +func New(ctx context.Context, config FiltersConfig, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { logger.Info("rpc filters: subscribing to Erigon events") ff := &Filters{ @@ -67,15 +70,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, pendingHeadsStores: concurrent.NewSyncMap[HeadsSubID, []*types.Header](), pendingTxsStores: concurrent.NewSyncMap[PendingTxsSubID, [][]types.Transaction](), logger: logger, + config: config, } go func() { if ethBackend == nil { return } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec() return default: } @@ -83,6 +89,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if err := ethBackend.Subscribe(ctx, ff.OnNewEvent); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec() return default: } @@ -99,15 +106,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if ethBackend == nil { return } + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Dec() return default: } if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs, &ff.logsRequestor); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Logs"}).Dec() return default: } @@ -122,15 +132,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if txPool != nil { go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Dec() return default: } if err := ff.subscribeToPendingTransactions(ctx, txPool); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingTxs"}).Dec() return default: } @@ -145,15 +158,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, if !reflect.ValueOf(mining).IsNil() { //https://groups.google.com/g/golang-nuts/c/wnH302gBa4I go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Dec() return default: } if err := ff.subscribeToPendingBlocks(ctx, mining); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingBlock"}).Dec() return default: } @@ -166,15 +182,18 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, } }() go func() { + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Inc() for { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Dec() return default: } if err := ff.subscribeToPendingLogs(ctx, mining); err != nil { select { case <-ctx.Done(): + activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "txPool_PendingLogs"}).Dec() return default: } @@ -414,26 +433,44 @@ func (ff *Filters) SubscribeLogs(size int, criteria filters.FilterCriteria) (<-c // If no addresses are specified, it means all addresses should be included f.allAddrs = 1 } else { + // Limit the number of addresses + addressCount := 0 for _, addr := range criteria.Addresses { - f.addrs.Put(addr, 1) + if ff.config.RpcSubscriptionFiltersMaxAddresses == 0 || addressCount < ff.config.RpcSubscriptionFiltersMaxAddresses { + f.addrs.Put(addr, 1) + addressCount++ + } else { + break + } } } - // Handle topics + // Handle topics and track the allowed topics if len(criteria.Topics) == 0 { // If no topics are specified, it means all topics should be included f.allTopics = 1 } else { + // Limit the number of topics + topicCount := 0 + allowedTopics := [][]libcommon.Hash{} for _, topics := range criteria.Topics { + allowedTopicsRow := []libcommon.Hash{} for _, topic := range topics { - f.topics.Put(topic, 1) + if ff.config.RpcSubscriptionFiltersMaxTopics == 0 || topicCount < ff.config.RpcSubscriptionFiltersMaxTopics { + f.topics.Put(topic, 1) + allowedTopicsRow = append(allowedTopicsRow, topic) + topicCount++ + } else { + break + } + } + if len(allowedTopicsRow) > 0 { + allowedTopics = append(allowedTopics, allowedTopicsRow) } } + f.topicsOriginal = allowedTopics } - // Store original topics for reference - f.topicsOriginal = criteria.Topics - // Add the filter to the list of log filters ff.logsSubs.addLogsFilters(f) @@ -602,12 +639,17 @@ func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) { } // AddLogs adds logs to the store associated with the given subscription ID. -func (ff *Filters) AddLogs(id LogsSubID, logs *types.Log) { +func (ff *Filters) AddLogs(id LogsSubID, log *types.Log) { ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log { if !ok { st = make([]*types.Log, 0) } - st = append(st, logs) + + maxLogs := ff.config.RpcSubscriptionFiltersMaxLogs + if maxLogs > 0 && len(st)+1 > maxLogs { + st = st[len(st)+1-maxLogs:] // Remove oldest logs to make space + } + st = append(st, log) return st }) } @@ -628,6 +670,11 @@ func (ff *Filters) AddPendingBlock(id HeadsSubID, block *types.Header) { if !ok { st = make([]*types.Header, 0) } + + maxHeaders := ff.config.RpcSubscriptionFiltersMaxHeaders + if maxHeaders > 0 && len(st) >= maxHeaders { + st = st[1:] // Remove the oldest header to make space + } st = append(st, block) return st }) @@ -649,6 +696,32 @@ func (ff *Filters) AddPendingTxs(id PendingTxsSubID, txs []types.Transaction) { if !ok { st = make([][]types.Transaction, 0) } + + // Calculate the total number of transactions in st + totalTxs := 0 + for _, txBatch := range st { + totalTxs += len(txBatch) + } + + maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs + // If adding the new transactions would exceed maxTxs, remove oldest transactions + if maxTxs > 0 && totalTxs+len(txs) > maxTxs { + // Flatten st to a single slice + flatSt := make([]types.Transaction, 0, totalTxs) + for _, txBatch := range st { + flatSt = append(flatSt, txBatch...) + } + + // Remove the oldest transactions to make space for new ones + if len(flatSt)+len(txs) > maxTxs { + flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:] + } + + // Convert flatSt back to [][]types.Transaction with a single batch + st = [][]types.Transaction{flatSt} + } + + // Append the new transactions as a new batch st = append(st, txs) return st }) diff --git a/turbo/rpchelper/filters_deadlock_test.go b/turbo/rpchelper/filters_deadlock_test.go index 6b143e27610..c76afbd5493 100644 --- a/turbo/rpchelper/filters_deadlock_test.go +++ b/turbo/rpchelper/filters_deadlock_test.go @@ -18,7 +18,8 @@ import ( func TestFiltersDeadlock_Test(t *testing.T) { t.Parallel() logger := log.New() - f := rpchelper.New(context.TODO(), nil, nil, nil, func() {}, logger) + config := rpchelper.FiltersConfig{} + f := rpchelper.New(context.TODO(), config, nil, nil, nil, func() {}, logger) crit := filters.FilterCriteria{ Addresses: nil, Topics: [][]libcommon.Hash{}, diff --git a/turbo/rpchelper/filters_test.go b/turbo/rpchelper/filters_test.go index b66a36e2901..edebc156334 100644 --- a/turbo/rpchelper/filters_test.go +++ b/turbo/rpchelper/filters_test.go @@ -2,6 +2,8 @@ package rpchelper import ( "context" + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/core/types" "testing" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -49,7 +51,7 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { v := <-subs _, ok := set[v] if ok { - t.Errorf("SubscriptionID Confict: %s", v) + t.Errorf("SubscriptionID Conflict: %s", v) return } set[v] = struct{}{} @@ -58,7 +60,8 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -90,7 +93,8 @@ func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing. func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) var nilTopic libcommon.Hash subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -123,7 +127,8 @@ func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAr func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -163,7 +168,8 @@ func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { func TestFilters_ThreeSubscriptionsWithDifferentCriteria(t *testing.T) { t.Parallel() - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -238,7 +244,8 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { return nil } - f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) + config := FiltersConfig{} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) f.logsRequestor.Store(loadRequester) // first request has no filters @@ -347,3 +354,133 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { t.Error("6: expected topics to be empty") } } + +func TestFilters_AddLogs(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxLogs: 5} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + logID := LogsSubID("test-log") + logEntry := &types.Log{} + + // Add 10 logs to the store, but limit is 5 + for i := 0; i < 10; i++ { + f.AddLogs(logID, logEntry) + } + + logs, found := f.ReadLogs(logID) + if !found { + t.Error("expected to find logs in the store") + } + if len(logs) != 5 { + t.Errorf("expected 5 logs in the store, got %d", len(logs)) + } +} + +func TestFilters_AddLogs_Unlimited(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxLogs: 0} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + logID := LogsSubID("test-log") + logEntry := &types.Log{} + + // Add 10 logs to the store, limit is unlimited + for i := 0; i < 10; i++ { + f.AddLogs(logID, logEntry) + } + + logs, found := f.ReadLogs(logID) + if !found { + t.Error("expected to find logs in the store") + } + if len(logs) != 10 { + t.Errorf("expected 10 logs in the store, got %d", len(logs)) + } +} + +func TestFilters_AddPendingBlocks(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxHeaders: 3} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + headerID := HeadsSubID("test-header") + header := &types.Header{} + + // Add 5 headers to the store, but limit is 3 + for i := 0; i < 5; i++ { + f.AddPendingBlock(headerID, header) + } + + headers, found := f.ReadPendingBlocks(headerID) + if !found { + t.Error("expected to find headers in the store") + } + if len(headers) != 3 { + t.Errorf("expected 3 headers in the store, got %d", len(headers)) + } +} + +func TestFilters_AddPendingBlocks_Unlimited(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxHeaders: 0} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + headerID := HeadsSubID("test-header") + header := &types.Header{} + + // Add 5 headers to the store, limit is unlimited + for i := 0; i < 5; i++ { + f.AddPendingBlock(headerID, header) + } + + headers, found := f.ReadPendingBlocks(headerID) + if !found { + t.Error("expected to find headers in the store") + } + if len(headers) != 5 { + t.Errorf("expected 5 headers in the store, got %d", len(headers)) + } +} + +func TestFilters_AddPendingTxs(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxTxs: 4} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + txID := PendingTxsSubID("test-tx") + var tx types.Transaction = types.NewTransaction(0, libcommon.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), uint256.NewInt(10), 50000, uint256.NewInt(10), nil) + tx, _ = tx.WithSignature(*types.LatestSignerForChainID(nil), libcommon.Hex2Bytes("9bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094f8a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b100")) + + // Add 6 txs to the store, but limit is 4 + for i := 0; i < 6; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + txs, found := f.ReadPendingTxs(txID) + if !found { + t.Error("expected to find txs in the store") + } + totalTxs := 0 + for _, batch := range txs { + totalTxs += len(batch) + } + if totalTxs != 4 { + t.Errorf("expected 4 txs in the store, got %d", totalTxs) + } +} + +func TestFilters_AddPendingTxs_Unlimited(t *testing.T) { + config := FiltersConfig{RpcSubscriptionFiltersMaxTxs: 0} + f := New(context.TODO(), config, nil, nil, nil, func() {}, log.New()) + txID := PendingTxsSubID("test-tx") + var tx types.Transaction = types.NewTransaction(0, libcommon.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), uint256.NewInt(10), 50000, uint256.NewInt(10), nil) + tx, _ = tx.WithSignature(*types.LatestSignerForChainID(nil), libcommon.Hex2Bytes("9bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094f8a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b100")) + + // Add 6 txs to the store, limit is unlimited + for i := 0; i < 6; i++ { + f.AddPendingTxs(txID, []types.Transaction{tx}) + } + + txs, found := f.ReadPendingTxs(txID) + if !found { + t.Error("expected to find txs in the store") + } + totalTxs := 0 + for _, batch := range txs { + totalTxs += len(batch) + } + if totalTxs != 6 { + t.Errorf("expected 6 txs in the store, got %d", totalTxs) + } +} diff --git a/turbo/rpchelper/logsfilter.go b/turbo/rpchelper/logsfilter.go index 07321609627..b4130ca3de4 100644 --- a/turbo/rpchelper/logsfilter.go +++ b/turbo/rpchelper/logsfilter.go @@ -105,9 +105,15 @@ func (a *LogsFilterAggregator) createFilterRequest() *remote.LogsFilterRequest { // provided LogsFilter. If the count for any address or topic reaches zero, it is removed from the aggregated filter. func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) { a.aggLogsFilter.allAddrs -= f.allAddrs + if f.allAddrs > 0 { + // Decrement the count for AllAddresses + activeSubscriptionsLogsAllAddressesGauge.Dec() + } f.addrs.Range(func(addr libcommon.Address, count int) error { a.aggLogsFilter.addrs.Do(addr, func(value int, exists bool) (int, bool) { if exists { + // Decrement the count for subscribed address + activeSubscriptionsLogsAddressesGauge.Dec() newValue := value - count if newValue <= 0 { return 0, false @@ -119,9 +125,15 @@ func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) { return nil }) a.aggLogsFilter.allTopics -= f.allTopics + if f.allTopics > 0 { + // Decrement the count for AllTopics + activeSubscriptionsLogsAllTopicsGauge.Dec() + } f.topics.Range(func(topic libcommon.Hash, count int) error { a.aggLogsFilter.topics.Do(topic, func(value int, exists bool) (int, bool) { if exists { + // Decrement the count for subscribed topic + activeSubscriptionsLogsTopicsGauge.Dec() newValue := value - count if newValue <= 0 { return 0, false @@ -141,14 +153,26 @@ func (a *LogsFilterAggregator) addLogsFilters(f *LogsFilter) { a.logsFilterLock.Lock() defer a.logsFilterLock.Unlock() a.aggLogsFilter.allAddrs += f.allAddrs + if f.allAddrs > 0 { + // Increment the count for AllAddresses + activeSubscriptionsLogsAllAddressesGauge.Inc() + } f.addrs.Range(func(addr libcommon.Address, count int) error { + // Increment the count for subscribed address + activeSubscriptionsLogsAddressesGauge.Inc() a.aggLogsFilter.addrs.DoAndStore(addr, func(value int, exists bool) int { return value + count }) return nil }) a.aggLogsFilter.allTopics += f.allTopics + if f.allTopics > 0 { + // Increment the count for AllTopics + activeSubscriptionsLogsAllTopicsGauge.Inc() + } f.topics.Range(func(topic libcommon.Hash, count int) error { + // Increment the count for subscribed topic + activeSubscriptionsLogsTopicsGauge.Inc() a.aggLogsFilter.topics.DoAndStore(topic, func(value int, exists bool) int { return value + count }) @@ -179,6 +203,10 @@ func (a *LogsFilterAggregator) getAggMaps() (map[libcommon.Address]int, map[libc func (a *LogsFilterAggregator) distributeLog(eventLog *remote.SubscribeLogsReply) error { a.logsFilterLock.RLock() defer a.logsFilterLock.RUnlock() + + var lg types2.Log + var topics []libcommon.Hash + a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error { if filter.allAddrs == 0 { _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address)) @@ -186,27 +214,34 @@ func (a *LogsFilterAggregator) distributeLog(eventLog *remote.SubscribeLogsReply return nil } } - var topics []libcommon.Hash + + // Pre-allocate topics slice to the required size to avoid multiple allocations + topics = topics[:0] + if cap(topics) < len(eventLog.Topics) { + topics = make([]libcommon.Hash, 0, len(eventLog.Topics)) + } for _, topic := range eventLog.Topics { topics = append(topics, gointerfaces.ConvertH256ToHash(topic)) } + if filter.allTopics == 0 { if !a.chooseTopics(filter, topics) { return nil } } - lg := &types2.Log{ - Address: gointerfaces.ConvertH160toAddress(eventLog.Address), - Topics: topics, - Data: eventLog.Data, - BlockNumber: eventLog.BlockNumber, - TxHash: gointerfaces.ConvertH256ToHash(eventLog.TransactionHash), - TxIndex: uint(eventLog.TransactionIndex), - BlockHash: gointerfaces.ConvertH256ToHash(eventLog.BlockHash), - Index: uint(eventLog.LogIndex), - Removed: eventLog.Removed, - } - filter.sender.Send(lg) + + // Reuse lg object to avoid creating new instances + lg.Address = gointerfaces.ConvertH160toAddress(eventLog.Address) + lg.Topics = topics + lg.Data = eventLog.Data + lg.BlockNumber = eventLog.BlockNumber + lg.TxHash = gointerfaces.ConvertH256ToHash(eventLog.TransactionHash) + lg.TxIndex = uint(eventLog.TransactionIndex) + lg.BlockHash = gointerfaces.ConvertH256ToHash(eventLog.BlockHash) + lg.Index = uint(eventLog.LogIndex) + lg.Removed = eventLog.Removed + + filter.sender.Send(&lg) return nil }) return nil diff --git a/turbo/rpchelper/metrics.go b/turbo/rpchelper/metrics.go new file mode 100644 index 00000000000..41e963d3285 --- /dev/null +++ b/turbo/rpchelper/metrics.go @@ -0,0 +1,19 @@ +package rpchelper + +import ( + "github.com/ledgerwatch/erigon-lib/metrics" +) + +const ( + filterLabelName = "filter" + clientLabelName = "client" +) + +var ( + activeSubscriptionsGauge = metrics.GetOrCreateGaugeVec("subscriptions", []string{filterLabelName}, "Current number of subscriptions") + activeSubscriptionsLogsAllAddressesGauge = metrics.GetOrCreateGauge("subscriptions_logs_all_addresses") + activeSubscriptionsLogsAllTopicsGauge = metrics.GetOrCreateGauge("subscriptions_logs_all_topics") + activeSubscriptionsLogsAddressesGauge = metrics.GetOrCreateGauge("subscriptions_logs_addresses") + activeSubscriptionsLogsTopicsGauge = metrics.GetOrCreateGauge("subscriptions_logs_topics") + activeSubscriptionsLogsClientGauge = metrics.GetOrCreateGaugeVec("subscriptions_logs_client", []string{clientLabelName}, "Current number of subscriptions by client") +)