diff --git a/src/Nethermind/Nethermind.AccountAbstraction.Test/UserOperationSubscribeTests.cs b/src/Nethermind/Nethermind.AccountAbstraction.Test/UserOperationSubscribeTests.cs index 409ed0d0beb..799ee6444db 100644 --- a/src/Nethermind/Nethermind.AccountAbstraction.Test/UserOperationSubscribeTests.cs +++ b/src/Nethermind/Nethermind.AccountAbstraction.Test/UserOperationSubscribeTests.cs @@ -62,7 +62,7 @@ public void Setup() _blockTree = Substitute.For(); _txPool = Substitute.For(); _receiptStorage = Substitute.For(); - _receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_blockTree, _receiptStorage, _logManager); + _receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager); _specProvider = Substitute.For(); _userOperationPools[_testPoolAddress] = Substitute.For(); _filterStore = new FilterStore(); diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Receipts/PersistentReceiptStorageTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/Receipts/PersistentReceiptStorageTests.cs index aa0c2f505e0..6269c21cbb7 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/Receipts/PersistentReceiptStorageTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/Receipts/PersistentReceiptStorageTests.cs @@ -290,17 +290,22 @@ public void When_TxLookupLimitIs_NegativeOne_DoNotIndexTxHash() _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().BeNull(); } - [Test] - public void Should_not_index_tx_hash_if_blockNumber_is_negative() + [TestCase(1L, false)] + [TestCase(10L, false)] + [TestCase(11L, true)] + public void Should_only_prune_index_tx_hashes_if_blockNumber_is_bigger_than_lookupLimit(long blockNumber, bool WillPruneOldIndicies) { _receiptConfig.TxLookupLimit = 10; CreateStorage(); _blockTree.BlockAddedToMain += - Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.WithNumber(1).TestObject)); + Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.WithNumber(blockNumber).TestObject)); Thread.Sleep(100); IEnumerable calls = _blockTree.ReceivedCalls() - .Where(call => !call.GetMethodInfo().Name.EndsWith(nameof(_blockTree.BlockAddedToMain))); - calls.Should().BeEmpty(); + .Where(call => call.GetMethodInfo().Name.EndsWith(nameof(_blockTree.FindBlock))); + if (WillPruneOldIndicies) + calls.Should().NotBeEmpty(); + else + calls.Should().BeEmpty(); } [Test] @@ -342,27 +347,49 @@ public void When_NewHeadBlock_Remove_TxIndex_OfRemovedBlock() [Test] public async Task When_NewHeadBlock_Remove_TxIndex_OfRemovedBlock_Unless_ItsAlsoInNewBlock() { + _receiptConfig.CompactTxIndex = _useCompactReceipts; CreateStorage(); (Block block, TxReceipt[] receipts) = InsertBlock(); + Block block2 = Build.A.Block + .WithParent(block) + .WithNumber(2) + .WithTransactions(Build.A.Transaction.SignedAndResolved(TestItem.PrivateKeyC).TestObject) + .TestObject; + _blockTree.FindBestSuggestedHeader().Returns(block2.Header); + _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block2)); if (_receiptConfig.CompactTxIndex) { - _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block.Number).Bytes); + _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block.Number).Bytes); } else { - _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().NotBeNull(); + _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(block.Hash!.Bytes.ToArray()); } - Block newHead = Build.A.Block + Block block3 = Build.A.Block .WithNumber(1) + .WithTransactions(block2.Transactions) + .WithExtraData(new byte[1]) + .TestObject; + Block block4 = Build.A.Block + .WithNumber(2) .WithTransactions(block.Transactions) + .WithExtraData(new byte[1]) .TestObject; - _blockTree.FindBestSuggestedHeader().Returns(newHead.Header); - _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(newHead, block)); + _blockTree.FindBestSuggestedHeader().Returns(block4.Header); + _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block3, block)); + _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block4, block2)); await Task.Delay(100); - _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().NotBeNull(); + if (_receiptConfig.CompactTxIndex) + { + _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block4.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block4.Number).Bytes); + } + else + { + _receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block4.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(block4.Hash!.Bytes.ToArray()); + } } [Test] diff --git a/src/Nethermind/Nethermind.Blockchain/ReceiptCanonicalityMonitor.cs b/src/Nethermind/Nethermind.Blockchain/ReceiptCanonicalityMonitor.cs index d6d35d8b0ef..4d6b55283d9 100644 --- a/src/Nethermind/Nethermind.Blockchain/ReceiptCanonicalityMonitor.cs +++ b/src/Nethermind/Nethermind.Blockchain/ReceiptCanonicalityMonitor.cs @@ -16,24 +16,20 @@ public interface IReceiptMonitor : IDisposable public class ReceiptCanonicalityMonitor : IReceiptMonitor { - private readonly IBlockTree _blockTree; private readonly IReceiptStorage _receiptStorage; private readonly ILogger _logger; public event EventHandler? ReceiptsInserted; - public ReceiptCanonicalityMonitor(IBlockTree? blockTree, IReceiptStorage? receiptStorage, ILogManager? logManager) + public ReceiptCanonicalityMonitor(IReceiptStorage? receiptStorage, ILogManager? logManager) { - _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); _receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage)); _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); - _blockTree.BlockAddedToMain += OnBlockAddedToMain; + _receiptStorage.ReceiptsInserted += OnBlockAddedToMain; } private void OnBlockAddedToMain(object sender, BlockReplacementEventArgs e) { - _receiptStorage.EnsureCanonical(e.Block); - // we don't want this to be on main processing thread Task.Run(() => TriggerReceiptInsertedEvent(e.Block, e.PreviousBlock)); } @@ -59,7 +55,7 @@ private void TriggerReceiptInsertedEvent(Block newBlock, Block? previousBlock) public void Dispose() { - _blockTree.BlockAddedToMain -= OnBlockAddedToMain; + _receiptStorage.ReceiptsInserted -= OnBlockAddedToMain; } } } diff --git a/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptStorage.cs b/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptStorage.cs index f1abcef89c9..236540cbe48 100644 --- a/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptStorage.cs +++ b/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptStorage.cs @@ -3,6 +3,7 @@ using Nethermind.Core; using Nethermind.Core.Crypto; +using System; namespace Nethermind.Blockchain.Receipts { @@ -14,5 +15,10 @@ public interface IReceiptStorage : IReceiptFinder long MigratedBlockNumber { get; set; } bool HasBlock(long blockNumber, Hash256 hash); void EnsureCanonical(Block block); + + /// + /// Receipts for a block are inserted + /// + event EventHandler ReceiptsInserted; } } diff --git a/src/Nethermind/Nethermind.Blockchain/Receipts/InMemoryReceiptStorage.cs b/src/Nethermind/Nethermind.Blockchain/Receipts/InMemoryReceiptStorage.cs index a118a0aec87..4b78996a0ac 100644 --- a/src/Nethermind/Nethermind.Blockchain/Receipts/InMemoryReceiptStorage.cs +++ b/src/Nethermind/Nethermind.Blockchain/Receipts/InMemoryReceiptStorage.cs @@ -11,13 +11,27 @@ namespace Nethermind.Blockchain.Receipts public class InMemoryReceiptStorage : IReceiptStorage { private readonly bool _allowReceiptIterator; + private readonly IBlockTree? _blockTree; private readonly ConcurrentDictionary _receipts = new(); private readonly ConcurrentDictionary _transactions = new(); - public InMemoryReceiptStorage(bool allowReceiptIterator = true) +#pragma warning disable CS0067 + public event EventHandler ReceiptsInserted; +#pragma warning restore CS0067 + + public InMemoryReceiptStorage(bool allowReceiptIterator = true, IBlockTree? blockTree = null) { _allowReceiptIterator = allowReceiptIterator; + _blockTree = blockTree; + if (_blockTree is not null) + _blockTree.BlockAddedToMain += BlockTree_BlockAddedToMain; + } + + private void BlockTree_BlockAddedToMain(object? sender, BlockReplacementEventArgs e) + { + EnsureCanonical(e.Block); + ReceiptsInserted?.Invoke(this, e); } public Hash256 FindBlockHash(Hash256 txHash) diff --git a/src/Nethermind/Nethermind.Blockchain/Receipts/NullReceiptStorage.cs b/src/Nethermind/Nethermind.Blockchain/Receipts/NullReceiptStorage.cs index 22f6b7fdc21..802c8d858e5 100644 --- a/src/Nethermind/Nethermind.Blockchain/Receipts/NullReceiptStorage.cs +++ b/src/Nethermind/Nethermind.Blockchain/Receipts/NullReceiptStorage.cs @@ -11,6 +11,10 @@ public class NullReceiptStorage : IReceiptStorage { public static NullReceiptStorage Instance { get; } = new(); +#pragma warning disable CS0067 + public event EventHandler ReceiptsInserted; +#pragma warning restore CS0067 + public Hash256? FindBlockHash(Hash256 hash) => null; private NullReceiptStorage() diff --git a/src/Nethermind/Nethermind.Blockchain/Receipts/PersistentReceiptStorage.cs b/src/Nethermind/Nethermind.Blockchain/Receipts/PersistentReceiptStorage.cs index ca972f1aa9a..a186d13d1d2 100644 --- a/src/Nethermind/Nethermind.Blockchain/Receipts/PersistentReceiptStorage.cs +++ b/src/Nethermind/Nethermind.Blockchain/Receipts/PersistentReceiptStorage.cs @@ -39,6 +39,8 @@ public class PersistentReceiptStorage : IReceiptStorage private const int CacheSize = 64; private readonly LruCache _receiptsCache = new(CacheSize, CacheSize, "receipts"); + public event EventHandler ReceiptsInserted; + public PersistentReceiptStorage( IColumnsDb receiptsDb, ISpecProvider specProvider, @@ -78,6 +80,8 @@ private void BlockTreeOnBlockAddedToMain(object? sender, BlockReplacementEventAr { RemoveBlockTx(e.PreviousBlock, e.Block); } + EnsureCanonical(e.Block); + ReceiptsInserted?.Invoke(this, e); // Dont block main loop Task.Run(() => diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs index 232c125c38b..3a8c660c9de 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs @@ -260,7 +260,7 @@ protected virtual TxReceipt[] ProcessBlock( // TODO: block processor pipeline private void StoreTxReceipts(Block block, TxReceipt[] txReceipts) { - // Setting canonical is done by ReceiptCanonicalityMonitor on block move to main + // Setting canonical is done when the BlockAddedToMain event is firec _receiptStorage.Insert(block, txReceipts, false); } diff --git a/src/Nethermind/Nethermind.Core.Test/Blockchain/TestBlockchain.cs b/src/Nethermind/Nethermind.Core.Test/Blockchain/TestBlockchain.cs index 4c83df1211f..4215fda3fa3 100644 --- a/src/Nethermind/Nethermind.Core.Test/Blockchain/TestBlockchain.cs +++ b/src/Nethermind/Nethermind.Core.Test/Blockchain/TestBlockchain.cs @@ -160,13 +160,13 @@ protected virtual async Task Build(ISpecProvider? specProvider = _trieStoreWatcher = new TrieStoreBoundaryWatcher(WorldStateManager, BlockTree, LogManager); - ReceiptStorage = new InMemoryReceiptStorage(); + ReceiptStorage = new InMemoryReceiptStorage(blockTree: BlockTree); VirtualMachine virtualMachine = new(new BlockhashProvider(BlockTree, LogManager), SpecProvider, LogManager); TxProcessor = new TransactionProcessor(SpecProvider, State, virtualMachine, LogManager); BlockPreprocessorStep = new RecoverSignatures(EthereumEcdsa, TxPool, SpecProvider, LogManager); HeaderValidator = new HeaderValidator(BlockTree, Always.Valid, SpecProvider, LogManager); - new ReceiptCanonicalityMonitor(BlockTree, ReceiptStorage, LogManager); + new ReceiptCanonicalityMonitor(ReceiptStorage, LogManager); BlockValidator = new BlockValidator( new TxValidator(SpecProvider.ChainId), diff --git a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs index 798b19e6b91..f87e07b38f7 100644 --- a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs +++ b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs @@ -73,7 +73,7 @@ protected virtual Task InitBlockchain() IStateReader stateReader = setApi.StateReader!; ITxPool txPool = _api.TxPool = CreateTxPool(); - ReceiptCanonicalityMonitor receiptCanonicalityMonitor = new(getApi.BlockTree, getApi.ReceiptStorage, _api.LogManager); + ReceiptCanonicalityMonitor receiptCanonicalityMonitor = new(getApi.ReceiptStorage, _api.LogManager); getApi.DisposeStack.Push(receiptCanonicalityMonitor); _api.ReceiptMonitor = receiptCanonicalityMonitor; diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/Modules/SubscribeModuleTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/Modules/SubscribeModuleTests.cs index 123497a5aa1..e2c033e4925 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/Modules/SubscribeModuleTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/Modules/SubscribeModuleTests.cs @@ -64,7 +64,7 @@ public void Setup() _filterStore = new FilterStore(); _jsonRpcDuplexClient = Substitute.For(); _jsonSerializer = new EthereumJsonSerializer(); - _receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_blockTree, _receiptStorage, _logManager); + _receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager); _syncConfig = new SyncConfig(); IJsonSerializer jsonSerializer = new EthereumJsonSerializer(); @@ -114,6 +114,7 @@ private JsonRpcResult GetBlockAddedToMainResult(BlockReplacementEventArgs blockR })); _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); + _receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockReplacementEventArgs); manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(1000)).Should().Be(shouldReceiveResult); subscriptionId = newHeadSubscription.Id; @@ -133,6 +134,7 @@ private List GetLogsSubscriptionResult(Filter filter, BlockReplac })); _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs); + _receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs); semaphoreSlim.Wait(TimeSpan.FromMilliseconds(100)); subscriptionId = logsSubscription.Id; @@ -713,6 +715,7 @@ public void LogsSubscription_should_not_send_logs_of_new_txs_on_ReceiptsInserted BlockReplacementEventArgs blockEventArgs = new(block); _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs); + _receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs); manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); jsonRpcResults.Count.Should().Be(1); @@ -892,6 +895,7 @@ public async Task MultipleSubscriptions_concurrent_fast_messages(int messages) { BlockReplacementEventArgs eventArgs = new(Build.A.Block.TestObject); blockTree.BlockAddedToMain += Raise.EventWith(eventArgs); + _receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), eventArgs); } }); @@ -1148,6 +1152,7 @@ public void LogsSubscription_can_send_logs_with_removed_txs_when_inserted() })); _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs); + _receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs); manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); jsonRpcResults.Count.Should().Be(1); diff --git a/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs b/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs index 8762884e529..fe452fb0691 100644 --- a/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs +++ b/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs @@ -161,7 +161,9 @@ public void EnsureCanonical(Block block) { } - public event EventHandler ReceiptsInserted { add { } remove { } } +#pragma warning disable CS0067 + public event EventHandler ReceiptsInserted; +#pragma warning restore CS0067 } } }