From 8772920b4cb6f980bd0a6028c9f7c4f2dd879eb4 Mon Sep 17 00:00:00 2001 From: DJGosnell Date: Thu, 21 Jun 2018 10:44:05 -0400 Subject: [PATCH] Renamed tests to more reflect their actual contents. BREAKING CHANGE: Removed logic for RPC method cancellation. --- .../DtronixMessageQueue.Tests.csproj | 4 +- .../Rpc/{RpcClientTests.cs => RpcTests.cs} | 97 +++++++++---------- ...{CalculatorService.cs => SampleService.cs} | 24 ++--- .../MessageHandlers/RpcCallMessageAction.cs | 5 - .../MessageHandlers/RpcCallMessageHandler.cs | 37 +------ src/DtronixMessageQueue/Rpc/ResponseWait.cs | 20 +--- .../Rpc/ResponseWaitHandle.cs | 10 -- src/DtronixMessageQueue/Rpc/RpcConfig.cs | 10 +- src/DtronixMessageQueue/Rpc/RpcProxy.cs | 48 +++------ 9 files changed, 75 insertions(+), 180 deletions(-) rename src/DtronixMessageQueue.Tests/Rpc/{RpcClientTests.cs => RpcTests.cs} (80%) rename src/DtronixMessageQueue.Tests/Rpc/Services/Server/{CalculatorService.cs => SampleService.cs} (57%) diff --git a/src/DtronixMessageQueue.Tests/DtronixMessageQueue.Tests.csproj b/src/DtronixMessageQueue.Tests/DtronixMessageQueue.Tests.csproj index dcf53e8..51d0219 100644 --- a/src/DtronixMessageQueue.Tests/DtronixMessageQueue.Tests.csproj +++ b/src/DtronixMessageQueue.Tests/DtronixMessageQueue.Tests.csproj @@ -67,9 +67,9 @@ - + - + diff --git a/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs b/src/DtronixMessageQueue.Tests/Rpc/RpcTests.cs similarity index 80% rename from src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs rename to src/DtronixMessageQueue.Tests/Rpc/RpcTests.cs index af37f40..f65d0cd 100644 --- a/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs +++ b/src/DtronixMessageQueue.Tests/Rpc/RpcTests.cs @@ -7,7 +7,7 @@ namespace DtronixMessageQueue.Tests.Rpc { - public class RpcClientTests : RpcTestsBase + public class RpcTests : RpcTestsBase { public class Test @@ -19,17 +19,17 @@ public class Test [Test] public void Client_calls_proxy_method() { - Server.SessionSetup += (sender, args) => { args.Session.AddService(new CalculatorService()); }; + Server.SessionSetup += (sender, args) => { args.Session.AddService(new SampleService()); }; Client.SessionSetup += (sender, args) => { - args.Session.AddProxy("CalculatorService"); + args.Session.AddProxy("SampleService"); }; Client.Ready += (sender, args) => { - var service = Client.Session.GetProxy(); + var service = Client.Session.GetProxy(); var result = service.Add(100, 200); if (result != 300) @@ -46,13 +46,13 @@ public void Client_calls_proxy_method() [Test] public void Client_calls_proxy_method_sequential() { - Server.SessionSetup += (sender, args) => { args.Session.AddService(new CalculatorService()); }; + Server.SessionSetup += (sender, args) => { args.Session.AddService(new SampleService()); }; Client.Ready += (sender, args) => { - args.Session.AddProxy("CalculatorService"); - var service = Client.Session.GetProxy(); + args.Session.AddProxy("SampleService"); + var service = Client.Session.GetProxy(); Stopwatch stopwatch = Stopwatch.StartNew(); int addedInt = 0; @@ -79,45 +79,6 @@ public void Client_calls_proxy_method_sequential() StartAndWait(); } - [Test] - public void Client_calls_proxy_method_and_canceles() - { - Server.SessionSetup += (sender, args) => - { - var service = new CalculatorService(); - args.Session.AddService(service); - - service.LongRunningTaskCanceled += (o, eventArgs) => { TestComplete.Set(); }; - }; - - - Client.Ready += (sender, args) => - { - args.Session.AddProxy("CalculatorService"); - var service = Client.Session.GetProxy(); - var tokenSource = new CancellationTokenSource(); - - - bool threw = false; - try - { - tokenSource.CancelAfter(500); - service.LongRunningTask(1, 2, tokenSource.Token); - } - catch (OperationCanceledException) - { - threw = true; - } - - if (threw != true) - { - LastException = new Exception("Operation did not cancel."); - } - }; - - StartAndWait(); - } - [Test] public void Server_requests_authentication() { @@ -134,15 +95,15 @@ public void Server_does_not_request_authentication() Server.Config.RequireAuthentication = false; Server.SessionSetup += - (sender, args) => { args.Session.AddService(new CalculatorService()); }; + (sender, args) => { args.Session.AddService(new SampleService()); }; Client.Authenticate += (sender, e) => { }; Client.Ready += (sender, e) => { - e.Session.AddProxy("CalculatorService"); - var service = Client.Session.GetProxy(); + e.Session.AddProxy("SampleService"); + var service = Client.Session.GetProxy(); var result = service.Add(100, 200); @@ -166,7 +127,7 @@ public void Server_verifies_authentication() Server.Config.RequireAuthentication = true; Server.SessionSetup += - (sender, args) => { args.Session.AddService(new CalculatorService()); }; + (sender, args) => { args.Session.AddService(new SampleService()); }; Server.Authenticate += (sender, e) => { @@ -196,7 +157,7 @@ public void Server_disconnectes_from_failed_authentication() Server.Config.RequireAuthentication = true; Server.SessionSetup += - (sender, args) => { args.Session.AddService(new CalculatorService()); }; + (sender, args) => { args.Session.AddService(new SampleService()); }; Server.Authenticate += (sender, e) => { e.Authenticated = false; }; @@ -223,7 +184,7 @@ public void Client_disconnectes_from_failed_authentication() Server.Config.RequireAuthentication = true; Server.SessionSetup += - (sender, args) => { args.Session.AddService(new CalculatorService()); }; + (sender, args) => { args.Session.AddService(new SampleService()); }; Server.Authenticate += (sender, e) => { e.Authenticated = false; }; @@ -248,7 +209,7 @@ public void Client_notified_of_authentication_success() ServerConfig.RequireAuthentication = true; Server.SessionSetup += - (sender, args) => { args.Session.AddService(new CalculatorService()); }; + (sender, args) => { args.Session.AddService(new SampleService()); }; Server.Authenticate += (sender, e) => { e.Authenticated = true; }; @@ -316,5 +277,35 @@ public void Client_connects_disconnects_and_reconnects() StartAndWait(); } + + [Test] + public void Client_throws_on_invalid_argument_passed() + { + Client.Ready += (sender, args) => + { + args.Session.AddProxy("SampleService"); + var service = Client.Session.GetProxy(); + var tokenSource = new CancellationTokenSource(); + + + bool threw = false; + try + { + service.InvalidArguments(1, 2, tokenSource.Token); + } + catch (ArgumentException) + { + TestComplete.Set(); + threw = true; + } + + if (threw != true) + { + LastException = new Exception("Operation did not throw ArgumentException."); + } + }; + + StartAndWait(); + } } } \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests/Rpc/Services/Server/CalculatorService.cs b/src/DtronixMessageQueue.Tests/Rpc/Services/Server/SampleService.cs similarity index 57% rename from src/DtronixMessageQueue.Tests/Rpc/Services/Server/CalculatorService.cs rename to src/DtronixMessageQueue.Tests/Rpc/Services/Server/SampleService.cs index 141f65f..4957c94 100644 --- a/src/DtronixMessageQueue.Tests/Rpc/Services/Server/CalculatorService.cs +++ b/src/DtronixMessageQueue.Tests/Rpc/Services/Server/SampleService.cs @@ -1,12 +1,13 @@ using System; using System.Threading; +using System.Threading.Tasks; using DtronixMessageQueue.Rpc; namespace DtronixMessageQueue.Tests.Rpc.Services.Server { - public class CalculatorService : MarshalByRefObject, ICalculatorService + public class SampleService : MarshalByRefObject, ISampleService { - public string Name { get; } = "CalculatorService"; + public string Name { get; } = "SampleService"; public SimpleRpcSession Session { get; set; } public event EventHandler LongRunningTaskCanceled; @@ -33,30 +34,17 @@ public int Divide(int number1, int number2) return number1 / number2; } - public int LongRunningTask(int number1, int number2, CancellationToken token) + public void InvalidArguments(int number1, int number2, CancellationToken token) { - ManualResetEventSlim mre = new ManualResetEventSlim(); - - try - { - mre.Wait(token); - } - catch (Exception) - { - LongRunningTaskCanceled?.Invoke(this, EventArgs.Empty); - throw; - } - - return number1 / number2; } } - public interface ICalculatorService : IRemoteService + public interface ISampleService : IRemoteService { int Add(int number1, int number2); int Subtract(int number1, int number2); int Multiply(int number1, int number2); int Divide(int number1, int number2); - int LongRunningTask(int number1, int number2, CancellationToken token); + void InvalidArguments(int number1, int number2, CancellationToken token); } } \ No newline at end of file diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageAction.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageAction.cs index 74abac1..ca5c01e 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageAction.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageAction.cs @@ -30,10 +30,5 @@ public enum RpcCallMessageAction : byte /// Message is a Rpc response. Message contains information about the exception thrown. /// MethodException = 4, - - /// - /// Message used to cancel a pending operation. - /// - MethodCancel = 5, } } \ No newline at end of file diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs index 18ac634..2f7b4f5 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs @@ -43,8 +43,6 @@ public class RpcCallMessageHandler : MessageHandler(T instance) where T : IRemoteService - /// Cancels the specified action. - /// - /// byte associated with the RpcCallMessageAction enum.> - /// Message containing the cancellation information. - public void MethodCancelAction(byte actionId, MqMessage message) - { - var cancellationId = message[0].ReadUInt16(0); - _remoteWaitOperations.Cancel(cancellationId); - } - /// /// Processes the incoming Rpc call from the recipient connection. @@ -104,19 +91,8 @@ private void ProcessRpcCallAction(byte actionId, MqMessage message) if (serviceMethod == null || !_serviceInstances.ContainsKey(recServiceName)) throw new Exception($"Service '{recServiceName}' does not exist."); - ResponseWaitHandle cancellationWait = null; - - // If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation. - if (recMessageReturnId != 0 && serviceMethod.HasCancellation) - { - cancellationWait = _remoteWaitOperations.CreateWaitHandle(recMessageReturnId); - - cancellationWait.TokenSource = new CancellationTokenSource(); - cancellationWait.Token = cancellationWait.TokenSource.Token; - } - // Setup the parameters to pass to the invoked method. - var parameters = new object[recArgumentCount + (cancellationWait == null ? 0 : 1)]; + var parameters = new object[recArgumentCount]; // Determine if we have any parameters to pass to the invoked method. if (recArgumentCount > 0) @@ -128,11 +104,6 @@ private void ProcessRpcCallAction(byte actionId, MqMessage message) parameters[i] = serialization.DeserializeFromReader(serviceMethod.ParameterTypes[i], i); } - // Add the cancellation token to the parameters. - if (cancellationWait != null) - parameters[parameters.Length - 1] = cancellationWait.Token; - - object returnValue; try { @@ -143,11 +114,9 @@ private void ProcessRpcCallAction(byte actionId, MqMessage message) { // Determine if this method was waited on. If it was and an exception was thrown, // Let the recipient session know an exception was thrown. - if (recMessageReturnId != 0 && - ex.InnerException?.GetType() != typeof(OperationCanceledException)) - { + if (recMessageReturnId != 0) SendRpcException(serialization, ex, recMessageReturnId); - } + return; } finally diff --git a/src/DtronixMessageQueue/Rpc/ResponseWait.cs b/src/DtronixMessageQueue/Rpc/ResponseWait.cs index a5ad17f..e93dc0a 100644 --- a/src/DtronixMessageQueue/Rpc/ResponseWait.cs +++ b/src/DtronixMessageQueue/Rpc/ResponseWait.cs @@ -37,9 +37,8 @@ public T CreateWaitHandle(ushort? handleId) lock (_idLock) { if (++_id > ushort.MaxValue) - { _id = 0; - } + returnWait.Id = (ushort) _id; } } @@ -54,23 +53,6 @@ public T CreateWaitHandle(ushort? handleId) return returnWait; } - /// - /// Called to cancel a operation on this and the recipient connection if specified. - /// - /// Id of the waiting operation to cancel. - public void Cancel(ushort handleId) - { - T callWaitHandle; - - // Try to get the wait. If the Id does not exist, the wait operation has already been completed or removed. - if (!TryRemove(handleId, out callWaitHandle)) - { - return; - } - - callWaitHandle.TokenSource?.Cancel(); - } - /// /// Called to complete operation on this connection. /// diff --git a/src/DtronixMessageQueue/Rpc/ResponseWaitHandle.cs b/src/DtronixMessageQueue/Rpc/ResponseWaitHandle.cs index 055ca05..4227174 100644 --- a/src/DtronixMessageQueue/Rpc/ResponseWaitHandle.cs +++ b/src/DtronixMessageQueue/Rpc/ResponseWaitHandle.cs @@ -28,16 +28,6 @@ public class ResponseWaitHandle /// public byte MessageActionId { get; set; } - /// - /// Cancellation token for the request. - /// - public CancellationToken Token { get; set; } - - /// - /// Cancellation token source for the request. - /// - public CancellationTokenSource TokenSource { get; set; } - /// /// Contains the time that this call wait was created to check for timeouts. /// diff --git a/src/DtronixMessageQueue/Rpc/RpcConfig.cs b/src/DtronixMessageQueue/Rpc/RpcConfig.cs index 8009ba6..96bd914 100644 --- a/src/DtronixMessageQueue/Rpc/RpcConfig.cs +++ b/src/DtronixMessageQueue/Rpc/RpcConfig.cs @@ -9,13 +9,14 @@ public class RpcConfig : MqConfig { /// /// Number of threads used for executing RPC calls. + /// -1 sets number of threads to number of logical processors. /// - public int MaxExecutionThreads { get; set; } = 5; + public int MaxExecutionThreads { get; set; } = -1; /// /// Number of threads each session is allowed to use at a time from the main thread pool. /// - public int MaxSessionConcurrency { get; set; } = 5; + public int MaxSessionConcurrency { get; } = 1; /// /// Set to true if the client needs to pass authentication data to the server to connect. @@ -26,5 +27,10 @@ public class RpcConfig : MqConfig /// Set to the maximum stream of bytes that is allowed to be sent to the session. /// public long MaxByteTransportLength { get; set; } = Int64.MaxValue; + + /// + /// Number of seconds before a RPC call with a return value executes before timing out. + /// + public int RpcExecutionTimeout { get; set; } = 10000; } } \ No newline at end of file diff --git a/src/DtronixMessageQueue/Rpc/RpcProxy.cs b/src/DtronixMessageQueue/Rpc/RpcProxy.cs index ac41edb..65e947a 100644 --- a/src/DtronixMessageQueue/Rpc/RpcProxy.cs +++ b/src/DtronixMessageQueue/Rpc/RpcProxy.cs @@ -69,26 +69,6 @@ public override IMessage Invoke(IMessage msg) // Get the called method's arguments. object[] arguments = methodCall.Args; - CancellationToken cancellationToken = CancellationToken.None; - - // Check to see if the last argument of the method is a CancellationToken. - if (methodCall.ArgCount > 0) - { - var lastArgument = methodCall.Args.Last(); - - if (lastArgument is CancellationToken) - { - cancellationToken = (CancellationToken) lastArgument; - - // Remove the last argument from being serialized. - if (methodCall.ArgCount > 1) - { - arguments = methodCall.Args.Take(methodCall.ArgCount - 1).ToArray(); - } - } - } - - ResponseWaitHandle returnWait = null; RpcCallMessageAction callType; @@ -108,7 +88,6 @@ public override IMessage Invoke(IMessage msg) // Byte[0,1] Wait Id which is used for returning the value and cancellation. serializer.MessageWriter.Write(returnWait.Id); - returnWait.Token = cancellationToken; } // Write the name of this service class. @@ -123,7 +102,16 @@ public override IMessage Invoke(IMessage msg) // Serialize all arguments to the message. for (var i = 0; i < arguments.Length; i++) { - serializer.SerializeToWriter(arguments[i], i); + try + { + serializer.SerializeToWriter(arguments[i], i); + } + catch (Exception e) + { + return + new ReturnMessage(new ArgumentException($"Argument {i} can not be converted to a protobuf object.", e), methodCall); + } + } // Send the message over the session. @@ -138,27 +126,13 @@ public override IMessage Invoke(IMessage msg) // Wait for the completion of the remote call. try { - returnWait.ReturnResetEvent.Wait(returnWait.Token); + returnWait.ReturnResetEvent.Wait(_session.Config.RpcExecutionTimeout); } catch (OperationCanceledException) - { - // If the operation was canceled, cancel the wait on this end and notify the other end. - _callMessageHandler.ProxyWaitOperations.Cancel(returnWait.Id); - - var frame = new MqFrame(new byte[2], MqFrameType.Last, _session.Config); - frame.Write(0, returnWait.Id); - - _callMessageHandler.SendHandlerMessage((byte) RpcCallMessageAction.MethodCancel, new MqMessage(frame)); - throw new OperationCanceledException("Wait handle was canceled while waiting for a response."); - } - - // If the wait times out, alert the callee. - if (returnWait.ReturnResetEvent.IsSet == false) { throw new TimeoutException("Wait handle timed out waiting for a response."); } - try { // Start parsing the received message.