Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/FlushWAL on flush. #7642

Merged
merged 10 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected override async Task RunFullPruning(CancellationToken cancellationToken
}
}

[Test, MaxTime(Timeout.MaxTestTime), Retry(5)]
asdacap marked this conversation as resolved.
Show resolved Hide resolved
[Test, MaxTime(Timeout.LongTestTime)]
public async Task prune_on_disk_multiple_times()
{
using PruningTestBlockchain chain = await PruningTestBlockchain.Create(new PruningConfig { FullPruningMinimumDelayHours = 0 });
Expand All @@ -142,7 +142,7 @@ public async Task prune_on_disk_multiple_times()
}
}

[Test, MaxTime(Timeout.MaxTestTime), Retry(5)]
[Test, MaxTime(Timeout.LongTestTime)]
public async Task prune_on_disk_only_once()
{
using PruningTestBlockchain chain = await PruningTestBlockchain.Create(new PruningConfig { FullPruningMinimumDelayHours = 10 });
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Blockchain.Test/Timeout.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Nethermind.Blockchain.Test;
internal class Timeout
{
public const int LongTestTime = 60_000;
public const int MaxTestTime = 10_000;
public const int MaxWaitTime = 1_000;
}
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public IColumnsWriteBatch<TKey> StartWriteBatch()
return new InMemoryColumnWriteBatch<TKey>(this);
}
public void Dispose() { }
public void Flush(bool onlyWal = false) { }
}
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Core.Test/TestMemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public override IWriteBatch StartWriteBatch()
return new InMemoryWriteBatch(this);
}

public override void Flush()
public override void Flush(bool onlyWal)
{
FlushCount++;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ public bool KeyExists(ReadOnlySpan<byte> key)
return _mainDb.KeyExistsWithColumn(key, _columnFamily);
}

public void Flush()
public void Flush(bool onlyWal)
{
_mainDb.Flush();
_mainDb.Flush(onlyWal);
}

public void Compact()
Expand Down
15 changes: 10 additions & 5 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,23 +1321,28 @@ private void FlushOnTooManyWrites()
}
}

public void Flush()
public void Flush(bool onlyWal = false)
{
ObjectDisposedException.ThrowIf(_isDisposing, this);

InnerFlush();
InnerFlush(onlyWal);
}

public virtual void Compact()
{
_db.CompactRange(Keccak.Zero.BytesToArray(), Keccak.MaxValue.BytesToArray());
}

private void InnerFlush()
private void InnerFlush(bool onlyWal)
{
try
{
_rocksDbNative.rocksdb_flush(_db.Handle, FlushOptions.DefaultFlushOptions.Handle);
_rocksDbNative.rocksdb_flush_wal(_db.Handle, true);

if (!onlyWal)
{
_rocksDbNative.rocksdb_flush(_db.Handle, FlushOptions.DefaultFlushOptions.Handle);
}
}
catch (RocksDbSharpException e)
{
Expand Down Expand Up @@ -1439,7 +1444,7 @@ public void Dispose()
dbMetricsUpdater.Dispose();
}

InnerFlush();
InnerFlush(false);
ReleaseUnmanagedResources();

_dbsByPath.Remove(_fullPath!, out _);
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ public IColumnsWriteBatch<T> StartWriteBatch()
return new InMemoryColumnWriteBatch<T>(this);
}
public void Dispose() { }
public void Flush(bool onlyWal = false) { }
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public bool KeyExists(ReadOnlySpan<byte> key)

public IDb Innermost => this; // record db is just a helper DB here
public void Flush() { }
public void Flush(bool onlyWal = false) { }

public void Clear() { }

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _recordDb.GetAll();
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db/CompressingDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public IEnumerable<byte[]> GetAllValues(bool ordered = false) =>

public bool KeyExists(ReadOnlySpan<byte> key) => _wrapped.KeyExists(key);

