Skip to content

Commit

Permalink
Refactor/Split dispatcher and downloader (#5778)
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap authored Jun 6, 2023
1 parent ace844b commit 3a96f5e
Show file tree
Hide file tree
Showing 26 changed files with 256 additions and 188 deletions.
1 change: 0 additions & 1 deletion src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ private async Task Initialize(CancellationToken cancellationToken)
SyncReport syncReport = new(_api.SyncPeerPool!, _api.NodeStatsManager!, _api.SyncModeSelector, _syncConfig, _api.Pivot, _api.LogManager);

_api.BlockDownloaderFactory ??= new BlockDownloaderFactory(
_syncConfig.MaxProcessingThreads,
_api.SpecProvider!,
_api.BlockTree!,
_api.ReceiptStorage!,
Expand Down
2 changes: 0 additions & 2 deletions src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,10 @@ public Task InitSynchronization()
SyncReport syncReport = new(_api.SyncPeerPool, _api.NodeStatsManager, _api.SyncModeSelector, _syncConfig, _beaconPivot, _api.LogManager);

_api.BlockDownloaderFactory = new MergeBlockDownloaderFactory(
_syncConfig.MaxProcessingThreads,
_poSSwitcher,
_beaconPivot,
_api.SpecProvider,
_api.BlockTree,
_blockCacheService,
_api.ReceiptStorage!,
_api.BlockValidator!,
_api.SealValidator!,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using Nethermind.Logging;
using Nethermind.Synchronization.FastBlocks;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;

namespace Nethermind.Merge.Plugin.Synchronization;

public class BeaconHeadersSyncDownloader : HeadersSyncDownloader
{
public BeaconHeadersSyncDownloader(ILogManager logManager) : base(logManager)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace Nethermind.Merge.Plugin.Synchronization
{
public class MergeBlockDownloader : BlockDownloader
public class MergeBlockDownloader : BlockDownloader, ISyncDownloader<BlocksRequest>
{
private readonly IBeaconPivot _beaconPivot;
private readonly IBlockTree _blockTree;
Expand All @@ -41,7 +41,6 @@ public class MergeBlockDownloader : BlockDownloader
private readonly ISyncProgressResolver _syncProgressResolver;

public MergeBlockDownloader(
int maxNumberOfProcessingThread,
IPoSSwitcher posSwitcher,
IBeaconPivot beaconPivot,
ISyncFeed<BlocksRequest?>? feed,
Expand All @@ -57,9 +56,8 @@ public MergeBlockDownloader(
ISyncProgressResolver syncProgressResolver,
ILogManager logManager,
SyncBatchSize? syncBatchSize = null)
: base(maxNumberOfProcessingThread, feed, syncPeerPool, blockTree, blockValidator, sealValidator, syncReport, receiptStorage,
specProvider, new MergeBlocksSyncPeerAllocationStrategyFactory(posSwitcher, beaconPivot, logManager),
betterPeerStrategy, logManager, syncBatchSize)
: base(feed, syncPeerPool, blockTree, blockValidator, sealValidator, syncReport, receiptStorage,
specProvider, betterPeerStrategy, logManager, syncBatchSize)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider));
Expand All @@ -74,7 +72,7 @@ public MergeBlockDownloader(
_logger = logManager.GetClassLogger();
}

protected override async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksRequest, CancellationToken cancellation)
public override async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksRequest, CancellationToken cancellation)
{
if (_beaconPivot.BeaconPivotExists() == false && _poSSwitcher.HasEverReachedTerminalBlock() == false)
{
Expand All @@ -86,7 +84,7 @@ protected override async Task Dispatch(PeerInfo bestPeer, BlocksRequest? blocksR

if (blocksRequest == null)
{
if (Logger.IsWarn) Logger.Warn($"NULL received for dispatch in {nameof(BlockDownloader)}");
if (_logger.IsWarn) _logger.Warn($"NULL received for dispatch in {nameof(BlockDownloader)}");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
using Nethermind.Consensus.Validators;
using Nethermind.Core.Specs;
using Nethermind.Logging;
using Nethermind.Merge.Plugin.Handlers;
using Nethermind.Merge.Plugin.InvalidChainTracker;
using Nethermind.Synchronization;
using Nethermind.Synchronization.Blocks;
using Nethermind.Synchronization.ParallelSync;
Expand All @@ -35,15 +33,12 @@ public class MergeBlockDownloaderFactory : IBlockDownloaderFactory
private readonly ISyncReport _syncReport;
private readonly ISyncProgressResolver _syncProgressResolver;
private readonly IChainLevelHelper _chainLevelHelper;
private readonly int _maxNumberOfProcessingThread;

public MergeBlockDownloaderFactory(
int maxNumberOfProcessingThread,
IPoSSwitcher poSSwitcher,
IBeaconPivot beaconPivot,
ISpecProvider specProvider,
IBlockTree blockTree,
IBlockCacheService blockCacheService,
IReceiptStorage receiptStorage,
IBlockValidator blockValidator,
ISealValidator sealValidator,
Expand All @@ -54,7 +49,6 @@ public MergeBlockDownloaderFactory(
ISyncProgressResolver syncProgressResolver,
ILogManager logManager)
{
_maxNumberOfProcessingThread = maxNumberOfProcessingThread;
_poSSwitcher = poSSwitcher ?? throw new ArgumentNullException(nameof(poSSwitcher));
_beaconPivot = beaconPivot ?? throw new ArgumentNullException(nameof(beaconPivot));
_specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider));
Expand All @@ -73,9 +67,25 @@ public MergeBlockDownloaderFactory(
public BlockDownloader Create(ISyncFeed<BlocksRequest?> syncFeed)
{
return new MergeBlockDownloader(
_maxNumberOfProcessingThread, _poSSwitcher, _beaconPivot, syncFeed, _syncPeerPool, _blockTree, _blockValidator,
_sealValidator, _syncReport, _receiptStorage, _specProvider, _betterPeerStrategy, _chainLevelHelper,
_syncProgressResolver, _logManager);
_poSSwitcher,
_beaconPivot,
syncFeed,
_syncPeerPool,
_blockTree,
_blockValidator,
_sealValidator,
_syncReport,
_receiptStorage,
_specProvider,
_betterPeerStrategy,
_chainLevelHelper,
_syncProgressResolver,
_logManager);
}

public IPeerAllocationStrategyFactory<BlocksRequest> CreateAllocationStrategyFactory()
{
return new MergeBlocksSyncPeerAllocationStrategyFactory(_poSSwitcher, _beaconPivot, _logManager);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@ private void StartBeaconHeadersComponents()
FastBlocksPeerAllocationStrategyFactory fastFactory = new();
BeaconHeadersSyncFeed beaconHeadersFeed =
new(_poSSwitcher, _syncMode, _blockTree, _syncPeerPool, _syncConfig, _syncReport, _pivot, _mergeConfig, _invalidChainTracker, _logManager);
BeaconHeadersSyncDispatcher beaconHeadersDispatcher =
new(_syncConfig.MaxProcessingThreads, beaconHeadersFeed!, _syncPeerPool, fastFactory, _logManager);
beaconHeadersDispatcher.Start(_syncCancellation!.Token).ContinueWith(t =>
BeaconHeadersSyncDownloader beaconHeadersDownloader = new(_logManager);

SyncDispatcher<HeadersSyncBatch> dispatcher = CreateDispatcher(
beaconHeadersFeed!,
beaconHeadersDownloader,
fastFactory
);

dispatcher.Start(_syncCancellation!.Token).ContinueWith(t =>
{
if (t.IsFaulted)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public async Task Does_not_deadlock_on_replace_peer()

CancellationTokenSource cts = new CancellationTokenSource();

Task ignored = ctx.BlockDownloader.Start(cts.Token);
Task ignored = ctx.Dispatcher.Start(cts.Token);
await Task.Delay(TimeSpan.FromMilliseconds(100));

// Feed should activate and allocate the first peer
Expand Down Expand Up @@ -328,8 +328,6 @@ public async Task No_old_bodies_and_receipts()

ctx.BeaconPivot.EnsurePivot(blockTrees.SyncedTree.FindHeader(64, BlockTreeLookupOptions.None));

BlockDownloader downloader = ctx.BlockDownloader;

SyncPeerMock syncPeer = new(syncedTree, false, Response.AllCorrect, 34000000);
PeerInfo peerInfo = new(syncPeer);

Expand All @@ -348,7 +346,7 @@ public async Task No_old_bodies_and_receipts()
ctx.Feed.Activate();

CancellationTokenSource cts = new();
downloader.Start(cts.Token);
ctx.Dispatcher.Start(cts.Token);

Assert.That(
() => ctx.BlockTree.BestKnownNumber,
Expand Down Expand Up @@ -470,7 +468,6 @@ public override BlockDownloader BlockDownloader
get
{
return _mergeBlockDownloader ??= new(
0,
PosSwitcher,
BeaconPivot,
Feed,
Expand All @@ -487,5 +484,10 @@ public override BlockDownloader BlockDownloader
LimboLogs.Instance);
}
}

private IPeerAllocationStrategyFactory<BlocksRequest>? _peerAllocationStrategy;
protected virtual IPeerAllocationStrategyFactory<BlocksRequest> PeerAllocationStrategy =>
_peerAllocationStrategy ??= new MergeBlocksSyncPeerAllocationStrategyFactory(PosSwitcher, BeaconPivot, LimboLogs.Instance);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,6 @@ public IBlockValidator BlockValidator

private BlockDownloader _blockDownloader;
public virtual BlockDownloader BlockDownloader => _blockDownloader ??= new BlockDownloader(
0,
Feed,
PeerPool,
BlockTree,
Expand All @@ -1003,12 +1002,26 @@ public IBlockValidator BlockValidator
NullSyncReport.Instance,
ReceiptStorage,
SpecProvider,
new BlocksSyncPeerAllocationStrategyFactory(),
BetterPeerStrategy,
LimboLogs.Instance,
SyncBatchSize
);

private SyncDispatcher<BlocksRequest> _dispatcher;
public SyncDispatcher<BlocksRequest> Dispatcher => _dispatcher ??= new SyncDispatcher<BlocksRequest>(
0,
Feed,
BlockDownloader,
PeerPool,
PeerAllocationStrategy,
LimboLogs.Instance
);

private IPeerAllocationStrategyFactory<BlocksRequest>? _peerAllocationStrategy;

protected virtual IPeerAllocationStrategyFactory<BlocksRequest> PeerAllocationStrategy =>
_peerAllocationStrategy ??= new BlocksSyncPeerAllocationStrategyFactory();

public Context(BlockTree? blockTree = null)
{
if (blockTree != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@

namespace Nethermind.Synchronization.Test.FastSync.SnapProtocolTests
{
public class StateSyncDispatcherTester : StateSyncDispatcher
public class StateSyncDispatcherTester : SyncDispatcher<StateSyncBatch>
{
private readonly ISyncDownloader<StateSyncBatch> _downloader;

public StateSyncDispatcherTester(
ISyncFeed<StateSyncBatch> syncFeed,
ISyncDownloader<StateSyncBatch> downloader,
ISyncPeerPool syncPeerPool,
IPeerAllocationStrategyFactory<StateSyncBatch> peerAllocationStrategy,
ILogManager logManager) : base(0, syncFeed, syncPeerPool, peerAllocationStrategy, logManager)
ILogManager logManager) : base(0, syncFeed, downloader, syncPeerPool, peerAllocationStrategy, logManager)
{
_downloader = downloader;
}

public async Task ExecuteDispatch(StateSyncBatch batch, int times)
Expand All @@ -31,7 +35,7 @@ public async Task ExecuteDispatch(StateSyncBatch batch, int times)

for (int i = 0; i < times; i++)
{
await base.Dispatch(allocation.Current, batch, CancellationToken.None);
await _downloader.Dispatch(allocation.Current, batch, CancellationToken.None);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void Setup()

var feed = Substitute.For<ISyncFeed<StateSyncBatch>>();
_dispatcher =
new StateSyncDispatcherTester(feed, _pool, new StateSyncAllocationStrategyFactory(), _logManager);
new StateSyncDispatcherTester(feed, new StateSyncDownloader(_logManager), _pool, new StateSyncAllocationStrategyFactory(), _logManager);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ protected SafeContext PrepareDownloaderWithPeer(DbContext dbContext, params ISyn
ctx.SyncModeSelector = StaticSelector.StateNodesWithFastBlocks;
ctx.TreeFeed = new(SyncMode.StateNodes, dbContext.LocalCodeDb, dbContext.LocalStateDb, blockTree, _logManager);
ctx.Feed = new StateSyncFeed(ctx.SyncModeSelector, ctx.TreeFeed, _logManager);
ctx.StateSyncDispatcher =
new StateSyncDispatcher(0, ctx.Feed, ctx.Pool, new StateSyncAllocationStrategyFactory(), _logManager);
ctx.Downloader = new StateSyncDownloader(_logManager);
ctx.StateSyncDispatcher = new SyncDispatcher<StateSyncBatch>(0, ctx.Feed, ctx.Downloader, ctx.Pool, new StateSyncAllocationStrategyFactory(), _logManager);
ctx.StateSyncDispatcher.Start(CancellationToken.None);
return ctx;
}
Expand Down Expand Up @@ -145,7 +145,8 @@ protected class SafeContext
public ISyncPeerPool Pool;
public TreeSync TreeFeed;
public StateSyncFeed Feed;
public StateSyncDispatcher StateSyncDispatcher;
public StateSyncDownloader Downloader;
public SyncDispatcher<StateSyncBatch> StateSyncDispatcher;
}

protected class DbContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public async Task Setup()
Pivot pivot = new(syncConfig);
SyncReport syncReport = new(_pool, stats, syncModeSelector, syncConfig, pivot, LimboLogs.Instance);
BlockDownloaderFactory blockDownloaderFactory = new(
0,
MainnetSpecProvider.Instance,
_blockTree,
_receiptStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,10 @@ public TestBatch(int start, int length)
public int[] Result { get; set; }
}

private class TestDispatcher : SyncDispatcher<TestBatch>
private class TestDownloader : ISyncDownloader<TestBatch>
{
public TestDispatcher(int maxNumberOfProcessingThread, ISyncFeed<TestBatch> syncFeed, ISyncPeerPool syncPeerPool, IPeerAllocationStrategyFactory<TestBatch> peerAllocationStrategy)
: base(maxNumberOfProcessingThread, syncFeed, syncPeerPool, peerAllocationStrategy, LimboLogs.Instance)
{
}

private int _failureSwitch;

protected override async Task Dispatch(PeerInfo allocation, TestBatch request, CancellationToken cancellationToken)
public async Task Dispatch(PeerInfo peerInfo, TestBatch request, CancellationToken cancellationToken)
{
if (++_failureSwitch % 2 == 0)
{
Expand Down Expand Up @@ -259,7 +253,14 @@ public override async Task<TestBatch> PrepareRequest(CancellationToken token = d
public async Task Simple_test_sync()
{
TestSyncFeed syncFeed = new();
TestDispatcher dispatcher = new(0, syncFeed, new TestSyncPeerPool(), new StaticPeerAllocationStrategyFactory<TestBatch>(FirstFree.Instance));
TestDownloader downloader = new TestDownloader();
SyncDispatcher<TestBatch> dispatcher = new(
0,
syncFeed,
downloader,
new TestSyncPeerPool(),
new StaticPeerAllocationStrategyFactory<TestBatch>(FirstFree.Instance),
LimboLogs.Instance);
Task executorTask = dispatcher.Start(CancellationToken.None);
syncFeed.Activate();
await executorTask;
Expand All @@ -277,7 +278,16 @@ public async Task Test_release_before_processing_complete(bool isMultiSync, int
{
TestSyncFeed syncFeed = new(isMultiSync, 999999);
syncFeed.LockResponse();
TestDispatcher dispatcher = new(processingThread, syncFeed, new TestSyncPeerPool(peerCount), new StaticPeerAllocationStrategyFactory<TestBatch>(FirstFree.Instance));

TestDownloader downloader = new TestDownloader();
SyncDispatcher<TestBatch> dispatcher = new(
processingThread,
syncFeed,
downloader,
new TestSyncPeerPool(peerCount),
new StaticPeerAllocationStrategyFactory<TestBatch>(FirstFree.Instance),
LimboLogs.Instance);

Task executorTask = dispatcher.Start(CancellationToken.None);
syncFeed.Activate();
await Task.Delay(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ private SyncTestContext CreateSyncManager(int index)
Pivot pivot = new(syncConfig);
SyncReport syncReport = new(syncPeerPool, nodeStatsManager, selector, syncConfig, pivot, LimboLogs.Instance);
BlockDownloaderFactory blockDownloaderFactory = new(
0,
MainnetSpecProvider.Instance,
tree,
NullReceiptStorage.Instance,
Expand Down
Loading

0 comments on commit 3a96f5e

Please sign in to comment.