Skip to content

Commit

Permalink
Fix hang
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap committed Nov 25, 2024
1 parent 0fb4fc9 commit 202b678
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Nethermind.Synchronization.FastSync;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
using Nethermind.Trie;
using Nethermind.Trie.Pruning;
using NSubstitute;
using NUnit.Framework;
Expand Down Expand Up @@ -412,5 +413,53 @@ public async Task When_empty_response_received_with_no_peer_return_not_allocated
ctx.Feed.HandleResponse(request, peer: null)
.Should().Be(SyncResponseHandlingResult.NotAssigned);
}

[Test]
[Repeat(TestRepeatCount)]
public async Task RepairPossiblyMissingStorage()
{
DbContext dbContext = new(_logger, _logManager)
{
RemoteCodeDb =
{
[Keccak.Compute(TrieScenarios.Code0).Bytes] = TrieScenarios.Code0,
[Keccak.Compute(TrieScenarios.Code1).Bytes] = TrieScenarios.Code1,
[Keccak.Compute(TrieScenarios.Code2).Bytes] = TrieScenarios.Code2,
[Keccak.Compute(TrieScenarios.Code3).Bytes] = TrieScenarios.Code3,
},
};

Hash256 theAccount = TestItem.KeccakA;
StorageTree storageTree = new StorageTree(dbContext.RemoteTrieStore.GetTrieStore(theAccount), LimboLogs.Instance);
for (int i = 0; i < 10; i++)
{
storageTree.Set((UInt256)i, TestItem.Keccaks[i].BytesToArray());
}
storageTree.Commit();

StateTree state = dbContext.RemoteStateTree;
state.Set(TestItem.KeccakA, Build.An.Account.WithNonce(1).WithStorageRoot(storageTree.RootHash).TestObject);
state.Set(TestItem.KeccakB, Build.An.Account.WithNonce(1).TestObject);
state.Set(TestItem.KeccakC, Build.An.Account.WithNonce(1).TestObject);
state.Commit();

// Local state only have the state
state = dbContext.LocalStateTree;
state.Set(TestItem.KeccakA, Build.An.Account.WithNonce(1).WithStorageRoot(storageTree.RootHash).TestObject);
state.Set(TestItem.KeccakB, Build.An.Account.WithNonce(1).TestObject);
state.Set(TestItem.KeccakC, Build.An.Account.WithNonce(1).TestObject);
state.Commit();

// Local state missing root so that it would start
dbContext.LocalNodeStorage.Set(null, TreePath.Empty, state.RootHash, null);

await using IContainer container = PrepareDownloader(dbContext);
container.Resolve<StateSyncPivot>().UpdatedStorages.Add(theAccount);

SafeContext ctx = container.Resolve<SafeContext>();
await ActivateAndWait(ctx);

dbContext.CompareTrees("END");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected ContainerBuilder BuildTestContainerBuilder(DbContext dbContext, int sy
SyncDispatcherAllocateTimeoutMs = syncDispatcherAllocateTimeoutMs, // there is a test for requested nodes which get affected if allocate timeout
FastSync = true
}))
.AddSingleton<ILogManager>(_logManager)
.AddKeyedSingleton<IDb>(DbNames.Code, dbContext.LocalCodeDb)
.AddKeyedSingleton<IDb>(DbNames.State, dbContext.LocalStateDb)
.AddSingleton<INodeStorage>(dbContext.LocalNodeStorage)
Expand Down
112 changes: 30 additions & 82 deletions src/Nethermind/Nethermind.Synchronization/FastSync/TreeSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class TreeSync : ITreeSync
// below which need to be cleared atomically during reset root, hence the write lock, while allowing
// concurrent request handling with the read lock.
private readonly ReaderWriterLockSlim _syncStateLock = new();
private readonly ConcurrentDictionary<StateSyncBatch, object?> _pendingRequests = new();
private readonly ConcurrentDictionary<StateSyncBatch, object?> _ongoingRequests = new();
private Dictionary<StateSyncItem.NodeKey, HashSet<DependentItem>> _dependencies = new();
private readonly LruKeyCache<StateSyncItem.NodeKey> _alreadySavedNode = new(AlreadySavedCapacity, "saved nodes");
private readonly LruKeyCache<ValueHash256> _alreadySavedCode = new(AlreadySavedCapacity, "saved nodes");
Expand Down Expand Up @@ -113,7 +113,7 @@ public TreeSync([KeyFilter(DbNames.Code)] IDb codeDb, INodeStorage nodeStorage,

if (_logger.IsTrace) _logger.Trace($"After preparing a request of {requestItems.Count} from ({_pendingItems.Description}) nodes | {_dependencies.Count}");
if (_logger.IsTrace) _logger.Trace($"Adding pending request {result}");
_pendingRequests.TryAdd(result, null);
_ongoingRequests.TryAdd(result, null);

Interlocked.Increment(ref Metrics.StateSyncRequests);
return await Task.FromResult(result);
Expand Down Expand Up @@ -149,7 +149,7 @@ public SyncResponseHandlingResult HandleResponse(StateSyncBatch? batch, PeerInfo
_syncStateLock.EnterReadLock();
try
{
if (!_pendingRequests.TryRemove(batch, out _))
if (!_ongoingRequests.TryRemove(batch, out _))
{
if (_logger.IsDebug) _logger.Debug($"Cannot remove pending request {batch}");
return SyncResponseHandlingResult.OK;
Expand Down Expand Up @@ -293,7 +293,7 @@ shorter than the request */
? SyncResponseHandlingResult.LesserQuality
: SyncResponseHandlingResult.OK;

_data.DisplayProgressReport(_pendingRequests.Count, _branchProgress, _logger);
_data.DisplayProgressReport(_ongoingRequests.Count, _branchProgress, _logger);

long elapsedTime = (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds;
long total = elapsedTime + _networkWatch.ElapsedMilliseconds;
Expand Down Expand Up @@ -334,7 +334,7 @@ shorter than the request */

public (bool continueProcessing, bool finishSyncRound) ValidatePrepareRequest(SyncMode currentSyncMode)
{
if (_rootSaved == 1 && _pendingRequests.Count == 0)
if (_rootSaved == 1)
{
if (_logger.IsInfo) _logger.Info("StateNode sync: falling asleep - root saved");
VerifyPostSyncCleanUp();
Expand Down Expand Up @@ -446,7 +446,7 @@ private void ResetStateRoot(long blockNumber, Hash256 stateRoot, SyncFeedState c
}
else
{
foreach ((StateSyncBatch pendingRequest, _) in _pendingRequests)
foreach ((StateSyncBatch pendingRequest, _) in _ongoingRequests)
{
// re-add the pending request
for (int i = 0; i < pendingRequest.RequestedNodes?.Count; i++)
Expand All @@ -458,7 +458,7 @@ private void ResetStateRoot(long blockNumber, Hash256 stateRoot, SyncFeedState c
}
}

_pendingRequests.Clear();
_ongoingRequests.Clear();

bool hasOnlyRootNode = false;

Expand Down Expand Up @@ -708,90 +708,38 @@ private void SaveNode(StateSyncItem syncItem, byte[] data)
PossiblySaveDependentNodes(syncItem.Key);
}

class OverlayNodeStorage(INodeStorage baseStorage, ValueHash256 rootKeccak, byte[] rootValue): INodeStorage
private bool VerifyStorageUpdated(StateSyncItem item, byte[] value)
{
public INodeStorage.KeyScheme Scheme
{
get => baseStorage.Scheme;
set => baseStorage.Scheme = value;
}

public bool RequirePath => baseStorage.RequirePath;

public byte[]? Get(Hash256? address, in TreePath path, in ValueHash256 keccak, ReadFlags readFlags = ReadFlags.None)
{
if (keccak == rootKeccak) return rootValue;
return baseStorage.Get(address, in path, in keccak, readFlags);
}

public void Set(Hash256? address, in TreePath path, in ValueHash256 hash, ReadOnlySpan<byte> data,
WriteFlags writeFlags = WriteFlags.None)
{
baseStorage.Set(address, in path, in hash, data, writeFlags);
}
DependentItem dependentItem = new DependentItem(item, value, _stateSyncPivot.UpdatedStorages.Count);
StateTree stateTree = new StateTree(new TrieStore(_nodeStorage, LimboLogs.Instance), LimboLogs.Instance);
stateTree.RootRef = new TrieNode(NodeType.Unknown, value);
// TODO: Remove this
if (_logger.IsWarn) _logger.Warn($"Updated storages count is {_stateSyncPivot.UpdatedStorages.Count}");

public INodeStorage.WriteBatch StartWriteBatch()
foreach (Hash256 updatedAddress in _stateSyncPivot.UpdatedStorages)
{
return baseStorage.StartWriteBatch();
}
Account? account = stateTree.Get(updatedAddress);

public bool KeyExists(Hash256? address, in TreePath path, in ValueHash256 hash)
{
if (hash == rootKeccak) return true;
return baseStorage.KeyExists(address, in path, in hash);
if (account?.StorageRoot is not null
&& AddNodeToPending(new StateSyncItem(account.StorageRoot, updatedAddress, TreePath.Empty, NodeDataType.Storage), dependentItem, "uncomplete storage") == AddNodeResult.Added)
{
// if (_logger.IsDebug) _logger.Debug($"Storage {updatedAddress} missing correct storage root {account.StorageRoot}");
if (_logger.IsWarn) _logger.Warn($"Storage {updatedAddress} missing correct storage root {account.StorageRoot}");
}
else
{
dependentItem.Counter--;
}
}

public void Flush(bool onlyWal)
if (dependentItem.Counter > 0)
{
baseStorage.Flush(onlyWal);
if (_logger.IsInfo) _logger.Info($"Queued extra {dependentItem.Counter} items for storage repair..");
}

public void Compact()
else
{
baseStorage.Compact();
if (_logger.IsInfo) _logger.Info($"Storage OK");
}
}

private bool VerifyStorageUpdated(StateSyncItem item, byte[] value)
{
DependentItem dependentItem = new DependentItem(item, value, _stateSyncPivot.UpdatedStorages.Count);
StateTree stateTree = new StateTree(new TrieStore(new OverlayNodeStorage(_nodeStorage, item.Hash, value), LimboLogs.Instance), LimboLogs.Instance);
stateTree.RootHash = _rootNode;
_stateDbLock.EnterReadLock();
try
{
if (_logger.IsWarn) _logger.Warn($"Updated storages count is {_stateSyncPivot.UpdatedStorages.Count}");
foreach (Hash256 updatedAddress in _stateSyncPivot.UpdatedStorages)
{
Account? account = stateTree.Get(updatedAddress);

if (account is not null)
{
if (_nodeStorage.Get(updatedAddress, TreePath.Empty, account.StorageRoot) is null)
{
// if (_logger.IsDebug) _logger.Debug($"Storage {updatedAddress} missing correct storage root {account.StorageRoot}");
if (_logger.IsWarn) _logger.Warn($"Storage {updatedAddress} missing correct storage root {account.StorageRoot}");

AddNodeToPending(new StateSyncItem(account.StorageRoot, updatedAddress, TreePath.Empty, NodeDataType.Storage), dependentItem, "uncomplete storage", missing: true);
}
else
{
dependentItem.Counter--;
if (_logger.IsWarn) _logger.Warn($"Storage {updatedAddress} has correct storage root {account.StorageRoot}");
}
}
else
{
dependentItem.Counter--;
if (_logger.IsWarn) _logger.Warn($"Storage {updatedAddress} has no account");
}
}
}
finally
{
_stateDbLock.ExitReadLock();
}

return dependentItem.Counter == 0;
}

Expand Down Expand Up @@ -822,7 +770,7 @@ private void CleanupMemory()
_syncStateLock.EnterWriteLock();
try
{
_pendingRequests.Clear();
_ongoingRequests.Clear();
_dependencies.Clear();
_alreadySavedNode.Clear();
_alreadySavedCode.Clear();
Expand Down

0 comments on commit 202b678

Please sign in to comment.