Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dht] Fix a DHT initialization issue #546

Merged
merged 1 commit into from
Jul 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 18 additions & 40 deletions src/MonoTorrent.Dht/MonoTorrent.Dht/DhtEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Threading.Tasks;

Expand Down Expand Up @@ -73,8 +74,6 @@ public class DhtEngine : IDisposable, IDhtEngine

internal static MainLoop MainLoop { get; } = new MainLoop ("DhtLoop");

#region Properties

public TimeSpan AnnounceInterval => DefaultAnnounceInternal;

public bool Disposed { get; private set; }
Expand All @@ -89,20 +88,18 @@ public class DhtEngine : IDisposable, IDhtEngine
internal NodeId LocalId => RoutingTable.LocalNode.Id;
internal MessageLoop MessageLoop { get; }
public int NodeCount => RoutingTable.CountNodes ();
IEnumerable<Node> PendingNodes { get; set; }
internal RoutingTable RoutingTable { get; }
internal TokenManager TokenManager { get; }
internal Dictionary<NodeId, List<Node>> Torrents { get; }

#endregion Properties

#region Constructors

public DhtEngine ()
{
var monitor = new TransferMonitor ();
BucketRefreshTimeout = TimeSpan.FromMinutes (15);
MessageLoop = new MessageLoop (this, monitor);
Monitor = monitor;
PendingNodes = Array.Empty<Node> ();
RoutingTable = new RoutingTable ();
State = DhtState.NotReady;
TokenManager = new TokenManager ();
Expand All @@ -115,36 +112,16 @@ public DhtEngine ()
});
}

#endregion Constructors

public void Add (IEnumerable<ReadOnlyMemory<byte>> nodes)
{
// Maybe we should pipeline all our tasks to ensure we don't flood the DHT engine.
// I don't think it's *bad* that we can run several initialise tasks simultaenously
// but it might be better to run them sequentially instead. We should also
// run GetPeers and Announce tasks sequentially.
InitializeAsync (Node.FromCompactNode (nodes));
}

internal void Add (IEnumerable<Node> nodes)
{
if (nodes == null)
throw new ArgumentNullException (nameof (nodes));

foreach (Node n in nodes)
Add (n);
}

internal async void Add (Node node)
{
if (node == null)
throw new ArgumentNullException (nameof (node));

try {
await MainLoop;
await SendQueryAsync (new Ping (RoutingTable.LocalNode.Id), node);
} catch {
// Ignore?
if (State == DhtState.NotReady) {
PendingNodes = Node.FromCompactNode (nodes);
} else {
// Maybe we should pipeline all our tasks to ensure we don't flood the DHT engine.
// I don't think it's *bad* that we can run several initialise tasks simultaenously
// but it might be better to run them sequentially instead. We should also
// run GetPeers and Announce tasks sequentially.
InitializeAsync (Node.FromCompactNode (nodes));
}
}

Expand Down Expand Up @@ -278,12 +255,13 @@ internal async Task<SendQueryEventArgs> SendQueryAsync (QueryMessage query, Node
return e;
}

public async Task StartAsync ()
{
await StartAsync (Array.Empty<byte> ());
}
public Task StartAsync ()
=> StartAsync (PendingNodes);

public Task StartAsync (ReadOnlyMemory<byte> initialNodes)
=> StartAsync (Node.FromCompactNode (BEncodedString.FromMemory (initialNodes)).Concat (PendingNodes));

public async Task StartAsync (ReadOnlyMemory<byte> initialNodes)
async Task StartAsync (IEnumerable<Node> nodes)
{
CheckDisposed ();

Expand All @@ -292,7 +270,7 @@ public async Task StartAsync (ReadOnlyMemory<byte> initialNodes)
if (RoutingTable.NeedsBootstrap) {
RaiseStateChanged (DhtState.Initialising);
// HACK: We want to disambiguate between 'decode one' and 'decode many' when using a Span<byte>
InitializeAsync (Node.FromCompactNode (BEncodedString.FromMemory (initialNodes)));
InitializeAsync (nodes);
} else {
RaiseStateChanged (DhtState.Ready);
}
Expand Down
28 changes: 28 additions & 0 deletions src/Tests/Tests.MonoTorrent.Dht/MonoTorrent.Dht/DhtEngineTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

using NUnit.Framework;

namespace MonoTorrent.Dht
{
[TestFixture]
public class DhtEngineTests
{
[Test]
public void AddRawNodesBeforeStarting ()
{
int count = 0;
var engine = new DhtEngine ();
engine.MessageLoop.QuerySent += (o, e) => count++;
engine.Add (new ReadOnlyMemory<byte>[] { new byte[100] });
Assert.AreEqual (0, engine.MessageLoop.PendingQueries);
Assert.AreEqual (0, count);
Assert.AreEqual (0, engine.RoutingTable.CountNodes ());
Assert.AreEqual (0, engine.MessageLoop.DhtMessageFactory.RegisteredMessages);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void SendPing_Synchronous ()
public void SendPing_Asynchronous ()
=> SendPing (true);

void SendPing (bool asynchronous)
async void SendPing (bool asynchronous)
{
listener.SendAsynchronously = asynchronous;

Expand All @@ -103,8 +103,10 @@ void SendPing (bool asynchronous)

Assert.AreEqual (NodeState.Unknown, node.State, "#1");

await engine.StartAsync ();

// Should cause an implicit Ping to be sent to the node to verify it's alive.
engine.Add (node);
engine.Add (new[] { node.CompactNode ().AsMemory () });

Assert.IsTrue (tcs.Task.Wait (1000), "#1a");
Assert.IsTrue (node.LastSeen < TimeSpan.FromSeconds (1), "#2");
Expand Down