Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RPC filter limits, metrics, and better memory management. #10718

Merged

Conversation

bretep
Copy link
Contributor

@bretep bretep commented Jun 12, 2024

  • Introduced GetOrCreateGaugeVec function to manage Prometheus GaugeVec metrics.
  • Implemented various subscription filter limits (max logs, max headers, max transactions, max addresses, and max topics).
  • Updated various test files to utilize the new FiltersConfig and added comprehensive tests for the new filter limits.
  • Added new flags for subscription filter limits.
  • Improved log filtering and event handling with better memory management and metric tracking.
  • Added configuration to handle RPC subscription filter limits.
  • Minor typo fixes.

Problem

RPC nodes would frequently OOM. Based on metrics and pprof, I observed high memory utilization in the rpchelper:

Initial Observation After 2 Hours
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total
Dropped 1081 nodes (cum <= 365.42MB)
Showing top 10 nodes out of 43
      flat  flat%   sum%        cum   cum%
65925.80MB 90.21% 90.21% 65925.80MB 90.21%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
 1944.80MB  2.66% 92.87%  1944.80MB  2.66%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
 1055.97MB  1.44% 94.31%  1055.97MB  1.44%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
  362.04MB   0.5% 94.81%   572.20MB  0.78%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
  257.73MB  0.35% 95.16%   587.85MB   0.8%  encoding/json.Marshal
   57.85MB 0.079% 95.24%  1658.66MB  2.27%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
   51.48MB  0.07% 95.31%  2014.50MB  2.76%  github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg
   23.01MB 0.031% 95.34%  1127.21MB  1.54%  github.com/ledgerwatch/erigon/core/vm.opCall
   11.50MB 0.016% 95.36%  1677.68MB  2.30%  github.com/ledgerwatch/erigon/core/vm.(*EVM).call
    6.50MB 0.0089% 95.37%   685.70MB  0.94%  github.com/ledgerwatch/erigon/turbo/transactions.DoCall
(pprof) list distributeLog.func1
Total: 71.37GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go
   64.38GB    64.38GB (flat, cum) 90.21% of Total
         .          .    206:   a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error {
         .          .    207:           if filter.allAddrs == 0 {
         .          .    208:                   _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address))
         .          .    209:                   if !addrOk {
         .          .    210:                           return nil
         .          .    211:                   }
         .          .    212:           }
         .          .    213:           var topics []libcommon.Hash
         .          .    214:           for _, topic := range eventLog.Topics {
   27.01GB    27.01GB    215:                   topics = append(topics, gointerfaces.ConvertH256ToHash(topic))
         .          .    216:           }
         .          .    217:           if filter.allTopics == 0 {
         .          .    218:                   if !a.chooseTopics(filter, topics) {
         .          .    219:                           return nil
         .          .    220:                   }
         .          .    221:           }
   37.37GB    37.37GB    222:           lg := &types2.Log{
         .          .    223:                   Address:     gointerfaces.ConvertH160toAddress(eventLog.Address),
         .          .    224:                   Topics:      topics,
         .          .    225:                   Data:        eventLog.Data,
         .          .    226:                   BlockNumber: eventLog.BlockNumber,
         .          .    227:                   TxHash:      gointerfaces.ConvertH256ToHash(eventLog.TransactionHash),
(pprof) %

Actions Taken

  • Added Metrics
    Implemented metrics to gain visibility into memory utilization.
  • Optimized Allocations
    Optimized object and slice allocations, resulting in significant improvements in memory usage.
Observation After 2 Hours Post-Optimization
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 25.92GB, 94.14% of 27.53GB total
Dropped 785 nodes (cum <= 0.14GB)
Showing top 10 nodes out of 70
      flat  flat%   sum%        cum   cum%
   15.37GB 55.82% 55.82%    15.37GB 55.82%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
    8.15GB 29.60% 85.42%     8.15GB 29.60%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
    0.64GB  2.33% 87.75%        1GB  3.65%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog
    0.50GB  1.82% 89.57%     0.65GB  2.35%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
    0.36GB  1.32% 90.89%     0.36GB  1.32%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
    0.31GB  1.12% 92.01%     0.31GB  1.12%  bytes.growSlice
    0.30GB  1.08% 93.09%     0.30GB  1.08%  github.com/ledgerwatch/erigon/core/vm.(*Memory).Resize (inline)
    0.15GB  0.56% 93.65%     0.35GB  1.27%  github.com/ledgerwatch/erigon/core/state.(*IntraBlockState).AddSlotToAccessList
    0.07GB  0.24% 93.90%     0.23GB  0.84%  github.com/ledgerwatch/erigon/core/types.codecSelfer2.decLogs
    0.07GB  0.24% 94.14%     1.58GB  5.75%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
(pprof) list AddLogs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
   15.37GB    15.37GB (flat, cum) 55.82% of Total
         .          .    644:   ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log {
         .          .    645:           if !ok {
         .          .    646:                   st = make([]*types.Log, 0)
         .          .    647:           }
   15.37GB    15.37GB    648:           st = append(st, logs)
         .          .    649:           return st
         .          .    650:   })
         .          .    651:}
         .          .    652:
         .          .    653:// ReadLogs reads logs from the store associated with the given subscription ID.
(pprof) list AddPendingTxs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
    8.15GB     8.15GB (flat, cum) 29.60% of Total
         .          .    686:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    687:           if !ok {
         .          .    688:                   st = make([][]types.Transaction, 0)
         .          .    689:           }
    8.15GB     8.15GB    690:           st = append(st, txs)
         .          .    691:           return st
         .          .    692:   })
         .          .    693:}
         .          .    694:
         .          .    695:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
