Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Fixed tests.
Browse files Browse the repository at this point in the history
Added ability for authentication timeouts from the client side.
  • Loading branch information
DJGosnell committed Jun 27, 2017
1 parent 32c14c1 commit 3963e0d
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue.Tests/Mq/MqClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void Client_times_out_while_connecting_for_too_long() {
}
};

StartAndWait(false, 1000, false);
StartAndWait(false, 10000, false);

if (TestStatus.IsSet == false) {
throw new Exception("Socket did not timeout.");
Expand Down
11 changes: 6 additions & 5 deletions src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ public void Client_notified_of_authentication_success() {
}

[Fact]
public void Client_times_out_on_auth_failure() {
public void Client_times_out_on_long_auth() {
Server.Config.RequireAuthentication = true;
Server.Config.ConnectionTimeout = 100;
Client.Config.ConnectionTimeout = 100;

Client.Closed += (sender, e) => {
if (e.CloseReason != SocketCloseReason.TimeOut) {
Expand All @@ -277,11 +277,12 @@ public void Client_times_out_on_auth_failure() {
TestStatus.Set();
};

Client.Authenticate += (sender, e) => {
Thread.Sleep(200);
Server.Authenticate += (sender, e) =>
{
Thread.Sleep(500);
};

StartAndWait();
StartAndWait(true, 5000, true);
}

[Fact]
Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue.Tests/Rpc/RpcTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public static int FreeTcpPort() {
return port;
}

public void StartAndWait(bool timeout_error = true, int timeout_length = -1) {
if (Server.IsRunning == false) {
public void StartAndWait(bool timeout_error = true, int timeout_length = -1, bool start_server = true) {
if (Server.IsRunning == false && start_server) {
Server.Start();
}
if (Client.IsRunning == false) {
Expand Down
4 changes: 4 additions & 0 deletions src/DtronixMessageQueue/MqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public void Send(MqMessage message) {
}

public void Close() {
if (Session == null)
{
return;
}
Session.IncomingMessage -= OnIncomingMessage;
Session.Close(SocketCloseReason.ClientClosing);
Session.Dispose();
Expand Down
31 changes: 28 additions & 3 deletions src/DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public abstract class RpcSession<TSession, TConfig> : MqSession<TSession, TConfi
/// </summary>
public bool Authenticated { get; private set; }

private Task auth_timeout;

private CancellationTokenSource auth_timeout_cancel = new CancellationTokenSource();

protected RpcSession() {
MessageHandlers = new Dictionary<byte, MessageHandler<TSession, TConfig>>();
}
Expand Down Expand Up @@ -152,9 +156,27 @@ protected override void ProcessCommand(MqFrame frame) {
var auth_message = serializer.MessageWriter.ToMessage(true);
auth_message[0].FrameType = MqFrameType.Command;

// RpcCommand:byte; RpcCommandType:byte; AuthData:byte[];
Send(auth_message);
} else {
auth_timeout = new Task(async () =>
{
try
{
await Task.Delay(Config.ConnectionTimeout, auth_timeout_cancel.Token);
}
catch
{
return;
}

if(!auth_timeout_cancel.IsCancellationRequested)
Close(SocketCloseReason.TimeOut);
});

// RpcCommand:byte; RpcCommandType:byte; AuthData:byte[];
Send(auth_message);

auth_timeout.Start();

} else {
// If no authentication is required, set this client to authenticated.
Authenticated = true;

Expand Down Expand Up @@ -216,6 +238,9 @@ protected override void ProcessCommand(MqFrame frame) {
} else if (rpc_command_type == RpcCommandType.AuthenticationResult) {
// RpcCommand:byte; RpcCommandType:byte; AuthResult:bool;

// Cancel the timeout request.
auth_timeout_cancel.Cancel();

// Ensure that this command is running on the client.
if (BaseSocket.Mode != SocketMode.Client) {
Close(SocketCloseReason.ProtocolError);
Expand Down
78 changes: 55 additions & 23 deletions src/DtronixMessageQueue/Socket/SocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace DtronixMessageQueue.Socket {
namespace DtronixMessageQueue.Socket
{

/// <summary>
/// Base functionality for all client connections to a remote server.
Expand All @@ -12,7 +14,8 @@ namespace DtronixMessageQueue.Socket {
/// <typeparam name="TConfig">Configuration for this connection.</typeparam>
public class SocketClient<TSession, TConfig> : SessionHandler<TSession, TConfig>
where TSession : SocketSession<TSession, TConfig>, new()
where TConfig : SocketConfig {
where TConfig : SocketConfig
{

/// <summary>
/// True if the client is connected to a server.
Expand All @@ -29,57 +32,83 @@ public class SocketClient<TSession, TConfig> : SessionHandler<TSession, TConfig>
/// Creates a socket client with the specified configurations.
/// </summary>
/// <param name="config">Configurations to use.</param>
public SocketClient(TConfig config) : base(config, SocketMode.Client) {
public SocketClient(TConfig config) : base(config, SocketMode.Client)
{
}

/// <summary>
/// Connects to the configured endpoint.
/// </summary>
public void Connect() {
public void Connect()
{
Connect(new IPEndPoint(IPAddress.Parse(Config.Ip), Config.Port));
}


/// <summary>
/// Task which will run when a connection times out.
/// </summary>
private Task connection_timeout_task;

/// <summary>
/// Cancellation token to cancel the timeout event for connections.
/// </summary>
private CancellationTokenSource connection_timeout_cancellation;

/// <summary>
/// Connects to the specified endpoint.
/// </summary>
/// <param name="end_point">Endpoint to connect to.</param>
public void Connect(IPEndPoint end_point) {
if (MainSocket != null && Session?.CurrentState != SocketSession<TSession, TConfig>.State.Closed) {
public void Connect(IPEndPoint end_point)
{
if (MainSocket != null && Session?.CurrentState != SocketSession<TSession, TConfig>.State.Closed)
{
throw new InvalidOperationException("Client is in the process of connecting.");
}

MainSocket = new System.Net.Sockets.Socket(end_point.AddressFamily, SocketType.Stream, ProtocolType.Tcp) {
MainSocket = new System.Net.Sockets.Socket(end_point.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true
};

// Set to true if the client connection either timed out or was canceled.
bool timed_out = false;
connection_timeout_cancellation = new CancellationTokenSource();

Timer connection_timer = null;
connection_timeout_task = new Task(async () =>
{
try
{
await Task.Delay(Config.ConnectionTimeout, connection_timeout_cancellation.Token);
}
catch
{
return;
}

connection_timer = new Timer(o => {
timed_out = true;
MainSocket.Close();
connection_timer.Change(Timeout.Infinite, Timeout.Infinite);
connection_timer.Dispose();

OnClose(null, SocketCloseReason.TimeOut);
MainSocket.Close();
});

var event_arg = new SocketAsyncEventArgs {




var event_arg = new SocketAsyncEventArgs
{
RemoteEndPoint = end_point
};

event_arg.Completed += (sender, args) => {
if (timed_out) {
if (timed_out)
{
return;
}
if (args.LastOperation == SocketAsyncOperation.Connect) {
if (args.LastOperation == SocketAsyncOperation.Connect)
{

// Stop the timeout timer.
connection_timer.Change(Timeout.Infinite, Timeout.Infinite);
connection_timer.Dispose();
connection_timeout_cancellation.Cancel();

Session = CreateSession(MainSocket);
Session.Connected += (sndr, e) => OnConnect(Session);
Expand All @@ -90,21 +119,24 @@ public void Connect(IPEndPoint end_point) {
}
};



MainSocket.ConnectAsync(event_arg);

connection_timer.Change(Config.ConnectionTimeout, Timeout.Infinite);
connection_timeout_task.Start();


}

protected override void OnClose(TSession session, SocketCloseReason reason) {
protected override void OnClose(TSession session, SocketCloseReason reason)
{
MainSocket.Close();

TSession sess_out;

// If the session is null, the connection timed out while trying to connect.
if (session != null) {
if (session != null)
{
ConnectedSessions.TryRemove(Session.Id, out sess_out);
}

Expand Down

0 comments on commit 3963e0d

Please sign in to comment.