Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Changed AuthenticationResult to AuthenticationSuccess.
Browse files Browse the repository at this point in the history
Authentication failure is now handled through the Closed session event.
Updated all RPC calls to use MessageHandler SendHandlerMessage method.
SendHandlerMessage now prefixes messages with a header frame and strips this frame when passing it to the handler action.
  • Loading branch information
DJGosnell committed Oct 14, 2016
1 parent 4d9b99f commit cbdb53f
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 156 deletions.
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue.Tests/Mq/MqClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void Client_prevents_times_out() {
}
};

StartAndWait(false, 2000);
StartAndWait(false, 1500);

if (TestStatus.IsSet) {
throw new Exception("Client timed out.");
Expand Down
42 changes: 2 additions & 40 deletions src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,36 +238,6 @@ public void Client_disconnectes_from_failed_authentication() {
StartAndWait();
}

[Fact]
public void Client_notified_of_authentication_failure() {
Server.Config.RequireAuthentication = true;

Server.SessionSetup += (sender, args) => {
args.Session.AddService<ICalculatorService>(new CalculatorService());
};

Server.Authenticate += (sender, e) => {
e.Authenticated = false;
};

Client.AuthenticationResult += (sender, e) => {
if (e.Authenticated) {
LastException = new Exception("Client notified of authentication wrongly.");
}
TestStatus.Set();
};

Client.Closed += (sender, e) => {

};

Client.Authenticate += (sender, e) => {
e.AuthData = new byte[] {5, 4, 3, 2, 1};
};

StartAndWait();
}


[Fact]
public void Client_notified_of_authentication_success() {
Expand All @@ -281,7 +251,7 @@ public void Client_notified_of_authentication_success() {
e.Authenticated = true;
};

Client.AuthenticationResult += (sender, e) => {
Client.AuthenticationSuccess += (sender, e) => {
if (e.Authenticated == false) {
LastException = new Exception("Client notified of authentication wrongly.");
}
Expand All @@ -299,18 +269,10 @@ public void Client_notified_of_authentication_success() {
public void Client_times_out_on_auth_failure() {
Server.Config.RequireAuthentication = true;
Server.Config.ConnectionTimeout = 100;
bool auth_failure_called = false;

Client.AuthenticationResult += (sender, e) => {
auth_failure_called = !e.Authenticated;
};

Client.Closed += (sender, e) => {
if (auth_failure_called == false) {
LastException = new Exception("Client was not notified that the authentication failed.");
}
if (e.CloseReason != SocketCloseReason.AuthenticationFailure) {
LastException = new Exception("Client was disconnected for invalid reason.");
LastException = new Exception("Client was not notified that the authentication failed.");
}
TestStatus.Set();
};
Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<Compile Include="MqSession.cs" />
<Compile Include="MqConfig.cs" />
<Compile Include="Rpc\MessageHandlers\ByteTransport.cs" />
<Compile Include="Rpc\MessageHandlers\ByteTransportMessageType.cs" />
<Compile Include="Rpc\MessageHandlers\ByteTransportMessageAction.cs" />
<Compile Include="Rpc\DataContract\RpcServerInfoDataContract.cs" />
<Compile Include="Rpc\IRemoteService.cs" />
<Compile Include="Rpc\MessageHandler.cs" />
Expand All @@ -65,7 +65,7 @@
<Compile Include="Rpc\RpcClient.cs" />
<Compile Include="Rpc\RpcCommandType.cs" />
<Compile Include="Rpc\RpcConfig.cs" />
<Compile Include="Rpc\MessageHandlers\RpcCallMessageType.cs" />
<Compile Include="Rpc\MessageHandlers\RpcCallMessageAction.cs" />
<Compile Include="Rpc\RpcProxy.cs" />
<Compile Include="Rpc\RpcRemoteException.cs" />
<Compile Include="Rpc\DataContract\RpcRemoteExceptionDataContract.cs" />
Expand Down
8 changes: 5 additions & 3 deletions src/DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ public override void Close(SocketCloseReason reason) {
// If we are passed a closing frame, then send it to the other connection.
if (close_frame != null) {
MqMessage msg;
// If we have an authentication error, we are simultaneously notifying the client of this and a session close.
// So this means we do not clear the queue since it contains an auth failure message and it should already be at the top of the stack.
if (outbox.IsEmpty == false && reason != SocketCloseReason.AuthenticationFailure) {
if (outbox.IsEmpty == false) {
while (outbox.TryDequeue(out msg)) {
}
}
Expand Down Expand Up @@ -297,6 +295,10 @@ public void Send(MqMessage message) {
if (is_running == false) {
return;
}
if (CurrentState == State.Closing) {
return;
}

lock (outbox_lock) {
outbox.Enqueue(message);
}
Expand Down
30 changes: 24 additions & 6 deletions src/DtronixMessageQueue/Rpc/MessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,52 @@ public abstract class MessageHandler<TSession, TConfig>
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {

public delegate void ActionHandler(byte action_handler, MqMessage message);

/// <summary>
/// Id byte which precedes all messages all messages of this type.
/// </summary>
public abstract byte Id { get; }

protected TSession Session;

protected Dictionary<byte, Action<MqMessage>> handlers = new Dictionary<byte, Action<MqMessage>>();
protected Dictionary<byte, ActionHandler> Handlers = new Dictionary<byte, ActionHandler>();

protected MessageHandler(TSession session) {
Session = session;
}

public void HandleMessage(MqMessage message) {
public bool HandleMessage(MqMessage message) {
if (message[0][0] != Id) {
Session.Close(SocketCloseReason.ProtocolError);
}

// Read the type of message.
var message_type = message[0].ReadByte(1);

if (handlers.ContainsKey(message_type)) {
handlers[message_type].Invoke(message);
if (Handlers.ContainsKey(message_type)) {
message.RemoveAt(0);
Handlers[message_type].Invoke(message_type, message);
return true;
}

// Unknown message type passed. Disconnect the connection.
Session.Close(SocketCloseReason.ProtocolError);
return false;
}

public void SendHandlerMessage(byte action_id, MqMessage message) {
var header_frame = Session.CreateFrame(new byte[2], MqFrameType.More);
header_frame.Write(0, Id);
header_frame.Write(1, action_id);

if (message == null) {
message = new MqMessage(header_frame);
} else {
// Unknown message type passed. Disconnect the connection.
Session.Close(SocketCloseReason.ProtocolError);
message.Insert(0, header_frame);
}

Session.Send(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public async Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancellat
/// <param name="count">Number of bytes to write to the message from the buffer.</param>
public void Write(byte[] buffer, int index, int count) {
message_writer.Write((byte) 2);
message_writer.Write((byte) ByteTransportMessageType.Write);
message_writer.Write((byte) ByteTransportMessageAction.Write);
message_writer.Write(id);
message_writer.Write(buffer, index, count);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/// <summary>
/// Type of message which is being sent.
/// </summary>
public enum ByteTransportMessageType : byte {
public enum ByteTransportMessageAction : byte {

/// <summary>
/// Unknown default type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,32 @@ public class ByteTransportMessageHandler<TSession, TConfig> : MessageHandler<TSe


public ByteTransportMessageHandler(TSession session) : base(session) {
handlers.Add((byte)ByteTransportMessageType.RequestTransportHandle, RequestTransportHandle);
Handlers.Add((byte)ByteTransportMessageAction.RequestTransportHandle, RequestTransportHandle);
}

private void RequestTransportHandle(MqMessage message) {
ushort handle_id = message[0].ReadUInt16(2);
long size = message[0].ReadInt64(4);
private void RequestTransportHandle(byte action_id, MqMessage message) {
ushort handle_id = message[0].ReadUInt16(0);
long size = message[0].ReadInt64(2);



if (size > Session.Config.MaxByteTransportLength) {
var error_frame = Session.CreateFrame(new byte[2], MqFrameType.Last);
error_frame.Write(0, Id);
error_frame.Write(1, (byte)ByteTransportMessageType.ResponseTransportHandle);
error_frame.Write(1, (byte)ByteTransportMessageAction.ResponseTransportHandle);
}

ByteTransport<TSession, TConfig> transport = new ByteTransport<TSession, TConfig>(Session, handle_id, size);


receive_operation.CreateWaitHandle(handle_id);

RemoteTransports.TryAdd(handle_id, remote_transport);
//RemoteTransports.TryAdd(handle_id, remote_transport);

// Create response.
var response_frame = Session.CreateFrame(new byte[4], MqFrameType.Last);
response_frame.Write(0, Id);
response_frame.Write(1, (byte)ByteTransportMessageType.ResponseTransportHandle);
response_frame.Write(1, (byte)ByteTransportMessageAction.ResponseTransportHandle);
response_frame.Write(2, handle_id);

Session.Send(response_frame);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/// <summary>
/// Type of message which is being sent.
/// </summary>
public enum RpcCallMessageType : byte {
public enum RpcCallMessageAction : byte {

/// <summary>
/// Unknown default type.
Expand Down
Loading

0 comments on commit cbdb53f

Please sign in to comment.