Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Moved message handling into separate class to separate session functi…
Browse files Browse the repository at this point in the history
…onality.
  • Loading branch information
DJGosnell committed Sep 27, 2016
1 parent 9713a4f commit 22b2c26
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<Compile Include="Rpc\RpcClient.cs" />
<Compile Include="Rpc\RpcCommandType.cs" />
<Compile Include="Rpc\RpcConfig.cs" />
<Compile Include="Rpc\RpcMessageType.cs" />
<Compile Include="Rpc\RpcCallMessageType.cs" />
<Compile Include="Rpc\RpcProxy.cs" />
<Compile Include="Rpc\RpcRemoteException.cs" />
<Compile Include="Rpc\DataContract\RpcRemoteExceptionDataContract.cs" />
Expand Down
5 changes: 5 additions & 0 deletions src/DtronixMessageQueue/Rpc/MessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ public abstract class MessageHandler<TSession, TConfig>
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {

/// <summary>
/// Id byte which precedes all messages all messages of this type.
/// </summary>
public abstract byte Id { get; }

protected TSession Session;

protected MessageHandler(TSession session) {
Expand Down
50 changes: 30 additions & 20 deletions src/DtronixMessageQueue/Rpc/RpcCallMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public class RpcCallMessageHandler<TSession, TConfig> : MessageHandler<TSession,
/// </summary>
private readonly object rpc_call_id_lock = new object();

/// <summary>
/// Id byte which precedes all messages all messages of this type.
/// </summary>
public override byte Id => 1;

/// <summary>
/// Contains all outstanding call returns pending a return of data from the recipient connection.
/// </summary>
Expand Down Expand Up @@ -60,33 +65,35 @@ public RpcCallMessageHandler(TSession session, IWorkItemsGroup thread_pool) : ba
this.thread_pool = thread_pool;
}



public override bool HandleMessage(MqMessage message) {
if (message[0][0] != 1) {
if (message[0][0] != Id) {
return false;
}

// Read the type of message.
var message_type = (RpcMessageType)message[0].ReadByte(0);
var message_type = (RpcCallMessageType)message[0].ReadByte(1);

switch (message_type) {

case RpcMessageType.RpcCallCancellation:
case RpcCallMessageType.MethodCancel:

// Remotely called to cancel a rpc call on this session.
var cancellation_id = message[0].ReadUInt16(1);
var cancellation_id = message[0].ReadUInt16(2);
RpcWaitHandle wait_handle;
if (remote_wait_handles.TryRemove(cancellation_id, out wait_handle)) {
wait_handle.TokenSource.Cancel();
}
break;

case RpcMessageType.RpcCallNoReturn:
case RpcMessageType.RpcCall:
case RpcCallMessageType.MethodCallNoReturn:
case RpcCallMessageType.MethodCall:
ProcessRpcCall(message, message_type);
break;

case RpcMessageType.RpcCallException:
case RpcMessageType.RpcCallReturn:
case RpcCallMessageType.MethodException:
case RpcCallMessageType.MethodReturn:
ProcessRpcReturn(message);
break;

Expand All @@ -105,7 +112,7 @@ public override bool HandleMessage(MqMessage message) {
/// </summary>
/// <param name="message">Message containing the Rpc call.</param>
/// <param name="message_type">Type of call this message is.</param>
private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {
private void ProcessRpcCall(MqMessage message, RpcCallMessageType message_type) {

// Execute the processing on the worker thread.
thread_pool.QueueWorkItem(() => {
Expand All @@ -115,11 +122,11 @@ private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {
ushort rec_message_return_id = 0;

try {
// Skip RpcMessageType
serialization.MessageReader.ReadByte();
// Skip Handler.Id & RpcMessageType
serialization.MessageReader.ReadBytes(2);

// Determine if this call has a return value.
if (message_type == RpcMessageType.RpcCall) {
if (message_type == RpcCallMessageType.MethodCall) {
rec_message_return_id = serialization.MessageReader.ReadUInt16();
}

Expand Down Expand Up @@ -211,13 +218,14 @@ private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {


// Determine what to do with the return value.
if (message_type == RpcMessageType.RpcCall) {
if (message_type == RpcCallMessageType.MethodCall) {
// Reset the stream.
serialization.Stream.SetLength(0);

// Write the Rpc call type and the id.
serialization.MessageWriter.Clear();
serialization.MessageWriter.Write((byte)RpcMessageType.RpcCallReturn);
serialization.MessageWriter.Write(Id);
serialization.MessageWriter.Write((byte)RpcCallMessageType.MethodReturn);
serialization.MessageWriter.Write(rec_message_return_id);

// Serialize the return value and add it to the stream.
Expand Down Expand Up @@ -255,8 +263,8 @@ private void ProcessRpcReturn(MqMessage message) {
var serialization = Session.SerializationCache.Get(message);
try {

// Skip message type byte.
serialization.MessageReader.ReadByte();
// Skip message type byte and message type.
serialization.MessageReader.ReadBytes(2);

// Read the return Id.
var return_id = serialization.MessageReader.ReadUInt16();
Expand Down Expand Up @@ -290,7 +298,8 @@ private void SendRpcException(SerializationCache.Serializer serialization, Excep
serialization.MessageWriter.Clear();

// Writer the Rpc call type and the return Id.
serialization.MessageWriter.Write((byte)RpcMessageType.RpcCallException);
serialization.MessageWriter.Write(Id);
serialization.MessageWriter.Write((byte)RpcCallMessageType.MethodException);
serialization.MessageWriter.Write(message_return_id);

// Get the exception information in a format that we can serialize.
Expand Down Expand Up @@ -339,9 +348,10 @@ public void CancelWaitHandle(ushort id) {

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
if (local_wait_handles.TryRemove(id, out call_wait_handle)) {
var frame = new MqFrame(new byte[3], MqFrameType.Last, Session.Config);
frame.Write(0, (byte)RpcMessageType.RpcCallCancellation);
frame.Write(1, id);
var frame = new MqFrame(new byte[4], MqFrameType.Last, Session.Config);
frame.Write(0, Id);
frame.Write(1, (byte)RpcCallMessageType.MethodCancel);
frame.Write(2, id);

Session.Send(frame);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,38 @@
/// <summary>
/// Type of message which is being sent.
/// </summary>
public enum RpcMessageType : byte {
public enum RpcCallMessageType : byte {

/// <summary>
/// Unknown default type.
/// </summary>
Unset = 0,

/// <summary>
/// Type is a Rpc command and consumed internally. (Currently unused)
/// </summary>
Command = 1,

/// <summary>
/// Message is a standard Rpc call with a return value.
/// </summary>
RpcCall = 2,
MethodCall = 1,

/// <summary>
/// Message is a Rpc call with no return value.
/// </summary>
RpcCallNoReturn = 3,
MethodCallNoReturn = 2,

/// <summary>
/// Message is a Rpc response with a return value.
/// </summary>
RpcCallReturn = 4,
MethodReturn = 3,


/// <summary>
/// Message is a Rpc response. Message contains information about the exception thrown.
/// </summary>
RpcCallException = 5,
MethodException = 4,

/// <summary>
/// Message used to cancel a pending operation.
/// </summary>
RpcCallCancellation = 6,
MethodCancel = 5,

/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
Expand Down
26 changes: 15 additions & 11 deletions src/DtronixMessageQueue/Rpc/RpcProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class RpcProxy<T, TSession, TConfig> : RealProxy
/// </summary>
private readonly T decorated;

private readonly RpcCallMessageHandler<TSession, TConfig> message_handler;
private readonly RpcCallMessageHandler<TSession, TConfig> call_message_handler;

/// <summary>
/// Session used to convey the proxied methods over.
Expand All @@ -40,9 +40,9 @@ public class RpcProxy<T, TSession, TConfig> : RealProxy
/// </summary>
/// <param name="decorated">Class to proxy method calls from.</param>
/// <param name="session">Session to convey proxied method calls over.</param>
public RpcProxy(T decorated, RpcSession<TSession, TConfig> session, RpcCallMessageHandler<TSession, TConfig> message_handler) : base(typeof(T)) {
public RpcProxy(T decorated, RpcSession<TSession, TConfig> session, RpcCallMessageHandler<TSession, TConfig> call_message_handler) : base(typeof(T)) {
this.decorated = decorated;
this.message_handler = message_handler;
this.call_message_handler = call_message_handler;
this.session = (TSession) session;
}

Expand Down Expand Up @@ -81,19 +81,20 @@ public override IMessage Invoke(IMessage msg) {


RpcWaitHandle return_wait = null;
serializer.MessageWriter.Write(call_message_handler.Id);

// Determine what kind of method we are calling.
if (method_info.ReturnType == typeof(void)) {

// Byte[0] The call has no return value so we are not waiting.
serializer.MessageWriter.Write((byte) RpcMessageType.RpcCallNoReturn);
serializer.MessageWriter.Write((byte) RpcCallMessageType.MethodCallNoReturn);
} else {

// Byte[0] The call has a return value so we are going to need to wait on the resposne.
serializer.MessageWriter.Write((byte) RpcMessageType.RpcCall);
serializer.MessageWriter.Write((byte) RpcCallMessageType.MethodCall);

// Create a wait operation to wait for the response.
return_wait = message_handler.CreateWaitHandle();
return_wait = call_message_handler.CreateWaitHandle();

// Byte[1,2] Wait Id which is used for returning the value and cancellation.
serializer.MessageWriter.Write(return_wait.Id);
Expand Down Expand Up @@ -128,7 +129,7 @@ public override IMessage Invoke(IMessage msg) {
} catch (OperationCanceledException) {

// If the operation was canceled, cancel the wait on this end and notify the other end.
message_handler.CancelWaitHandle(return_wait.Id);
call_message_handler.CancelWaitHandle(return_wait.Id);
throw new OperationCanceledException("Wait handle was canceled while waiting for a response.");
}

Expand All @@ -143,9 +144,12 @@ public override IMessage Invoke(IMessage msg) {

// Start parsing the received message.
serializer.MessageReader.Message = return_wait.ReturnMessage;


// Skip the Handler Id.
serializer.MessageReader.ReadByte();

// Read the first byte which dictates the type of message.
var return_type = (RpcMessageType)serializer.MessageReader.ReadByte();
var return_type = (RpcCallMessageType)serializer.MessageReader.ReadByte();

// Skip 2 bytes for the return ID
serializer.MessageReader.ReadBytes(2);
Expand All @@ -155,13 +159,13 @@ public override IMessage Invoke(IMessage msg) {


switch (return_type) {
case RpcMessageType.RpcCallReturn:
case RpcCallMessageType.MethodReturn:

// Deserialize the return value and return it to the local method call.
var return_value = serializer.DeserializeFromReader(method_info.ReturnType, 0);
return new ReturnMessage(return_value, null, 0, method_call.LogicalCallContext, method_call);

case RpcMessageType.RpcCallException:
case RpcCallMessageType.MethodException:

// Deserialize the exception and let the local method call receive it.
var return_exception = serializer.DeserializeFromReader(method_info.ReturnType, 0);
Expand Down
18 changes: 8 additions & 10 deletions src/DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class RpcSession<TSession, TConfig> : MqSession<TSession, TConfi
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {

public List<MessageHandler<TSession, TConfig>> MessageHandlers { get; }
public Dictionary<byte, MessageHandler<TSession, TConfig>> MessageHandlers { get; }

protected RpcCallMessageHandler<TSession, TConfig> RpcCallHandler;
/// <summary>
Expand Down Expand Up @@ -74,7 +74,7 @@ public abstract class RpcSession<TSession, TConfig> : MqSession<TSession, TConfi
public bool Authenticated { get; private set; }

protected RpcSession() {
MessageHandlers = new List<MessageHandler<TSession, TConfig>>();
MessageHandlers = new Dictionary<byte, MessageHandler<TSession, TConfig>>();
}

/// <summary>
Expand All @@ -97,7 +97,7 @@ protected override void OnSetup() {

RpcCallHandler = new RpcCallMessageHandler<TSession, TConfig>((TSession)this, WorkerThreadPool);

MessageHandlers.Add(RpcCallHandler);
MessageHandlers.Add(RpcCallHandler.Id, RpcCallHandler);

}

Expand Down Expand Up @@ -306,13 +306,11 @@ protected override void OnIncomingMessage(object sender, IncomingMessageEventArg
// Continue to parse the messages in this queue.
while (e.Messages.Count > 0) {
message = e.Messages.Dequeue();
// Check to see if this can be handled by the main RpcCallHandler. If not,
if (RpcCallHandler.HandleMessage(message) == false) {
foreach (var message_handler in MessageHandlers) {
if (message_handler.HandleMessage(message)) {
break;
}
}

var handler_id = message[0].ReadByte(0);

if (MessageHandlers.ContainsKey(handler_id)) {
MessageHandlers[handler_id].HandleMessage(message);
}
}
}
Expand Down

0 comments on commit 22b2c26

Please sign in to comment.