Skip to content

Commit

Permalink
Remove some closure allocations (#7953)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Dec 23, 2024
1 parent 372e6e7 commit bf0206d
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 131 deletions.
14 changes: 1 addition & 13 deletions src/Nethermind/Nethermind.Core/Crypto/PublicKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,7 @@ public Hash256 Hash

public byte[] Bytes { get; }

public byte[] PrefixedBytes
{
get
{
if (_prefixedBytes is null)
{
return LazyInitializer.EnsureInitialized(ref _prefixedBytes,
() => Core.Extensions.Bytes.Concat(0x04, Bytes));
}

return _prefixedBytes;
}
}
public byte[] PrefixedBytes => _prefixedBytes ??= Core.Extensions.Bytes.Concat(0x04, Bytes);

public bool Equals(PublicKey? other) => other is not null && Core.Extensions.Bytes.AreEqual(Bytes, other.Bytes);

Expand Down
27 changes: 22 additions & 5 deletions src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,37 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
{
if (value is null)
{
_cacheSpan.TryRemove(key, out _);
if (_cacheSpan.TryRemove(key, out _))
{
_hasPendingChanges = true;
}
return;
}

bool setValue = true;
if (_cacheSpan.TryGetValue(key, out var existingValue))
{
if (!Bytes.AreEqual(existingValue, value))
{
setValue = false;
}
}
else

if (setValue)
{
_cache.AddOrUpdate(key.ToArray(), newValue => Add(value), (x, oldValue) => Update(oldValue, value));
_cacheSpan[key] = value;
_hasPendingChanges = true;
}
}

public KeyValuePair<byte[], byte[]>[] this[byte[][] keys] => keys.Select(k => new KeyValuePair<byte[], byte[]>(k, _cache.TryGetValue(k, out var value) ? value : null)).ToArray();

public void Remove(ReadOnlySpan<byte> key)
{
_hasPendingChanges = true;
_cacheSpan.TryRemove(key, out _);
if (_cacheSpan.TryRemove(key, out _))
{
_hasPendingChanges = true;
}
}

public bool KeyExists(ReadOnlySpan<byte> key)
Expand Down
37 changes: 24 additions & 13 deletions src/Nethermind/Nethermind.Network.Discovery/DiscoveryManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Nethermind.Config;
using Nethermind.Core;
using Nethermind.Core.Crypto;
Expand All @@ -25,6 +26,8 @@ public class DiscoveryManager : IDiscoveryManager
private readonly INetworkStorage _discoveryStorage;

private readonly ConcurrentDictionary<MessageTypeKey, TaskCompletionSource<DiscoveryMsg>> _waitingEvents = new();
private readonly Func<Hash256, Node, INodeLifecycleManager> _createNodeLifecycleManager;
private readonly Func<Hash256, Node, INodeLifecycleManager> _createNodeLifecycleManagerPersisted;
private IMsgSender? _msgSender;

public DiscoveryManager(
Expand All @@ -41,6 +44,24 @@ public DiscoveryManager(
_discoveryStorage = discoveryStorage ?? throw new ArgumentNullException(nameof(discoveryStorage));
_nodeLifecycleManagerFactory.DiscoveryManager = this;
_outgoingMessageRateLimiter = new RateLimiter(discoveryConfig.MaxOutgoingMessagePerSecond);
_createNodeLifecycleManager = GetLifecycleManagerFunc(isPersisted: false);
_createNodeLifecycleManagerPersisted = GetLifecycleManagerFunc(isPersisted: true);
}

private Func<Hash256, Node, INodeLifecycleManager> GetLifecycleManagerFunc(bool isPersisted)
{
return (_, node) =>
{
Interlocked.Increment(ref _managersCreated);
INodeLifecycleManager manager = _nodeLifecycleManagerFactory.CreateNodeLifecycleManager(node);
manager.OnStateChanged += ManagerOnOnStateChanged;
if (!isPersisted)
{
_discoveryStorage.UpdateNodes(new[] { new NetworkNode(manager.ManagedNode.Id, manager.ManagedNode.Host, manager.ManagedNode.Port, manager.NodeStats.NewPersistedNodeReputation(DateTime.UtcNow)) });
}

return manager;
};
}

public IMsgSender MsgSender
Expand Down Expand Up @@ -120,18 +141,7 @@ public void OnIncomingMsg(DiscoveryMsg msg)
return null;
}

return _nodeLifecycleManagers.GetOrAdd(node.IdHash, _ =>
{
Interlocked.Increment(ref _managersCreated);
INodeLifecycleManager manager = _nodeLifecycleManagerFactory.CreateNodeLifecycleManager(node);
manager.OnStateChanged += ManagerOnOnStateChanged;
if (!isPersisted)
{
_discoveryStorage.UpdateNodes(new[] { new NetworkNode(manager.ManagedNode.Id, manager.ManagedNode.Host, manager.ManagedNode.Port, manager.NodeStats.NewPersistedNodeReputation(DateTime.UtcNow)) });
}

return manager;
});
return _nodeLifecycleManagers.GetOrAdd(node.IdHash, isPersisted ? _createNodeLifecycleManagerPersisted : _createNodeLifecycleManager, node);
}