(pprof) %

Findings

  • Identified unbound slices that could grow indefinitely, leading to memory leaks. This issue occurs if subscribers do not request updates using their subscription ID, especially when behind a load balancer that does not pin clients to RPC nodes.

Solutions Considered

  1. Architecture Design Changes
    • Implement architectural changes to pin clients to RPC Nodes ensuring subscribers that request updates using their subscription IDs will hit the same node, which does clean up the objecs on each request.
    • Reason for not choosing: This still relies on clients with subscriptions to request updates and if they never do and do not call the unsubscribe function it's an unbound memory leak. The RPC node operator has no control over the behavior of the clients.
  2. Implementing Timeouts
    • Introduce timeouts for subscriptions to automatically clean up unresponsive or inactive subscriptions.
    • Reason for not choosing: Implementing timeouts would inadvertently stop websocket subscriptions due to the current design of the code, leading to a potential loss of data for active users. This solution would be good but requires a lot more work.
  3. Configurable Limits (Chosen Solution)
    • Set configurable limits for various subscription parameters (e.g., logs, headers, transactions, addresses, topics) to manage memory utilization effectively.
    • Reason for choosing: This approach provides flexibility to RPC node operators to configure limits as per their requirements. The default behavior remains unchanged, making it a non-breaking change. Additionally, it ensures current data is always available by pruning the oldest data first.

Test with these configured limits

	--rpc.subscription.filters.maxlogs=60
	--rpc.subscription.filters.maxheaders=60
	--rpc.subscription.filters.maxtxs=10_000
	--rpc.subscription.filters.maxaddresses=1_000
	--rpc.subscription.filters.maxtopics=1_000
Observation After 10 Hours with Limits
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total
Dropped 403 nodes (cum <= 6.42MB)
Showing top 10 nodes out of 124
      flat  flat%   sum%        cum   cum%
  923.64MB 71.92% 71.92%   923.64MB 71.92%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
   54.59MB  4.25% 76.18%    54.59MB  4.25%  github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes
   38.21MB  2.98% 79.15%    38.21MB  2.98%  bytes.growSlice
   29.34MB  2.28% 81.44%    29.34MB  2.28%  github.com/ledgerwatch/erigon/p2p/rlpx.growslice
   26.44MB  2.06% 83.49%    34.16MB  2.66%  compress/flate.NewWriter
   18.66MB  1.45% 84.95%    21.16MB  1.65%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
   17.50MB  1.36% 86.31%    64.58MB  5.03%  github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP
      15MB  1.17% 87.48%    82.08MB  6.39%  github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary
   13.58MB  1.06% 88.54%    13.58MB  1.06%  github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink
    9.55MB  0.74% 89.28%     9.55MB  0.74%  github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...]
(pprof) list AddPendingTxs.func1
Total: 1.25GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
  923.64MB   923.64MB (flat, cum) 71.92% of Total
         .          .    698:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    699:           if !ok {
         .          .    700:                   st = make([][]types.Transaction, 0)
         .          .    701:           }
         .          .    702:
         .          .    703:           // Calculate the total number of transactions in st
         .          .    704:           totalTxs := 0
         .          .    705:           for _, txBatch := range st {
         .          .    706:                   totalTxs += len(txBatch)
         .          .    707:           }
         .          .    708:
         .          .    709:           maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs
         .          .    710:           // If adding the new transactions would exceed maxTxs, remove oldest transactions
         .          .    711:           if maxTxs > 0 && totalTxs+len(txs) > maxTxs {
         .          .    712:                   // Flatten st to a single slice
  918.69MB   918.69MB    713:                   flatSt := make([]types.Transaction, 0, totalTxs)
         .          .    714:                   for _, txBatch := range st {
         .          .    715:                           flatSt = append(flatSt, txBatch...)
         .          .    716:                   }
         .          .    717:
         .          .    718:                   // Remove the oldest transactions to make space for new ones
         .          .    719:                   if len(flatSt)+len(txs) > maxTxs {
         .          .    720:                           flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:]
         .          .    721:                   }
         .          .    722:
         .          .    723:                   // Convert flatSt back to [][]types.Transaction with a single batch
         .          .    724:                   st = [][]types.Transaction{flatSt}
         .          .    725:           }
         .          .    726:
         .          .    727:           // Append the new transactions as a new batch
    5.95MB     4.95MB    728:           st = append(st, txs)
         .          .    729:           return st
         .          .    730:   })
         .          .    731:}
         .          .    732:
         .          .    733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.

With these changes, the memory utilization has significantly improved, and the system is now more stable and predictable.


Graph of a single node over 7-days before and after these changes

image

Blue line indicates a deployment
Sharp drop of green line is an OOM

@bretep bretep force-pushed the rpchelper-add-limits-memory-management branch from c637b33 to f90b9d1 Compare June 12, 2024 17:56
- Introduced `GetOrCreateGaugeVec` function to manage Prometheus `GaugeVec` metrics.
- Implemented various subscription filter limits (max logs, max headers, max transactions, max addresses, and max topics).
- Updated various test files to utilize the new `FiltersConfig` and added comprehensive tests for the new filter limits.
- Added new flags for subscription filter limits.
- Improved log filtering and event handling with better memory management and metric tracking.
- Added configuration to handle RPC subscription filter limits.
- Minor typo fixes.
@bretep bretep force-pushed the rpchelper-add-limits-memory-management branch from f90b9d1 to 540663b Compare June 12, 2024 18:09
@AskAlexSharov
Copy link
Collaborator

