Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Started work on cancellation notification.
Browse files Browse the repository at this point in the history
  • Loading branch information
DJGosnell committed Sep 14, 2016
1 parent 21253bf commit 11b5ec1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
3 changes: 2 additions & 1 deletion DtronixMessageQueue/Rpc/RpcMessageType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public enum RpcMessageType : byte {
RpcCall = 2,
RpcCallNoReturn = 3,
RpcCallReturn = 4,
RpcCallException = 5
RpcCallException = 5,
RpcCallCalcellation = 6
}
}
15 changes: 12 additions & 3 deletions DtronixMessageQueue/Rpc/RpcProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,22 @@ public override IMessage Invoke(IMessage msg) {
return new ReturnMessage(null, null, 0, method_call.LogicalCallContext, method_call);
}

return_wait.ReturnResetEvent.Wait(TimeSpan.FromSeconds(100), return_wait.Token);
return_wait.ReturnResetEvent.Wait(session.Config.SendTimeout, return_wait.Token);

if (return_wait.ReturnResetEvent.IsSet == false) {
string test = "wait;";
throw new TimeoutException("Wait handle timed out waiting for a response.");
}

if (return_wait.Token.IsCancellationRequested) {
store.MessageWriter.Clear();
store.MessageWriter.Write((byte)RpcMessageType.RpcCallCancellation);
store.MessageWriter.Write(return_wait.Id);

session.Send(store.MessageWriter.ToMessage());

throw new OperationCanceledException("Wait handle was canceled while waiting for a response.");
}

return_wait.Token.ThrowIfCancellationRequested();

try {
store.MessageReader.Message = return_wait.ReturnMessage;
Expand Down
28 changes: 26 additions & 2 deletions DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class RpcSession<TSession> : MqSession<TSession>
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcReturnCallWait> return_call_wait = new ConcurrentDictionary<ushort, RpcReturnCallWait>();

/// <summary>
/// Contains all operations running on this session which are cancellable.
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcReturnCallWait> ongoing_operations = new ConcurrentDictionary<ushort, RpcReturnCallWait>();

private readonly Dictionary<string, IRemoteService<TSession>> services = new Dictionary<string, IRemoteService<TSession>>();
private readonly Dictionary<Type, IRemoteService<TSession>> remote_services_proxy = new Dictionary<Type, IRemoteService<TSession>>();
private readonly Dictionary<Type, RealProxy> remote_service_realproxy = new Dictionary<Type, RealProxy>();
Expand Down Expand Up @@ -74,9 +79,12 @@ public override void OnIncomingMessage(object sender, IncomingMessageEventArgs<T
ProcessRpcCommand(message, e);
break;

case RpcMessageType.RpcCallCancellation:
break;

case RpcMessageType.RpcCallNoReturn:
case RpcMessageType.RpcCall:
ProcessRpcCall(message, e, message_type);
ProcessRpcCall(message, message_type);
break;

case RpcMessageType.RpcCallException:
Expand Down Expand Up @@ -110,7 +118,7 @@ public void AddService<T>(T instance) where T : IRemoteService<TSession>{



private void ProcessRpcCall(MqMessage message, IncomingMessageEventArgs<TSession> e, RpcMessageType message_type) {
private void ProcessRpcCall(MqMessage message, RpcMessageType message_type) {
worker_thread_pool.QueueWorkItem(() => {
var store = Store.Get();
ushort message_return_id = 0;
Expand Down Expand Up @@ -151,6 +159,22 @@ private void ProcessRpcCall(MqMessage message, IncomingMessageEventArgs<TSession
PrefixStyle.Base128, i);
}
}


var last_param = method_info.GetParameters().LastOrDefault();

var cancellation_token = new CancellationTokenSource();

if (last_param?.ParameterType == typeof(CancellationToken)) {
var return_wait = new RpcReturnCallWait {
Token = new CancellationToken()
};

ongoing_operations.TryAdd(message_return_id, )
}



object return_value;
try {
return_value = method_info.Invoke(service, parameters);
Expand Down

0 comments on commit 11b5ec1

Please sign in to comment.