public void Flush() => _wrapped.Flush();
public void Flush(bool onlyWal) => _wrapped.Flush(onlyWal);

public void Clear() => _wrapped.Clear();

Expand Down
6 changes: 3 additions & 3 deletions src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ public void Remove(ReadOnlySpan<byte> key)
public IDb Innermost => this;

// we need to flush both DB's
public void Flush()
public void Flush(bool onlyWal)
{
_currentDb.Flush();
_currentDb.Flush(onlyWal);
IDb? cloningDb = _pruningContext?.CloningDb;
cloningDb?.Flush();
cloningDb?.Flush(onlyWal);
}

// we need to clear both DB's
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db/IDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface IDbMeta
{
DbMetric GatherMetric(bool includeSharedCache = false) => new DbMetric();

void Flush() { }
void Flush(bool onlyWal = false);
void Clear() { }
void Compact() { }

Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db/MemColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ public IColumnsWriteBatch<TKey> StartWriteBatch()
return new InMemoryColumnWriteBatch<TKey>(this);
}
public void Dispose() { }
public void Flush(bool onlyWal = false) { }
}
}
4 changes: 1 addition & 3 deletions src/Nethermind/Nethermind.Db/MemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public bool KeyExists(ReadOnlySpan<byte> key)

public IDb Innermost => this;

public virtual void Flush()
{
}
public virtual void Flush(bool onlyWal = false) { }

public void Clear()
{
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Db/NullDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public bool KeyExists(ReadOnlySpan<byte> key)
return false;
}

public void Flush() { }
public void Flush(bool onlyWal = false) { }

public void Clear() { }

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => Enumerable.Empty<KeyValuePair<byte[], byte[]>>();
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db/ReadOnlyColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ public void Dispose()
readOnlyColumn.Value.Dispose();
}
}

public void Flush(bool onlyWal = false) { }
}
}
6 changes: 1 addition & 5 deletions src/Nethermind/Nethermind.Db/ReadOnlyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ public void Remove(ReadOnlySpan<byte> key) { }

public bool KeyExists(ReadOnlySpan<byte> key) => _memDb.KeyExists(key) || wrappedDb.KeyExists(key);

public void Flush()
{
wrappedDb.Flush();
_memDb.Flush();
}
public void Flush(bool onlyWal) { }

public void Clear() => throw new InvalidOperationException();

Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public bool KeyExists(ReadOnlySpan<byte> key)
return _cache.ContainsKey(key);
}

public void Flush() { }
public void Flush(bool onlyWal = false) { }

public void Clear()
{
File.Delete(DbPath);
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/InitializeStateDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ public Task Execute(CancellationToken cancellationToken)
var minimumWriteBufferMb = 0.2 * pruningConfig.CacheMb;
if (totalWriteBufferMb < minimumWriteBufferMb)
{
int minimumWriteBufferNumber = (int)Math.Ceiling((minimumWriteBufferMb * 1.MB()) / dbConfig.StateDbWriteBufferSize);
long minimumWriteBufferSize = (int)Math.Ceiling((minimumWriteBufferMb * 1.MB()) / dbConfig.StateDbWriteBufferNumber);

if (_logger.IsWarn) _logger.Warn($"Detected {totalWriteBufferMb}MB of maximum write buffer size. Write buffer size should be at least 20% of pruning cache MB or memory pruning may slow down. Try setting `--Db.{nameof(dbConfig.WriteBufferNumber)} {minimumWriteBufferNumber}`.");
if (_logger.IsWarn) _logger.Warn($"Detected {totalWriteBufferMb}MB of maximum write buffer size. Write buffer size should be at least 20% of pruning cache MB or memory pruning may slow down. Try setting `--Db.{nameof(dbConfig.StateDbWriteBufferSize)} {minimumWriteBufferSize}`.");
}

pruningStrategy = Prune
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ private void SaveNode(StateSyncItem syncItem, byte[] data)
{
if (_logger.IsInfo) _logger.Info($"Saving root {syncItem.Hash} of {_branchProgress.CurrentSyncBlock}");

_nodeStorage.Flush();
_nodeStorage.Flush(onlyWal: false);
_codeDb.Flush();

Interlocked.Exchange(ref _rootSaved, 1);
Expand Down
26 changes: 24 additions & 2 deletions src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ public void Memory_with_one_node_is_288()
trieNode.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize);
}