yes, stateful filters - definitely needed this love.

@@ -46,11 +56,18 @@ func (s *Set) Describe(ch chan<- *prometheus.Desc) {
if !sort.SliceIsSorted(s.a, lessFunc) {
sort.Slice(s.a, lessFunc)
}
if !sort.SliceIsSorted(s.av, lessFunc) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bretep try remove this check and always call sort.Slice - it may improve speed - if slices are usually non-sorted. (not in this PR).

// labels are the labels associated with the GaugeVec.
//
// The returned GaugeVec is safe to use from concurrent goroutines.
func (s *Set) GetOrCreateGaugeVec(name string, labels []string, help ...string) (*prometheus.GaugeVec, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Erigon moved from victoria metrics to prometheus - metrics lib. Are you sure Prometheus metric lib doesn't have this?

return
default:
}

if err := ethBackend.Subscribe(ctx, ff.OnNewEvent); err != nil {
select {
case <-ctx.Done():
activeSubscriptionsLogsClientGauge.With(prometheus.Labels{clientLabelName: "ethBackend_Events"}).Dec()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in kv_interface.go we have pre-allocated metrics with labels:

DbCommitSync   = metrics.GetOrCreateSummary(`db_commit_seconds{phase="sync"}`)

maybe it's better to pre-alloc them instead of call .With at runtime?

for _, topic := range topics {
f.topics.Put(topic, 1)
if ff.config.RpcSubscriptionFiltersMaxTopics == 0 || topicCount < ff.config.RpcSubscriptionFiltersMaxTopics {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can reduce nesting depth by:

if !(ff.config.RpcSubscriptionFiltersMaxTopics == 0 || topicCount < ff.config.RpcSubscriptionFiltersMaxTopics) {
   break
}
// loop body

@AskAlexSharov AskAlexSharov merged commit adf3f43 into erigontech:main Jun 13, 2024
9 of 10 checks passed
@bretep
Copy link
Contributor Author

bretep commented Jun 13, 2024

Thanks @AskAlexSharov I'll take a look at what you suggested. :)

taratorio pushed a commit that referenced this pull request Sep 5, 2024
- Introduced `GetOrCreateGaugeVec` function to manage Prometheus
`GaugeVec` metrics.
- Implemented various subscription filter limits (max logs, max headers,
max transactions, max addresses, and max topics).
- Updated various test files to utilize the new `FiltersConfig` and
added comprehensive tests for the new filter limits.
- Added new flags for subscription filter limits.
- Improved log filtering and event handling with better memory
management and metric tracking.
- Added configuration to handle RPC subscription filter limits.
- Minor typo fixes.

-------
RPC nodes would frequently OOM. Based on metrics and pprof, I observed
high memory utilization in the rpchelper:
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total
Dropped 1081 nodes (cum <= 365.42MB)
Showing top 10 nodes out of 43
      flat  flat%   sum%        cum   cum%
65925.80MB 90.21% 90.21% 65925.80MB 90.21%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
 1944.80MB  2.66% 92.87%  1944.80MB  2.66%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
 1055.97MB  1.44% 94.31%  1055.97MB  1.44%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
  362.04MB   0.5% 94.81%   572.20MB  0.78%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
  257.73MB  0.35% 95.16%   587.85MB   0.8%  encoding/json.Marshal
   57.85MB 0.079% 95.24%  1658.66MB  2.27%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
   51.48MB  0.07% 95.31%  2014.50MB  2.76%  github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg
   23.01MB 0.031% 95.34%  1127.21MB  1.54%  github.com/ledgerwatch/erigon/core/vm.opCall
   11.50MB 0.016% 95.36%  1677.68MB  2.30%  github.com/ledgerwatch/erigon/core/vm.(*EVM).call
    6.50MB 0.0089% 95.37%   685.70MB  0.94%  github.com/ledgerwatch/erigon/turbo/transactions.DoCall
(pprof) list distributeLog.func1
Total: 71.37GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go
   64.38GB    64.38GB (flat, cum) 90.21% of Total
         .          .    206:   a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error {
         .          .    207:           if filter.allAddrs == 0 {
         .          .    208:                   _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address))
         .          .    209:                   if !addrOk {
         .          .    210:                           return nil
         .          .    211:                   }
         .          .    212:           }
         .          .    213:           var topics []libcommon.Hash
         .          .    214:           for _, topic := range eventLog.Topics {
   27.01GB    27.01GB    215:                   topics = append(topics, gointerfaces.ConvertH256ToHash(topic))
         .          .    216:           }
         .          .    217:           if filter.allTopics == 0 {
         .          .    218:                   if !a.chooseTopics(filter, topics) {
         .          .    219:                           return nil
         .          .    220:                   }
         .          .    221:           }
   37.37GB    37.37GB    222:           lg := &types2.Log{
         .          .    223:                   Address:     gointerfaces.ConvertH160toAddress(eventLog.Address),
         .          .    224:                   Topics:      topics,
         .          .    225:                   Data:        eventLog.Data,
         .          .    226:                   BlockNumber: eventLog.BlockNumber,
         .          .    227:                   TxHash:      gointerfaces.ConvertH256ToHash(eventLog.TransactionHash),
(pprof) %
```

- Added Metrics
    Implemented metrics to gain visibility into memory utilization.
- Optimized Allocations
Optimized object and slice allocations, resulting in significant
improvements in memory usage.
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 25.92GB, 94.14% of 27.53GB total
Dropped 785 nodes (cum <= 0.14GB)
Showing top 10 nodes out of 70
      flat  flat%   sum%        cum   cum%
   15.37GB 55.82% 55.82%    15.37GB 55.82%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
    8.15GB 29.60% 85.42%     8.15GB 29.60%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
    0.64GB  2.33% 87.75%        1GB  3.65%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog
    0.50GB  1.82% 89.57%     0.65GB  2.35%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
    0.36GB  1.32% 90.89%     0.36GB  1.32%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
    0.31GB  1.12% 92.01%     0.31GB  1.12%  bytes.growSlice
    0.30GB  1.08% 93.09%     0.30GB  1.08%  github.com/ledgerwatch/erigon/core/vm.(*Memory).Resize (inline)
    0.15GB  0.56% 93.65%     0.35GB  1.27%  github.com/ledgerwatch/erigon/core/state.(*IntraBlockState).AddSlotToAccessList
    0.07GB  0.24% 93.90%     0.23GB  0.84%  github.com/ledgerwatch/erigon/core/types.codecSelfer2.decLogs
    0.07GB  0.24% 94.14%     1.58GB  5.75%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
(pprof) list AddLogs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
   15.37GB    15.37GB (flat, cum) 55.82% of Total
         .          .    644:   ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log {
         .          .    645:           if !ok {
         .          .    646:                   st = make([]*types.Log, 0)
         .          .    647:           }
   15.37GB    15.37GB    648:           st = append(st, logs)
         .          .    649:           return st
         .          .    650:   })
         .          .    651:}
         .          .    652:
         .          .    653:// ReadLogs reads logs from the store associated with the given subscription ID.
(pprof) list AddPendingTxs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
    8.15GB     8.15GB (flat, cum) 29.60% of Total
         .          .    686:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    687:           if !ok {
         .          .    688:                   st = make([][]types.Transaction, 0)
         .          .    689:           }
    8.15GB     8.15GB    690:           st = append(st, txs)
         .          .    691:           return st
         .          .    692:   })
         .          .    693:}
         .          .    694:
         .          .    695:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
(pprof) %
```
- Identified unbound slices that could grow indefinitely, leading to
memory leaks. This issue occurs if subscribers do not request updates
using their subscription ID, especially when behind a load balancer that
does not pin clients to RPC nodes.

1. Architecture Design Changes
- Implement architectural changes to pin clients to RPC Nodes ensuring
subscribers that request updates using their subscription IDs will hit
the same node, which does clean up the objecs on each request.
- Reason for not choosing: This still relies on clients with
subscriptions to request updates and if they never do and do not call
the unsubscribe function it's an unbound memory leak. The RPC node
operator has no control over the behavior of the clients.
2. Implementing Timeouts
- Introduce timeouts for subscriptions to automatically clean up
unresponsive or inactive subscriptions.
- Reason for not choosing: Implementing timeouts would inadvertently
stop websocket subscriptions due to the current design of the code,
leading to a potential loss of data for active users. This solution
would be good but requires a lot more work.
3. Configurable Limits (Chosen Solution)
- Set configurable limits for various subscription parameters (e.g.,
logs, headers, transactions, addresses, topics) to manage memory
utilization effectively.
- Reason for choosing: This approach provides flexibility to RPC node
operators to configure limits as per their requirements. The default
behavior remains unchanged, making it a non-breaking change.
Additionally, it ensures current data is always available by pruning the
oldest data first.

```bash
	--rpc.subscription.filters.maxlogs=60
	--rpc.subscription.filters.maxheaders=60
	--rpc.subscription.filters.maxtxs=10_000
	--rpc.subscription.filters.maxaddresses=1_000
	--rpc.subscription.filters.maxtopics=1_000
```
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total
Dropped 403 nodes (cum <= 6.42MB)
Showing top 10 nodes out of 124
      flat  flat%   sum%        cum   cum%
  923.64MB 71.92% 71.92%   923.64MB 71.92%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
   54.59MB  4.25% 76.18%    54.59MB  4.25%  github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes
   38.21MB  2.98% 79.15%    38.21MB  2.98%  bytes.growSlice
   29.34MB  2.28% 81.44%    29.34MB  2.28%  github.com/ledgerwatch/erigon/p2p/rlpx.growslice
   26.44MB  2.06% 83.49%    34.16MB  2.66%  compress/flate.NewWriter
   18.66MB  1.45% 84.95%    21.16MB  1.65%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
   17.50MB  1.36% 86.31%    64.58MB  5.03%  github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP
      15MB  1.17% 87.48%    82.08MB  6.39%  github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary
   13.58MB  1.06% 88.54%    13.58MB  1.06%  github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink
    9.55MB  0.74% 89.28%     9.55MB  0.74%  github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...]
(pprof) list AddPendingTxs.func1
Total: 1.25GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
  923.64MB   923.64MB (flat, cum) 71.92% of Total
         .          .    698:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    699:           if !ok {
         .          .    700:                   st = make([][]types.Transaction, 0)
         .          .    701:           }
         .          .    702:
         .          .    703:           // Calculate the total number of transactions in st
         .          .    704:           totalTxs := 0
         .          .    705:           for _, txBatch := range st {
         .          .    706:                   totalTxs += len(txBatch)
         .          .    707:           }
         .          .    708:
         .          .    709:           maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs
         .          .    710:           // If adding the new transactions would exceed maxTxs, remove oldest transactions
         .          .    711:           if maxTxs > 0 && totalTxs+len(txs) > maxTxs {
         .          .    712:                   // Flatten st to a single slice
  918.69MB   918.69MB    713:                   flatSt := make([]types.Transaction, 0, totalTxs)
         .          .    714:                   for _, txBatch := range st {
         .          .    715:                           flatSt = append(flatSt, txBatch...)
         .          .    716:                   }
         .          .    717:
         .          .    718:                   // Remove the oldest transactions to make space for new ones
         .          .    719:                   if len(flatSt)+len(txs) > maxTxs {
         .          .    720:                           flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:]
         .          .    721:                   }
         .          .    722:
         .          .    723:                   // Convert flatSt back to [][]types.Transaction with a single batch
         .          .    724:                   st = [][]types.Transaction{flatSt}
         .          .    725:           }
         .          .    726:
         .          .    727:           // Append the new transactions as a new batch
    5.95MB     4.95MB    728:           st = append(st, txs)
         .          .    729:           return st
         .          .    730:   })
         .          .    731:}
         .          .    732:
         .          .    733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
