Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce TxPool allocations #6970

Merged
merged 7 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;

using Nethermind.Core.Collections;
using Nethermind.Core.Threading;
using Nethermind.Logging;
Expand Down Expand Up @@ -90,11 +92,43 @@ protected SortedPool(int capacity, IComparer<TValue> comparer, ILogManager logMa
/// </summary>
public TValue[] GetSnapshot()
{
using var lockRelease = Lock.Acquire();
TValue[]? snapshot = Volatile.Read(ref _snapshot);
if (snapshot is not null)
{
return snapshot;
}

return GetSnapShotLocked();
}

private TValue[] GetSnapShotLocked()
{
using var handle = Lock.Acquire();

TValue[]? snapshot = _snapshot;
snapshot ??= _snapshot = _buckets.SelectMany(b => b.Value).ToArray();
if (snapshot is not null)
{
return snapshot;
}

var count = 0;
foreach (KeyValuePair<TGroupKey, EnhancedSortedSet<TValue>> bucket in _buckets)
{
count += bucket.Value.Count;
}

snapshot = new TValue[count];
var index = 0;
foreach (KeyValuePair<TGroupKey, EnhancedSortedSet<TValue>> bucket in _buckets)
{
foreach (TValue value in bucket.Value)
{
snapshot[index] = value;
index++;
}
}

_snapshot = snapshot;
return snapshot;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ namespace Nethermind.TxPool.Collections
{
public class TxDistinctSortedPool : DistinctValueSortedPool<ValueHash256, Transaction, AddressAsKey>
{
public delegate void UpdateGroupDelegate(in AccountStruct account, EnhancedSortedSet<Transaction> transactions, ref Transaction? lastElement, UpdateTransactionDelegate updateTx);
public delegate void UpdateTransactionDelegate(EnhancedSortedSet<Transaction> bucket, Transaction tx, in UInt256? changedGasBottleneck, Transaction? lastElement);

private readonly UpdateTransactionDelegate _updateTx;
private readonly List<Transaction> _transactionsToRemove = new();
protected int _poolCapacity;
private readonly ILogger _logger;


public TxDistinctSortedPool(int capacity, IComparer<Transaction> comparer, ILogManager logManager)
: base(capacity, comparer, CompetingTransactionEqualityComparer.Instance, logManager)
{
_poolCapacity = capacity;
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_updateTx = UpdateTransaction;
}

protected override IComparer<Transaction> GetUniqueComparer(IComparer<Transaction> comparer) => comparer.GetPoolUniqueTxComparer();
Expand Down Expand Up @@ -70,7 +72,7 @@ protected override void UpdateGroup(AddressAsKey groupKey, EnhancedSortedSet<Tra
}
}

public void UpdatePool(IAccountStateProvider accounts, Func<Address, AccountStruct, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
public void UpdatePool(IAccountStateProvider accounts, UpdateGroupDelegate updateElements)
{
using McsLock.Disposable lockRelease = Lock.Acquire();

Expand All @@ -80,46 +82,48 @@ public void UpdatePool(IAccountStateProvider accounts, Func<Address, AccountStru
Debug.Assert(bucket.Count > 0);

accounts.TryGetAccount(address, out AccountStruct account);
UpdateGroupNonLocked(address, account, bucket, changingElements);
UpdateGroupNonLocked(account, bucket, updateElements);
}
}

private void UpdateGroupNonLocked(Address groupKey, AccountStruct groupValue, EnhancedSortedSet<Transaction> bucket, Func<Address, AccountStruct, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
private void UpdateGroupNonLocked(AccountStruct groupValue, EnhancedSortedSet<Transaction> bucket, UpdateGroupDelegate updateElements)
{
_transactionsToRemove.Clear();
Transaction? lastElement = bucket.Max;

foreach ((Transaction tx, UInt256? changedGasBottleneck) in changingElements(groupKey, groupValue, bucket))
updateElements(groupValue, bucket, ref lastElement, _updateTx);

ReadOnlySpan<Transaction> txs = CollectionsMarshal.AsSpan(_transactionsToRemove);
for (int i = 0; i < txs.Length; i++)
{
if (changedGasBottleneck is null)
{
_transactionsToRemove.Add(tx);
}
else if (Equals(lastElement, tx))
{
bool reAdd = _worstSortedValues.Remove(tx);
tx.GasBottleneck = changedGasBottleneck;
if (reAdd)
{
_worstSortedValues.Add(tx, tx.Hash!);
}
TryRemoveNonLocked(txs[i].Hash!, evicted: false, out _, out _);
}
}

UpdateWorstValue();
}
else
private void UpdateTransaction(EnhancedSortedSet<Transaction> bucket, Transaction tx, in UInt256? changedGasBottleneck, Transaction? lastElement)
{
if (changedGasBottleneck is null)
{
_transactionsToRemove.Add(tx);
}
else if (Equals(lastElement, tx))
{
bool reAdd = _worstSortedValues.Remove(tx);
tx.GasBottleneck = changedGasBottleneck;
if (reAdd)
{
tx.GasBottleneck = changedGasBottleneck;
_worstSortedValues.Add(tx, tx.Hash!);
}
}

ReadOnlySpan<Transaction> txs = CollectionsMarshal.AsSpan(_transactionsToRemove);
for (int i = 0; i < txs.Length; i++)
UpdateWorstValue();
}
else
{
TryRemoveNonLocked(txs[i].Hash!, evicted: false, out _, out _);
tx.GasBottleneck = changedGasBottleneck;
}
}

public void UpdateGroup(Address groupKey, AccountStruct groupValue, Func<Address, AccountStruct, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
public void UpdateGroup(Address groupKey, AccountStruct groupValue, UpdateGroupDelegate updateElements)
{
using McsLock.Disposable lockRelease = Lock.Acquire();

Expand All @@ -128,7 +132,7 @@ public void UpdateGroup(Address groupKey, AccountStruct groupValue, Func<Address
{
Debug.Assert(bucket.Count > 0);

UpdateGroupNonLocked(groupKey, groupValue, bucket, changingElements);
UpdateGroupNonLocked(groupValue, bucket, updateElements);
}
}

Expand Down
36 changes: 17 additions & 19 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using Nethermind.Logging;
using Nethermind.TxPool.Collections;
using Nethermind.TxPool.Filters;
using static Nethermind.TxPool.Collections.TxDistinctSortedPool;

using ITimer = Nethermind.Core.Timers.ITimer;

[assembly: InternalsVisibleTo("Nethermind.Blockchain.Test")]
Expand Down Expand Up @@ -52,7 +54,8 @@ public class TxPool : ITxPool, IDisposable

private readonly Channel<BlockReplacementEventArgs> _headBlocksChannel = Channel.CreateUnbounded<BlockReplacementEventArgs>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true });

private readonly Func<Address, AccountStruct, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> _updateBucket;
private readonly UpdateGroupDelegate _updateBucket;
private readonly UpdateGroupDelegate _updateBucketAdded;

/// <summary>
/// Indexes transactions
Expand Down Expand Up @@ -102,6 +105,7 @@ public TxPool(IEthereumEcdsa ecdsa,

// Capture closures once rather than per invocation
_updateBucket = UpdateBucket;
_updateBucketAdded = UpdateBucketWithAddedTransaction;

_broadcaster = new TxBroadcaster(comparer, TimerFactory.Default, txPoolConfig, chainHeadInfoProvider, logManager, transactionsGossipPolicy);

Expand Down Expand Up @@ -460,7 +464,7 @@ private AcceptTxResult AddCore(Transaction tx, ref TxFilteringState state, bool
return AcceptTxResult.FeeTooLowToCompete;
}

relevantPool.UpdateGroup(tx.SenderAddress!, state.SenderAccount, UpdateBucketWithAddedTransaction);
relevantPool.UpdateGroup(tx.SenderAddress!, state.SenderAccount, _updateBucketAdded);
Metrics.PendingTransactionsAdded++;
if (tx.Supports1559) { Metrics.Pending1559TransactionsAdded++; }
if (tx.SupportsBlobs) { Metrics.PendingBlobTransactionsAdded++; }
Expand All @@ -484,23 +488,19 @@ private AcceptTxResult AddCore(Transaction tx, ref TxFilteringState state, bool
return AcceptTxResult.Accepted;
}

private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucketWithAddedTransaction(
Address address, AccountStruct account, EnhancedSortedSet<Transaction> transactions)
private void UpdateBucketWithAddedTransaction(in AccountStruct account, EnhancedSortedSet<Transaction> transactions, ref Transaction? lastElement, UpdateTransactionDelegate updateTx)
{
if (transactions.Count != 0)
{
UInt256 balance = account.Balance;
long currentNonce = (long)(account.Nonce);

foreach (var changedTx in UpdateGasBottleneck(transactions, currentNonce, balance))
{
yield return changedTx;
}
UpdateGasBottleneck(transactions, currentNonce, balance, lastElement, updateTx);
}
}

private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateGasBottleneck(
EnhancedSortedSet<Transaction> transactions, long currentNonce, UInt256 balance)
private void UpdateGasBottleneck(
EnhancedSortedSet<Transaction> transactions, long currentNonce, UInt256 balance, Transaction? lastElement, UpdateTransactionDelegate updateTx)
{
UInt256? previousTxBottleneck = null;
int i = 0;
Expand All @@ -513,7 +513,7 @@ private AcceptTxResult AddCore(Transaction tx, ref TxFilteringState state, bool
if (tx.Nonce < currentNonce)
{
_broadcaster.StopBroadcast(tx.Hash!);
yield return (tx, null);
updateTx(transactions, tx, changedGasBottleneck: null, lastElement);
}
else
{
Expand All @@ -535,14 +535,14 @@ private AcceptTxResult AddCore(Transaction tx, ref TxFilteringState state, bool
{
// balance too low, remove tx from the pool
_broadcaster.StopBroadcast(tx.Hash!);
yield return (tx, null);
updateTx(transactions, tx, changedGasBottleneck: null, lastElement);
}
gasBottleneck = UInt256.Min(effectiveGasPrice, previousTxBottleneck ?? 0);
}

if (tx.GasBottleneck != gasBottleneck)
{
yield return (tx, gasBottleneck);
updateTx(transactions, tx, gasBottleneck, lastElement);
}

previousTxBottleneck = gasBottleneck;
Expand All @@ -557,7 +557,7 @@ private void UpdateBuckets()
_blobTransactions.UpdatePool(_accounts, _updateBucket);
}

private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucket(Address address, AccountStruct account, EnhancedSortedSet<Transaction> transactions)
private void UpdateBucket(in AccountStruct account, EnhancedSortedSet<Transaction> transactions, ref Transaction? lastElement, UpdateTransactionDelegate updateTx)
{
if (transactions.Count != 0)
{
Expand Down Expand Up @@ -597,15 +597,13 @@ private void UpdateBuckets()
// transaction removed from TxPool because of insufficient balance should have opportunity
// to come back in the future, so it is removed from long term cache as well.
_hashCache.DeleteFromLongTerm(transaction.Hash!);
yield return (transaction, null);

updateTx(transactions, transaction, changedGasBottleneck: null, lastElement);
}
}
else
{
foreach (var changedTx in UpdateGasBottleneck(transactions, currentNonce, balance))
{
yield return changedTx;
}
UpdateGasBottleneck(transactions, currentNonce, balance, lastElement, updateTx);
}
}
}
Expand Down