Skip to content

Commit

Permalink
Added EndPoint RemoteEndPoint {get;} property to channel (#2111)
Browse files Browse the repository at this point in the history
* Added EndPoint RemoteEndPoint{get;} property

* Reformat code

* Reformat code

---------

Co-authored-by: christian <6939810+chkr1011@users.noreply.github.com>
  • Loading branch information
xljiulang and chkr1011 authored Dec 2, 2024
1 parent ef5b6c9 commit 8089c6b
Show file tree
Hide file tree
Showing 20 changed files with 91 additions and 55 deletions.
6 changes: 3 additions & 3 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,21 @@ public X509Certificate2 ClientCertificate
}
}

public string Endpoint
public EndPoint RemoteEndPoint
{
get
{
// mqtt over tcp
if (_connection.RemoteEndPoint != null)
{
return _connection.RemoteEndPoint.ToString();
return _connection.RemoteEndPoint;
}

// mqtt over websocket
var httpFeature = _connection.Features.Get<IHttpConnectionFeature>();
if (httpFeature?.RemoteIpAddress != null)
{
return new IPEndPoint(httpFeature.RemoteIpAddress, httpFeature.RemotePort).ToString();
return new IPEndPoint(httpFeature.RemoteIpAddress, httpFeature.RemotePort);
}

return null;
Expand Down
6 changes: 4 additions & 2 deletions Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Net;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
Expand All @@ -28,7 +29,8 @@ public async Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext h
{
ArgumentNullException.ThrowIfNull(webSocket);

var endpoint = $"{httpContext.Connection.RemoteIpAddress}:{httpContext.Connection.RemotePort}";
var remoteAddress = httpContext.Connection.RemoteIpAddress;
var remoteEndPoint = remoteAddress == null ? null : new IPEndPoint(remoteAddress, httpContext.Connection.RemotePort);

var clientCertificate = await httpContext.Connection.GetClientCertificateAsync().ConfigureAwait(false);
try
Expand All @@ -39,7 +41,7 @@ public async Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext h
if (clientHandler != null)
{
var formatter = new MqttPacketFormatterAdapter(new MqttBufferWriter(4096, 65535));
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection, clientCertificate);
var channel = new MqttWebSocketChannel(webSocket, remoteEndPoint, isSecureConnection, clientCertificate);

using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger))
{
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.Benchmarks/SerializerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using BenchmarkDotNet.Jobs;
using MQTTnet.Diagnostics.Logger;
using System.Buffers;
using System.Net;

namespace MQTTnet.Benchmarks
{
Expand Down Expand Up @@ -76,7 +77,7 @@ public BenchmarkMqttChannel(ArraySegment<byte> buffer)
_position = _buffer.Offset;
}

public string Endpoint { get; } = string.Empty;
public EndPoint RemoteEndPoint { get; set; }

public bool IsSecureConnection { get; } = false;

Expand Down
10 changes: 7 additions & 3 deletions Source/MQTTnet.Server/Events/ClientConnectedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using MQTTnet.Formatter;
using MQTTnet.Packets;

Expand All @@ -14,11 +15,11 @@ public sealed class ClientConnectedEventArgs : EventArgs
{
readonly MqttConnectPacket _connectPacket;

public ClientConnectedEventArgs(MqttConnectPacket connectPacket, MqttProtocolVersion protocolVersion, string endpoint, IDictionary sessionItems)
public ClientConnectedEventArgs(MqttConnectPacket connectPacket, MqttProtocolVersion protocolVersion, EndPoint remoteEndPoint, IDictionary sessionItems)
{
_connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));
ProtocolVersion = protocolVersion;
Endpoint = endpoint;
RemoteEndPoint = remoteEndPoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
}

Expand All @@ -35,7 +36,10 @@ public ClientConnectedEventArgs(MqttConnectPacket connectPacket, MqttProtocolVer
/// <summary>
/// Gets the endpoint of the connected client.
/// </summary>
public string Endpoint { get; }
public EndPoint RemoteEndPoint { get; }

[Obsolete("Use RemoteEndPoint instead.")]
public string Endpoint => RemoteEndPoint?.ToString();

/// <summary>
/// Gets the protocol version which is used by the connected client.
Expand Down
10 changes: 7 additions & 3 deletions Source/MQTTnet.Server/Events/ClientDisconnectedEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using MQTTnet.Packets;
using MQTTnet.Protocol;

Expand All @@ -18,12 +19,12 @@ public ClientDisconnectedEventArgs(
string clientId,
MqttDisconnectPacket disconnectPacket,
MqttClientDisconnectType disconnectType,
string endpoint,
EndPoint remoteEndPoint,
IDictionary sessionItems)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
DisconnectType = disconnectType;
Endpoint = endpoint;
RemoteEndPoint = remoteEndPoint;
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));