```

With these changes, the memory utilization has significantly improved,
and the system is now more stable and predictable.

-----
<img width="1567" alt="image"
src="https://github.com/ledgerwatch/erigon/assets/787344/264c9757-93cf-4be8-9883-5ca3187acd73">

Blue line indicates a deployment
Sharp drop of green line is an OOM
@taratorio taratorio added this to the v2.60.7-fixes milestone Sep 5, 2024
taratorio pushed a commit that referenced this pull request Sep 5, 2024
- Introduced `GetOrCreateGaugeVec` function to manage Prometheus
`GaugeVec` metrics.
- Implemented various subscription filter limits (max logs, max headers,
max transactions, max addresses, and max topics).
- Updated various test files to utilize the new `FiltersConfig` and
added comprehensive tests for the new filter limits.
- Added new flags for subscription filter limits.
- Improved log filtering and event handling with better memory
management and metric tracking.
- Added configuration to handle RPC subscription filter limits.
- Minor typo fixes.

-------
RPC nodes would frequently OOM. Based on metrics and pprof, I observed
high memory utilization in the rpchelper:
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total
Dropped 1081 nodes (cum <= 365.42MB)
Showing top 10 nodes out of 43
      flat  flat%   sum%        cum   cum%
65925.80MB 90.21% 90.21% 65925.80MB 90.21%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
 1944.80MB  2.66% 92.87%  1944.80MB  2.66%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
 1055.97MB  1.44% 94.31%  1055.97MB  1.44%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
  362.04MB   0.5% 94.81%   572.20MB  0.78%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
  257.73MB  0.35% 95.16%   587.85MB   0.8%  encoding/json.Marshal
   57.85MB 0.079% 95.24%  1658.66MB  2.27%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
   51.48MB  0.07% 95.31%  2014.50MB  2.76%  github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg
   23.01MB 0.031% 95.34%  1127.21MB  1.54%  github.com/ledgerwatch/erigon/core/vm.opCall
   11.50MB 0.016% 95.36%  1677.68MB  2.30%  github.com/ledgerwatch/erigon/core/vm.(*EVM).call
    6.50MB 0.0089% 95.37%   685.70MB  0.94%  github.com/ledgerwatch/erigon/turbo/transactions.DoCall
(pprof) list distributeLog.func1
Total: 71.37GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go
   64.38GB    64.38GB (flat, cum) 90.21% of Total
         .          .    206:   a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error {
         .          .    207:           if filter.allAddrs == 0 {
         .          .    208:                   _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address))
         .          .    209:                   if !addrOk {
         .          .    210:                           return nil
         .          .    211:                   }
         .          .    212:           }
         .          .    213:           var topics []libcommon.Hash
         .          .    214:           for _, topic := range eventLog.Topics {
   27.01GB    27.01GB    215:                   topics = append(topics, gointerfaces.ConvertH256ToHash(topic))
         .          .    216:           }
         .          .    217:           if filter.allTopics == 0 {
         .          .    218:                   if !a.chooseTopics(filter, topics) {
         .          .    219:                           return nil
         .          .    220:                   }
         .          .    221:           }
   37.37GB    37.37GB    222:           lg := &types2.Log{
         .          .    223:                   Address:     gointerfaces.ConvertH160toAddress(eventLog.Address),
         .          .    224:                   Topics:      topics,
         .          .    225:                   Data:        eventLog.Data,
         .          .    226:                   BlockNumber: eventLog.BlockNumber,
         .          .    227:                   TxHash:      gointerfaces.ConvertH256ToHash(eventLog.TransactionHash),
(pprof) %
```

