diff --git a/src/DtronixMessageQueue.Tests.Gui/Tests/MqBaseTestSession.cs b/src/DtronixMessageQueue.Tests.Gui/Tests/MqBaseTestSession.cs index 697b269..5540310 100644 --- a/src/DtronixMessageQueue.Tests.Gui/Tests/MqBaseTestSession.cs +++ b/src/DtronixMessageQueue.Tests.Gui/Tests/MqBaseTestSession.cs @@ -72,9 +72,9 @@ protected byte[] RandomBytes(int len) return val; } - protected override void Send(byte[] buffer, int offset, int count) + protected override void Send(byte[] buffer, int offset, int count, bool pad) { - base.Send(buffer, offset, count); + base.Send(buffer, offset, count, pad); Interlocked.Add(ref TotalSent, count); } diff --git a/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj b/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj index ec96234..179e01e 100644 --- a/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj +++ b/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj @@ -36,8 +36,8 @@ bin\Nuget\ - - ..\packages\protobuf-net.2.3.12\lib\net40\protobuf-net.dll + + ..\packages\protobuf-net.2.3.13\lib\net40\protobuf-net.dll diff --git a/src/DtronixMessageQueue.Tests.Performance/Program.cs b/src/DtronixMessageQueue.Tests.Performance/Program.cs index 1f4ef19..785e5ca 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Program.cs @@ -30,7 +30,7 @@ static void Main(string[] args) Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); Console.WriteLine("MQ Performance tests.\r\n"); - new MqPerformanceTest().StartTest(); + //new MqPerformanceTest().StartTest(); Console.WriteLine("RPC Performance tests.\r\n"); new RpcPerformanceTest(args); diff --git a/src/DtronixMessageQueue.Tests.Performance/packages.config b/src/DtronixMessageQueue.Tests.Performance/packages.config index de0b989..fbe9d65 100644 --- a/src/DtronixMessageQueue.Tests.Performance/packages.config +++ b/src/DtronixMessageQueue.Tests.Performance/packages.config @@ -1,4 +1,4 @@  - + \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs b/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs index 699946e..af37f40 100644 --- a/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs +++ b/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs @@ -123,10 +123,8 @@ public void Server_requests_authentication() { Server.Config.RequireAuthentication = true; - Client.Authenticate += (sender, e) => { TestComplete.Set(); }; - StartAndWait(); } diff --git a/src/DtronixMessageQueue/DtronixMessageQueue.csproj b/src/DtronixMessageQueue/DtronixMessageQueue.csproj index 6f414ed..c7bf00b 100644 --- a/src/DtronixMessageQueue/DtronixMessageQueue.csproj +++ b/src/DtronixMessageQueue/DtronixMessageQueue.csproj @@ -33,8 +33,8 @@ true - - ..\packages\protobuf-net.2.3.12\lib\net40\protobuf-net.dll + + ..\packages\protobuf-net.2.3.13\lib\net40\protobuf-net.dll diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs index e5959e7..18ac634 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs @@ -77,117 +77,113 @@ public void MethodCancelAction(byte actionId, MqMessage message) /// Message containing the Rpc call. private void ProcessRpcCallAction(byte actionId, MqMessage message) { - // Execute the processing on the worker thread. - Task.Run(() => - { - var messageType = (RpcCallMessageAction)actionId; + var messageType = (RpcCallMessageAction) actionId; - // Retrieve a serialization cache to work with. - var serialization = Session.SerializationCache.Get(message); - ushort recMessageReturnId = 0; + // Retrieve a serialization cache to work with. + var serialization = Session.SerializationCache.Get(message); + ushort recMessageReturnId = 0; - try + try + { + // Determine if this call has a return value. + if (messageType == RpcCallMessageAction.MethodCall) { - // Determine if this call has a return value. - if (messageType == RpcCallMessageAction.MethodCall) - { - recMessageReturnId = serialization.MessageReader.ReadUInt16(); - } - - // Read the string service name, method and number of arguments. - var recServiceName = serialization.MessageReader.ReadString(); - var recMethodName = serialization.MessageReader.ReadString(); - var recArgumentCount = serialization.MessageReader.ReadByte(); - - // Get the method info from the cache. - var serviceMethod = Session.ServiceMethodCache.GetMethodInfo(recServiceName, recMethodName); - - - // Verify that the requested service exists. - if (serviceMethod == null || !_serviceInstances.ContainsKey(recServiceName)) - throw new Exception($"Service '{recServiceName}' does not exist."); - - ResponseWaitHandle cancellationWait = null; + recMessageReturnId = serialization.MessageReader.ReadUInt16(); + } - // If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation. - if (recMessageReturnId != 0 && serviceMethod.HasCancellation) - { - cancellationWait = _remoteWaitOperations.CreateWaitHandle(recMessageReturnId); + // Read the string service name, method and number of arguments. + var recServiceName = serialization.MessageReader.ReadString(); + var recMethodName = serialization.MessageReader.ReadString(); + var recArgumentCount = serialization.MessageReader.ReadByte(); - cancellationWait.TokenSource = new CancellationTokenSource(); - cancellationWait.Token = cancellationWait.TokenSource.Token; - } + // Get the method info from the cache. + var serviceMethod = Session.ServiceMethodCache.GetMethodInfo(recServiceName, recMethodName); - // Setup the parameters to pass to the invoked method. - object[] parameters = new object[recArgumentCount + (cancellationWait == null ? 0 : 1)]; - // Determine if we have any parameters to pass to the invoked method. - if (recArgumentCount > 0) - { - serialization.PrepareDeserializeReader(); + // Verify that the requested service exists. + if (serviceMethod == null || !_serviceInstances.ContainsKey(recServiceName)) + throw new Exception($"Service '{recServiceName}' does not exist."); - // Parse each parameter to the parameter list. - for (var i = 0; i < recArgumentCount; i++) - parameters[i] = serialization.DeserializeFromReader(serviceMethod.ParameterTypes[i], i); - } + ResponseWaitHandle cancellationWait = null; - // Add the cancellation token to the parameters. - if (cancellationWait != null) - parameters[parameters.Length - 1] = cancellationWait.Token; + // If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation. + if (recMessageReturnId != 0 && serviceMethod.HasCancellation) + { + cancellationWait = _remoteWaitOperations.CreateWaitHandle(recMessageReturnId); + cancellationWait.TokenSource = new CancellationTokenSource(); + cancellationWait.Token = cancellationWait.TokenSource.Token; + } - object returnValue; - try - { - // Invoke the requested method. - returnValue = serviceMethod.Invoke(_serviceInstances[recServiceName], parameters); - } - catch (Exception ex) - { - // Determine if this method was waited on. If it was and an exception was thrown, - // Let the recipient session know an exception was thrown. - if (recMessageReturnId != 0 && - ex.InnerException?.GetType() != typeof(OperationCanceledException)) - { - SendRpcException(serialization, ex, recMessageReturnId); - } - return; - } - finally - { - _remoteWaitOperations.Remove(recMessageReturnId); - } + // Setup the parameters to pass to the invoked method. + var parameters = new object[recArgumentCount + (cancellationWait == null ? 0 : 1)]; + // Determine if we have any parameters to pass to the invoked method. + if (recArgumentCount > 0) + { + serialization.PrepareDeserializeReader(); - // Determine what to do with the return value. - if (messageType == RpcCallMessageAction.MethodCall) - { - // Reset the stream. - serialization.Stream.SetLength(0); - serialization.MessageWriter.Clear(); + // Parse each parameter to the parameter list. + for (var i = 0; i < recArgumentCount; i++) + parameters[i] = serialization.DeserializeFromReader(serviceMethod.ParameterTypes[i], i); + } - // Write the return id. - serialization.MessageWriter.Write(recMessageReturnId); + // Add the cancellation token to the parameters. + if (cancellationWait != null) + parameters[parameters.Length - 1] = cancellationWait.Token; - // Serialize the return value and add it to the stream. - serialization.SerializeToWriter(returnValue, 0); - // Send the return value message to the recipient. - SendHandlerMessage((byte)RpcCallMessageAction.MethodReturn, - serialization.MessageWriter.ToMessage(true)); - } + object returnValue; + try + { + // Invoke the requested method. + returnValue = serviceMethod.Invoke(_serviceInstances[recServiceName], parameters); } catch (Exception ex) { - // If an exception occurred, notify the recipient connection. - SendRpcException(serialization, ex, recMessageReturnId); + // Determine if this method was waited on. If it was and an exception was thrown, + // Let the recipient session know an exception was thrown. + if (recMessageReturnId != 0 && + ex.InnerException?.GetType() != typeof(OperationCanceledException)) + { + SendRpcException(serialization, ex, recMessageReturnId); + } + return; } finally { - // Return the serialization to the cache to be reused. - Session.SerializationCache.Put(serialization); + _remoteWaitOperations.Remove(recMessageReturnId); } - }); + + + // Determine what to do with the return value. + if (messageType == RpcCallMessageAction.MethodCall) + { + // Reset the stream. + serialization.Stream.SetLength(0); + serialization.MessageWriter.Clear(); + + // Write the return id. + serialization.MessageWriter.Write(recMessageReturnId); + + // Serialize the return value and add it to the stream. + serialization.SerializeToWriter(returnValue, 0); + + // Send the return value message to the recipient. + SendHandlerMessage((byte) RpcCallMessageAction.MethodReturn, + serialization.MessageWriter.ToMessage(true)); + } + } + catch (Exception ex) + { + // If an exception occurred, notify the recipient connection. + SendRpcException(serialization, ex, recMessageReturnId); + } + finally + { + // Return the serialization to the cache to be reused. + Session.SerializationCache.Put(serialization); + } } @@ -198,25 +194,21 @@ private void ProcessRpcCallAction(byte actionId, MqMessage message) /// Message containing the frames for the return value. private void ProcessRpcReturnAction(byte actionId, MqMessage message) { - // Execute the processing on the worker thread. - Task.Run(() => - { - // Read the return Id. - var returnId = message[0].ReadUInt16(0); + // Read the return Id. + var returnId = message[0].ReadUInt16(0); - ResponseWaitHandle callWaitHandle = ProxyWaitOperations.Remove(returnId); - // Try to get the outstanding wait from the return id. If it does not exist, the has already completed. - if (callWaitHandle != null) - { - callWaitHandle.Message = message; + ResponseWaitHandle callWaitHandle = ProxyWaitOperations.Remove(returnId); + // Try to get the outstanding wait from the return id. If it does not exist, the has already completed. + if (callWaitHandle != null) + { + callWaitHandle.Message = message; - callWaitHandle.MessageActionId = actionId; + callWaitHandle.MessageActionId = actionId; - // Release the wait event. - callWaitHandle.ReturnResetEvent.Set(); - } - }); + // Release the wait event. + callWaitHandle.ReturnResetEvent.Set(); + } } /// diff --git a/src/DtronixMessageQueue/Rpc/RpcClient.cs b/src/DtronixMessageQueue/Rpc/RpcClient.cs index 4ce524f..c6bcdad 100644 --- a/src/DtronixMessageQueue/Rpc/RpcClient.cs +++ b/src/DtronixMessageQueue/Rpc/RpcClient.cs @@ -18,6 +18,8 @@ public class RpcClient : MqClient /// public RpcServerInfoDataContract ServerInfo { get; set; } + public ActionProcessor RpcActionProcessor { get; } + /// /// Called to send authentication data to the server. /// @@ -28,12 +30,21 @@ public class RpcClient : MqClient /// public event EventHandler> Ready; + + /// /// Initializes a new instance of a Rpc client. /// /// Configurations for this client to use. public RpcClient(TConfig config) : base(config) { + RpcActionProcessor = new ActionProcessor(new ActionProcessor.Config + { + StartThreads = 1, + ThreadName = "RpcProcessor-Client" + }); + + RpcActionProcessor.Start(); } protected override TSession CreateSession(System.Net.Sockets.Socket sessionSocket) diff --git a/src/DtronixMessageQueue/Rpc/RpcServer.cs b/src/DtronixMessageQueue/Rpc/RpcServer.cs index 2995d2c..5e54286 100644 --- a/src/DtronixMessageQueue/Rpc/RpcServer.cs +++ b/src/DtronixMessageQueue/Rpc/RpcServer.cs @@ -18,6 +18,8 @@ public class RpcServer : MqServer /// public RpcServerInfoDataContract ServerInfo { get; } + public ActionProcessor RpcActionProcessor { get; } + /// /// Called to send authentication data to the server. /// @@ -44,6 +46,12 @@ public class RpcServer : MqServer public RpcServer(TConfig config, RpcServerInfoDataContract serverInfo) : base(config) { ServerInfo = serverInfo ?? new RpcServerInfoDataContract(); + RpcActionProcessor = new ActionProcessor(new ActionProcessor.Config + { + ThreadName = "RpcProcessor-Server" + }); + + RpcActionProcessor.Start(); } /// diff --git a/src/DtronixMessageQueue/Rpc/RpcSession.cs b/src/DtronixMessageQueue/Rpc/RpcSession.cs index 8cf624b..36c0bab 100644 --- a/src/DtronixMessageQueue/Rpc/RpcSession.cs +++ b/src/DtronixMessageQueue/Rpc/RpcSession.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -38,6 +39,8 @@ public abstract class RpcSession : MqSession public RpcClient Client { get; private set; } + private ActionProcessor _rpcActionProcessor; + /// /// Verify the authenticity of the newly connected client. /// @@ -61,6 +64,7 @@ public abstract class RpcSession : MqSession ProcessQueue = new ConcurrentQueue(); protected RpcSession() { @@ -76,9 +80,17 @@ protected override void OnSetup() // Determine if this session is running on the server or client to retrieve the worker thread pool. if (SocketHandler.Mode == TcpSocketMode.Server) + { Server = (RpcServer) SocketHandler; + _rpcActionProcessor = Server.RpcActionProcessor; + } else + { Client = (RpcClient) SocketHandler; + _rpcActionProcessor = Client.RpcActionProcessor; + } + + _rpcActionProcessor.Register(Id, ProcessRpc); SerializationCache = new SerializationCache(Config); @@ -86,6 +98,13 @@ protected override void OnSetup() MessageHandlers.Add(RpcCallHandler.Id, RpcCallHandler); } + protected override void OnDisconnected(CloseReason reason) + { + _rpcActionProcessor.Deregister(Id); + + base.OnDisconnected(reason); + } + /// /// Processes an incoming command frame from the connection. @@ -321,13 +340,19 @@ protected override void OnConnected() /// Event args for the message. protected override void OnIncomingMessage(object sender, IncomingMessageEventArgs e) { - MqMessage message; + var totalMessages = e.Messages.Count; + for (int i = 0; i < totalMessages; i++) + ProcessQueue.Enqueue(e.Messages.Dequeue()); + + _rpcActionProcessor.QueueOnce(Id); + } + + private void ProcessRpc() + { // Continue to parse the messages in this queue. - while (e.Messages.Count > 0) + while (ProcessQueue.TryDequeue(out var message)) { - message = e.Messages.Dequeue(); - // Read the first byte for the ID. var handlerId = message[0].ReadByte(0); var handledMessage = false; @@ -345,7 +370,7 @@ protected override void OnIncomingMessage(object sender, IncomingMessageEventArg Close(CloseReason.ApplicationError); return; } - + } // If the we can not handle this message, disconnect the session. diff --git a/src/DtronixMessageQueue/packages.config b/src/DtronixMessageQueue/packages.config index de0b989..fbe9d65 100644 --- a/src/DtronixMessageQueue/packages.config +++ b/src/DtronixMessageQueue/packages.config @@ -1,4 +1,4 @@  - + \ No newline at end of file