Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tx from same sender sequentially #7467

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
using Nethermind.Logging;
using Nethermind.State;
using Nethermind.Core.Eip2930;
using System.Linq;
using System.Collections.Generic;

namespace Nethermind.Consensus.Processing;

Expand Down Expand Up @@ -115,34 +117,40 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

Transaction[] txs = block.Transactions;
// Group tx by sender address so nonces are correct (e.g. contract deployment)
List<IGrouping<Address, (Transaction, int)>> txGroups = [.. txs
.Select((tx, index) => (tx, index))
.GroupBy(indexedTx => indexedTx.tx.SenderAddress)
.OrderBy(g => g.Min(itx => itx.index))];

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
Parallel.For(0, txGroups.Count, parallelOptions, _ =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
int groupId = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
IGrouping<Address, (Transaction, int)> group = txGroups[groupId];

// Skip if main processing is ahead of last tx in group
if (block.TransactionProcessed > group.Last().Item2) return;

tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
foreach ((Transaction tx, int i) in group)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
tx.CopyTo(systemTransaction);
RunTransaction(spec, block, systemTransaction, scope, i);
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {systemTransaction?.Hash}", ex);
}
finally
{
Expand All @@ -152,6 +160,17 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
});
}

private void RunTransaction(IReleaseSpec spec, Block block, SystemTransaction tx, IReadOnlyTxProcessingScope scope, int i)
{
if (spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Trace(tx, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);

if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}

private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash256 stateRoot, AccessList? systemTxAccessList, BlockCachePreWarmer preWarmer)
: IThreadPoolWorkItem
{
Expand Down