Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some closure allocations #7953

Merged
merged 9 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions src/Nethermind/Nethermind.Core/Crypto/PublicKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,7 @@ public Hash256 Hash

public byte[] Bytes { get; }

public byte[] PrefixedBytes
{
get
{
if (_prefixedBytes is null)
{
return LazyInitializer.EnsureInitialized(ref _prefixedBytes,
() => Core.Extensions.Bytes.Concat(0x04, Bytes));
}

return _prefixedBytes;
}
}
public byte[] PrefixedBytes => _prefixedBytes ??= Core.Extensions.Bytes.Concat(0x04, Bytes);

public bool Equals(PublicKey? other) => other is not null && Core.Extensions.Bytes.AreEqual(Bytes, other.Bytes);

Expand Down
27 changes: 22 additions & 5 deletions src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,37 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
{
if (value is null)
{
_cacheSpan.TryRemove(key, out _);
if (_cacheSpan.TryRemove(key, out _))
{
_hasPendingChanges = true;
}
return;
}

bool setValue = true;
if (_cacheSpan.TryGetValue(key, out var existingValue))
{
if (!Bytes.AreEqual(existingValue, value))
{
setValue = false;
}
}
else

if (setValue)
{
_cache.AddOrUpdate(key.ToArray(), newValue => Add(value), (x, oldValue) => Update(oldValue, value));
_cacheSpan[key] = value;
_hasPendingChanges = true;
}
}

public KeyValuePair<byte[], byte[]>[] this[byte[][] keys] => keys.Select(k => new KeyValuePair<byte[], byte[]>(k, _cache.TryGetValue(k, out var value) ? value : null)).ToArray();

public void Remove(ReadOnlySpan<byte> key)
{
_hasPendingChanges = true;
_cacheSpan.TryRemove(key, out _);
if (_cacheSpan.TryRemove(key, out _))
{
_hasPendingChanges = true;
}
}

public bool KeyExists(ReadOnlySpan<byte> key)
Expand Down
37 changes: 24 additions & 13 deletions src/Nethermind/Nethermind.Network.Discovery/DiscoveryManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Nethermind.Config;
using Nethermind.Core;
using Nethermind.Core.Crypto;
Expand All @@ -25,6 +26,8 @@ public class DiscoveryManager : IDiscoveryManager
private readonly INetworkStorage _discoveryStorage;

private readonly ConcurrentDictionary<MessageTypeKey, TaskCompletionSource<DiscoveryMsg>> _waitingEvents = new();
private readonly Func<Hash256, Node, INodeLifecycleManager> _createNodeLifecycleManager;
private readonly Func<Hash256, Node, INodeLifecycleManager> _createNodeLifecycleManagerPersisted;
private IMsgSender? _msgSender;

public DiscoveryManager(
Expand All @@ -41,6 +44,24 @@ public DiscoveryManager(
_discoveryStorage = discoveryStorage ?? throw new ArgumentNullException(nameof(discoveryStorage));
_nodeLifecycleManagerFactory.DiscoveryManager = this;
_outgoingMessageRateLimiter = new RateLimiter(discoveryConfig.MaxOutgoingMessagePerSecond);
_createNodeLifecycleManager = GetLifecycleManagerFunc(isPersisted: false);
_createNodeLifecycleManagerPersisted = GetLifecycleManagerFunc(isPersisted: true);
}

private Func<Hash256, Node, INodeLifecycleManager> GetLifecycleManagerFunc(bool isPersisted)
{
return (_, node) =>
{
Interlocked.Increment(ref _managersCreated);
INodeLifecycleManager manager = _nodeLifecycleManagerFactory.CreateNodeLifecycleManager(node);
manager.OnStateChanged += ManagerOnOnStateChanged;
if (!isPersisted)
{
_discoveryStorage.UpdateNodes(new[] { new NetworkNode(manager.ManagedNode.Id, manager.ManagedNode.Host, manager.ManagedNode.Port, manager.NodeStats.NewPersistedNodeReputation(DateTime.UtcNow)) });
}

return manager;
};
}

public IMsgSender MsgSender
Expand Down Expand Up @@ -120,18 +141,7 @@ public void OnIncomingMsg(DiscoveryMsg msg)
return null;
}

return _nodeLifecycleManagers.GetOrAdd(node.IdHash, _ =>
{
Interlocked.Increment(ref _managersCreated);
INodeLifecycleManager manager = _nodeLifecycleManagerFactory.CreateNodeLifecycleManager(node);
manager.OnStateChanged += ManagerOnOnStateChanged;
if (!isPersisted)
{
_discoveryStorage.UpdateNodes(new[] { new NetworkNode(manager.ManagedNode.Id, manager.ManagedNode.Host, manager.ManagedNode.Port, manager.NodeStats.NewPersistedNodeReputation(DateTime.UtcNow)) });
}

return manager;
});
return _nodeLifecycleManagers.GetOrAdd(node.IdHash, isPersisted ? _createNodeLifecycleManagerPersisted : _createNodeLifecycleManager, node);
}

