Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Potential fix to missing tx index sometimes (related to reorgs) #6422

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void Setup()
_blockTree = Substitute.For<IBlockTree>();
_txPool = Substitute.For<ITxPool>();
_receiptStorage = Substitute.For<IReceiptStorage>();
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_blockTree, _receiptStorage, _logManager);
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager);
_specProvider = Substitute.For<ISpecProvider>();
_userOperationPools[_testPoolAddress] = Substitute.For<IUserOperationPool>();
_filterStore = new FilterStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,22 @@ public void When_TxLookupLimitIs_NegativeOne_DoNotIndexTxHash()
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().BeNull();
}

[Test]
public void Should_not_index_tx_hash_if_blockNumber_is_negative()
[TestCase(1L, false)]
[TestCase(10L, false)]
[TestCase(11L, true)]
public void Should_only_prune_index_tx_hashes_if_blockNumber_is_bigger_than_lookupLimit(long blockNumber, bool WillPruneOldIndicies)
{
_receiptConfig.TxLookupLimit = 10;
CreateStorage();
_blockTree.BlockAddedToMain +=
Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.WithNumber(1).TestObject));
Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.WithNumber(blockNumber).TestObject));
Thread.Sleep(100);
IEnumerable<ICall> calls = _blockTree.ReceivedCalls()
.Where(call => !call.GetMethodInfo().Name.EndsWith(nameof(_blockTree.BlockAddedToMain)));
calls.Should().BeEmpty();
.Where(call => call.GetMethodInfo().Name.EndsWith(nameof(_blockTree.FindBlock)));
if (WillPruneOldIndicies)
calls.Should().NotBeEmpty();
else
calls.Should().BeEmpty();
}

[Test]
Expand Down Expand Up @@ -342,27 +347,49 @@ public void When_NewHeadBlock_Remove_TxIndex_OfRemovedBlock()
[Test]
public async Task When_NewHeadBlock_Remove_TxIndex_OfRemovedBlock_Unless_ItsAlsoInNewBlock()
{
_receiptConfig.CompactTxIndex = _useCompactReceipts;
CreateStorage();
(Block block, TxReceipt[] receipts) = InsertBlock();
Block block2 = Build.A.Block
.WithParent(block)
.WithNumber(2)
.WithTransactions(Build.A.Transaction.SignedAndResolved(TestItem.PrivateKeyC).TestObject)
.TestObject;
_blockTree.FindBestSuggestedHeader().Returns(block2.Header);
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block2));

if (_receiptConfig.CompactTxIndex)
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block.Number).Bytes);
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block.Number).Bytes);
}
else
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().NotBeNull();
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(block.Hash!.Bytes.ToArray());
}

Block newHead = Build.A.Block
Block block3 = Build.A.Block
.WithNumber(1)
.WithTransactions(block2.Transactions)
.WithExtraData(new byte[1])
.TestObject;
Block block4 = Build.A.Block
.WithNumber(2)
.WithTransactions(block.Transactions)
.WithExtraData(new byte[1])
.TestObject;
_blockTree.FindBestSuggestedHeader().Returns(newHead.Header);
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(newHead, block));
_blockTree.FindBestSuggestedHeader().Returns(block4.Header);
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block3, block));
_blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(block4, block2));

await Task.Delay(100);
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[receipts[0].TxHash!.Bytes].Should().NotBeNull();
if (_receiptConfig.CompactTxIndex)
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block4.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(Rlp.Encode(block4.Number).Bytes);
}
else
{
_receiptsDb.GetColumnDb(ReceiptsColumns.Transactions)[block4.Transactions[0].Hash!.Bytes].Should().BeEquivalentTo(block4.Hash!.Bytes.ToArray());
}
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@ public interface IReceiptMonitor : IDisposable

