Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Reordering internal way wait handles are held.
Browse files Browse the repository at this point in the history
  • Loading branch information
DJGosnell committed Sep 26, 2016
1 parent a6ba5dd commit e352f54
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 58 deletions.
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<Compile Include="Rpc\RpcProxy.cs" />
<Compile Include="Rpc\RpcRemoteException.cs" />
<Compile Include="Rpc\DataContract\RpcRemoteExceptionDataContract.cs" />
<Compile Include="Rpc\RpcOperationWait.cs" />
<Compile Include="Rpc\RpcWaitHandle.cs" />
<Compile Include="Rpc\RpcServer.cs" />
<Compile Include="Rpc\RpcSession.cs" />
<Compile Include="Rpc\RpcStream.cs" />
Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/Rpc/IProcessRpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ public interface IProcessRpcSession {
/// Called to cancel a remote waiting operation on the recipient connection.
/// </summary>
/// <param name="id">Id of the waiting operation to cancel.</param>
void CancelWaitOperation(ushort id);
void CancelWaitHandle(ushort id);

/// <summary>
/// Creates a waiting operation for this session. Could be a remote cancellation request or a pending result request.
/// </summary>
/// <returns>Wait operation to wait on.</returns>
RpcOperationWait CreateWaitOperation();
RpcWaitHandle CreateWaitHandle();
}
}
12 changes: 0 additions & 12 deletions src/DtronixMessageQueue/Rpc/RpcCommandType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,5 @@ public enum RpcCommandType : byte {
/// Server is sending the result of the authentication request.
/// </summary>
AuthenticationResult = 2,



/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
/// </summary>
RequestStreamHandle = 10,

/// <summary>
/// Sends a request to the client/server session for a stream handle be closed.
/// </summary>
CloseStreamHandle = 11,
}
}
19 changes: 17 additions & 2 deletions src/DtronixMessageQueue/Rpc/RpcMessageType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public enum RpcMessageType : byte {
Unset = 0,

/// <summary>
/// Type is a Rpc command and consumed internally.
/// Type is a Rpc command and consumed internally. (Currently unused)
/// </summary>
Command = 1,

Expand Down Expand Up @@ -39,6 +39,21 @@ public enum RpcMessageType : byte {
/// <summary>
/// Message used to cancel a pending operation.
/// </summary>
RpcCallCancellation = 6
RpcCallCancellation = 6,

/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
/// </summary>
RequestStreamHandle = 10,

/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
/// </summary>
RespondStreamHandle = 11,

/// <summary>
/// Sends a request to the client/server session for a stream handle be closed.
/// </summary>
CloseStreamHandle = 12,
}
}
7 changes: 3 additions & 4 deletions src/DtronixMessageQueue/Rpc/RpcProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public override IMessage Invoke(IMessage msg) {
}


RpcOperationWait return_wait = null;
RpcWaitHandle return_wait = null;

