From 3a96f5eb399bf2ab878ce06098953691e02a2652 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Tue, 6 Jun 2023 18:40:58 +0800 Subject: [PATCH] Refactor/Split dispatcher and downloader (#5778) --- .../Steps/InitializeNetwork.cs | 1 - .../Nethermind.Merge.Plugin/MergePlugin.cs | 2 - .../BeaconHeadersSyncDispatcher.cs | 22 ----- .../BeaconHeadersSyncDownloader.cs | 16 ++++ .../Synchronization/MergeBlockDownloader.cs | 12 +-- .../MergeBlockDownloaderFactory.cs | 28 ++++-- .../Synchronization/MergeSynchronizer.cs | 12 ++- .../BlockDownloaderTests.Merge.cs | 12 ++- .../BlockDownloaderTests.cs | 17 +++- .../StateSyncDispatcherTester.cs | 10 +- .../StateSyncDispatcherTests.cs | 2 +- .../FastSync/StateSyncFeedTestsBase.cs | 7 +- .../OldStyleFullSynchronizerTests.cs | 1 - .../ParallelSync/SyncDispatcherTests.cs | 30 ++++-- .../SyncThreadTests.cs | 1 - .../SynchronizerTests.cs | 3 - .../Blocks/BlockDownloader.cs | 45 ++++----- .../Blocks/IBlockDownloaderFactory.cs | 12 +-- ...cDispatcher.cs => BodiesSyncDownloader.cs} | 15 ++- ...Dispatcher.cs => HeadersSyncDownloader.cs} | 16 ++-- ...ispatcher.cs => ReceiptsSyncDownloader.cs} | 15 ++- .../ParallelSync/ISyncDownloader.cs | 15 +++ .../ParallelSync/SyncDispatcher.cs | 23 +++-- ...yncDispatcher.cs => SnapSyncDownloader.cs} | 18 ++-- ...ncDispatcher.cs => StateSyncDownloader.cs} | 15 ++- .../Synchronizer.cs | 94 ++++++++++++++----- 26 files changed, 256 insertions(+), 188 deletions(-) delete mode 100644 src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDispatcher.cs create mode 100644 src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDownloader.cs rename src/Nethermind/Nethermind.Synchronization/FastBlocks/{BodiesSyncDispatcher.cs => BodiesSyncDownloader.cs} (71%) rename src/Nethermind/Nethermind.Synchronization/FastBlocks/{HeadersSyncDispatcher.cs => HeadersSyncDownloader.cs} (69%) rename src/Nethermind/Nethermind.Synchronization/FastBlocks/{ReceiptsSyncDispatcher.cs => ReceiptsSyncDownloader.cs} (71%) create mode 100644 src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncDownloader.cs rename src/Nethermind/Nethermind.Synchronization/SnapSync/{SnapSyncDispatcher.cs => SnapSyncDownloader.cs} (73%) rename src/Nethermind/Nethermind.Synchronization/StateSync/{StateSyncDispatcher.cs => StateSyncDownloader.cs} (93%) diff --git a/src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs b/src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs index 1b4b1cfb403..ff978bbd7cc 100644 --- a/src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs +++ b/src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs @@ -141,7 +141,6 @@ private async Task Initialize(CancellationToken cancellationToken) SyncReport syncReport = new(_api.SyncPeerPool!, _api.NodeStatsManager!, _api.SyncModeSelector, _syncConfig, _api.Pivot, _api.LogManager); _api.BlockDownloaderFactory ??= new BlockDownloaderFactory( - _syncConfig.MaxProcessingThreads, _api.SpecProvider!, _api.BlockTree!, _api.ReceiptStorage!, diff --git a/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs b/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs index 176ed86ca84..1352d8e9d1b 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs @@ -424,12 +424,10 @@ public Task InitSynchronization() SyncReport syncReport = new(_api.SyncPeerPool, _api.NodeStatsManager, _api.SyncModeSelector, _syncConfig, _beaconPivot, _api.LogManager); _api.BlockDownloaderFactory = new MergeBlockDownloaderFactory( - _syncConfig.MaxProcessingThreads, _poSSwitcher, _beaconPivot, _api.SpecProvider, _api.BlockTree, - _blockCacheService, _api.ReceiptStorage!, _api.BlockValidator!, _api.SealValidator!, diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDispatcher.cs b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDispatcher.cs deleted file mode 100644 index e90923052c7..00000000000 --- a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDispatcher.cs +++ /dev/null @@ -1,22 +0,0 @@ -// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited -// SPDX-License-Identifier: LGPL-3.0-only - -using Nethermind.Logging; -using Nethermind.Synchronization.FastBlocks; -using Nethermind.Synchronization.ParallelSync; -using Nethermind.Synchronization.Peers; - -namespace Nethermind.Merge.Plugin.Synchronization; - -public class BeaconHeadersSyncDispatcher : HeadersSyncDispatcher -{ - public BeaconHeadersSyncDispatcher( - int maxNumberOfProcessingThread, - ISyncFeed syncFeed, - ISyncPeerPool syncPeerPool, - IPeerAllocationStrategyFactory peerAllocationStrategy, - ILogManager logManager) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) - { - } -} diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDownloader.cs b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDownloader.cs new file mode 100644 index 00000000000..3a5c69510a5 --- /dev/null +++ b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/BeaconHeadersSyncDownloader.cs @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using Nethermind.Logging; +using Nethermind.Synchronization.FastBlocks; +using Nethermind.Synchronization.ParallelSync; +using Nethermind.Synchronization.Peers; + +namespace Nethermind.Merge.Plugin.Synchronization; + +public class BeaconHeadersSyncDownloader : HeadersSyncDownloader +{ + public BeaconHeadersSyncDownloader(ILogManager logManager) : base(logManager) + { + } +} diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs index d494b915ca9..7e19c6a2fbc 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs @@ -26,7 +26,7 @@ namespace Nethermind.Merge.Plugin.Synchronization { - public class MergeBlockDownloader : BlockDownloader + public class MergeBlockDownloader : BlockDownloader, ISyncDownloader { private readonly IBeaconPivot _beaconPivot; private readonly IBlockTree _blockTree; @@ -41,7 +41,6 @@ public class MergeBlockDownloader : BlockDownloader private readonly ISyncProgressResolver _syncProgressResolver; public MergeBlockDownloader( - int maxNumberOfProcessingThread, IPoSSwitcher posSwitcher, IBeaconPivot beaconPivot, ISyncFeed? feed, @@ -57,9 +56,8 @@ public MergeBlockDownloader( ISyncProgressResolver syncProgressResolver, ILogManager logManager, SyncBatchSize? syncBatchSize = null) - : base(maxNumberOfProcessingThread, feed, syncPeerPool, blockTree, blockValidator, sealValidator, syncReport, receiptStorage, - specProvider, new MergeBlocksSyncPeerAllocationStrategyFactory(posSwitcher, beaconPivot, logManager), - betterPeerStrategy, logManager, syncBatchSize) + : base(feed, syncPeerPool, blockTree, blockValidator, sealValidator, syncReport, receiptStorage, + specProvider, betterPeerStrategy, logManager, syncBatchSize) { _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); _specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider)); @@ -74,7 +72,7 @@ public MergeBlockDownloader( _logger = logManager.GetClassLogger(); } - protected override async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksRequest, CancellationToken cancellation) + public override async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksRequest, CancellationToken cancellation) { if (_beaconPivot.BeaconPivotExists() == false && _poSSwitcher.HasEverReachedTerminalBlock() == false) { @@ -86,7 +84,7 @@ protected override async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksR if (blocksRequest == null) { - if (Logger.IsWarn) Logger.Warn($"NULL received for dispatch in {nameof(BlockDownloader)}"); + if (_logger.IsWarn) _logger.Warn($"NULL received for dispatch in {nameof(BlockDownloader)}"); return; } diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloaderFactory.cs b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloaderFactory.cs index 82606920221..4a2a58a9531 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloaderFactory.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloaderFactory.cs @@ -9,8 +9,6 @@ using Nethermind.Consensus.Validators; using Nethermind.Core.Specs; using Nethermind.Logging; -using Nethermind.Merge.Plugin.Handlers; -using Nethermind.Merge.Plugin.InvalidChainTracker; using Nethermind.Synchronization; using Nethermind.Synchronization.Blocks; using Nethermind.Synchronization.ParallelSync; @@ -35,15 +33,12 @@ public class MergeBlockDownloaderFactory : IBlockDownloaderFactory private readonly ISyncReport _syncReport; private readonly ISyncProgressResolver _syncProgressResolver; private readonly IChainLevelHelper _chainLevelHelper; - private readonly int _maxNumberOfProcessingThread; public MergeBlockDownloaderFactory( - int maxNumberOfProcessingThread, IPoSSwitcher poSSwitcher, IBeaconPivot beaconPivot, ISpecProvider specProvider, IBlockTree blockTree, - IBlockCacheService blockCacheService, IReceiptStorage receiptStorage, IBlockValidator blockValidator, ISealValidator sealValidator, @@ -54,7 +49,6 @@ public MergeBlockDownloaderFactory( ISyncProgressResolver syncProgressResolver, ILogManager logManager) { - _maxNumberOfProcessingThread = maxNumberOfProcessingThread; _poSSwitcher = poSSwitcher ?? throw new ArgumentNullException(nameof(poSSwitcher)); _beaconPivot = beaconPivot ?? throw new ArgumentNullException(nameof(beaconPivot)); _specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider)); @@ -73,9 +67,25 @@ public MergeBlockDownloaderFactory( public BlockDownloader Create(ISyncFeed syncFeed) { return new MergeBlockDownloader( - _maxNumberOfProcessingThread, _poSSwitcher, _beaconPivot, syncFeed, _syncPeerPool, _blockTree, _blockValidator, - _sealValidator, _syncReport, _receiptStorage, _specProvider, _betterPeerStrategy, _chainLevelHelper, - _syncProgressResolver, _logManager); + _poSSwitcher, + _beaconPivot, + syncFeed, + _syncPeerPool, + _blockTree, + _blockValidator, + _sealValidator, + _syncReport, + _receiptStorage, + _specProvider, + _betterPeerStrategy, + _chainLevelHelper, + _syncProgressResolver, + _logManager); + } + + public IPeerAllocationStrategyFactory CreateAllocationStrategyFactory() + { + return new MergeBlocksSyncPeerAllocationStrategyFactory(_poSSwitcher, _beaconPivot, _logManager); } } } diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeSynchronizer.cs b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeSynchronizer.cs index 50b63f56e25..a07e2a25b64 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeSynchronizer.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeSynchronizer.cs @@ -82,9 +82,15 @@ private void StartBeaconHeadersComponents() FastBlocksPeerAllocationStrategyFactory fastFactory = new(); BeaconHeadersSyncFeed beaconHeadersFeed = new(_poSSwitcher, _syncMode, _blockTree, _syncPeerPool, _syncConfig, _syncReport, _pivot, _mergeConfig, _invalidChainTracker, _logManager); - BeaconHeadersSyncDispatcher beaconHeadersDispatcher = - new(_syncConfig.MaxProcessingThreads, beaconHeadersFeed!, _syncPeerPool, fastFactory, _logManager); - beaconHeadersDispatcher.Start(_syncCancellation!.Token).ContinueWith(t => + BeaconHeadersSyncDownloader beaconHeadersDownloader = new(_logManager); + + SyncDispatcher dispatcher = CreateDispatcher( + beaconHeadersFeed!, + beaconHeadersDownloader, + fastFactory + ); + + dispatcher.Start(_syncCancellation!.Token).ContinueWith(t => { if (t.IsFaulted) { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.Merge.cs b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.Merge.cs index 6a2a48ac845..07b5ef2778d 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.Merge.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.Merge.cs @@ -275,7 +275,7 @@ public async Task Does_not_deadlock_on_replace_peer() CancellationTokenSource cts = new CancellationTokenSource(); - Task ignored = ctx.BlockDownloader.Start(cts.Token); + Task ignored = ctx.Dispatcher.Start(cts.Token); await Task.Delay(TimeSpan.FromMilliseconds(100)); // Feed should activate and allocate the first peer @@ -328,8 +328,6 @@ public async Task No_old_bodies_and_receipts() ctx.BeaconPivot.EnsurePivot(blockTrees.SyncedTree.FindHeader(64, BlockTreeLookupOptions.None)); - BlockDownloader downloader = ctx.BlockDownloader; - SyncPeerMock syncPeer = new(syncedTree, false, Response.AllCorrect, 34000000); PeerInfo peerInfo = new(syncPeer); @@ -348,7 +346,7 @@ public async Task No_old_bodies_and_receipts() ctx.Feed.Activate(); CancellationTokenSource cts = new(); - downloader.Start(cts.Token); + ctx.Dispatcher.Start(cts.Token); Assert.That( () => ctx.BlockTree.BestKnownNumber, @@ -470,7 +468,6 @@ public override BlockDownloader BlockDownloader get { return _mergeBlockDownloader ??= new( - 0, PosSwitcher, BeaconPivot, Feed, @@ -487,5 +484,10 @@ public override BlockDownloader BlockDownloader LimboLogs.Instance); } } + + private IPeerAllocationStrategyFactory? _peerAllocationStrategy; + protected virtual IPeerAllocationStrategyFactory PeerAllocationStrategy => + _peerAllocationStrategy ??= new MergeBlocksSyncPeerAllocationStrategyFactory(PosSwitcher, BeaconPivot, LimboLogs.Instance); + } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs index e23ddd7ca61..12568668c87 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs @@ -994,7 +994,6 @@ public IBlockValidator BlockValidator private BlockDownloader _blockDownloader; public virtual BlockDownloader BlockDownloader => _blockDownloader ??= new BlockDownloader( - 0, Feed, PeerPool, BlockTree, @@ -1003,12 +1002,26 @@ public IBlockValidator BlockValidator NullSyncReport.Instance, ReceiptStorage, SpecProvider, - new BlocksSyncPeerAllocationStrategyFactory(), BetterPeerStrategy, LimboLogs.Instance, SyncBatchSize ); + private SyncDispatcher _dispatcher; + public SyncDispatcher Dispatcher => _dispatcher ??= new SyncDispatcher( + 0, + Feed, + BlockDownloader, + PeerPool, + PeerAllocationStrategy, + LimboLogs.Instance + ); + + private IPeerAllocationStrategyFactory? _peerAllocationStrategy; + + protected virtual IPeerAllocationStrategyFactory PeerAllocationStrategy => + _peerAllocationStrategy ??= new BlocksSyncPeerAllocationStrategyFactory(); + public Context(BlockTree? blockTree = null) { if (blockTree != null) diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTester.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTester.cs index 27b5734dbd9..779dd54c030 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTester.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTester.cs @@ -15,14 +15,18 @@ namespace Nethermind.Synchronization.Test.FastSync.SnapProtocolTests { - public class StateSyncDispatcherTester : StateSyncDispatcher + public class StateSyncDispatcherTester : SyncDispatcher { + private readonly ISyncDownloader _downloader; + public StateSyncDispatcherTester( ISyncFeed syncFeed, + ISyncDownloader downloader, ISyncPeerPool syncPeerPool, IPeerAllocationStrategyFactory peerAllocationStrategy, - ILogManager logManager) : base(0, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) + ILogManager logManager) : base(0, syncFeed, downloader, syncPeerPool, peerAllocationStrategy, logManager) { + _downloader = downloader; } public async Task ExecuteDispatch(StateSyncBatch batch, int times) @@ -31,7 +35,7 @@ public async Task ExecuteDispatch(StateSyncBatch batch, int times) for (int i = 0; i < times; i++) { - await base.Dispatch(allocation.Current, batch, CancellationToken.None); + await _downloader.Dispatch(allocation.Current, batch, CancellationToken.None); } } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTests.cs index a79943b7d6c..26717fd067f 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTests.cs @@ -48,7 +48,7 @@ public void Setup() var feed = Substitute.For>(); _dispatcher = - new StateSyncDispatcherTester(feed, _pool, new StateSyncAllocationStrategyFactory(), _logManager); + new StateSyncDispatcherTester(feed, new StateSyncDownloader(_logManager), _pool, new StateSyncAllocationStrategyFactory(), _logManager); } [Test] diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTestsBase.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTestsBase.cs index a86886dbb9b..2f45993fd2d 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTestsBase.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTestsBase.cs @@ -113,8 +113,8 @@ protected SafeContext PrepareDownloaderWithPeer(DbContext dbContext, params ISyn ctx.SyncModeSelector = StaticSelector.StateNodesWithFastBlocks; ctx.TreeFeed = new(SyncMode.StateNodes, dbContext.LocalCodeDb, dbContext.LocalStateDb, blockTree, _logManager); ctx.Feed = new StateSyncFeed(ctx.SyncModeSelector, ctx.TreeFeed, _logManager); - ctx.StateSyncDispatcher = - new StateSyncDispatcher(0, ctx.Feed, ctx.Pool, new StateSyncAllocationStrategyFactory(), _logManager); + ctx.Downloader = new StateSyncDownloader(_logManager); + ctx.StateSyncDispatcher = new SyncDispatcher(0, ctx.Feed, ctx.Downloader, ctx.Pool, new StateSyncAllocationStrategyFactory(), _logManager); ctx.StateSyncDispatcher.Start(CancellationToken.None); return ctx; } @@ -145,7 +145,8 @@ protected class SafeContext public ISyncPeerPool Pool; public TreeSync TreeFeed; public StateSyncFeed Feed; - public StateSyncDispatcher StateSyncDispatcher; + public StateSyncDownloader Downloader; + public SyncDispatcher StateSyncDispatcher; } protected class DbContext diff --git a/src/Nethermind/Nethermind.Synchronization.Test/OldStyleFullSynchronizerTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/OldStyleFullSynchronizerTests.cs index 45148eaca15..06a84baba15 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/OldStyleFullSynchronizerTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/OldStyleFullSynchronizerTests.cs @@ -70,7 +70,6 @@ public async Task Setup() Pivot pivot = new(syncConfig); SyncReport syncReport = new(_pool, stats, syncModeSelector, syncConfig, pivot, LimboLogs.Instance); BlockDownloaderFactory blockDownloaderFactory = new( - 0, MainnetSpecProvider.Instance, _blockTree, _receiptStorage, diff --git a/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/SyncDispatcherTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/SyncDispatcherTests.cs index 8201bc7bffb..0c50884b9f9 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/SyncDispatcherTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/ParallelSync/SyncDispatcherTests.cs @@ -129,16 +129,10 @@ public TestBatch(int start, int length) public int[] Result { get; set; } } - private class TestDispatcher : SyncDispatcher + private class TestDownloader : ISyncDownloader { - public TestDispatcher(int maxNumberOfProcessingThread, ISyncFeed syncFeed, ISyncPeerPool syncPeerPool, IPeerAllocationStrategyFactory peerAllocationStrategy) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, LimboLogs.Instance) - { - } - private int _failureSwitch; - - protected override async Task Dispatch(PeerInfo allocation, TestBatch request, CancellationToken cancellationToken) + public async Task Dispatch(PeerInfo peerInfo, TestBatch request, CancellationToken cancellationToken) { if (++_failureSwitch % 2 == 0) { @@ -259,7 +253,14 @@ public override async Task PrepareRequest(CancellationToken token = d public async Task Simple_test_sync() { TestSyncFeed syncFeed = new(); - TestDispatcher dispatcher = new(0, syncFeed, new TestSyncPeerPool(), new StaticPeerAllocationStrategyFactory(FirstFree.Instance)); + TestDownloader downloader = new TestDownloader(); + SyncDispatcher dispatcher = new( + 0, + syncFeed, + downloader, + new TestSyncPeerPool(), + new StaticPeerAllocationStrategyFactory(FirstFree.Instance), + LimboLogs.Instance); Task executorTask = dispatcher.Start(CancellationToken.None); syncFeed.Activate(); await executorTask; @@ -277,7 +278,16 @@ public async Task Test_release_before_processing_complete(bool isMultiSync, int { TestSyncFeed syncFeed = new(isMultiSync, 999999); syncFeed.LockResponse(); - TestDispatcher dispatcher = new(processingThread, syncFeed, new TestSyncPeerPool(peerCount), new StaticPeerAllocationStrategyFactory(FirstFree.Instance)); + + TestDownloader downloader = new TestDownloader(); + SyncDispatcher dispatcher = new( + processingThread, + syncFeed, + downloader, + new TestSyncPeerPool(peerCount), + new StaticPeerAllocationStrategyFactory(FirstFree.Instance), + LimboLogs.Instance); + Task executorTask = dispatcher.Start(CancellationToken.None); syncFeed.Activate(); await Task.Delay(100); diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SyncThreadTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SyncThreadTests.cs index d0cf78e4ee8..a3ca85cfec9 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SyncThreadTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SyncThreadTests.cs @@ -352,7 +352,6 @@ private SyncTestContext CreateSyncManager(int index) Pivot pivot = new(syncConfig); SyncReport syncReport = new(syncPeerPool, nodeStatsManager, selector, syncConfig, pivot, LimboLogs.Instance); BlockDownloaderFactory blockDownloaderFactory = new( - 0, MainnetSpecProvider.Instance, tree, NullReceiptStorage.Instance, diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs index c6fd06e1b0f..fbe906a0866 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs @@ -348,12 +348,10 @@ ISyncConfig GetSyncConfig() => { SyncReport syncReport = new(SyncPeerPool, stats, syncModeSelector, syncConfig, beaconPivot, _logManager); blockDownloaderFactory = new MergeBlockDownloaderFactory( - 0, poSSwitcher, beaconPivot, MainnetSpecProvider.Instance, BlockTree, - blockCacheService, NullReceiptStorage.Instance, Always.Valid, Always.Valid, @@ -386,7 +384,6 @@ ISyncConfig GetSyncConfig() => { SyncReport syncReport = new(SyncPeerPool, stats, syncModeSelector, syncConfig, pivot, _logManager); blockDownloaderFactory = new BlockDownloaderFactory( - 0, MainnetSpecProvider.Instance, BlockTree, NullReceiptStorage.Instance, diff --git a/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs b/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs index 9ddf2fc762a..375cb96eed7 100644 --- a/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs +++ b/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs @@ -25,7 +25,7 @@ namespace Nethermind.Synchronization.Blocks { - public class BlockDownloader : SyncDispatcher + public class BlockDownloader : ISyncDownloader { public const int MaxReorganizationLength = SyncBatchSize.Max * 2; @@ -33,6 +33,7 @@ public class BlockDownloader : SyncDispatcher public static readonly TimeSpan SyncBatchDownloadTimeUpperBound = TimeSpan.FromMilliseconds(8000); public static readonly TimeSpan SyncBatchDownloadTimeLowerBound = TimeSpan.FromMilliseconds(5000); + private readonly ISyncFeed _feed; private readonly IBlockTree _blockTree; private readonly IBlockValidator _blockValidator; private readonly ISealValidator _sealValidator; @@ -42,6 +43,8 @@ public class BlockDownloader : SyncDispatcher private readonly ISpecProvider _specProvider; private readonly IBetterPeerStrategy _betterPeerStrategy; private readonly ILogger _logger; + private readonly ISyncPeerPool _syncPeerPool; + private readonly Guid _sealValidatorUserGuid = Guid.NewGuid(); private readonly Random _rnd = new(); private bool _cancelDueToBetterPeer; @@ -53,7 +56,6 @@ public class BlockDownloader : SyncDispatcher private readonly int[] _ancestorJumps = { 1, 2, 3, 8, 16, 32, 64, 128, 256, 384, 512, 640, 768, 896, 1024 }; public BlockDownloader( - int maxNumberOfProcessingThread, ISyncFeed? feed, ISyncPeerPool? syncPeerPool, IBlockTree? blockTree, @@ -62,19 +64,19 @@ public BlockDownloader( ISyncReport? syncReport, IReceiptStorage? receiptStorage, ISpecProvider? specProvider, - IPeerAllocationStrategyFactory blockSyncPeerAllocationStrategyFactory, IBetterPeerStrategy betterPeerStrategy, ILogManager? logManager, SyncBatchSize? syncBatchSize = null) - : base(maxNumberOfProcessingThread, feed, syncPeerPool, blockSyncPeerAllocationStrategyFactory, logManager) { + _feed = feed; + _syncPeerPool = syncPeerPool ?? throw new ArgumentNullException(nameof(syncPeerPool)); _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); _blockValidator = blockValidator ?? throw new ArgumentNullException(nameof(blockValidator)); _sealValidator = sealValidator ?? throw new ArgumentNullException(nameof(sealValidator)); _syncReport = syncReport ?? throw new ArgumentNullException(nameof(syncReport)); _receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage)); _specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider)); - _betterPeerStrategy = betterPeerStrategy ?? throw new ArgumentNullException(nameof(betterPeerStrategy)); ; + _betterPeerStrategy = betterPeerStrategy ?? throw new ArgumentNullException(nameof(betterPeerStrategy)); _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); _receiptsRecovery = new ReceiptsRecovery(new EthereumEcdsa(_specProvider.ChainId, logManager), _specProvider); @@ -96,14 +98,11 @@ private void BlockTreeOnNewHeadBlock(object? sender, BlockEventArgs e) protected PeerInfo? _previousBestPeer = null; - protected override async Task Dispatch( - PeerInfo bestPeer, - BlocksRequest? blocksRequest, - CancellationToken cancellation) + public virtual async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksRequest, CancellationToken cancellation) { if (blocksRequest is null) { - if (Logger.IsWarn) Logger.Warn($"NULL received for dispatch in {nameof(BlockDownloader)}"); + if (_logger.IsWarn) _logger.Warn($"NULL received for dispatch in {nameof(BlockDownloader)}"); return; } @@ -120,17 +119,17 @@ protected override async Task Dispatch( SyncEvent?.Invoke(this, new SyncEventArgs(bestPeer.SyncPeer, Synchronization.SyncEvent.Started)); if ((blocksRequest.Options & DownloaderOptions.WithBodies) == DownloaderOptions.WithBodies) { - if (Logger.IsDebug) Logger.Debug("Downloading bodies"); + if (_logger.IsDebug) _logger.Debug("Downloading bodies"); await DownloadBlocks(bestPeer, blocksRequest, cancellation) .ContinueWith(t => HandleSyncRequestResult(t, bestPeer), cancellation); - if (Logger.IsDebug) Logger.Debug("Finished downloading bodies"); + if (_logger.IsDebug) _logger.Debug("Finished downloading bodies"); } else { - if (Logger.IsDebug) Logger.Debug("Downloading headers"); + if (_logger.IsDebug) _logger.Debug("Downloading headers"); await DownloadHeaders(bestPeer, blocksRequest, cancellation) .ContinueWith(t => HandleSyncRequestResult(t, bestPeer), cancellation); - if (Logger.IsDebug) Logger.Debug("Finished downloading headers"); + if (_logger.IsDebug) _logger.Debug("Finished downloading headers"); } } finally @@ -206,7 +205,7 @@ bool HasMoreToSync() break; } - SyncPeerPool.ReportNoSyncProgress(bestPeer, AllocationContexts.Blocks); + _syncPeerPool.ReportNoSyncProgress(bestPeer, AllocationContexts.Blocks); return 0; } @@ -426,8 +425,6 @@ private ValueTask DownloadFailHandler(Task downloadTask, string entities) return default; } - private readonly Guid _sealValidatorUserGuid = Guid.NewGuid(); - protected virtual async Task RequestHeaders(PeerInfo peer, CancellationToken cancellation, long currentNumber, int headersToRequest) { _sealValidator.HintValidationRange(_sealValidatorUserGuid, currentNumber - 1028, currentNumber + 30000); @@ -719,27 +716,19 @@ protected void AdjustSyncBatchSize(TimeSpan downloadTime) } } - protected override async Task Allocate(BlocksRequest? request) + public void OnAllocate(SyncPeerAllocation allocation) { - if (request is null) - { - throw new InvalidOperationException($"NULL received for dispatch in {nameof(BlockDownloader)}"); - } - - SyncPeerAllocation allocation = await base.Allocate(request); CancellationTokenSource cancellation = new(); _allocationWithCancellation = new AllocationWithCancellation(allocation, cancellation); allocation.Cancelled += AllocationOnCancelled; allocation.Replaced += AllocationOnReplaced; - return allocation; } - protected override void Free(SyncPeerAllocation allocation) + public void BeforeFree(SyncPeerAllocation allocation) { allocation.Cancelled -= AllocationOnCancelled; allocation.Replaced -= AllocationOnReplaced; - base.Free(allocation); } private void AllocationOnCancelled(object? sender, AllocationChangeEventArgs e) @@ -774,7 +763,7 @@ private void AllocationOnReplaced(object? sender, AllocationChangeEventArgs e) BlockHeader? bestSuggested = _blockTree.BestSuggestedHeader; if (_betterPeerStrategy.Compare(bestSuggested, newPeer?.SyncPeer) < 0) { - Feed.Activate(); + _feed.Activate(); } } diff --git a/src/Nethermind/Nethermind.Synchronization/Blocks/IBlockDownloaderFactory.cs b/src/Nethermind/Nethermind.Synchronization/Blocks/IBlockDownloaderFactory.cs index bf88a7f8941..91dd37531ec 100644 --- a/src/Nethermind/Nethermind.Synchronization/Blocks/IBlockDownloaderFactory.cs +++ b/src/Nethermind/Nethermind.Synchronization/Blocks/IBlockDownloaderFactory.cs @@ -18,7 +18,6 @@ namespace Nethermind.Synchronization.Blocks { public class BlockDownloaderFactory : IBlockDownloaderFactory { - private readonly int _maxNumberOfProcessingThread; private readonly ISpecProvider _specProvider; private readonly IBlockTree _blockTree; private readonly IReceiptStorage _receiptStorage; @@ -30,7 +29,6 @@ public class BlockDownloaderFactory : IBlockDownloaderFactory private readonly ISyncReport _syncReport; public BlockDownloaderFactory( - int maxNumberOfProcessingThread, ISpecProvider specProvider, IBlockTree blockTree, IReceiptStorage receiptStorage, @@ -41,7 +39,6 @@ public BlockDownloaderFactory( ISyncReport syncReport, ILogManager logManager) { - _maxNumberOfProcessingThread = maxNumberOfProcessingThread; _specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider)); _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); _receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage)); @@ -51,13 +48,11 @@ public BlockDownloaderFactory( _betterPeerStrategy = betterPeerStrategy ?? throw new ArgumentNullException(nameof(betterPeerStrategy)); _syncReport = syncReport ?? throw new ArgumentNullException(nameof(syncReport)); _logManager = logManager ?? throw new ArgumentNullException(nameof(logManager)); - } public BlockDownloader Create(ISyncFeed syncFeed) { return new( - _maxNumberOfProcessingThread, syncFeed, _syncPeerPool, _blockTree, @@ -66,14 +61,19 @@ public BlockDownloader Create(ISyncFeed syncFeed) _syncReport, _receiptStorage, _specProvider, - new BlocksSyncPeerAllocationStrategyFactory(), _betterPeerStrategy, _logManager); } + + public IPeerAllocationStrategyFactory CreateAllocationStrategyFactory() + { + return new BlocksSyncPeerAllocationStrategyFactory(); + } } public interface IBlockDownloaderFactory { BlockDownloader Create(ISyncFeed syncFeed); + IPeerAllocationStrategyFactory CreateAllocationStrategyFactory(); } } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncDispatcher.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncDownloader.cs similarity index 71% rename from src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncDispatcher.cs rename to src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncDownloader.cs index 38be0034969..9622fb0f4cd 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncDispatcher.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncDownloader.cs @@ -15,19 +15,16 @@ namespace Nethermind.Synchronization.FastBlocks { - public class BodiesSyncDispatcher : SyncDispatcher + public class BodiesSyncDownloader : ISyncDownloader { - public BodiesSyncDispatcher( - int maxNumberOfProcessingThread, - ISyncFeed syncFeed, - ISyncPeerPool syncPeerPool, - IPeerAllocationStrategyFactory peerAllocationStrategy, - ILogManager logManager) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) + private ILogger Logger; + + public BodiesSyncDownloader(ILogManager logManager) { + Logger = logManager.GetClassLogger(); } - protected override async Task Dispatch(PeerInfo peerInfo, BodiesSyncBatch batch, CancellationToken cancellationToken) + public async Task Dispatch(PeerInfo peerInfo, BodiesSyncBatch batch, CancellationToken cancellationToken) { ISyncPeer peer = peerInfo.SyncPeer; batch.ResponseSourcePeer = peerInfo; diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/HeadersSyncDispatcher.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/HeadersSyncDownloader.cs similarity index 69% rename from src/Nethermind/Nethermind.Synchronization/FastBlocks/HeadersSyncDispatcher.cs rename to src/Nethermind/Nethermind.Synchronization/FastBlocks/HeadersSyncDownloader.cs index 9249a2d57da..89548d213cf 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/HeadersSyncDispatcher.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/HeadersSyncDownloader.cs @@ -5,26 +5,22 @@ using System.Threading; using System.Threading.Tasks; using Nethermind.Blockchain.Synchronization; -using Nethermind.Core; using Nethermind.Logging; using Nethermind.Synchronization.ParallelSync; using Nethermind.Synchronization.Peers; namespace Nethermind.Synchronization.FastBlocks { - public class HeadersSyncDispatcher : SyncDispatcher + public class HeadersSyncDownloader : ISyncDownloader { - public HeadersSyncDispatcher( - int maxNumberOfProcessingThread, - ISyncFeed syncFeed, - ISyncPeerPool syncPeerPool, - IPeerAllocationStrategyFactory peerAllocationStrategy, - ILogManager logManager) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) + private ILogger Logger; + + public HeadersSyncDownloader(ILogManager logManager) { + Logger = logManager.GetClassLogger(); } - protected override async Task Dispatch( + async Task ISyncDownloader.Dispatch( PeerInfo peerInfo, HeadersSyncBatch batch, CancellationToken cancellationToken) diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncDispatcher.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncDownloader.cs similarity index 71% rename from src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncDispatcher.cs rename to src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncDownloader.cs index 01cc247f0f2..364f8408e86 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncDispatcher.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncDownloader.cs @@ -15,19 +15,16 @@ namespace Nethermind.Synchronization.FastBlocks { - public class ReceiptsSyncDispatcher : SyncDispatcher + public class ReceiptsSyncDispatcher : ISyncDownloader { - public ReceiptsSyncDispatcher( - int maxNumberOfProcessingThread, - ISyncFeed syncFeed, - ISyncPeerPool syncPeerPool, - IPeerAllocationStrategyFactory peerAllocationStrategy, - ILogManager logManager) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) + private ILogger Logger; + + public ReceiptsSyncDispatcher(ILogManager logManager) { + Logger = logManager.GetClassLogger(); } - protected override async Task Dispatch(PeerInfo peerInfo, ReceiptsSyncBatch batch, CancellationToken cancellationToken) + public async Task Dispatch(PeerInfo peerInfo, ReceiptsSyncBatch batch, CancellationToken cancellationToken) { ISyncPeer peer = peerInfo.SyncPeer; batch.ResponseSourcePeer = peerInfo; diff --git a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncDownloader.cs b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncDownloader.cs new file mode 100644 index 00000000000..a892f0a9792 --- /dev/null +++ b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncDownloader.cs @@ -0,0 +1,15 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Synchronization.Peers; + +namespace Nethermind.Synchronization.ParallelSync; + +public interface ISyncDownloader +{ + public Task Dispatch(PeerInfo peerInfo, T request, CancellationToken cancellationToken); + public void BeforeFree(SyncPeerAllocation allocation) { } + public void OnAllocate(SyncPeerAllocation allocation) { } +} diff --git a/src/Nethermind/Nethermind.Synchronization/ParallelSync/SyncDispatcher.cs b/src/Nethermind/Nethermind.Synchronization/ParallelSync/SyncDispatcher.cs index b96582a6632..d20edf05dca 100644 --- a/src/Nethermind/Nethermind.Synchronization/ParallelSync/SyncDispatcher.cs +++ b/src/Nethermind/Nethermind.Synchronization/ParallelSync/SyncDispatcher.cs @@ -10,28 +10,31 @@ namespace Nethermind.Synchronization.ParallelSync { - public abstract class SyncDispatcher + public class SyncDispatcher { private readonly object _feedStateManipulation = new(); private SyncFeedState _currentFeedState = SyncFeedState.Dormant; private IPeerAllocationStrategyFactory PeerAllocationStrategyFactory { get; } - protected ILogger Logger { get; } - protected ISyncFeed Feed { get; } - protected ISyncPeerPool SyncPeerPool { get; } + private ILogger Logger { get; } + private ISyncFeed Feed { get; } + private ISyncDownloader Downloader { get; } + private ISyncPeerPool SyncPeerPool { get; } private readonly SemaphoreSlim _concurrentProcessingSemaphore; - protected SyncDispatcher( + public SyncDispatcher( int maxNumberOfProcessingThread, ISyncFeed? syncFeed, + ISyncDownloader? downloader, ISyncPeerPool? syncPeerPool, IPeerAllocationStrategyFactory? peerAllocationStrategy, ILogManager? logManager) { Logger = logManager?.GetClassLogger>() ?? throw new ArgumentNullException(nameof(logManager)); Feed = syncFeed ?? throw new ArgumentNullException(nameof(syncFeed)); + Downloader = downloader ?? throw new ArgumentNullException(nameof(downloader)); SyncPeerPool = syncPeerPool ?? throw new ArgumentNullException(nameof(syncPeerPool)); PeerAllocationStrategyFactory = peerAllocationStrategy ?? throw new ArgumentNullException(nameof(peerAllocationStrategy)); @@ -49,8 +52,6 @@ protected SyncDispatcher( private TaskCompletionSource? _dormantStateTask = new(TaskCreationOptions.RunContinuationsAsynchronously); - protected abstract Task Dispatch(PeerInfo peerInfo, T request, CancellationToken cancellationToken); - public async Task Start(CancellationToken cancellationToken) { UpdateState(Feed.CurrentState); @@ -134,7 +135,7 @@ private async Task DoDispatch(CancellationToken cancellationToken, PeerInfo? all { try { - await Dispatch(allocatedPeer, request, cancellationToken); + await Downloader.Dispatch(allocatedPeer, request, cancellationToken); } catch (ConcurrencyLimitReachedException) { @@ -196,14 +197,16 @@ private void DoHandleResponse(T request, PeerInfo? allocatedPeer = null) } } - protected virtual void Free(SyncPeerAllocation allocation) + private void Free(SyncPeerAllocation allocation) { + Downloader.BeforeFree(allocation); SyncPeerPool.Free(allocation); } - protected virtual async Task Allocate(T request) + protected async Task Allocate(T request) { SyncPeerAllocation allocation = await SyncPeerPool.Allocate(PeerAllocationStrategyFactory.Create(request), Feed.Contexts, 1000); + Downloader.OnAllocate(allocation); return allocation; } diff --git a/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncDispatcher.cs b/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncDownloader.cs similarity index 73% rename from src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncDispatcher.cs rename to src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncDownloader.cs index 35e9ae262bb..7a9c95a8612 100644 --- a/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncDispatcher.cs +++ b/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapSyncDownloader.cs @@ -4,29 +4,23 @@ using System; using System.Threading; using System.Threading.Tasks; -using Nethermind.Blockchain; using Nethermind.Blockchain.Synchronization; using Nethermind.Logging; -using Nethermind.State.Snap; -using Nethermind.Synchronization.FastSync; using Nethermind.Synchronization.ParallelSync; using Nethermind.Synchronization.Peers; namespace Nethermind.Synchronization.SnapSync { - public class SnapSyncDispatcher : SyncDispatcher + public class SnapSyncDownloader : ISyncDownloader { - public SnapSyncDispatcher( - int maxNumberOfProcessingThread, - ISyncFeed? syncFeed, - ISyncPeerPool? syncPeerPool, - IPeerAllocationStrategyFactory? peerAllocationStrategy, - ILogManager? logManager) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) + private ILogger Logger; + + public SnapSyncDownloader(ILogManager? logManager) { + Logger = logManager.GetClassLogger(); } - protected override async Task Dispatch(PeerInfo peerInfo, SnapSyncBatch batch, CancellationToken cancellationToken) + public async Task Dispatch(PeerInfo peerInfo, SnapSyncBatch batch, CancellationToken cancellationToken) { ISyncPeer peer = peerInfo.SyncPeer; diff --git a/src/Nethermind/Nethermind.Synchronization/StateSync/StateSyncDispatcher.cs b/src/Nethermind/Nethermind.Synchronization/StateSync/StateSyncDownloader.cs similarity index 93% rename from src/Nethermind/Nethermind.Synchronization/StateSync/StateSyncDispatcher.cs rename to src/Nethermind/Nethermind.Synchronization/StateSync/StateSyncDownloader.cs index 361a1fe3c82..8c5f1970058 100644 --- a/src/Nethermind/Nethermind.Synchronization/StateSync/StateSyncDispatcher.cs +++ b/src/Nethermind/Nethermind.Synchronization/StateSync/StateSyncDownloader.cs @@ -18,19 +18,16 @@ namespace Nethermind.Synchronization.StateSync { - public class StateSyncDispatcher : SyncDispatcher + public class StateSyncDownloader : ISyncDownloader { - public StateSyncDispatcher( - int maxNumberOfProcessingThread, - ISyncFeed syncFeed, - ISyncPeerPool syncPeerPool, - IPeerAllocationStrategyFactory peerAllocationStrategy, - ILogManager logManager) - : base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, logManager) + private ILogger Logger; + + public StateSyncDownloader(ILogManager logManager) { + Logger = logManager.GetClassLogger(); } - protected override async Task Dispatch(PeerInfo peerInfo, StateSyncBatch batch, CancellationToken cancellationToken) + public async Task Dispatch(PeerInfo peerInfo, StateSyncBatch batch, CancellationToken cancellationToken) { if (batch?.RequestedNodes is null || batch.RequestedNodes.Count == 0) { diff --git a/src/Nethermind/Nethermind.Synchronization/Synchronizer.cs b/src/Nethermind/Nethermind.Synchronization/Synchronizer.cs index 34a30614c93..d9bf542fa51 100644 --- a/src/Nethermind/Nethermind.Synchronization/Synchronizer.cs +++ b/src/Nethermind/Nethermind.Synchronization/Synchronizer.cs @@ -132,7 +132,14 @@ private void StartFullSyncComponents() _fullSyncFeed = new FullSyncFeed(_syncMode, LimboLogs.Instance); BlockDownloader fullSyncBlockDownloader = _blockDownloaderFactory.Create(_fullSyncFeed); fullSyncBlockDownloader.SyncEvent += DownloaderOnSyncEvent; - fullSyncBlockDownloader.Start(_syncCancellation!.Token).ContinueWith(t => + + SyncDispatcher dispatcher = CreateDispatcher( + _fullSyncFeed, + fullSyncBlockDownloader, + _blockDownloaderFactory.CreateAllocationStrategyFactory() + ); + + dispatcher.Start(_syncCancellation!.Token).ContinueWith(t => { if (t.IsFaulted) { @@ -145,11 +152,41 @@ private void StartFullSyncComponents() }); } + private void StartFastSyncComponents() + { + _fastSyncFeed = new FastSyncFeed(_syncMode, _syncConfig, _logManager); + BlockDownloader downloader = _blockDownloaderFactory.Create(_fastSyncFeed); + downloader.SyncEvent += DownloaderOnSyncEvent; + + SyncDispatcher dispatcher = CreateDispatcher( + _fastSyncFeed, + downloader, + _blockDownloaderFactory.CreateAllocationStrategyFactory() + ); + + dispatcher.Start(_syncCancellation!.Token).ContinueWith(t => + { + if (t.IsFaulted) + { + if (_logger.IsError) _logger.Error("Fast sync failed", t.Exception); + } + else + { + if (_logger.IsInfo) _logger.Info("Fast sync blocks downloader task completed."); + } + }); + } + private void StartStateSyncComponents() { TreeSync treeSync = new(SyncMode.StateNodes, _dbProvider.CodeDb, _dbProvider.StateDb, _blockTree, _logManager); _stateSyncFeed = new StateSyncFeed(_syncMode, treeSync, _logManager); - StateSyncDispatcher stateSyncDispatcher = new(_syncConfig.MaxProcessingThreads, _stateSyncFeed!, _syncPeerPool, new StateSyncAllocationStrategyFactory(), _logManager); + SyncDispatcher stateSyncDispatcher = CreateDispatcher( + _stateSyncFeed, + new StateSyncDownloader(_logManager), + new StateSyncAllocationStrategyFactory() + ); + Task syncDispatcherTask = stateSyncDispatcher.Start(_syncCancellation.Token).ContinueWith(t => { if (t.IsFaulted) @@ -166,7 +203,11 @@ private void StartStateSyncComponents() private void StartSnapSyncComponents() { _snapSyncFeed = new SnapSyncFeed(_syncMode, _snapProvider, _logManager); - SnapSyncDispatcher dispatcher = new(_syncConfig.MaxProcessingThreads, _snapSyncFeed!, _syncPeerPool, new SnapSyncAllocationStrategyFactory(), _logManager); + SyncDispatcher dispatcher = CreateDispatcher( + _snapSyncFeed, + new SnapSyncDownloader(_logManager), + new SnapSyncAllocationStrategyFactory() + ); Task _ = dispatcher.Start(_syncCancellation!.Token).ContinueWith(t => { @@ -186,7 +227,12 @@ private void StartFastBlocksComponents() FastBlocksPeerAllocationStrategyFactory fastFactory = new(); _headersFeed = new HeadersSyncFeed(_syncMode, _blockTree, _syncPeerPool, _syncConfig, _syncReport, _logManager); - HeadersSyncDispatcher headersDispatcher = new(_syncConfig.MaxProcessingThreads, _headersFeed!, _syncPeerPool, fastFactory, _logManager); + SyncDispatcher headersDispatcher = CreateDispatcher( + _headersFeed, + new HeadersSyncDownloader(_logManager), + fastFactory + ); + Task headersTask = headersDispatcher.Start(_syncCancellation!.Token).ContinueWith(t => { if (t.IsFaulted) @@ -204,7 +250,13 @@ private void StartFastBlocksComponents() if (_syncConfig.DownloadBodiesInFastSync) { _bodiesFeed = new BodiesSyncFeed(_syncMode, _blockTree, _syncPeerPool, _syncConfig, _syncReport, _specProvider, _logManager); - BodiesSyncDispatcher bodiesDispatcher = new(_syncConfig.MaxProcessingThreads, _bodiesFeed!, _syncPeerPool, fastFactory, _logManager); + + SyncDispatcher bodiesDispatcher = CreateDispatcher( + _bodiesFeed, + new BodiesSyncDownloader(_logManager), + fastFactory + ); + Task bodiesTask = bodiesDispatcher.Start(_syncCancellation.Token).ContinueWith(t => { if (t.IsFaulted) @@ -221,7 +273,13 @@ private void StartFastBlocksComponents() if (_syncConfig.DownloadReceiptsInFastSync) { _receiptsFeed = new ReceiptsSyncFeed(_syncMode, _specProvider, _blockTree, _receiptStorage, _syncPeerPool, _syncConfig, _syncReport, _logManager); - ReceiptsSyncDispatcher receiptsDispatcher = new(_syncConfig.MaxProcessingThreads, _receiptsFeed!, _syncPeerPool, fastFactory, _logManager); + + SyncDispatcher receiptsDispatcher = CreateDispatcher( + _receiptsFeed, + new ReceiptsSyncDispatcher(_logManager), + fastFactory + ); + Task receiptsTask = receiptsDispatcher.Start(_syncCancellation.Token).ContinueWith(t => { if (t.IsFaulted) @@ -237,23 +295,15 @@ private void StartFastBlocksComponents() } } - private void StartFastSyncComponents() + protected SyncDispatcher CreateDispatcher(ISyncFeed feed, ISyncDownloader downloader, IPeerAllocationStrategyFactory peerAllocationStrategyFactory) { - _fastSyncFeed = new FastSyncFeed(_syncMode, _syncConfig, _logManager); - BlockDownloader downloader = _blockDownloaderFactory.Create(_fastSyncFeed); - downloader.SyncEvent += DownloaderOnSyncEvent; - - downloader.Start(_syncCancellation!.Token).ContinueWith(t => - { - if (t.IsFaulted) - { - if (_logger.IsError) _logger.Error("Fast sync failed", t.Exception); - } - else - { - if (_logger.IsInfo) _logger.Info("Fast sync blocks downloader task completed."); - } - }); + return new( + _syncConfig.MaxProcessingThreads, + feed!, + downloader, + _syncPeerPool, + peerAllocationStrategyFactory, + _logManager); } private NodeStatsEventType Convert(SyncEvent syncEvent)