- Added Metrics
    Implemented metrics to gain visibility into memory utilization.
- Optimized Allocations
Optimized object and slice allocations, resulting in significant
improvements in memory usage.
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 25.92GB, 94.14% of 27.53GB total
Dropped 785 nodes (cum <= 0.14GB)
Showing top 10 nodes out of 70
      flat  flat%   sum%        cum   cum%
   15.37GB 55.82% 55.82%    15.37GB 55.82%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
    8.15GB 29.60% 85.42%     8.15GB 29.60%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
    0.64GB  2.33% 87.75%        1GB  3.65%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog
    0.50GB  1.82% 89.57%     0.65GB  2.35%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
    0.36GB  1.32% 90.89%     0.36GB  1.32%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
    0.31GB  1.12% 92.01%     0.31GB  1.12%  bytes.growSlice
    0.30GB  1.08% 93.09%     0.30GB  1.08%  github.com/ledgerwatch/erigon/core/vm.(*Memory).Resize (inline)
    0.15GB  0.56% 93.65%     0.35GB  1.27%  github.com/ledgerwatch/erigon/core/state.(*IntraBlockState).AddSlotToAccessList
    0.07GB  0.24% 93.90%     0.23GB  0.84%  github.com/ledgerwatch/erigon/core/types.codecSelfer2.decLogs
    0.07GB  0.24% 94.14%     1.58GB  5.75%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
