Skip to content

Commit

Permalink
[Fix] Potential fix to missing tx index sometimes (related to reorgs) (
Browse files Browse the repository at this point in the history
…#6422)

* fixes

* update unit tests to reflect reorg

---------

Co-authored-by: Ahmad Mazen Radwan Bitar <am.bitar@diyarme.com>
  • Loading branch information
smartprogrammer93 and Ahmad Mazen Radwan Bitar authored Dec 29, 2023
1 parent 6a9c76d commit c4767a4
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void Setup()
_blockTree = Substitute.For<IBlockTree>();
_txPool = Substitute.For<ITxPool>();
_receiptStorage = Substitute.For<IReceiptStorage>();
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_blockTree, _receiptStorage, _logManager);
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager);
_specProvider = Substitute.For<ISpecProvider>();
_userOperationPools[_testPoolAddress] = Substitute.For<IUserOperationPool>();
_filterStore = new FilterStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,22 @@ public void When_TxLookupLimitIs_NegativeOne_DoNotIndexTxHash()
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().BeNull();
}

[Test]
public void Should_not_index_tx_hash_if_blockNumber_is_negative()
[TestCase(1L, false)]
[TestCase(10L, false)]
[TestCase(11L, true)]
public void Should_only_prune_index_tx_hashes_if_blockNumber_is_bigger_than_lookupLimit(long blockNumber, bool WillPruneOldIndicies)
{
_receiptConfig.TxLookupLimit = 10;
CreateStorage();
_blockTree.BlockAddedToMain +=
Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.WithNumber(1).TestObject));
Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.WithNumber(blockNumber).TestObject));
Thread.Sleep(100);
IEnumerable<ICall> calls = _blockTree.ReceivedCalls()
.Where(call => !call.GetMethodInfo().Name.EndsWith(nameof(_blockTree.BlockAddedToMain)));
calls.Should().BeEmpty();
.Where(call => call.GetMethodInfo().Name.EndsWith(nameof(_blockTree.FindBlock)));
if (WillPruneOldIndicies)
calls.Should().NotBeEmpty();
else
calls.Should().BeEmpty();
}

[Test]
Expand Down Expand Up @@ -342,27 +347,49 @@ public void When_NewHeadBlock_Remove_TxIndex_OfRemovedBlock()
[Test]
public async Task When_NewHeadBlock_Remove_TxIndex_OfRemovedBlock_Unless_ItsAlsoInNewBlock()
{
_receiptConfig.CompactTxIndex = _useCompactReceipts;
CreateStorage();
(Block block, TxReceipt[] receipts) = InsertBlock();
Block block2 = Build.A.Block
.WithParent(block)
.WithNumber(2)
.WithTransactions(Build.A.Transaction.SignedAndResolved(TestItem.PrivateKeyC).TestObject)
.TestObject;
_blockTree.FindBestSuggestedHeader().Returns(block2.Header);
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block2));

if (_receiptConfig.CompactTxIndex)
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block.Number).Bytes);
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block.Number).Bytes);
}
else
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().NotBeNull();
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(block.Hash!.Bytes.ToArray());
}

Block newHead = Build.A.Block
Block block3 = Build.A.Block
.WithNumber(1)
.WithTransactions(block2.Transactions)
.WithExtraData(new byte[1])
.TestObject;
Block block4 = Build.A.Block
.WithNumber(2)
.WithTransactions(block.Transactions)
.WithExtraData(new byte[1])
.TestObject;
_blockTree.FindBestSuggestedHeader().Returns(newHead.Header);
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(newHead, block));
_blockTree.FindBestSuggestedHeader().Returns(block4.Header);
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block3, block));
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block4, block2));

await Task.Delay(100);
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().NotBeNull();
if (_receiptConfig.CompactTxIndex)
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block4.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block4.Number).Bytes);
}
else
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block4.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(block4.Hash!.Bytes.ToArray());
}
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@ public interface IReceiptMonitor : IDisposable