private void ManagerOnOnStateChanged(object? sender, NodeLifecycleState e)
Expand Down Expand Up @@ -168,7 +178,8 @@ public async Task SendMessageAsync(DiscoveryMsg discoveryMsg)
}
}

public async Task<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout)
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
public async ValueTask<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout)
{
TaskCompletionSource<DiscoveryMsg> completionSource = GetCompletionSource(senderIdHash, (int)msgType);
CancellationTokenSource delayCancellation = new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface IDiscoveryManager : IDiscoveryMsgListener
INodeLifecycleManager? GetNodeLifecycleManager(Node node, bool isPersisted = false);
void SendMessage(DiscoveryMsg discoveryMsg);
Task SendMessageAsync(DiscoveryMsg discoveryMsg);
Task<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout);
ValueTask<bool> WasMessageReceived(Hash256 senderIdHash, MsgType msgType, int timeout);
event EventHandler<NodeEventArgs> NodeDiscovered;

IReadOnlyCollection<INodeLifecycleManager> GetNodeLifecycleManagers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ public async Task SendMsg(DiscoveryMsg discoveryMsg)
}

IAddressedEnvelope<IByteBuffer> packet = new DatagramPacket(msgBuffer, discoveryMsg.FarAddress);

await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
try
{
if (t.IsFaulted)
{
if (_logger.IsTrace) _logger.Trace($"Error when sending a discovery message Msg: {discoveryMsg} ,Exp: {t.Exception}");
}
});
await _channel.WriteAndFlushAsync(packet);
}
catch (Exception e)
{
if (_logger.IsTrace) _logger.Trace($"Error when sending a discovery message Msg: {discoveryMsg} ,Exp: {e}");
}