(pprof) list AddLogs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
   15.37GB    15.37GB (flat, cum) 55.82% of Total
         .          .    644:   ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log {
         .          .    645:           if !ok {
         .          .    646:                   st = make([]*types.Log, 0)
         .          .    647:           }
   15.37GB    15.37GB    648:           st = append(st, logs)
         .          .    649:           return st
         .          .    650:   })
         .          .    651:}
         .          .    652:
         .          .    653:// ReadLogs reads logs from the store associated with the given subscription ID.
(pprof) list AddPendingTxs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
    8.15GB     8.15GB (flat, cum) 29.60% of Total
         .          .    686:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    687:           if !ok {
         .          .    688:                   st = make([][]types.Transaction, 0)
         .          .    689:           }
    8.15GB     8.15GB    690:           st = append(st, txs)
         .          .    691:           return st
         .          .    692:   })
         .          .    693:}
         .          .    694:
         .          .    695:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
(pprof) %
```
- Identified unbound slices that could grow indefinitely, leading to
memory leaks. This issue occurs if subscribers do not request updates
using their subscription ID, especially when behind a load balancer that
does not pin clients to RPC nodes.

1. Architecture Design Changes
- Implement architectural changes to pin clients to RPC Nodes ensuring
subscribers that request updates using their subscription IDs will hit
the same node, which does clean up the objecs on each request.
- Reason for not choosing: This still relies on clients with
subscriptions to request updates and if they never do and do not call
the unsubscribe function it's an unbound memory leak. The RPC node
operator has no control over the behavior of the clients.
2. Implementing Timeouts
- Introduce timeouts for subscriptions to automatically clean up
unresponsive or inactive subscriptions.
- Reason for not choosing: Implementing timeouts would inadvertently
stop websocket subscriptions due to the current design of the code,
leading to a potential loss of data for active users. This solution
would be good but requires a lot more work.
3. Configurable Limits (Chosen Solution)
- Set configurable limits for various subscription parameters (e.g.,
logs, headers, transactions, addresses, topics) to manage memory
utilization effectively.
- Reason for choosing: This approach provides flexibility to RPC node
operators to configure limits as per their requirements. The default
behavior remains unchanged, making it a non-breaking change.
Additionally, it ensures current data is always available by pruning the
oldest data first.

```bash
	--rpc.subscription.filters.maxlogs=60
	--rpc.subscription.filters.maxheaders=60
	--rpc.subscription.filters.maxtxs=10_000
	--rpc.subscription.filters.maxaddresses=1_000
	--rpc.subscription.filters.maxtopics=1_000
```
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total
Dropped 403 nodes (cum <= 6.42MB)
Showing top 10 nodes out of 124
      flat  flat%   sum%        cum   cum%
  923.64MB 71.92% 71.92%   923.64MB 71.92%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
   54.59MB  4.25% 76.18%    54.59MB  4.25%  github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes
   38.21MB  2.98% 79.15%    38.21MB  2.98%  bytes.growSlice
   29.34MB  2.28% 81.44%    29.34MB  2.28%  github.com/ledgerwatch/erigon/p2p/rlpx.growslice
   26.44MB  2.06% 83.49%    34.16MB  2.66%  compress/flate.NewWriter
   18.66MB  1.45% 84.95%    21.16MB  1.65%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
   17.50MB  1.36% 86.31%    64.58MB  5.03%  github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP
      15MB  1.17% 87.48%    82.08MB  6.39%  github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary
   13.58MB  1.06% 88.54%    13.58MB  1.06%  github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink
    9.55MB  0.74% 89.28%     9.55MB  0.74%  github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...]
(pprof) list AddPendingTxs.func1
Total: 1.25GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
  923.64MB   923.64MB (flat, cum) 71.92% of Total
         .          .    698:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    699:           if !ok {
         .          .    700:                   st = make([][]types.Transaction, 0)
         .          .    701:           }
         .          .    702:
         .          .    703:           // Calculate the total number of transactions in st
         .          .    704:           totalTxs := 0
         .          .    705:           for _, txBatch := range st {
         .          .    706:                   totalTxs += len(txBatch)
         .          .    707:           }
         .          .    708:
         .          .    709:           maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs
         .          .    710:           // If adding the new transactions would exceed maxTxs, remove oldest transactions
         .          .    711:           if maxTxs > 0 && totalTxs+len(txs) > maxTxs {
         .          .    712:                   // Flatten st to a single slice
  918.69MB   918.69MB    713:                   flatSt := make([]types.Transaction, 0, totalTxs)
         .          .    714:                   for _, txBatch := range st {
         .          .    715:                           flatSt = append(flatSt, txBatch...)
         .          .    716:                   }
         .          .    717:
         .          .    718:                   // Remove the oldest transactions to make space for new ones
         .          .    719:                   if len(flatSt)+len(txs) > maxTxs {
         .          .    720:                           flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:]
         .          .    721:                   }
         .          .    722:
         .          .    723:                   // Convert flatSt back to [][]types.Transaction with a single batch
         .          .    724:                   st = [][]types.Transaction{flatSt}
         .          .    725:           }
         .          .    726:
         .          .    727:           // Append the new transactions as a new batch
    5.95MB     4.95MB    728:           st = append(st, txs)
         .          .    729:           return st
         .          .    730:   })
         .          .    731:}
         .          .    732:
         .          .    733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
```