private void ManagerOnOnStateChanged(object? sender, NodeLifecycleState e)
Expand Down Expand Up @@ -168,7 +178,8 @@ public async Task SendMessageAsync(DiscoveryMsg discoveryMsg)
}
}

public async Task<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout)
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
public async ValueTask<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout)
{
TaskCompletionSource<DiscoveryMsg> completionSource = GetCompletionSource(senderIdHash, (int)msgType);
CancellationTokenSource delayCancellation = new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface IDiscoveryManager : IDiscoveryMsgListener
INodeLifecycleManager? GetNodeLifecycleManager(Node node, bool isPersisted = false);
void SendMessage(DiscoveryMsg discoveryMsg);
Task SendMessageAsync(DiscoveryMsg discoveryMsg);
Task<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout);
ValueTask<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout);
event EventHandler<NodeEventArgs> NodeDiscovered;

IReadOnlyCollection<INodeLifecycleManager> GetNodeLifecycleManagers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ public async Task SendMsg(DiscoveryMsg discoveryMsg)
}

IAddressedEnvelope<IByteBuffer> packet = new DatagramPacket(msgBuffer, discoveryMsg.FarAddress);

await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
try
{
if (t.IsFaulted)
{
if (_logger.IsTrace) _logger.Trace($"Error when sending a discovery message Msg: {discoveryMsg} ,Exp: {t.Exception}");
}
});
await _channel.WriteAndFlushAsync(packet);
}
catch (Exception e)
{
if (_logger.IsTrace) _logger.Trace($"Error when sending a discovery message Msg: {discoveryMsg} ,Exp: {e}");
}

Interlocked.Add(ref Metrics.DiscoveryBytesSent, size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ namespace Nethermind.Network.Discovery.Serializers;

public class NeighborsMsgSerializer : DiscoveryMsgSerializerBase, IZeroInnerMessageSerializer<NeighborsMsg>
{
private static readonly Func<RlpStream, Node> _decodeItem = static ctx =>
{
int lastPosition = ctx.ReadSequenceLength() + ctx.Position;
int count = ctx.PeekNumberOfItemsRemaining(lastPosition);

ReadOnlySpan<byte> ip = ctx.DecodeByteArraySpan();
IPEndPoint address = GetAddress(ip, ctx.DecodeInt());
if (count > 3)
{
ctx.DecodeInt();
}

ReadOnlySpan<byte> id = ctx.DecodeByteArraySpan();
return new Node(new PublicKey(id), address);
};

public NeighborsMsgSerializer(IEcdsa ecdsa,
IPrivateKeyGenerator nodeKey,
INodeIdResolver nodeIdResolver) : base(ecdsa, nodeKey, nodeIdResolver)
Expand Down Expand Up @@ -53,30 +69,16 @@ public NeighborsMsg Deserialize(IByteBuffer msgBytes)

NettyRlpStream rlp = new(Data);
rlp.ReadSequenceLength();
Node[] nodes = DeserializeNodes(rlp) as Node[];
Node[] nodes = DeserializeNodes(rlp);

long expirationTime = rlp.DecodeLong();
NeighborsMsg msg = new(FarPublicKey, expirationTime, nodes);
return msg;
}

private static Node?[] DeserializeNodes(RlpStream rlpStream)
private static Node[] DeserializeNodes(RlpStream rlpStream)
{
return rlpStream.DecodeArray(ctx =>
{
int lastPosition = ctx.ReadSequenceLength() + ctx.Position;
int count = ctx.PeekNumberOfItemsRemaining(lastPosition);

ReadOnlySpan<byte> ip = ctx.DecodeByteArraySpan();
IPEndPoint address = GetAddress(ip, ctx.DecodeInt());
if (count > 3)
{
ctx.DecodeInt();
}

ReadOnlySpan<byte> id = ctx.DecodeByteArraySpan();
return new Node(new PublicKey(id), address);
});
return rlpStream.DecodeArray<Node>(_decodeItem);
}

private static int GetNodesLength(Node[] nodes, out int contentLength)
Expand Down
42 changes: 21 additions & 21 deletions src/Nethermind/Nethermind.Network.Test/P2P/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ public void Can_enable_snappy()
}

[Test]
public void Enabling_snappy_on_disconnected_will_not_cause_trouble()
public async Task Enabling_snappy_on_disconnected_will_not_cause_trouble()
{
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
session.Handshake(TestItem.PublicKeyA);
session.Init(5, _channelHandlerContext, _packetSender);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
session.EnableSnappy();
}

Expand Down Expand Up @@ -194,7 +194,7 @@ void addProtocol()
task.Start();

await Task.Delay(20);
session.InitiateDisconnect(DisconnectReason.Other, "test");
await session.InitiateDisconnect(DisconnectReason.Other, "test");
await Task.Delay(10);
shouldStop = true;
}
Expand Down Expand Up @@ -229,7 +229,7 @@ public void Best_state_reached_is_correct()
}