// The DISCONNECT packet can be null in case of a non clean disconnect or session takeover.
Expand All @@ -38,7 +39,10 @@ public ClientDisconnectedEventArgs(

public MqttClientDisconnectType DisconnectType { get; }

public string Endpoint { get; }
public EndPoint RemoteEndPoint { get; }

[Obsolete("Use RemoteEndPoint instead.")]
public string Endpoint => RemoteEndPoint?.ToString();

/// <summary>
/// Gets the reason code sent by the client.
Expand Down
10 changes: 7 additions & 3 deletions Source/MQTTnet.Server/Events/InterceptingPacketEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@

using System;
using System.Collections;
using System.Net;
using System.Threading;
using MQTTnet.Packets;

namespace MQTTnet.Server
{
public sealed class InterceptingPacketEventArgs : EventArgs
{
public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, string endpoint, MqttPacket packet, IDictionary sessionItems)
public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, EndPoint remoteEndPoint, MqttPacket packet, IDictionary sessionItems)
{
CancellationToken = cancellationToken;
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Endpoint = endpoint;
RemoteEndPoint = remoteEndPoint;
Packet = packet ?? throw new ArgumentNullException(nameof(packet));
SessionItems = sessionItems;
}
Expand All @@ -34,7 +35,10 @@ public InterceptingPacketEventArgs(CancellationToken cancellationToken, string c
/// <summary>
/// Gets the endpoint of the sending or receiving client.
/// </summary>
public string Endpoint { get; }
public EndPoint RemoteEndPoint { get; }

[Obsolete("Use RemoteEndPoint instead.")]
public string Endpoint => RemoteEndPoint?.ToString();

/// <summary>
/// Gets or sets the MQTT packet which was received or will be sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using MQTTnet.Adapter;
Expand Down Expand Up @@ -70,7 +71,10 @@ public ValidatingConnectionEventArgs(MqttConnectPacket connectPacket, IMqttChann
/// </summary>
public string ClientId => _connectPacket.ClientId;

public string Endpoint => ChannelAdapter.Endpoint;
public EndPoint RemoteEndPoint => ChannelAdapter.RemoteEndPoint;

[Obsolete("Use RemoteEndPoint instead.")]
public string Endpoint => RemoteEndPoint?.ToString();

public bool IsSecureConnection => ChannelAdapter.IsSecureConnection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken)
async Task TryHandleClientConnectionAsync(CrossPlatformSocket clientSocket)
{
Stream stream = null;
string remoteEndPoint = null;
EndPoint remoteEndPoint = null;

try
{
remoteEndPoint = clientSocket.RemoteEndPoint.ToString();
remoteEndPoint = clientSocket.RemoteEndPoint;

_logger.Verbose("TCP client '{0}' accepted (Local endpoint={1})", remoteEndPoint, _localEndPoint);

Expand Down
27 changes: 16 additions & 11 deletions Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@ namespace MQTTnet.Server.Internal;
public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification, IDisposable
{
readonly Dictionary<string, MqttConnectedClient> _clients = new(4096);

readonly AsyncLock _createConnectionSyncRoot = new();

readonly MqttServerEventContainer _eventContainer;
readonly MqttNetSourceLogger _logger;
readonly MqttServerOptions _options;

readonly MqttRetainedMessagesManager _retainedMessagesManager;
readonly IMqttNetLogger _rootLogger;

readonly ReaderWriterLockSlim _sessionsManagementLock = new();

// The _sessions dictionary contains all session, the _subscriberSessions hash set contains subscriber sessions only.
// See the MqttSubscription object for a detailed explanation.
readonly MqttSessionsStorage _sessionsStorage = new();
readonly HashSet<MqttSession> _subscriberSessions = new();
readonly HashSet<MqttSession> _subscriberSessions = [];

public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger)
{
Expand Down Expand Up @@ -365,7 +361,11 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter

if (_eventContainer.ClientConnectedEvent.HasHandlers)
{
var eventArgs = new ClientConnectedEventArgs(connectPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion, channelAdapter.Endpoint, connectedClient.Session.Items);
var eventArgs = new ClientConnectedEventArgs(
connectPacket,
channelAdapter.PacketFormatterAdapter.ProtocolVersion,
channelAdapter.RemoteEndPoint,
connectedClient.Session.Items);

await _eventContainer.ClientConnectedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false);
}
Expand Down Expand Up @@ -403,7 +403,7 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter
}
}

var endpoint = connectedClient.Endpoint;
var endpoint = connectedClient.RemoteEndPoint;

if (connectedClient.Id != null && !connectedClient.IsTakenOver && _eventContainer.ClientDisconnectedEvent.HasHandlers)
{
Expand Down Expand Up @@ -591,7 +591,12 @@ async Task<MqttConnectedClient> CreateClientConnection(

if (_eventContainer.ClientDisconnectedEvent.HasHandlers)
{
var eventArgs = new ClientDisconnectedEventArgs(oldConnectedClient.Id, null, MqttClientDisconnectType.Takeover, oldConnectedClient.Endpoint, oldConnectedClient.Session.Items);
var eventArgs = new ClientDisconnectedEventArgs(
oldConnectedClient.Id,
null,
MqttClientDisconnectType.Takeover,
oldConnectedClient.RemoteEndPoint,
oldConnectedClient.Session.Items);

await _eventContainer.ClientDisconnectedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(false);
}
Expand Down Expand Up @@ -655,14 +660,14 @@ async Task<MqttConnectPacket> ReceiveConnectPacket(IMqttChannelAdapter channelAd
}
catch (OperationCanceledException)
{
_logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
_logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.RemoteEndPoint);
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
_logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.RemoteEndPoint);
}

_logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
_logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.RemoteEndPoint);
return null;
}

