Skip to content

Commit

Permalink
Fixes for P2P message disposing (#7578)
Browse files Browse the repository at this point in the history
Co-authored-by: lukasz.rozmej <lukasz.rozmej@gmail.com>
  • Loading branch information
2 people authored and rjnrohit committed Oct 10, 2024
1 parent c4b349f commit f763d99
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 67 deletions.
10 changes: 0 additions & 10 deletions src/Nethermind/Nethermind.Network.Test/P2P/PacketSenderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public void Does_send_on_active_channel()
PacketSender packetSender = new(serializer, LimboLogs.Instance, TimeSpan.Zero);
packetSender.HandlerAdded(context);
packetSender.Enqueue(testMessage);
testMessage.WasDisposed.Should().BeTrue();

context.Received(1).WriteAndFlushAsync(Arg.Any<IByteBuffer>());
}
Expand All @@ -62,7 +61,6 @@ public void Does_not_try_to_send_on_inactive_channel()
PacketSender packetSender = new(serializer, LimboLogs.Instance, TimeSpan.Zero);
packetSender.HandlerAdded(context);
packetSender.Enqueue(testMessage);
testMessage.WasDisposed.Should().BeTrue();

context.Received(0).WriteAndFlushAsync(Arg.Any<IByteBuffer>());
}
Expand All @@ -88,7 +86,6 @@ public async Task Send_after_delay_if_specified()
PacketSender packetSender = new(serializer, LimboLogs.Instance, delay);
packetSender.HandlerAdded(context);
packetSender.Enqueue(testMessage);
testMessage.WasDisposed.Should().BeTrue();

await context.Received(0).WriteAndFlushAsync(Arg.Any<IByteBuffer>());

Expand All @@ -101,13 +98,6 @@ private class TestMessage : P2PMessage
{
public override int PacketType { get; } = 0;
public override string Protocol { get; } = "";

public bool WasDisposed { get; set; }
public override void Dispose()
{
base.Dispose();
WasDisposed = true;
}
}
}
}
41 changes: 31 additions & 10 deletions src/Nethermind/Nethermind.Network.Test/P2P/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Nethermind.Network.Rlpx;
using Nethermind.Stats.Model;
using NSubstitute;
using NSubstitute.ReceivedExtensions;
using NUnit.Framework;

namespace Nethermind.Network.Test.P2P;
Expand Down Expand Up @@ -452,20 +451,24 @@ public void Can_deliver_messages()
session.AddProtocolHandler(bbb);
session.AddProtocolHandler(ccc);

_packetSender.Enqueue(PingMessage.Instance).Returns(10);
session.DeliverMessage(PingMessage.Instance);
_packetSender.Received().Enqueue(PingMessage.Instance);
var message = new TestMessage();
_packetSender.Enqueue(message).Returns(10);
session.DeliverMessage(message);
_packetSender.Received().Enqueue(message);
message.WasDisposed.Should().BeTrue();

Metrics.P2PBytesSent.Should().Be(10);
}

[Test]
public void Cannot_deliver_before_initialized()
{
var message = new TestMessage();
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
Assert.Throws<InvalidOperationException>(() => session.DeliverMessage(PingMessage.Instance));
Assert.Throws<InvalidOperationException>(() => session.DeliverMessage(message));
session.Handshake(TestItem.PublicKeyA);
Assert.Throws<InvalidOperationException>(() => session.DeliverMessage(PingMessage.Instance));
Assert.Throws<InvalidOperationException>(() => session.DeliverMessage(message));
message.WasDisposed.Should().BeTrue();
session.Init(5, _channelHandlerContext, _packetSender);
IProtocolHandler p2p = BuildHandler("p2p", 10);
session.AddProtocolHandler(p2p);
Expand Down Expand Up @@ -494,8 +497,10 @@ public void Stops_delivering_messages_after_disconnect()

session.InitiateDisconnect(DisconnectReason.Other);

session.DeliverMessage(PingMessage.Instance);
_packetSender.DidNotReceive().Enqueue(Arg.Any<PingMessage>());
var message = new TestMessage();
session.DeliverMessage(message);
_packetSender.DidNotReceive().Enqueue(Arg.Any<TestMessage>());
message.WasDisposed.Should().BeTrue();
}

[Test]
Expand Down Expand Up @@ -538,18 +543,21 @@ public void Protocol_handler_can_send_message_on_disconnect()
IProtocolHandler p2p = BuildHandler("p2p", 10);
session.AddProtocolHandler(p2p);

var message = new TestMessage();
p2p.When(it => it.DisconnectProtocol(Arg.Any<DisconnectReason>(), Arg.Any<string>()))
.Do((_) =>
{
session.DeliverMessage(PingMessage.Instance);
session.DeliverMessage(message);
});

session.Init(5, _channelHandlerContext, _packetSender);
session.InitiateDisconnect(DisconnectReason.Other);

_packetSender
.Received()
.Enqueue(PingMessage.Instance);
.Enqueue(message);

message.WasDisposed.Should().BeTrue();
}

[Test, Retry(3)]
Expand Down Expand Up @@ -619,4 +627,17 @@ public void Updates_local_and_remote_metrics_on_disconnects()
Assert.That(afterLocal, Is.EqualTo(beforeLocal));
Assert.That(afterRemote, Is.EqualTo(beforeRemote + 1));
}