[Test]
public void Cannot_dispose_unless_disconnected()
public async Task Cannot_dispose_unless_disconnected()
{
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
session.Handshake(TestItem.PublicKeyA);
Expand All @@ -246,7 +246,7 @@ public void Cannot_dispose_unless_disconnected()
session.AddProtocolHandler(bbb);
session.AddProtocolHandler(ccc);

session.InitiateDisconnect(DisconnectReason.Other, "test");
await session.InitiateDisconnect(DisconnectReason.Other, "test");
session.Dispose();

aaa.Received().DisconnectProtocol(DisconnectReason.Other, "test");
Expand All @@ -272,15 +272,15 @@ public void Raises_event_on_disconnecting()
}

[Test]
public void Raises_event_on_disconnected()
public async Task Raises_event_on_disconnected()
{
bool wasCalled = false;
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
session.Disconnected += (s, e) => wasCalled = true;

session.Handshake(TestItem.PublicKeyA);
session.Init(5, _channelHandlerContext, _packetSender);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
Assert.That(wasCalled, Is.True);
}

Expand Down Expand Up @@ -315,53 +315,53 @@ public void Do_not_disconnects_after_initiating_disconnect_on_static_node()
}

[Test]
public void Error_on_channel_when_disconnecting_channels_does_not_prevent_the_event()
public async Task Error_on_channel_when_disconnecting_channels_does_not_prevent_the_event()
{
bool wasCalled = false;
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
_channel.DisconnectAsync().Returns(Task.FromException<Exception>(new Exception()));
session.Disconnected += (s, e) => wasCalled = true;
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
Assert.That(wasCalled, Is.True);
}

[Test]
public void Error_on_context_when_disconnecting_channels_does_not_prevent_the_event()
public async Task Error_on_context_when_disconnecting_channels_does_not_prevent_the_event()
{
bool wasCalled = false;
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
_channelHandlerContext.DisconnectAsync().Returns(Task.FromException<Exception>(new Exception()));
session.Disconnected += (s, e) => wasCalled = true;
session.Handshake(TestItem.PublicKeyA);
session.Init(5, _channelHandlerContext, _packetSender);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
Assert.That(wasCalled, Is.True);
}

[Test]
public void Can_disconnect_many_times()
public async Task Can_disconnect_many_times()
{
int wasCalledTimes = 0;
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
session.Disconnecting += (s, e) => wasCalledTimes++;

session.Handshake(TestItem.PublicKeyA);
session.Init(5, _channelHandlerContext, _packetSender);
session.InitiateDisconnect(DisconnectReason.Other);
session.InitiateDisconnect(DisconnectReason.Other);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
await session.InitiateDisconnect(DisconnectReason.Other);
await session.InitiateDisconnect(DisconnectReason.Other);
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, "test");
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
Assert.That(wasCalledTimes, Is.EqualTo(1));
}

[Test]
public void Can_disconnect_before_init()
public async Task Can_disconnect_before_init()
{
int wasCalledTimes = 0;
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, NullDisconnectsAnalyzer.Instance, LimboLogs.Instance);
session.Disconnecting += (s, e) => wasCalledTimes++;
session.Handshake(TestItem.PublicKeyA);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
session.Init(5, _channelHandlerContext, _packetSender);
Assert.That(wasCalledTimes, Is.EqualTo(1));
}
Expand Down Expand Up @@ -597,7 +597,7 @@ public void Can_receive_messages()
}

[Test]
public void Updates_local_and_remote_metrics_on_disconnects()
public async Task Updates_local_and_remote_metrics_on_disconnects()
{
Session session = new(30312, new Node(TestItem.PublicKeyA, "127.0.0.1", 8545), _channel, new MetricsDisconnectsAnalyzer(), LimboLogs.Instance);
session.Handshake(TestItem.PublicKeyA);
Expand All @@ -607,7 +607,7 @@ public void Updates_local_and_remote_metrics_on_disconnects()

long beforeLocal = Network.Metrics.LocalDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
long beforeRemote = Network.Metrics.RemoteDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, string.Empty);
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Local, string.Empty);
long afterLocal = Network.Metrics.LocalDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
long afterRemote = Network.Metrics.RemoteDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
Assert.That(afterLocal, Is.EqualTo(beforeLocal + 1));
Expand All @@ -621,7 +621,7 @@ public void Updates_local_and_remote_metrics_on_disconnects()

beforeLocal = Network.Metrics.LocalDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
beforeRemote = Network.Metrics.RemoteDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, string.Empty);
await session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, string.Empty);
afterLocal = Network.Metrics.LocalDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
afterRemote = Network.Metrics.RemoteDisconnectsTotal.GetValueOrDefault(DisconnectReason.Other);
Assert.That(afterLocal, Is.EqualTo(beforeLocal));
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ public void DisconnectAllSessions()

foreach (Session session in clone)
{
session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
_ = session.MarkDisconnected(DisconnectReason.Other, DisconnectType.Remote, "test");
}
}

Expand Down
Loading
Loading