public class ReceiptCanonicalityMonitor : IReceiptMonitor
{
private readonly IBlockTree _blockTree;
private readonly IReceiptStorage _receiptStorage;
private readonly ILogger _logger;

public event EventHandler<ReceiptsEventArgs>? ReceiptsInserted;

public ReceiptCanonicalityMonitor(IBlockTree? blockTree, IReceiptStorage? receiptStorage, ILogManager? logManager)
public ReceiptCanonicalityMonitor(IReceiptStorage? receiptStorage, ILogManager? logManager)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage));
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_blockTree.BlockAddedToMain += OnBlockAddedToMain;
_receiptStorage.ReceiptsInserted += OnBlockAddedToMain;
}

private void OnBlockAddedToMain(object sender, BlockReplacementEventArgs e)
{
_receiptStorage.EnsureCanonical(e.Block);

// we don't want this to be on main processing thread
Task.Run(() => TriggerReceiptInsertedEvent(e.Block, e.PreviousBlock));
}
Expand All @@ -59,7 +55,7 @@ private void TriggerReceiptInsertedEvent(Block newBlock, Block? previousBlock)

public void Dispose()
{
_blockTree.BlockAddedToMain -= OnBlockAddedToMain;
_receiptStorage.ReceiptsInserted -= OnBlockAddedToMain;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Nethermind.Core;
using Nethermind.Core.Crypto;
using System;

namespace Nethermind.Blockchain.Receipts
{
Expand All @@ -14,5 +15,10 @@ public interface IReceiptStorage : IReceiptFinder
long MigratedBlockNumber { get; set; }
bool HasBlock(long blockNumber, Hash256 hash);
void EnsureCanonical(Block block);

/// <summary>
/// Receipts for a block are inserted
/// </summary>
event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@ namespace Nethermind.Blockchain.Receipts
public class InMemoryReceiptStorage : IReceiptStorage
{
private readonly bool _allowReceiptIterator;
private readonly IBlockTree? _blockTree;
private readonly ConcurrentDictionary<Hash256, TxReceipt[]> _receipts = new();

private readonly ConcurrentDictionary<Hash256, TxReceipt> _transactions = new();

public InMemoryReceiptStorage(bool allowReceiptIterator = true)
#pragma warning disable CS0067
public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
#pragma warning restore CS0067

public InMemoryReceiptStorage(bool allowReceiptIterator = true, IBlockTree? blockTree = null)
{
_allowReceiptIterator = allowReceiptIterator;
_blockTree = blockTree;
if (_blockTree is not null)
_blockTree.BlockAddedToMain += BlockTree_BlockAddedToMain;
}

private void BlockTree_BlockAddedToMain(object? sender, BlockReplacementEventArgs e)
{
EnsureCanonical(e.Block);
ReceiptsInserted?.Invoke(this, e);
}

public Hash256 FindBlockHash(Hash256 txHash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ public class NullReceiptStorage : IReceiptStorage
{
public static NullReceiptStorage Instance { get; } = new();

#pragma warning disable CS0067
public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
#pragma warning restore CS0067

public Hash256? FindBlockHash(Hash256 hash) => null;

private NullReceiptStorage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class PersistentReceiptStorage : IReceiptStorage
private const int CacheSize = 64;
private readonly LruCache<ValueHash256, TxReceipt[]> _receiptsCache = new(CacheSize, CacheSize, "receipts");

public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;

public PersistentReceiptStorage(
IColumnsDb<ReceiptsColumns> receiptsDb,
ISpecProvider specProvider,
Expand Down Expand Up @@ -78,6 +80,8 @@ private void BlockTreeOnBlockAddedToMain(object? sender, BlockReplacementEventAr
{
RemoveBlockTx(e.PreviousBlock, e.Block);
}
EnsureCanonical(e.Block);
ReceiptsInserted?.Invoke(this, e);

// Dont block main loop
Task.Run(() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ protected virtual TxReceipt[] ProcessBlock(
// TODO: block processor pipeline
private void StoreTxReceipts(Block block, TxReceipt[] txReceipts)
{
// Setting canonical is done by ReceiptCanonicalityMonitor on block move to main
// Setting canonical is done when the BlockAddedToMain event is firec
_receiptStorage.Insert(block, txReceipts, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ protected virtual async Task<TestBlockchain> Build(ISpecProvider? specProvider =

_trieStoreWatcher = new TrieStoreBoundaryWatcher(WorldStateManager, BlockTree, LogManager);

ReceiptStorage = new InMemoryReceiptStorage();
ReceiptStorage = new InMemoryReceiptStorage(blockTree: BlockTree);
VirtualMachine virtualMachine = new(new BlockhashProvider(BlockTree, LogManager), SpecProvider, LogManager);
TxProcessor = new TransactionProcessor(SpecProvider, State, virtualMachine, LogManager);
BlockPreprocessorStep = new RecoverSignatures(EthereumEcdsa, TxPool, SpecProvider, LogManager);
HeaderValidator = new HeaderValidator(BlockTree, Always.Valid, SpecProvider, LogManager);

new ReceiptCanonicalityMonitor(BlockTree, ReceiptStorage, LogManager);
new ReceiptCanonicalityMonitor(ReceiptStorage, LogManager);

BlockValidator = new BlockValidator(
new TxValidator(SpecProvider.ChainId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected virtual Task InitBlockchain()
IStateReader stateReader = setApi.StateReader!;
ITxPool txPool = _api.TxPool = CreateTxPool();

ReceiptCanonicalityMonitor receiptCanonicalityMonitor = new(getApi.BlockTree, getApi.ReceiptStorage, _api.LogManager);
ReceiptCanonicalityMonitor receiptCanonicalityMonitor = new(getApi.ReceiptStorage, _api.LogManager);
getApi.DisposeStack.Push(receiptCanonicalityMonitor);
_api.ReceiptMonitor = receiptCanonicalityMonitor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void Setup()
_filterStore = new FilterStore();
_jsonRpcDuplexClient = Substitute.For<IJsonRpcDuplexClient>();
_jsonSerializer = new EthereumJsonSerializer();
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_blockTree, _receiptStorage, _logManager);
_receiptCanonicalityMonitor = new ReceiptCanonicalityMonitor(_receiptStorage, _logManager);
_syncConfig = new SyncConfig();

IJsonSerializer jsonSerializer = new EthereumJsonSerializer();
Expand Down Expand Up @@ -114,6 +114,7 @@ private JsonRpcResult GetBlockAddedToMainResult(BlockReplacementEventArgs blockR
}));

_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockReplacementEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(1000)).Should().Be(shouldReceiveResult);

subscriptionId = newHeadSubscription.Id;
Expand All @@ -133,6 +134,7 @@ private List<JsonRpcResult> GetLogsSubscriptionResult(Filter filter, BlockReplac
}));

_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs);
semaphoreSlim.Wait(TimeSpan.FromMilliseconds(100));

subscriptionId = logsSubscription.Id;
Expand Down Expand Up @@ -713,6 +715,7 @@ public void LogsSubscription_should_not_send_logs_of_new_txs_on_ReceiptsInserted

BlockReplacementEventArgs blockEventArgs = new(block);
_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200));

jsonRpcResults.Count.Should().Be(1);
Expand Down Expand Up @@ -892,6 +895,7 @@ public async Task MultipleSubscriptions_concurrent_fast_messages(int messages)
{
BlockReplacementEventArgs eventArgs = new(Build.A.Block.TestObject);
blockTree.BlockAddedToMain += Raise.EventWith(eventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), eventArgs);
}
});

Expand Down Expand Up @@ -1148,6 +1152,7 @@ public void LogsSubscription_can_send_logs_with_removed_txs_when_inserted()
}));

_blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockEventArgs);
_receiptStorage.ReceiptsInserted += Raise.EventWith(new object(), blockEventArgs);
manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(2000));

jsonRpcResults.Count.Should().Be(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public void EnsureCanonical(Block block)
{
}

public event EventHandler<ReceiptsEventArgs> ReceiptsInserted { add { } remove { } }
#pragma warning disable CS0067
public event EventHandler<BlockReplacementEventArgs> ReceiptsInserted;
#pragma warning restore CS0067
}
}
}