public class ReceiptCanonicalityMonitor : IReceiptMonitor
{
private readonly IBlockTree _blockTree;
private readonly IReceiptStorage _receiptStorage;
private readonly ILogger _logger;

public event EventHandler<ReceiptsEventArgs>? ReceiptsInserted;

public ReceiptCanonicalityMonitor(IBlockTree? blockTree, IReceiptStorage? receiptStorage, ILogManager? logManager)
public ReceiptCanonicalityMonitor(IReceiptStorage? receiptStorage, ILogManager? logManager)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage));
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_blockTree.BlockAddedToMain += OnBlockAddedToMain;
_receiptStorage.ReceiptsInserted += OnBlockAddedToMain;
}

private void OnBlockAddedToMain(object sender, BlockReplacementEventArgs e)
{
_receiptStorage.EnsureCanonical(e.Block);

// we don't want this to be on main processing thread
Task.Run(() => TriggerReceiptInsertedEvent(e.Block, e.PreviousBlock));
}
Expand All @@ -59,7 +55,7 @@ private void TriggerReceiptInsertedEvent(Block newBlock, Block? previousBlock)

public void Dispose()
{
_blockTree.BlockAddedToMain -= OnBlockAddedToMain;
_receiptStorage.ReceiptsInserted -= OnBlockAddedToMain;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Nethermind.Core;
using Nethermind.Core.Crypto;
using System;

namespace Nethermind.Blockchain.Receipts
{
Expand All @@ -14,5 +15,10 @@ public interface IReceiptStorage : IReceiptFinder
long MigratedBlockNumber { get; set; }
bool HasBlock(long blockNumber, Hash256 hash);
void EnsureCanonical(Block block);

/// <summary>
/// Receipts for a block are inserted
/// </summary>
event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@ namespace Nethermind.Blockchain.Receipts
public class InMemoryReceiptStorage : IReceiptStorage
{
private readonly bool _allowReceiptIterator;
private readonly IBlockTree? _blockTree;
private readonly ConcurrentDictionary<Hash256, TxReceipt[]> _receipts = new();

private readonly ConcurrentDictionary<Hash256, TxReceipt> _transactions = new();

public InMemoryReceiptStorage(bool allowReceiptIterator = true)
#pragma warning disable CS0067
public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
#pragma warning restore CS0067

public InMemoryReceiptStorage(bool allowReceiptIterator = true, IBlockTree? blockTree = null)
{
_allowReceiptIterator = allowReceiptIterator;
_blockTree = blockTree;
if (_blockTree is not null)
_blockTree.BlockAddedToMain += BlockTree_BlockAddedToMain;
}

private void BlockTree_BlockAddedToMain(object? sender, BlockReplacementEventArgs e)
{
EnsureCanonical(e.Block);
ReceiptsInserted?.Invoke(this, e);
}

public Hash256 FindBlockHash(Hash256 txHash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ public class NullReceiptStorage : IReceiptStorage
{
public static NullReceiptStorage Instance { get; } = new();

#pragma warning disable CS0067
public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
#pragma warning restore CS0067

public Hash256? FindBlockHash(Hash256 hash) => null;

private NullReceiptStorage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class PersistentReceiptStorage : IReceiptStorage
private const int CacheSize = 64;
private readonly LruCache<ValueHash256, TxReceipt[]> _receiptsCache = new(CacheSize, CacheSize, "receipts");

public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;

public PersistentReceiptStorage(
IColumnsDb<ReceiptsColumns> receiptsDb,
ISpecProvider specProvider,
Expand Down Expand Up @@ -78,6 +80,8 @@ private void BlockTreeOnBlockAddedToMain(object? sender, BlockReplacementEventAr
{
RemoveBlockTx(e.PreviousBlock, e.Block);
}
EnsureCanonical(e.Block);
ReceiptsInserted?.Invoke(this, e);

// Dont block main loop
Task.Run(() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ protected virtual TxReceipt[] ProcessBlock(
// TODO: block processor pipeline
private void StoreTxReceipts(Block block, TxReceipt[] txReceipts)
{
// Setting canonical is done by ReceiptCanonicalityMonitor on block move to main
// Setting canonical is done when the BlockAddedToMain event is firec
_receiptStorage.Insert(block, txReceipts, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ protected virtual async Task<TestBlockchain> Build(ISpecProvider? specProvider =

_trieStoreWatcher = new TrieStoreBoundaryWatcher(WorldStateManager, BlockTree, LogManager);

ReceiptStorage = new InMemoryReceiptStorage();
ReceiptStorage = new InMemoryReceiptStorage(blockTree: BlockTree);
VirtualMachine virtualMachine = new(new BlockhashProvider(BlockTree, LogManager), SpecProvider, LogManager);
TxProcessor = new TransactionProcessor(SpecProvider, State, virtualMachine, LogManager);
BlockPreprocessorStep = new RecoverSignatures(EthereumEcdsa, TxPool, SpecProvider, LogManager);
HeaderValidator = new HeaderValidator(BlockTree, Always.Valid, SpecProvider, LogManager);

new ReceiptCanonicalityMonitor(BlockTree, ReceiptStorage, LogManager);
new ReceiptCanonicalityMonitor(ReceiptStorage, LogManager);

BlockValidator = new BlockValidator(
new TxValidator(SpecProvider.ChainId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected virtual Task InitBlockchain()
IStateReader stateReader = setApi.StateReader!;
ITxPool txPool = _api.TxPool = CreateTxPool();

ReceiptCanonicalityMonitor receiptCanonicalityMonitor = new(getApi.BlockTree, getApi.ReceiptStorage, _api.LogManager);
ReceiptCanonicalityMonitor receiptCanonicalityMonitor = new(getApi.ReceiptStorage, _api.LogManager);
getApi.DisposeStack.Push(receiptCanonicalityMonitor);
_api.ReceiptMonitor = receiptCanonicalityMonitor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void Setup()
_filterStore = new FilterStore();
_jsonRpcDuplexClient = Substitute.For<IJsonRpcDuplexClient>();
_jsonSerializer = new EthereumJsonSerializer();
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_blockTree, _receiptStorage, _logManager);
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager);
_syncConfig = new SyncConfig();

IJsonSerializer jsonSerializer = new EthereumJsonSerializer();
Expand Down Expand Up @@ -114,6 +114,7 @@ private JsonRpcResult GetBlockAddedToMainResult(BlockReplacementEventArgs blockR
}));

_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockReplacementEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(1000)).Should().Be(shouldReceiveResult);

subscriptionId = newHeadSubscription.Id;
Expand All @@ -133,6 +134,7 @@ private List<JsonRpcResult> GetLogsSubscriptionResult(Filter filter, BlockReplac
}));

_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs);
semaphoreSlim.Wait(TimeSpan.FromMilliseconds(100));

subscriptionId = logsSubscription.Id;
Expand Down Expand Up @@ -713,6 +715,7 @@ public void LogsSubscription_should_not_send_logs_of_new_txs_on_ReceiptsInserted

BlockReplacementEventArgs blockEventArgs = new(block);
_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200));

jsonRpcResults.Count.Should().Be(1);
Expand Down Expand Up @@ -892,6 +895,7 @@ public async Task MultipleSubscriptions_concurrent_fast_messages(int messages)
{
BlockReplacementEventArgs eventArgs = new(Build.A.Block.TestObject);
blockTree.BlockAddedToMain += Raise.EventWith(eventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), eventArgs);
}
});

Expand Down Expand Up @@ -1148,6 +1152,7 @@ public void LogsSubscription_can_send_logs_with_removed_txs_when_inserted()
}));

_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(2000));

jsonRpcResults.Count.Should().Be(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public void EnsureCanonical(Block block)
{
}

public event EventHandler<ReceiptsEventArgs> ReceiptsInserted { add { } remove { } }
#pragma warning disable CS0067
public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
#pragma warning restore CS0067
}
}
}

0 comments on commit c4767a4

Please sign in to comment.