Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Ported all waiting methods to a dedicated classes to be reused in str…
Browse files Browse the repository at this point in the history
…eam reading.
  • Loading branch information
DJGosnell committed Oct 11, 2016
1 parent 055bf58 commit 65f9313
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public RpcPerformanceTest(string[] args) {
Port = 2828
};

RpcSingleProcessTest(100, 4, config, RpcTestType.LngBlock);
//RpcSingleProcessTest(20, 4, config, RpcTestType.LngBlock);

RpcSingleProcessTest(200000, 4, config, RpcTestType.NoRetrun);
RpcSingleProcessTest(200000, 4, config, RpcTestType.NoReturn);

RpcSingleProcessTest(200000, 4, config, RpcTestType.Await);

Expand Down Expand Up @@ -89,7 +89,7 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest
service.TestNoReturnBlock();
break;

case RpcTestType.NoRetrun:
case RpcTestType.NoReturn:
service.TestNoReturn();
break;

Expand Down Expand Up @@ -143,7 +143,7 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest
}

enum RpcTestType {
NoRetrun,
NoReturn,
Return,
Exception,
Await,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ public async void TestNoReturnAwait() {

public void TestNoReturnLongBlocking() {
var number = Interlocked.Increment(ref call_count);
Console.WriteLine($"Started {number}");
Thread.Sleep(10000);
Console.WriteLine($"Completed {number}");

VerifyComplete();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public override bool HandleMessage(MqMessage message) {
case ByteTransportMessageType.ResponseTransportHandle:
var id = message[0].ReadUInt16(2);

message_writer.Write((byte)ByteTransportMessageType.Write);
//message_writer.Write((byte)ByteTransportMessageType.Write);
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class RpcCallMessageHandler<TSession, TConfig> : MessageHandler<TSession,
/// <summary>
/// Id byte which precedes all messages all messages of this type.
/// </summary>
public override byte Id => 1;
public sealed override byte Id => 1;

/// <summary>
/// Contains all services that can be remotely executed on this session.
Expand All @@ -36,7 +36,10 @@ public class RpcCallMessageHandler<TSession, TConfig> : MessageHandler<TSession,
/// </summary>
public readonly Dictionary<Type, RealProxy> RemoteServiceRealproxy = new Dictionary<Type, RealProxy>();

public readonly ResponseWait<TSession, TConfig> WaitOperations;

public RpcCallMessageHandler(TSession session) : base(session) {
WaitOperations = new ResponseWait<TSession, TConfig>(Id, Session);
}


Expand All @@ -55,10 +58,7 @@ public override bool HandleMessage(MqMessage message) {

// Remotely called to cancel a rpc call on this session.
var cancellation_id = message[0].ReadUInt16(2);
ResponseWaitHandle wait_handle;
if (RemoteWaitHandles.TryRemove(cancellation_id, out wait_handle)) {
wait_handle.TokenSource.Cancel();
}
WaitOperations.RemoteCancel(cancellation_id);
break;

case RpcCallMessageType.MethodCallNoReturn:
Expand Down Expand Up @@ -124,29 +124,19 @@ private void ProcessRpcCall(MqMessage message, RpcCallMessageType message_type)
// Determine if the last parameter is a cancellation token.
var last_param = method_info.GetParameters().LastOrDefault();

//var cancellation_source = new CancellationTokenSource();

// Number used to increase the number of parameters if there is a cancellation token.
int cancellation_token_param = 0;
ResponseWaitHandle cancellation_wait;
ResponseWaitHandle cancellation_wait = null;

// If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation.
if (rec_message_return_id != 0 && last_param?.ParameterType == typeof(CancellationToken)) {
cancellation_wait = new ResponseWaitHandle {
Token = cancellation_source.Token,
TokenSource = cancellation_source,
Id = rec_message_return_id
};

cancellation_wait = WaitOperations.CreateRemoteWaitHandle(rec_message_return_id);

// Set the number to 1 to increase the parameter number by one.
cancellation_token_param = 1;

// Add it to the main list of ongoing operations.
RemoteWaitHandles.TryAdd(rec_message_return_id, cancellation_wait);
cancellation_wait.TokenSource = new CancellationTokenSource();
cancellation_wait.Token = cancellation_wait.TokenSource.Token;
}

// Setup the parameters to pass to the invoked method.
object[] parameters = new object[rec_argument_count + cancellation_token_param];
object[] parameters = new object[rec_argument_count + (cancellation_wait == null ? 0 : 1)];

// Determine if we have any parameters to pass to the invoked method.
if (rec_argument_count > 0) {
Expand All @@ -161,8 +151,8 @@ private void ProcessRpcCall(MqMessage message, RpcCallMessageType message_type)
}

// Add the cancellation token to the parameters.
if (cancellation_token_param > 0) {
parameters[parameters.Length - 1] = cancellation_source.Token;
if (cancellation_wait != null) {
parameters[parameters.Length - 1] = cancellation_wait.Token;
}


Expand All @@ -178,9 +168,7 @@ private void ProcessRpcCall(MqMessage message, RpcCallMessageType message_type)
}
return;
} finally {
// Remove the cancellation wait if it exists.
LocalWaitHandles.TryRemove(rec_message_return_id, out cancellation_wait);

WaitOperations.RemoteComplete(rec_message_return_id);
}


Expand Down Expand Up @@ -236,9 +224,10 @@ private void ProcessRpcReturn(MqMessage message) {
// Read the return Id.
var return_id = serialization.MessageReader.ReadUInt16();

ResponseWaitHandle call_wait_handle;

ResponseWaitHandle call_wait_handle = WaitOperations.LocalGet(return_id);
// Try to get the outstanding wait from the return id. If it does not exist, the has already completed.
if (LocalWaitHandles.TryRemove(return_id, out call_wait_handle)) {
if (call_wait_handle != null) {
call_wait_handle.ReturnMessage = message;

// Release the wait event.
Expand Down
93 changes: 73 additions & 20 deletions src/DtronixMessageQueue/Rpc/ResponseWait.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using DtronixMessageQueue.Rpc.MessageHandlers;

namespace DtronixMessageQueue.Rpc {
class ResponseWait<TSession, TConfig>
public class ResponseWait<TSession, TConfig>
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {
private readonly byte handler_id;
Expand All @@ -29,15 +29,16 @@ class ResponseWait<TSession, TConfig>
/// <summary>
/// Contains all outstanding call returns pending a return of data from the recipient connection.
/// </summary>
public readonly ConcurrentDictionary<ushort, ResponseWaitHandle> RemoteWaitHandles =
private readonly ConcurrentDictionary<ushort, ResponseWaitHandle> remote_wait_handles =
new ConcurrentDictionary<ushort, ResponseWaitHandle>();

/// <summary>
/// Contains all operations running on this session which are cancellable.
/// </summary>
public readonly ConcurrentDictionary<ushort, ResponseWaitHandle> LocalWaitHandles =
private readonly ConcurrentDictionary<ushort, ResponseWaitHandle> local_wait_handles =
new ConcurrentDictionary<ushort, ResponseWaitHandle>();


public ResponseWait(byte handler_id, TSession session) {
this.handler_id = handler_id;
this.session = session;
Expand All @@ -62,49 +63,101 @@ public ResponseWaitHandle CreateLocalWaitHandle() {
}

// Add the wait to the outstanding wait dictionary for retrieval later.
if (LocalWaitHandles.TryAdd(return_wait.Id, return_wait) == false) {
throw new InvalidOperationException($"Id {return_wait.Id} already exists in the return_wait_handles dictionary.");
if (local_wait_handles.TryAdd(return_wait.Id, return_wait) == false) {
throw new InvalidOperationException($"Id {return_wait.Id} already exists in the local handles dictionary.");
}

return return_wait;
}

public ResponseWaitHandle Create RemoteWaitHandle
public ResponseWaitHandle CreateRemoteWaitHandle(ushort id) {
var return_wait = new ResponseWaitHandle {
Id = id
};

// Add the wait to the outstanding wait dictionary for retrieval later.
if (local_wait_handles.TryAdd(return_wait.Id, return_wait) == false) {
throw new InvalidOperationException($"Id {return_wait.Id} already exists in the remote handles dictionary.");
}

return return_wait;
}


/// <summary>
/// Called to cancel a remote waiting operation on the recipient connection.
/// Called to cancel a localally called operation on this and the recipient connection.
/// </summary>
/// <param name="id">Id of the waiting operation to cancel.</param>
public void Cancel(ushort id) {
/// <param name="remote_cancel">True if this wait should notify the recipient.</param>
public void LocalCancel(ushort id, bool remote_cancel) {
ResponseWaitHandle call_wait_handle;

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
if (LocalWaitHandles.TryRemove(id, out call_wait_handle)) {
call_wait_handle.Cancel();

var frame = new MqFrame(new byte[4], MqFrameType.Last, session.Config);
frame.Write(0, handler_id);
frame.Write(1, (byte)RpcCallMessageType.MethodCancel);
frame.Write(2, id);
if (!local_wait_handles.TryRemove(id, out call_wait_handle)) {
return;
}

session.Send(frame);
if (!remote_cancel) {
return;
}

var frame = new MqFrame(new byte[4], MqFrameType.Last, session.Config);
frame.Write(0, handler_id);
frame.Write(1, (byte) RpcCallMessageType.MethodCancel);
frame.Write(2, id);

session.Send(frame);
}

/// <summary>
/// Called to complete a localally called operation on this and the recipient connection.
/// </summary>
/// <param name="id">Id of the waiting operation to complete.</param>
public void LocalCompete(ushort id) {
ResponseWaitHandle call_wait_handle;

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
local_wait_handles.TryRemove(id, out call_wait_handle);
}

/// <summary>
/// Called to cancel a remote waiting operation on the recipient connection.
/// Gets the local response wait handle
/// </summary>
/// <param name="id">Id of the waiting operation to get.</param>
public ResponseWaitHandle LocalGet(ushort id) {
ResponseWaitHandle call_wait_handle;

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
local_wait_handles.TryGetValue(id, out call_wait_handle);

return call_wait_handle;
}



/// <summary>
/// Called to cancel a remotely called operation on this session.
/// </summary>
/// <param name="id">Id of the waiting operation to cancel.</param>
public void RequestedCancel(ushort id) {
public void RemoteCancel(ushort id) {
ResponseWaitHandle call_wait_handle;

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
if (RemoteWaitHandles.TryRemove(id, out call_wait_handle)) {
call_wait_handle.Cancel();
if (remote_wait_handles.TryRemove(id, out call_wait_handle)) {
call_wait_handle.TokenSource?.Cancel();
}
}

/// <summary>
/// Called to complete a remotely called operation on this session.
/// </summary>
/// <param name="id">Id of the waiting operation to cancel.</param>
public void RemoteComplete(ushort id) {
ResponseWaitHandle call_wait_handle;

// Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed.
remote_wait_handles.TryRemove(id, out call_wait_handle);
}

}
}
15 changes: 2 additions & 13 deletions src/DtronixMessageQueue/Rpc/ResponseWaitHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,17 @@ public class ResponseWaitHandle {
/// <summary>
/// Cancellation token for the request.
/// </summary>
public CancellationToken Token { get; }
public CancellationToken Token { get; set; }

/// <summary>
/// Cancellation token source for the request.
/// </summary>
private CancellationTokenSource token_source;
public CancellationTokenSource TokenSource { get; set; }

/// <summary>
/// Contains the time that this call wait was created to check for timeouts.
/// </summary>
public DateTime Created { get; } = DateTime.UtcNow;

public ResponseWaitHandle() : this(new CancellationTokenSource()) {
}

public ResponseWaitHandle(CancellationTokenSource cancellation_token_source) {
token_source = cancellation_token_source;
Token = cancellation_token_source.Token;
}

public void Cancel() {
token_source.Cancel();
}
}
}
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/Rpc/RpcProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public override IMessage Invoke(IMessage msg) {
serializer.MessageWriter.Write((byte) RpcCallMessageType.MethodCall);

// Create a wait operation to wait for the response.
return_wait = call_message_handler.CreateWaitHandle();
return_wait = call_message_handler.WaitOperations.CreateLocalWaitHandle();

// Byte[1,2] Wait Id which is used for returning the value and cancellation.
serializer.MessageWriter.Write(return_wait.Id);
Expand Down Expand Up @@ -130,7 +130,7 @@ public override IMessage Invoke(IMessage msg) {
} catch (OperationCanceledException) {

// If the operation was canceled, cancel the wait on this end and notify the other end.
call_message_handler.CancelWaitHandle(return_wait.Id);
call_message_handler.WaitOperations.LocalCancel(return_wait.Id, true);
throw new OperationCanceledException("Wait handle was canceled while waiting for a response.");
}

Expand Down

0 comments on commit 65f9313

Please sign in to comment.