[Test]
public void Flush_ShouldBeCalledOnEachPersist()
{
TrieNode trieNode = new(NodeType.Leaf, Keccak.Zero);

TestMemDb testMemDb = new TestMemDb();
using TrieStore fullTrieStore = CreateTrieStore(persistenceStrategy: Archive.Instance, kvStore: testMemDb);
PatriciaTree pt = new PatriciaTree(fullTrieStore.GetTrieStore(null), LimboLogs.Instance);

for (int i = 0; i < 4; i++)
{
pt.Set(TestItem.KeccakA.BytesToArray(), TestItem.Keccaks[i].BytesToArray());
using (ICommitter? committer = fullTrieStore.BeginStateBlockCommit(i + 1, trieNode))
{
pt.Commit();
}
}

testMemDb.FlushCount.Should().Be(4);
}

[Test]
public void Pruning_off_cache_should_not_change_commit_node()
Expand Down Expand Up @@ -978,8 +998,9 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning()
pruningStrategy: new TestPruningStrategy(true, true, 2, 100000),
persistenceStrategy: isPruningPersistenceStrategy);

IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null);
TreePath emptyPath = TreePath.Empty;
TaskCompletionSource tcs = new TaskCompletionSource();
fullTrieStore.OnMemoryPruneCompleted += (sender, args) => tcs.TrySetResult();

for (int i = 0; i < 64; i++)
{
Expand All @@ -990,7 +1011,8 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning()
}

// Pruning is done in background
await Task.Delay(TimeSpan.FromMilliseconds(10));
await tcs.Task;
tcs = new TaskCompletionSource();
}

memDb.Count.Should().Be(61);
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Trie/INodeStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public interface INodeStorage
/// <summary>
/// Used by StateSync to make sure values are flushed.
/// </summary>
void Flush();
/// <param name="onlyWal">True if only WAL file should be flushed, not memtable.</param>
void Flush(bool onlyWal);
void Compact();

public enum KeyScheme
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Trie/NodeStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, Read
_keyValueStore.PutSpan(GetExpectedPath(stackalloc byte[StoragePathLength], address, path, keccak), data, writeFlags);
}

public void Flush()
public void Flush(bool onlyWal)
{
if (_keyValueStore is IDb db)
{
db.Flush();
db.Flush(onlyWal);
}
}

Expand Down
36 changes: 23 additions & 13 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ private void FinishBlockCommit(BlockCommitSet set, TrieNode? root)

public event EventHandler<ReorgBoundaryReached>? ReorgBoundaryReached;

// Used in testing to not have to wait for condition.
public event EventHandler OnMemoryPruneCompleted;