// Determine what kind of method we are calling.
if (method_info.ReturnType == typeof(void)) {
Expand All @@ -90,7 +90,7 @@ public override IMessage Invoke(IMessage msg) {
serializer.MessageWriter.Write((byte) RpcMessageType.RpcCall);

// Create a wait operation to wait for the response.
return_wait = ((IProcessRpcSession)session).CreateWaitOperation();
return_wait = ((IProcessRpcSession)session).CreateWaitHandle();

// Byte[1,2] Wait Id which is used for returning the value and cancellation.
serializer.MessageWriter.Write(return_wait.Id);
Expand Down Expand Up @@ -125,7 +125,7 @@ public override IMessage Invoke(IMessage msg) {
} catch (OperationCanceledException) {

// If the operation was canceled, cancel the wait on this end and notify the other end.
((IProcessRpcSession)session).CancelWaitOperation(return_wait.Id);
((IProcessRpcSession)session).CancelWaitHandle(return_wait.Id);
throw new OperationCanceledException("Wait handle was canceled while waiting for a response.");
}

Expand Down Expand Up @@ -155,7 +155,6 @@ public override IMessage Invoke(IMessage msg) {
case RpcMessageType.RpcCallReturn:

// Deserialize the return value and return it to the local method call.

var return_value = serializer.DeserializeFromReader(method_info.ReturnType, 0);
return new ReturnMessage(return_value, null, 0, method_call.LogicalCallContext, method_call);

Expand Down
65 changes: 31 additions & 34 deletions src/DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public abstract class RpcSession<TSession, TConfig> : MqSession<TSession, TConfi
/// <summary>
/// Contains all outstanding call returns pending a return of data from the recipient connection.
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcOperationWait> outstanding_waits =
new ConcurrentDictionary<ushort, RpcOperationWait>();
private readonly ConcurrentDictionary<ushort, RpcWaitHandle> remote_wait_handles =
new ConcurrentDictionary<ushort, RpcWaitHandle>();

/// <summary>
/// Contains all operations running on this session which are cancellable.
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcOperationWait> ongoing_operations =
new ConcurrentDictionary<ushort, RpcOperationWait>();
private readonly ConcurrentDictionary<ushort, RpcWaitHandle> local_wait_handles =
new ConcurrentDictionary<ushort, RpcWaitHandle>();

/// <summary>
/// Contains all services that can be remotely executed on this session.
Expand All @@ -74,8 +74,8 @@ public abstract class RpcSession<TSession, TConfig> : MqSession<TSession, TConfi
/// <summary>
/// Contains all active stream handles for this session.
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcOperationWait> stream_handles =
new ConcurrentDictionary<ushort, RpcOperationWait>();
private readonly ConcurrentDictionary<ushort, RpcWaitHandle> stream_handles =
new ConcurrentDictionary<ushort, RpcWaitHandle>();

/// <summary>
/// Server base socket for this session.
Expand Down Expand Up @@ -278,13 +278,6 @@ protected override void ProcessCommand(MqFrame frame) {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});

} else if (rpc_command_type == RpcCommandType.RequestStreamHandle) {
// RpcCommand:byte; RpcCommandType:byte; AuthResult:bool;
Send(new MqFrame());
var sream_handle_id = frame.ReadUInt16(2);

} else if (rpc_command_type == RpcCommandType.RequestStreamHandle) {

} else {
Close(SocketCloseReason.ProtocolError);
}
Expand Down Expand Up @@ -345,18 +338,14 @@ protected override void OnIncomingMessage(object sender, IncomingMessageEventArg
var message_type = (RpcMessageType) message[0].ReadByte(0);

switch (message_type) {
case RpcMessageType.Command:
// Reserved for future use.
//ProcessRpcCommand(message);
break;

case RpcMessageType.RpcCallCancellation:

// Remotely called to cancel a rpc call on this session.
var cancellation_id = message[0].ReadUInt16(1);
RpcOperationWait wait_operation;
if (ongoing_operations.TryRemove(cancellation_id, out wait_operation)) {
wait_operation.TokenSource.Cancel();
RpcWaitHandle wait_handle;
if (remote_wait_handles.TryRemove(cancellation_id, out wait_handle)) {
wait_handle.TokenSource.Cancel();
}
break;

Expand All @@ -370,6 +359,10 @@ protected override void OnIncomingMessage(object sender, IncomingMessageEventArg
ProcessRpcReturn(message);
break;

case RpcMessageType.RequestStreamHandle:
SetupStreamHandle(message);
break;

default:
// Unknown message type passed. Disconnect the connection.
e.Session.Close(SocketCloseReason.ProtocolError);
Expand All @@ -380,6 +373,10 @@ protected override void OnIncomingMessage(object sender, IncomingMessageEventArg
}
}

private void SetupStreamHandle(MqMessage message) {
throw new NotImplementedException();
}


/// <summary>
/// Adds a proxy interface and instance to the current session to allow for remote method proxying.
Expand Down Expand Up @@ -460,11 +457,11 @@ private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {

// Number used to increase the number of parameters if there is a cancellation token.
int cancellation_token_param = 0;
RpcOperationWait cancellation_wait;
RpcWaitHandle cancellation_wait;

// If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation.
if (rec_message_return_id != 0 && last_param?.ParameterType == typeof(CancellationToken)) {
cancellation_wait = new RpcOperationWait {
cancellation_wait = new RpcWaitHandle {
Token = cancellation_source.Token,
TokenSource = cancellation_source,
Id = rec_message_return_id
Expand All @@ -474,7 +471,7 @@ private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {
cancellation_token_param = 1;

// Add it to the main list of ongoing operations.
ongoing_operations.TryAdd(rec_message_return_id, cancellation_wait);
remote_wait_handles.TryAdd(rec_message_return_id, cancellation_wait);
}

// Setup the parameters to pass to the invoked method.
Expand Down Expand Up @@ -518,7 +515,7 @@ private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {
return;
} finally {
// Remove the cancellation wait if it exists.
ongoing_operations.TryRemove(rec_message_return_id, out cancellation_wait);
local_wait_handles.TryRemove(rec_message_return_id, out cancellation_wait);

}

Expand Down Expand Up @@ -575,13 +572,13 @@ private void ProcessRpcReturn(MqMessage mq_message) {
// Read the return Id.
var return_id = serialization.MessageReader.ReadUInt16();

RpcOperationWait call_wait;
RpcWaitHandle call_wait_handle;
// Try to get the outstanding wait from the return id. If it does not exist, the has already completed.
if (outstanding_waits.TryRemove(return_id, out call_wait)) {
call_wait.ReturnMessage = mq_message;
if (local_wait_handles.TryRemove(return_id, out call_wait_handle)) {
call_wait_handle.ReturnMessage = mq_message;

// Release the wait event.
call_wait.ReturnResetEvent.Set();
call_wait_handle.ReturnResetEvent.Set();
}

} finally {
Expand Down Expand Up @@ -622,8 +619,8 @@ private void SendRpcException(SerializationCache.Serializer serialization, Excep
/// Creates a waiting operation for this session. Could be a remote cancellation request or a pending result request.
/// </summary>
/// <returns>Wait operation to wait on.</returns>
RpcOperationWait IProcessRpcSession.CreateWaitOperation() {
var return_wait = new RpcOperationWait {
RpcWaitHandle IProcessRpcSession.CreateWaitHandle() {
var return_wait = new RpcWaitHandle {
ReturnResetEvent = new ManualResetEventSlim()
};

Expand All @@ -636,7 +633,7 @@ RpcOperationWait IProcessRpcSession.CreateWaitOperation() {
}

// Add the wait to the outstanding wait dictionary for retrieval later.
if (outstanding_waits.TryAdd(return_wait.Id, return_wait) == false) {
if (local_wait_handles.TryAdd(return_wait.Id, return_wait) == false) {
throw new InvalidOperationException($"Id {return_wait.Id} already exists in the return_wait_handles dictionary.");
}

Expand All @@ -647,11 +644,11 @@ RpcOperationWait IProcessRpcSession.CreateWaitOperation() {
/// Called to cancel a remote waiting operation on the recipient connection.
/// </summary>
/// <param name="id">Id of the waiting operation to cancel.</param>
void IProcessRpcSession.CancelWaitOperation(ushort id) {
RpcOperationWait call_wait;
void IProcessRpcSession.CancelWaitHandle(ushort id) {
RpcWaitHandle call_wait_handle;

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
if (outstanding_waits.TryRemove(id, out call_wait)) {
if (local_wait_handles.TryRemove(id, out call_wait_handle)) {
var frame = new MqFrame(new byte[3], MqFrameType.Last, Config);
frame.Write(0, (byte) RpcMessageType.RpcCallCancellation);
frame.Write(1, id);
Expand Down
2 changes: 0 additions & 2 deletions src/DtronixMessageQueue/Rpc/RpcStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class RpcStream<TSession, TConfig> : Stream
/// <param name="session">Session to write to.</param>
public RpcStream(RpcSession<TSession, TConfig> session) {
this.session = session;

session.
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace DtronixMessageQueue.Rpc {
/// <summary>
/// Class which represents a task which is waiting on another event to occur.
/// </summary>
public class RpcOperationWait {
public class RpcWaitHandle {

/// <summary>
/// Id of this wait operation. Used to coordinate between client/server.
Expand Down

0 comments on commit e352f54

Please sign in to comment.