With these changes, the memory utilization has significantly improved,
and the system is now more stable and predictable.

-----
<img width="1567" alt="image"
src="https://github.com/ledgerwatch/erigon/assets/787344/264c9757-93cf-4be8-9883-5ca3187acd73">

Blue line indicates a deployment
Sharp drop of green line is an OOM
taratorio added a commit that referenced this pull request Sep 6, 2024
…) (#11894)

relates to #11890
cherry pick from E3
adf3f43

-----

- Introduced `GetOrCreateGaugeVec` function to manage Prometheus
`GaugeVec` metrics.
- Implemented various subscription filter limits (max logs, max headers,
max transactions, max addresses, and max topics).
- Updated various test files to utilize the new `FiltersConfig` and
added comprehensive tests for the new filter limits.
- Added new flags for subscription filter limits.
- Improved log filtering and event handling with better memory
management and metric tracking.
- Added configuration to handle RPC subscription filter limits.
- Minor typo fixes.

-------
RPC nodes would frequently OOM. Based on metrics and pprof, I observed
high memory utilization in the rpchelper:
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total
Dropped 1081 nodes (cum <= 365.42MB)
Showing top 10 nodes out of 43
      flat  flat%   sum%        cum   cum%
65925.80MB 90.21% 90.21% 65925.80MB 90.21%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
 1944.80MB  2.66% 92.87%  1944.80MB  2.66%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
 1055.97MB  1.44% 94.31%  1055.97MB  1.44%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
  362.04MB   0.5% 94.81%   572.20MB  0.78%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
  257.73MB  0.35% 95.16%   587.85MB   0.8%  encoding/json.Marshal
   57.85MB 0.079% 95.24%  1658.66MB  2.27%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
   51.48MB  0.07% 95.31%  2014.50MB  2.76%  github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg
   23.01MB 0.031% 95.34%  1127.21MB  1.54%  github.com/ledgerwatch/erigon/core/vm.opCall
   11.50MB 0.016% 95.36%  1677.68MB  2.30%  github.com/ledgerwatch/erigon/core/vm.(*EVM).call
    6.50MB 0.0089% 95.37%   685.70MB  0.94%  github.com/ledgerwatch/erigon/turbo/transactions.DoCall
(pprof) list distributeLog.func1
Total: 71.37GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go
   64.38GB    64.38GB (flat, cum) 90.21% of Total
         .          .    206:   a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error {
         .          .    207:           if filter.allAddrs == 0 {
         .          .    208:                   _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address))
         .          .    209:                   if !addrOk {
         .          .    210:                           return nil
         .          .    211:                   }
         .          .    212:           }
         .          .    213:           var topics []libcommon.Hash
         .          .    214:           for _, topic := range eventLog.Topics {
   27.01GB    27.01GB    215:                   topics = append(topics, gointerfaces.ConvertH256ToHash(topic))
         .          .    216:           }
         .          .    217:           if filter.allTopics == 0 {
         .          .    218:                   if !a.chooseTopics(filter, topics) {
         .          .    219:                           return nil
         .          .    220:                   }
         .          .    221:           }
   37.37GB    37.37GB    222:           lg := &types2.Log{
         .          .    223:                   Address:     gointerfaces.ConvertH160toAddress(eventLog.Address),
         .          .    224:                   Topics:      topics,
         .          .    225:                   Data:        eventLog.Data,
         .          .    226:                   BlockNumber: eventLog.BlockNumber,
         .          .    227:                   TxHash:      gointerfaces.ConvertH256ToHash(eventLog.TransactionHash),
(pprof) %
```

- Added Metrics Implemented metrics to gain visibility into memory
utilization.
- Optimized Allocations Optimized object and slice allocations,
resulting in significant improvements in memory usage.
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 25.92GB, 94.14% of 27.53GB total
Dropped 785 nodes (cum <= 0.14GB)
Showing top 10 nodes out of 70
      flat  flat%   sum%        cum   cum%
   15.37GB 55.82% 55.82%    15.37GB 55.82%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
    8.15GB 29.60% 85.42%     8.15GB 29.60%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
    0.64GB  2.33% 87.75%        1GB  3.65%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog
    0.50GB  1.82% 89.57%     0.65GB  2.35%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
    0.36GB  1.32% 90.89%     0.36GB  1.32%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
    0.31GB  1.12% 92.01%     0.31GB  1.12%  bytes.growSlice
    0.30GB  1.08% 93.09%     0.30GB  1.08%  github.com/ledgerwatch/erigon/core/vm.(*Memory).Resize (inline)
    0.15GB  0.56% 93.65%     0.35GB  1.27%  github.com/ledgerwatch/erigon/core/state.(*IntraBlockState).AddSlotToAccessList
    0.07GB  0.24% 93.90%     0.23GB  0.84%  github.com/ledgerwatch/erigon/core/types.codecSelfer2.decLogs
    0.07GB  0.24% 94.14%     1.58GB  5.75%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
(pprof) list AddLogs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
   15.37GB    15.37GB (flat, cum) 55.82% of Total
         .          .    644:   ff.logsStores.DoAndStore(id, func(st []*types.Log, ok bool) []*types.Log {
         .          .    645:           if !ok {
         .          .    646:                   st = make([]*types.Log, 0)
         .          .    647:           }
   15.37GB    15.37GB    648:           st = append(st, logs)
         .          .    649:           return st
         .          .    650:   })
         .          .    651:}
         .          .    652:
         .          .    653:// ReadLogs reads logs from the store associated with the given subscription ID.
(pprof) list AddPendingTxs.func1
Total: 27.53GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
    8.15GB     8.15GB (flat, cum) 29.60% of Total
         .          .    686:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    687:           if !ok {
         .          .    688:                   st = make([][]types.Transaction, 0)
         .          .    689:           }
    8.15GB     8.15GB    690:           st = append(st, txs)
         .          .    691:           return st
         .          .    692:   })
         .          .    693:}
         .          .    694:
         .          .    695:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
(pprof) %
```
- Identified unbound slices that could grow indefinitely, leading to
memory leaks. This issue occurs if subscribers do not request updates
using their subscription ID, especially when behind a load balancer that
does not pin clients to RPC nodes.

1. Architecture Design Changes
- Implement architectural changes to pin clients to RPC Nodes ensuring
subscribers that request updates using their subscription IDs will hit
the same node, which does clean up the objecs on each request.
- Reason for not choosing: This still relies on clients with
subscriptions to request updates and if they never do and do not call
the unsubscribe function it's an unbound memory leak. The RPC node
operator has no control over the behavior of the clients.
2. Implementing Timeouts
- Introduce timeouts for subscriptions to automatically clean up
unresponsive or inactive subscriptions.
- Reason for not choosing: Implementing timeouts would inadvertently
stop websocket subscriptions due to the current design of the code,
leading to a potential loss of data for active users. This solution
would be good but requires a lot more work.
3. Configurable Limits (Chosen Solution)
- Set configurable limits for various subscription parameters (e.g.,
logs, headers, transactions, addresses, topics) to manage memory
utilization effectively.
- Reason for choosing: This approach provides flexibility to RPC node
operators to configure limits as per their requirements. The default
behavior remains unchanged, making it a non-breaking change.
Additionally, it ensures current data is always available by pruning the
oldest data first.

```bash
	--rpc.subscription.filters.maxlogs=60
	--rpc.subscription.filters.maxheaders=60
	--rpc.subscription.filters.maxtxs=10_000
	--rpc.subscription.filters.maxaddresses=1_000
	--rpc.subscription.filters.maxtopics=1_000
```
```text
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total
Dropped 403 nodes (cum <= 6.42MB)
Showing top 10 nodes out of 124
      flat  flat%   sum%        cum   cum%
  923.64MB 71.92% 71.92%   923.64MB 71.92%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
   54.59MB  4.25% 76.18%    54.59MB  4.25%  github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes
   38.21MB  2.98% 79.15%    38.21MB  2.98%  bytes.growSlice
   29.34MB  2.28% 81.44%    29.34MB  2.28%  github.com/ledgerwatch/erigon/p2p/rlpx.growslice
   26.44MB  2.06% 83.49%    34.16MB  2.66%  compress/flate.NewWriter
   18.66MB  1.45% 84.95%    21.16MB  1.65%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
   17.50MB  1.36% 86.31%    64.58MB  5.03%  github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP
      15MB  1.17% 87.48%    82.08MB  6.39%  github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary
   13.58MB  1.06% 88.54%    13.58MB  1.06%  github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink
    9.55MB  0.74% 89.28%     9.55MB  0.74%  github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...]
(pprof) list AddPendingTxs.func1
Total: 1.25GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
  923.64MB   923.64MB (flat, cum) 71.92% of Total
         .          .    698:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    699:           if !ok {
         .          .    700:                   st = make([][]types.Transaction, 0)
         .          .    701:           }
         .          .    702:
         .          .    703:           // Calculate the total number of transactions in st
         .          .    704:           totalTxs := 0
         .          .    705:           for _, txBatch := range st {
         .          .    706:                   totalTxs += len(txBatch)
         .          .    707:           }
         .          .    708:
         .          .    709:           maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs
         .          .    710:           // If adding the new transactions would exceed maxTxs, remove oldest transactions
         .          .    711:           if maxTxs > 0 && totalTxs+len(txs) > maxTxs {
         .          .    712:                   // Flatten st to a single slice
  918.69MB   918.69MB    713:                   flatSt := make([]types.Transaction, 0, totalTxs)
         .          .    714:                   for _, txBatch := range st {
         .          .    715:                           flatSt = append(flatSt, txBatch...)
         .          .    716:                   }
         .          .    717:
         .          .    718:                   // Remove the oldest transactions to make space for new ones
         .          .    719:                   if len(flatSt)+len(txs) > maxTxs {
         .          .    720:                           flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:]
         .          .    721:                   }
         .          .    722:
         .          .    723:                   // Convert flatSt back to [][]types.Transaction with a single batch
         .          .    724:                   st = [][]types.Transaction{flatSt}
         .          .    725:           }
         .          .    726:
         .          .    727:           // Append the new transactions as a new batch
    5.95MB     4.95MB    728:           st = append(st, txs)
         .          .    729:           return st
         .          .    730:   })
         .          .    731:}
         .          .    732:
         .          .    733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.
```

With these changes, the memory utilization has significantly improved,
and the system is now more stable and predictable.

-----
<img width="1567" alt="image"

src="https://github.com/ledgerwatch/erigon/assets/787344/264c9757-93cf-4be8-9883-5ca3187acd73">

Blue line indicates a deployment
Sharp drop of green line is an OOM

Co-authored-by: Bret <787344+bretep@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants