Skip to content

Commit

Permalink
Fix pruning with LruCaheLowObject (#7114)
Browse files Browse the repository at this point in the history
Co-authored-by: Amirul Ashraf <asdacap@gmail.com>
  • Loading branch information
benaadams and asdacap authored May 31, 2024
1 parent 749fbb5 commit 33015b7
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public LruCacheLowObject(int maxCapacity, string name)

_name = name;
_maxCapacity = maxCapacity;
_cacheMap = new Dictionary<TKey, int>(maxCapacity);
_cacheMap = new Dictionary<TKey, int>(maxCapacity / 2);
_items = new LruCacheItem[maxCapacity];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public LruKeyCacheLowObject(int maxCapacity, string name)

_name = name;
_maxCapacity = maxCapacity;
_cacheMap = new Dictionary<TKey, int>(maxCapacity); // do not initialize it at the full capacity
_cacheMap = new Dictionary<TKey, int>(maxCapacity / 2); // do not initialize it at the full capacity
_items = new LruCacheItem[maxCapacity];
}

Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Core/IKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface IWriteOnlyKeyValueStore
/// is preferable. Unless you plan to reuse the array somehow (pool), then you'd just use span.
/// </summary>
public bool PreferWriteByArray => false;
void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags flags = WriteFlags.None) => Set(key, value.ToArray(), flags);
void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags flags = WriteFlags.None) => Set(key, value.IsNull() ? null : value.ToArray(), flags);
void Remove(ReadOnlySpan<byte> key) => Set(key, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public abstract class SyncPeerProtocolHandlerBase : ZeroProtocolHandlerBase, ISy
initialRequestSize: 4
);

protected LruKeyCacheLowObject<ValueHash256> NotifiedTransactions { get; } = new(2 * MemoryAllowance.MemPoolSize, "notifiedTransactions");
protected LruKeyCacheLowObject<ValueHash256>? _notifiedTransactions;
protected LruKeyCacheLowObject<ValueHash256> NotifiedTransactions => _notifiedTransactions ??= new(2 * MemoryAllowance.MemPoolSize, "notifiedTransactions");

protected SyncPeerProtocolHandlerBase(ISession session,
IMessageSerializationService serializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class Eth62ProtocolHandler : SyncPeerProtocolHandlerBase, IZeroProtocolHa
protected readonly ITxPool _txPool;
private readonly IGossipPolicy _gossipPolicy;
private readonly ITxGossipPolicy _txGossipPolicy;
private readonly LruKeyCache<Hash256AsKey> _lastBlockNotificationCache = new(10, "LastBlockNotificationCache");
private LruKeyCache<Hash256AsKey>? _lastBlockNotificationCache;
private LruKeyCache<Hash256AsKey> LastBlockNotificationCache => _lastBlockNotificationCache ??= new(10, "LastBlockNotificationCache");

public Eth62ProtocolHandler(ISession session,
IMessageSerializationService serializer,
Expand Down Expand Up @@ -338,7 +339,7 @@ public override void NotifyOfNewBlock(Block block, SendBlockMode mode)
return;
}

if (_lastBlockNotificationCache.Set(block.Hash))
if (LastBlockNotificationCache.Set(block.Hash))
{
switch (mode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public StateSyncItem(Hash256 hash, byte[]? accountPathNibbles, byte[]? pathNibbl
public readonly struct NodeKey(Hash256? address, TreePath? path, Hash256 hash) : IEquatable<NodeKey>
{
private readonly ValueHash256 Address = address ?? default;
private readonly TreePath? Path = path ?? default;
private readonly TreePath? Path = path;
private readonly ValueHash256 Hash = hash;

public readonly bool Equals(NodeKey other)
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Trie/INodeStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public interface INodeStorage
/// Used by StateSync to make sure values are flushed.
/// </summary>
void Flush();
void Compact();

public enum KeyScheme
{
Expand Down
8 changes: 8 additions & 0 deletions src/Nethermind/Nethermind.Trie/NodeStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ public void Flush()
}
}

public void Compact()
{
if (_keyValueStore is IDb db)
{
db.Compact();
}
}

private class WriteBatch : INodeStorage.WriteBatch
{
private readonly IWriteBatch _writeBatch;
Expand Down
60 changes: 38 additions & 22 deletions src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,65 +10,81 @@

namespace Nethermind.Trie;

public class PreCachedTrieStore(ITrieStore inner,
ConcurrentDictionary<NodeKey, byte[]?> preBlockCache)
: ITrieStore
public class PreCachedTrieStore : ITrieStore
{
private readonly ITrieStore _inner;
private readonly ConcurrentDictionary<NodeKey, byte[]?> _preBlockCache;
private readonly Func<NodeKey, byte[]> _loadRlp;
private readonly Func<NodeKey, byte[]> _tryLoadRlp;

public PreCachedTrieStore(ITrieStore inner,
ConcurrentDictionary<NodeKey, byte[]?> preBlockCache)
{
_inner = inner;
_preBlockCache = preBlockCache;

// Capture the delegate once for default path to avoid the allocation of the lambda per call
_loadRlp = (NodeKey key) => _inner.LoadRlp(key.Address, in key.Path, key.Hash, flags: ReadFlags.None);
_tryLoadRlp = (NodeKey key) => _inner.TryLoadRlp(key.Address, in key.Path, key.Hash, flags: ReadFlags.None);
}

public void Dispose()
{
inner.Dispose();
_inner.Dispose();
}

public void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None)
{
inner.CommitNode(blockNumber, address, in nodeCommitInfo, writeFlags);
_inner.CommitNode(blockNumber, address, in nodeCommitInfo, writeFlags);
}

public void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None)
{
inner.FinishBlockCommit(trieType, blockNumber, address, root, writeFlags);
preBlockCache.Clear();
_inner.FinishBlockCommit(trieType, blockNumber, address, root, writeFlags);
_preBlockCache.Clear();
}

public bool IsPersisted(Hash256? address, in TreePath path, in ValueHash256 keccak)
{
byte[]? rlp = preBlockCache.GetOrAdd(new(address, in path, in keccak),
key => inner.TryLoadRlp(key.Address, in key.Path, key.Hash));
byte[]? rlp = _preBlockCache.GetOrAdd(new(address, in path, in keccak),
key => _inner.TryLoadRlp(key.Address, in key.Path, key.Hash));

return rlp is not null;
}

public IReadOnlyTrieStore AsReadOnly(INodeStorage? keyValueStore = null) => inner.AsReadOnly(keyValueStore);
public IReadOnlyTrieStore AsReadOnly(INodeStorage? keyValueStore = null) => _inner.AsReadOnly(keyValueStore);

public event EventHandler<ReorgBoundaryReached>? ReorgBoundaryReached
{
add => inner.ReorgBoundaryReached += value;
remove => inner.ReorgBoundaryReached -= value;
add => _inner.ReorgBoundaryReached += value;
remove => _inner.ReorgBoundaryReached -= value;
}

public IReadOnlyKeyValueStore TrieNodeRlpStore => inner.TrieNodeRlpStore;
public IReadOnlyKeyValueStore TrieNodeRlpStore => _inner.TrieNodeRlpStore;

public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, byte[] rlp)
{
preBlockCache[new(address, in path, in keccak)] = rlp;
inner.Set(address, in path, in keccak, rlp);
_preBlockCache[new(address, in path, in keccak)] = rlp;
_inner.Set(address, in path, in keccak, rlp);
}

public bool HasRoot(Hash256 stateRoot) => inner.HasRoot(stateRoot);
public bool HasRoot(Hash256 stateRoot) => _inner.HasRoot(stateRoot);

public IScopedTrieStore GetTrieStore(Hash256? address) => new ScopedTrieStore(this, address);

public TrieNode FindCachedOrUnknown(Hash256? address, in TreePath path, Hash256 hash) => inner.FindCachedOrUnknown(address, in path, hash);
public TrieNode FindCachedOrUnknown(Hash256? address, in TreePath path, Hash256 hash) => _inner.FindCachedOrUnknown(address, in path, hash);

public byte[]? LoadRlp(Hash256? address, in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) =>
preBlockCache.GetOrAdd(new(address, in path, hash),
key => inner.LoadRlp(key.Address, in key.Path, key.Hash, flags));
_preBlockCache.GetOrAdd(new(address, in path, hash),
flags == ReadFlags.None ? _loadRlp :
key => _inner.LoadRlp(key.Address, in key.Path, key.Hash, flags));

public byte[]? TryLoadRlp(Hash256? address, in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) =>
preBlockCache.GetOrAdd(new(address, in path, hash),
key => inner.TryLoadRlp(key.Address, in key.Path, key.Hash, flags));
_preBlockCache.GetOrAdd(new(address, in path, hash),
flags == ReadFlags.None ? _tryLoadRlp :
key => _inner.TryLoadRlp(key.Address, in key.Path, key.Hash, flags));

public INodeStorage.KeyScheme Scheme => inner.Scheme;
public INodeStorage.KeyScheme Scheme => _inner.Scheme;
}

public class NodeKey : IEquatable<NodeKey>
Expand Down
Loading

0 comments on commit 33015b7

Please sign in to comment.