Expand Down
17 changes: 8 additions & 9 deletions Source/MQTTnet.Server/Internal/MqttConnectedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Net;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Exceptions;
Expand Down Expand Up @@ -45,7 +46,7 @@ public MqttConnectedClient(
ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));

ChannelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
Endpoint = channelAdapter.Endpoint;
RemoteEndPoint = channelAdapter.RemoteEndPoint;
Session = session ?? throw new ArgumentNullException(nameof(session));

ArgumentNullException.ThrowIfNull(logger);
Expand All @@ -59,14 +60,14 @@ public MqttConnectedClient(

public MqttDisconnectPacket DisconnectPacket { get; private set; }

public string Endpoint { get; }

public string Id => ConnectPacket.ClientId;

public bool IsRunning { get; private set; }

public bool IsTakenOver { get; set; }

public EndPoint RemoteEndPoint { get; }

public MqttSession Session { get; }

public MqttClientStatistics Statistics { get; } = new();
Expand Down Expand Up @@ -338,7 +339,7 @@ async Task<MqttPacket> InterceptPacketAsync(MqttPacket packet, CancellationToken
return packet;
}

var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, packet, Session.Items);
var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, RemoteEndPoint, packet, Session.Items);
await _eventContainer.InterceptingOutboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false);

if (!interceptingPacketEventArgs.ProcessPacket || packet == null)
Expand Down Expand Up @@ -384,7 +385,7 @@ async Task ReceivePackagesLoop(CancellationToken cancellationToken)

if (_eventContainer.InterceptingInboundPacketEvent.HasHandlers)
{
var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, currentPacket, Session.Items);
var interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, RemoteEndPoint, currentPacket, Session.Items);
await _eventContainer.InterceptingInboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(false);
currentPacket = interceptingPacketEventArgs.Packet;
processPacket = interceptingPacketEventArgs.ProcessPacket;
Expand Down Expand Up @@ -560,10 +561,8 @@ async Task TrySendDisconnectPacket(MqttServerClientDisconnectOptions options)

var disconnectPacket = MqttDisconnectPacketFactory.Create(options);

using (var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout))
{
await SendPacketAsync(disconnectPacket, timeout.Token).ConfigureAwait(false);
}
using var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout);
await SendPacketAsync(disconnectPacket, timeout.Token).ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down
6 changes: 5 additions & 1 deletion Source/MQTTnet.Server/Status/MqttClientStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using MQTTnet.Formatter;
using MQTTnet.Server.Internal;
using System.Net;

namespace MQTTnet.Server;

Expand All @@ -22,7 +23,10 @@ public MqttClientStatus(MqttConnectedClient client)

public DateTime ConnectedTimestamp => _client.Statistics.ConnectedTimestamp;

public string Endpoint => _client.Endpoint;
public EndPoint RemoteEndPoint => _client.RemoteEndPoint;

[Obsolete("Use RemoteEndPoint instead.")]
public string Endpoint => RemoteEndPoint?.ToString();

/// <summary>
/// Gets or sets the client identifier.
Expand Down
4 changes: 3 additions & 1 deletion Source/MQTTnet.TestApp/ServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public static async Task RunAsync()
{
try
{
var options = new MqttServerOptions();
var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.Build();

// Extend the timestamp for all messages from clients.
// Protect several topics from being subscribed from every client.
Expand Down
Loading

0 comments on commit 8089c6b

Please sign in to comment.