public byte[]? TryLoadRlp(Hash256? address, in TreePath path, Hash256 keccak, INodeStorage? nodeStorage, ReadFlags readFlags = ReadFlags.None)
{
nodeStorage ??= _nodeStorage;
Expand Down Expand Up @@ -457,14 +460,15 @@ public void Prune()
// otherwise, it may not fit the whole dirty cache.
// Additionally, if (WriteBufferSize * (WriteBufferNumber - 1)) is already more than 20% of pruning
// cache, it is likely that there are enough space for it on most time, except for syncing maybe.
_nodeStorage.Flush();
_nodeStorage.Flush(onlyWal: false);
lock (_dirtyNodesLock)
{
long start = Stopwatch.GetTimestamp();
if (_logger.IsDebug) _logger.Debug($"Locked {nameof(TrieStore)} for pruning.");

long memoryUsedByDirtyCache = MemoryUsedByDirtyCache;
if (!_pruningTaskCancellationTokenSource.IsCancellationRequested && _pruningStrategy.ShouldPrune(memoryUsedByDirtyCache))
if (!_pruningTaskCancellationTokenSource.IsCancellationRequested &&
_pruningStrategy.ShouldPrune(memoryUsedByDirtyCache))
{
// Most of the time in memory pruning is on `PrunePersistedRecursively`. So its
// usually faster to just SaveSnapshot causing most of the entry to be persisted.
Expand Down Expand Up @@ -497,6 +501,11 @@ public void Prune()
if (_logger.IsError) _logger.Error("Pruning failed with exception.", e);
}
});

_pruningTask.ContinueWith((_) =>
{
OnMemoryPruneCompleted?.Invoke(this, EventArgs.Empty);
});
}
}

Expand Down Expand Up @@ -673,9 +682,7 @@ public void WaitForPruning()
private ConcurrentQueue<BlockCommitSet> CommitSetQueue =>
(_commitSetQueue ?? CreateQueueAtomic(ref _commitSetQueue));

#if DEBUG
private BlockCommitSet? _lastCommitSet = null;
#endif

private long _memoryUsedByDirtyCache;

Expand Down Expand Up @@ -703,18 +710,20 @@ private BlockCommitSet CreateCommitSet(long blockNumber)
{
if (_logger.IsDebug) _logger.Debug($"Beginning new {nameof(BlockCommitSet)} - {blockNumber}");

// TODO: this throws on reorgs, does it not? let us recreate it in test
#if DEBUG
Debug.Assert(_lastCommitSet == null || blockNumber == _lastCommitSet.BlockNumber + 1 || _lastCommitSet.BlockNumber == 0, $"Newly begun block is not a successor of the last one.");
Debug.Assert(_lastCommitSet == null || _lastCommitSet.IsSealed, "Not sealed when beginning new block");
#endif
if (_lastCommitSet is not null)
{
Debug.Assert(_lastCommitSet.IsSealed, "Not sealed when beginning new block");

if (_lastCommitSet.BlockNumber != blockNumber - 1 && blockNumber != 0 && _lastCommitSet.BlockNumber != 0)
{
if (_logger.IsInfo) _logger.Info($"Non consecutive block commit. This is likely a reorg. Last block commit: {_lastCommitSet.BlockNumber}. New block commit: {blockNumber}.");
asdacap marked this conversation as resolved.
Show resolved Hide resolved
}
}

BlockCommitSet commitSet = new(blockNumber);
CommitSetQueue.Enqueue(commitSet);

#if DEBUG
_lastCommitSet = commitSet;
#endif

LatestCommittedBlockNumber = Math.Max(blockNumber, LatestCommittedBlockNumber);
// Why are we announcing **before** committing next block??
Expand Down Expand Up @@ -757,7 +766,7 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
}
}

if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root} in {commitSet.BlockNumber}");
if (_logger.IsInfo) _logger.Info($"Persisting from root {commitSet.Root?.Keccak} in block {commitSet.BlockNumber}");
asdacap marked this conversation as resolved.
Show resolved Hide resolved

long start = Stopwatch.GetTimestamp();

Expand Down Expand Up @@ -793,8 +802,9 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
disposeQueue.CompleteAdding();
Task.WaitAll(_disposeTasks);

// Dispose top level last in case something goes wrong, at least the root wont be stored
// Dispose top level last in case something goes wrong, at least the root won't be stored
topLevelWriteBatch.Dispose();
_nodeStorage.Flush(onlyWal: true);

long elapsedMilliseconds = (long)Stopwatch.GetElapsedTime(start).TotalMilliseconds;
Metrics.SnapshotPersistenceTime = elapsedMilliseconds;
Expand Down