From 2af8c770a39bda9381a8300af4b2fc442c82db3b Mon Sep 17 00:00:00 2001 From: DJGosnell Date: Sun, 11 Sep 2016 22:30:54 -0400 Subject: [PATCH] Added documentation. Started sub-threadding work on RPC calls. --- .../RpcClientTests.cs | 25 +++++ .../Services/Server/CalculatorService.cs | 15 +++ .../DtronixMessageQueue.Rpc.csproj | 2 +- DtronixMessageQueue.Rpc/RpcProxy.cs | 6 +- DtronixMessageQueue.Rpc/RpcReturnCallWait.cs | 20 +++- DtronixMessageQueue.Rpc/RpcSession.cs | 100 +++++++++++------- DtronixMessageQueue.Rpc/SerializationStore.cs | 50 +++++++++ .../RpcPerformanceTest.cs | 2 +- 8 files changed, 176 insertions(+), 44 deletions(-) create mode 100644 DtronixMessageQueue.Rpc/SerializationStore.cs diff --git a/DtronixMessageQueue.Rpc.Tests/RpcClientTests.cs b/DtronixMessageQueue.Rpc.Tests/RpcClientTests.cs index a023f95..3b50f81 100644 --- a/DtronixMessageQueue.Rpc.Tests/RpcClientTests.cs +++ b/DtronixMessageQueue.Rpc.Tests/RpcClientTests.cs @@ -68,5 +68,30 @@ public void Client_calls_proxy_method_sequential() { StartAndWait(); } + + [Fact] + public void Client_calls_proxy_method_and_canceles() { + + Server.Connected += (sender, args) => { + args.Session.AddService(new CalculatorService()); + }; + + + Client.Connected += (sender, args) => { + args.Session.AddProxy(new CalculatorService()); + var service = Client.Session.GetProxy(); + Stopwatch stopwatch = Stopwatch.StartNew(); + + int added_int = 0; + for (int i = 0; i < 10; i++) { + added_int = service.Add(added_int, 1); + } + + Output.WriteLine($"{stopwatch.ElapsedMilliseconds}"); + TestStatus.Set(); + }; + + StartAndWait(); + } } } diff --git a/DtronixMessageQueue.Rpc.Tests/Services/Server/CalculatorService.cs b/DtronixMessageQueue.Rpc.Tests/Services/Server/CalculatorService.cs index e9f3a71..ce893f2 100644 --- a/DtronixMessageQueue.Rpc.Tests/Services/Server/CalculatorService.cs +++ b/DtronixMessageQueue.Rpc.Tests/Services/Server/CalculatorService.cs @@ -1,10 +1,13 @@ using System; +using System.Threading; namespace DtronixMessageQueue.Rpc.Tests.Services.Server { public class CalculatorService : MarshalByRefObject, ICalculatorService { public string Name { get; } = "CalculatorService"; public SimpleRpcSession Session { get; set; } + public event EventHandler LongRunningTaskCanceled; + public int Add(int number_1, int number_2) { return number_1 + number_2; } @@ -20,5 +23,17 @@ public int Multiply(int number_1, int number_2) { public int Divide(int number_1, int number_2) { return number_1/number_2; } + + public int LongRunningTask(int number_1, int number_2, CancellationToken token) { + ManualResetEventSlim mre = new ManualResetEventSlim(); + + mre.Wait(token); + + if (mre.IsSet == false) { + LongRunningTaskCanceled?.Invoke(this, EventArgs.Empty); + } + + return number_1 / number_2; + } } } diff --git a/DtronixMessageQueue.Rpc/DtronixMessageQueue.Rpc.csproj b/DtronixMessageQueue.Rpc/DtronixMessageQueue.Rpc.csproj index 02dd169..71b46fd 100644 --- a/DtronixMessageQueue.Rpc/DtronixMessageQueue.Rpc.csproj +++ b/DtronixMessageQueue.Rpc/DtronixMessageQueue.Rpc.csproj @@ -44,7 +44,7 @@ - + diff --git a/DtronixMessageQueue.Rpc/RpcProxy.cs b/DtronixMessageQueue.Rpc/RpcProxy.cs index 9858ee9..5ccc59e 100644 --- a/DtronixMessageQueue.Rpc/RpcProxy.cs +++ b/DtronixMessageQueue.Rpc/RpcProxy.cs @@ -26,7 +26,7 @@ public override IMessage Invoke(IMessage msg) { var method_call = msg as IMethodCallMessage; var method_info = method_call.MethodBase as MethodInfo; - var store = session.ReadWriteStore.Get(); + var store = session.Store.Get(); object[] arguments = method_call.Args; CancellationToken cancellation_token = CancellationToken.None; @@ -54,6 +54,7 @@ public override IMessage Invoke(IMessage msg) { return_wait = session.CreateReturnCallWait(); store.MessageWriter.Write(return_wait.Id); + return_wait.Token = cancellation_token; } store.MessageWriter.Write(decorated.Name); @@ -79,6 +80,7 @@ public override IMessage Invoke(IMessage msg) { return_wait.ReturnResetEvent.Wait(return_wait.Token); + return_wait.Token.ThrowIfCancellationRequested(); try { store.MessageReader.Message = return_wait.ReturnMessage; @@ -106,7 +108,7 @@ public override IMessage Invoke(IMessage msg) { } } finally { - session.ReadWriteStore.Put(store); + session.Store.Put(store); } } } diff --git a/DtronixMessageQueue.Rpc/RpcReturnCallWait.cs b/DtronixMessageQueue.Rpc/RpcReturnCallWait.cs index 39e4eb6..e1c9223 100644 --- a/DtronixMessageQueue.Rpc/RpcReturnCallWait.cs +++ b/DtronixMessageQueue.Rpc/RpcReturnCallWait.cs @@ -1,10 +1,28 @@ -using System.Threading; +using System; +using System.Threading; namespace DtronixMessageQueue.Rpc { public class RpcReturnCallWait { public ushort Id { get; set; } + + /// + /// Reset event used to hold the requesting + /// public ManualResetEventSlim ReturnResetEvent { get; set; } + + /// + /// Message that the other session receives from the connection that is associated with the return value for this request. + /// public MqMessage ReturnMessage { get; set; } + + /// + /// Cancellation token for the request. + /// public CancellationToken Token { get; set; } + + /// + /// Contains the time that this call wait was created to check for timeouts. + /// + public DateTime Created { get; } = DateTime.UtcNow; } } diff --git a/DtronixMessageQueue.Rpc/RpcSession.cs b/DtronixMessageQueue.Rpc/RpcSession.cs index fbb162e..4144006 100644 --- a/DtronixMessageQueue.Rpc/RpcSession.cs +++ b/DtronixMessageQueue.Rpc/RpcSession.cs @@ -5,6 +5,7 @@ using System.Reflection; using System.Runtime.Remoting.Proxies; using System.Threading; +using System.Threading.Tasks; using DtronixMessageQueue.Socket; using ProtoBuf; using ProtoBuf.Meta; @@ -13,19 +14,29 @@ namespace DtronixMessageQueue.Rpc { public class RpcSession : MqSession where TSession : RpcSession, new() { - + /// + /// Current call Id wich gets incremented for each call return request. + /// private int rpc_call_id; - private object rpc_call_id_lock = new object(); + /// + /// Lock to increment and loop return ID. + /// + private readonly object rpc_call_id_lock = new object(); - public BsonReadWriteStore ReadWriteStore { get; private set; } + /// + /// Store which contains instances of all classes for serialization and destabilization of data. + /// + public SerializationStore Store { get; private set; } - private ConcurrentDictionary return_call_wait = - new ConcurrentDictionary(); + /// + /// Contains all outstanding call returns pending a return of data from the other end of the connection. + /// + private readonly ConcurrentDictionary return_call_wait = new ConcurrentDictionary(); - private Dictionary> services = new Dictionary>(); - private Dictionary> remote_services_proxy = new Dictionary>(); - private Dictionary remote_service_realproxy = new Dictionary(); + private readonly Dictionary> services = new Dictionary>(); + private readonly Dictionary> remote_services_proxy = new Dictionary>(); + private readonly Dictionary remote_service_realproxy = new Dictionary(); public RpcServer Server { get; set; } @@ -33,7 +44,7 @@ public class RpcSession : MqSession protected override void OnSetup() { base.OnSetup(); - ReadWriteStore = new BsonReadWriteStore((MqSocketConfig)Config); + Store = new SerializationStore((MqSocketConfig)Config); } @@ -89,7 +100,7 @@ public void AddService(T instance) where T : IRemoteService{ private void ProcessRpcCall(MqMessage message, IncomingMessageEventArgs e, RpcMessageType message_type) { - var store = ReadWriteStore.Get(); + var store = Store.Get(); ushort message_return_id = 0; try { store.MessageReader.Message = message; @@ -128,49 +139,60 @@ private void ProcessRpcCall(MqMessage message, IncomingMessageEventArgs { + object return_value; + try { + return_value = method_info.Invoke(service, parameters); + } catch (Exception ex) { + SendRpcException(store, ex, message_return_id); + return; + } + - var return_value = method_info.Invoke(service, parameters); + switch (message_type) { + case RpcMessageType.RpcCall: + store.Stream.SetLength(0); - switch (message_type) { - case RpcMessageType.RpcCall: - store.Stream.SetLength(0); + store.MessageWriter.Clear(); + store.MessageWriter.Write((byte)RpcMessageType.RpcCallReturn); + store.MessageWriter.Write(message_return_id); - store.MessageWriter.Clear(); - store.MessageWriter.Write((byte)RpcMessageType.RpcCallReturn); - store.MessageWriter.Write(message_return_id); + RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, return_value, return_value.GetType(), PrefixStyle.Base128, 0); - RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, return_value, return_value.GetType(), PrefixStyle.Base128, 0); + store.MessageWriter.Write(store.Stream.ToArray()); - store.MessageWriter.Write(store.Stream.ToArray()); + Send(store.MessageWriter.ToMessage(true)); - Send(store.MessageWriter.ToMessage(true)); + break; + case RpcMessageType.RpcCallNoReturn: + break; + } - break; - case RpcMessageType.RpcCallNoReturn: - break; - default: - throw new ArgumentOutOfRangeException(nameof(message_type), message_type, null); - } + Store.Put(store); + }); } catch (Exception ex) { - store.Stream.SetLength(0); + SendRpcException(store, ex, message_return_id); + Store.Put(store); + } - store.MessageWriter.Clear(); - store.MessageWriter.Write((byte)RpcMessageType.RpcCallException); - store.MessageWriter.Write(message_return_id); + } - var exception = new RpcRemoteExceptionDataContract(ex is TargetInvocationException ? ex.InnerException : ex); + private void SendRpcException(SerializationStore.Store store, Exception ex, ushort message_return_id) { + store.Stream.SetLength(0); - RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, exception, exception.GetType(), PrefixStyle.Base128, 1); + store.MessageWriter.Clear(); + store.MessageWriter.Write((byte)RpcMessageType.RpcCallException); + store.MessageWriter.Write(message_return_id); - store.MessageWriter.Write(store.Stream.ToArray()); + var exception = new RpcRemoteExceptionDataContract(ex is TargetInvocationException ? ex.InnerException : ex); - Send(store.MessageWriter.ToMessage(true)); + RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, exception, exception.GetType(), PrefixStyle.Base128, 1); - } finally { - ReadWriteStore.Put(store); - } + store.MessageWriter.Write(store.Stream.ToArray()); + + Send(store.MessageWriter.ToMessage(true)); } @@ -195,7 +217,7 @@ public RpcReturnCallWait CreateReturnCallWait() { private void ProcessRpcReturn(MqMessage mq_message) { - var store = ReadWriteStore.Get(); + var store = Store.Get(); try { store.MessageReader.Message = mq_message; @@ -212,7 +234,7 @@ private void ProcessRpcReturn(MqMessage mq_message) { call_wait.ReturnResetEvent.Set(); } finally { - ReadWriteStore.Put(store); + Store.Put(store); } } diff --git a/DtronixMessageQueue.Rpc/SerializationStore.cs b/DtronixMessageQueue.Rpc/SerializationStore.cs new file mode 100644 index 0000000..4ecc9ce --- /dev/null +++ b/DtronixMessageQueue.Rpc/SerializationStore.cs @@ -0,0 +1,50 @@ +using System.Collections.Concurrent; +using System.IO; + +namespace DtronixMessageQueue.Rpc { + public class SerializationStore { + private readonly MqSocketConfig config; + + public class Store { + public MqMessageWriter MessageWriter; + public MqMessageReader MessageReader; + public MemoryStream Stream; + } + + private readonly ConcurrentQueue reader_writers = new ConcurrentQueue(); + + public SerializationStore(MqSocketConfig config) { + this.config = config; + } + + public Store Get() { + Store store; + if (reader_writers.TryDequeue(out store) == false) { + + var mq_writer = new MqMessageWriter(config); + var mq_reader = new MqMessageReader(); + + store = new Store { + MessageWriter = mq_writer, + MessageReader = mq_reader, + Stream = new MemoryStream() + + }; + } else { + store.MessageWriter.Clear(); + store.Stream.SetLength(0); + } + + // + + + return store; + } + + public void Put(Store store) { + reader_writers.Enqueue(store); + } + + + } +} diff --git a/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs b/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs index 5a1b418..041f5ca 100644 --- a/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs +++ b/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs @@ -16,7 +16,7 @@ public RpcPerformanceTest(string[] args) { Port = 2828 }; - RpcSingleProcessTest(100000, 4, config, RpcTestType.NoRetrun); + RpcSingleProcessTest(100, 4, config, RpcTestType.NoRetrun); RpcSingleProcessTest(1000, 4, config, RpcTestType.Return);