private class TestMessage : P2PMessage
{
public override int PacketType => P2PMessageCode.Ping;
public override string Protocol => "p2p";

public bool WasDisposed { get; set; }
public override void Dispose()
{
base.Dispose();
WasDisposed = true;
}
}
}
16 changes: 4 additions & 12 deletions src/Nethermind/Nethermind.Network/P2P/PacketSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,12 @@ public PacketSender(IMessageSerializationService messageSerializationService, IL

public int Enqueue<T>(T message) where T : P2PMessage
{
IByteBuffer buffer;
try
if (!_context.Channel.IsWritable || !_context.Channel.Active)
{
if (!_context.Channel.IsWritable || !_context.Channel.Active)
{
return 0;
}

buffer = _messageSerializationService.ZeroSerialize(message);
}
finally
{
message.Dispose();
return 0;
}

IByteBuffer buffer = _messageSerializationService.ZeroSerialize(message);
int length = buffer.ReadableBytes;

// Running in background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,39 @@ CancellationToken token
)
{
Task<TResponse> task = request.CompletionSource.Task;
using CancellationTokenSource delayCancellation = new();
using CancellationTokenSource compositeCancellation =
CancellationTokenSource.CreateLinkedTokenSource(token, delayCancellation.Token);
Task firstTask = await Task.WhenAny(task, Task.Delay(Timeouts.Eth, compositeCancellation.Token));
if (firstTask.IsCanceled)
bool success = false;
try
{
token.ThrowIfCancellationRequested();
}
using CancellationTokenSource delayCancellation = new();
using CancellationTokenSource compositeCancellation = CancellationTokenSource.CreateLinkedTokenSource(token, delayCancellation.Token);
Task firstTask = await Task.WhenAny(task, Task.Delay(Timeouts.Eth, compositeCancellation.Token));
if (firstTask.IsCanceled)
{
token.ThrowIfCancellationRequested();
}

if (firstTask == task)
{
delayCancellation.Cancel();
long elapsed = request.FinishMeasuringTime();
long bytesPerMillisecond = (long)((decimal)request.ResponseSize / Math.Max(1, elapsed));
if (Logger.IsTrace) Logger.Trace($"{this} speed is {request.ResponseSize}/{elapsed} = {bytesPerMillisecond}");
StatsManager.ReportTransferSpeedEvent(Session.Node, speedType, bytesPerMillisecond);
if (firstTask == task)
{
await delayCancellation.CancelAsync();
long elapsed = request.FinishMeasuringTime();
long bytesPerMillisecond = (long)((decimal)request.ResponseSize / Math.Max(1, elapsed));
if (Logger.IsTrace) Logger.Trace($"{this} speed is {request.ResponseSize}/{elapsed} = {bytesPerMillisecond}");
StatsManager.ReportTransferSpeedEvent(Session.Node, speedType, bytesPerMillisecond);

return await task;
}
success = true;
return await task;
}

CleanupTimeoutTask(task);
StatsManager.ReportTransferSpeedEvent(Session.Node, speedType, 0L);
throw new TimeoutException($"{Session} Request timeout in {describeRequestFunc(request.Message)}");
StatsManager.ReportTransferSpeedEvent(Session.Node, speedType, 0L);
throw new TimeoutException($"{Session} Request timeout in {describeRequestFunc(request.Message)}");
}
finally
{
if (!success)
{
CleanupTimeoutTask(task);
}
}
}

private static void CleanupTimeoutTask<TResponse>(Task<TResponse> task)
Expand Down
39 changes: 23 additions & 16 deletions src/Nethermind/Nethermind.Network/P2P/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,31 +207,38 @@ public void ReceiveMessage(ZeroPacket zeroPacket)

public int DeliverMessage<T>(T message) where T : P2PMessage
{
lock (_sessionStateLock)
try
{
if (State < SessionState.Initialized)
lock (_sessionStateLock)
{
throw new InvalidOperationException($"{nameof(DeliverMessage)} called {this}");
}
if (State < SessionState.Initialized)
{
throw new InvalidOperationException($"{nameof(DeliverMessage)} called {this}");
}

// Must allow sending out packet when `DisconnectingProtocols` so that we can send out disconnect reason
// and hello (part of protocol)
if (IsClosed)
{
return 1;
// Must allow sending out packet when `DisconnectingProtocols` so that we can send out disconnect reason
// and hello (part of protocol)
if (IsClosed)
{
return 1;
}
}
}

if (_logger.IsTrace) _logger.Trace($"P2P to deliver {message.Protocol}.{message.PacketType} on {this}");
if (_logger.IsTrace) _logger.Trace($"P2P to deliver {message.Protocol}.{message.PacketType} on {this}");

message.AdaptivePacketType = _resolver.ResolveAdaptiveId(message.Protocol, message.PacketType);
int size = _packetSender.Enqueue(message);
message.AdaptivePacketType = _resolver.ResolveAdaptiveId(message.Protocol, message.PacketType);
int size = _packetSender.Enqueue(message);

RecordOutgoingMessageMetric(message, size);
RecordOutgoingMessageMetric(message, size);

Interlocked.Add(ref Metrics.P2PBytesSent, size);
Interlocked.Add(ref Metrics.P2PBytesSent, size);

return size;
return size;
}
finally
{
message.Dispose();
}
}

public void ReceiveMessage(Packet packet)
Expand Down

0 comments on commit f763d99

Please sign in to comment.