Interlocked.Add(ref Metrics.DiscoveryBytesSent, size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ namespace Nethermind.Network.Discovery.Serializers;

public class NeighborsMsgSerializer : DiscoveryMsgSerializerBase, IZeroInnerMessageSerializer<NeighborsMsg>
{
private static readonly Func<RlpStream, Node> _decodeItem = static ctx =>
{
int lastPosition = ctx.ReadSequenceLength() + ctx.Position;
int count = ctx.PeekNumberOfItemsRemaining(lastPosition);

ReadOnlySpan<byte> ip = ctx.DecodeByteArraySpan();
IPEndPoint address = GetAddress(ip, ctx.DecodeInt());
if (count > 3)
{
ctx.DecodeInt();
}

ReadOnlySpan<byte> id = ctx.DecodeByteArraySpan();
return new Node(new PublicKey(id), address);
};

public NeighborsMsgSerializer(IEcdsa ecdsa,
IPrivateKeyGenerator nodeKey,
INodeIdResolver nodeIdResolver) : base(ecdsa, nodeKey, nodeIdResolver)
Expand Down Expand Up @@ -53,30 +69,16 @@ public NeighborsMsg Deserialize(IByteBuffer msgBytes)

NettyRlpStream rlp = new(Data);
rlp.ReadSequenceLength();
Node[] nodes = DeserializeNodes(rlp) as Node[];
Node[] nodes = DeserializeNodes(rlp);

long expirationTime = rlp.DecodeLong();
NeighborsMsg msg = new(FarPublicKey, expirationTime, nodes);
return msg;
}

private static Node?[] DeserializeNodes(RlpStream rlpStream)
private static Node[] DeserializeNodes(RlpStream rlpStream)
{
return rlpStream.DecodeArray(ctx =>
{
int lastPosition = ctx.ReadSequenceLength() + ctx.Position;
int count = ctx.PeekNumberOfItemsRemaining(lastPosition);

ReadOnlySpan<byte> ip = ctx.DecodeByteArraySpan();
IPEndPoint address = GetAddress(ip, ctx.DecodeInt());
if (count > 3)
{
ctx.DecodeInt();
}

ReadOnlySpan<byte> id = ctx.DecodeByteArraySpan();
return new Node(new PublicKey(id), address);
});
return rlpStream.DecodeArray<Node>(_decodeItem);
}

private static int GetNodesLength(Node[] nodes, out int contentLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System.Numerics;
using System.Threading;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using Nethermind.Blockchain;
Expand Down Expand Up @@ -184,12 +183,6 @@ public Context ReceiveDisconnect()
return this;
}

public Context Wait(int i)
{
Thread.Sleep(i);
return this;
}

public Context VerifyInitialized()
{
Assert.That(_currentSession.State, Is.EqualTo(SessionState.Initialized));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,7 @@ public override void Init()

// We are expecting to receive Hello message anytime from the handshake completion,
// irrespective of sending Hello from our side
CheckProtocolInitTimeout().ContinueWith(x =>
{
if (x.IsFaulted && Logger.IsError)
{
Logger.Error("Error during p2pProtocol handler timeout logic", x.Exception);
}
});
_ = CheckProtocolInitTimeout();
}

public override void HandleMessage(Packet msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,32 @@ protected internal void Send<T>(T message) where T : P2PMessage

protected async Task CheckProtocolInitTimeout()
{
Task<MessageBase> receivedInitMsgTask = _initCompletionSource.Task;
CancellationTokenSource delayCancellation = new();
Task firstTask = await Task.WhenAny(receivedInitMsgTask, Task.Delay(InitTimeout, delayCancellation.Token));

if (firstTask != receivedInitMsgTask)
try
{
if (Logger.IsTrace)
Task<MessageBase> receivedInitMsgTask = _initCompletionSource.Task;
CancellationTokenSource delayCancellation = new();
Task firstTask = await Task.WhenAny(receivedInitMsgTask, Task.Delay(InitTimeout, delayCancellation.Token));

if (firstTask != receivedInitMsgTask)
{
Logger.Trace($"Disconnecting due to timeout for protocol init message ({Name}): {Session.RemoteNodeId}");
}
if (Logger.IsTrace)
{
Logger.Trace($"Disconnecting due to timeout for protocol init message ({Name}): {Session.RemoteNodeId}");
}

Session.InitiateDisconnect(DisconnectReason.ProtocolInitTimeout, "protocol init timeout");
Session.InitiateDisconnect(DisconnectReason.ProtocolInitTimeout, "protocol init timeout");
}
else
{
delayCancellation.Cancel();
}
}
else
catch (Exception e)
{
delayCancellation.Cancel();
if (Logger.IsError)
{
Logger.Error("Error during p2pProtocol handler timeout logic", e);
}
}
}

Expand Down
63 changes: 36 additions & 27 deletions src/Nethermind/Nethermind.Network/P2P/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -487,33 +487,7 @@ public void MarkDisconnected(DisconnectReason disconnectReason, DisconnectType d

Disconnecting?.Invoke(this, new DisconnectEventArgs(disconnectReason, disconnectType, details));

//Possible in case of disconnect before p2p initialization
if (_context is null)
{
//in case pipeline did not get to p2p - no disconnect delay
_channel.DisconnectAsync().ContinueWith(x =>
{
if (x.IsFaulted && _logger.IsTrace)
_logger.Trace($"Error while disconnecting on channel on {this} : {x.Exception}");
});
}
else
{
Task delayTask =
disconnectType == DisconnectType.Local
? Task.Delay(Timeouts.Disconnection)
: Task.CompletedTask;
delayTask.ContinueWith(t =>
{
if (_logger.IsTrace)
_logger.Trace($"{this} disconnecting now after {Timeouts.Disconnection.TotalMilliseconds} milliseconds");
_context.DisconnectAsync().ContinueWith(x =>
{
if (x.IsFaulted && _logger.IsTrace)
_logger.Trace($"Error while disconnecting on context on {this} : {x.Exception}");
});
});
}
_ = DisconnectAsync(disconnectType);

lock (_sessionStateLock)
{
Expand All @@ -530,6 +504,41 @@ public void MarkDisconnected(DisconnectReason disconnectReason, DisconnectType d
_logger.Error($"DEBUG/ERROR No subscriptions for session disconnected event on {this}");
}

private async Task DisconnectAsync(DisconnectType disconnectType)
{
//Possible in case of disconnect before p2p initialization
if (_context is null)
{
//in case pipeline did not get to p2p - no disconnect delay
try
{
await _channel.DisconnectAsync();
}
catch (Exception e)
{
if (_logger.IsTrace)
_logger.Trace($"Error while disconnecting on context on {this} : {e}");
}
}
else
{
if (disconnectType == DisconnectType.Local)
{
await Task.Delay(Timeouts.Disconnection);
}

try
{
await _context.DisconnectAsync();
}
catch (Exception e)
{
if (_logger.IsTrace)
_logger.Trace($"Error while disconnecting on context on {this} : {e}");
}
}
}

public event EventHandler<DisconnectEventArgs> Disconnecting;
public event EventHandler<DisconnectEventArgs> Disconnected;
public event EventHandler<EventArgs> HandshakeComplete;
Expand Down
Loading

0 comments on commit bf0206d

Please sign in to comment.