diff --git a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs index c875a42..e488fe5 100644 --- a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs @@ -47,7 +47,6 @@ public MqPerformanceTest(string[] args) { } else if (mode == "client") { - WriteSysInfo(); Console.WriteLine("| Messages | Msg Bytes | Milliseconds | Msg/sec | MBps |"); Console.WriteLine("|------------|-----------|--------------|------------|----------|"); @@ -57,10 +56,7 @@ public MqPerformanceTest(string[] args) { StartServer(total_messages, total_clients); } else if (mode == "single-process") { - using (var cc = new ConsoleCopy("MessageQueuePerformanceTest.txt")) { - WriteSysInfo(); - MqInProcessTest(); - } + MqInProcessTest(); } } diff --git a/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs b/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs index a719505..e5bae89 100644 --- a/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs +++ b/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs @@ -26,7 +26,7 @@ public static void WriteSysInfo() { long mem_kb; GetPhysicallyInstalledSystemMemory(out mem_kb); - Console.WriteLine(" with " + (mem_kb / 1024 / 1024) + " GB of RAM installed.\r\n"); + Console.WriteLine(" with " + (mem_kb / 1024 / 1024) + " GB of RAM installed."); } diff --git a/src/DtronixMessageQueue.Tests.Performance/Program.cs b/src/DtronixMessageQueue.Tests.Performance/Program.cs index 2837a15..9582597 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Program.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; +using System.Linq; using System.Management; using System.Reflection; using System.Runtime.InteropServices; @@ -15,16 +16,23 @@ class Program { static void Main(string[] args) { var mode = args.Length == 0 ? null : args[0]; - switch (mode) { - case "mq": - Console.WriteLine("Running MQ performance tests.\r\n"); - new MqPerformanceTest(args); - break; - - default: - Console.WriteLine("Running RPC performance tests.\r\n"); - new RpcPerformanceTest(args); - break; + var file_name = string.Join("-", args); + using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{file_name}.txt")) { + PerformanceTestBase.WriteSysInfo(); + + Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); + + switch (mode) { + case "mq": + Console.WriteLine("Running MQ performance tests.\r\n"); + new MqPerformanceTest(args); + break; + + default: + Console.WriteLine("Running RPC performance tests.\r\n"); + new RpcPerformanceTest(args); + break; + } } Console.ReadLine(); diff --git a/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs index 706e5a8..e937925 100644 --- a/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs @@ -16,7 +16,7 @@ public RpcPerformanceTest(string[] args) { Port = 2828 }; - RpcSingleProcessTest(100000, 4, config, RpcTestType.NoRetrun); + RpcSingleProcessTest(200000, 4, config, RpcTestType.NoRetrun); RpcSingleProcessTest(10000, 4, config, RpcTestType.Return); @@ -35,7 +35,7 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest var complete_test = new AutoResetEvent(false); var client = new RpcClient(config); - server.Connected += (sender, args) => { + server.SessionSetup += (sender, args) => { test_service = new TestService(); args.Session.AddService(test_service); diff --git a/src/DtronixMessageQueue.Tests/Mq/MqTestsBase.cs b/src/DtronixMessageQueue.Tests/Mq/MqTestsBase.cs index 11d44e7..d9f3aad 100644 --- a/src/DtronixMessageQueue.Tests/Mq/MqTestsBase.cs +++ b/src/DtronixMessageQueue.Tests/Mq/MqTestsBase.cs @@ -18,7 +18,7 @@ public class MqTestsBase : IDisposable { public Exception LastException { get; set; } - public TimeSpan TestTimeout { get; set; } = new TimeSpan(0, 0, 0, 0, 60000); + public TimeSpan TestTimeout { get; set; } = new TimeSpan(0, 0, 0, 0, 2000); public ManualResetEventSlim TestStatus { get; set; } = new ManualResetEventSlim(false); diff --git a/src/DtronixMessageQueue/DtronixMessageQueue.csproj b/src/DtronixMessageQueue/DtronixMessageQueue.csproj index f9492e2..ea46419 100644 --- a/src/DtronixMessageQueue/DtronixMessageQueue.csproj +++ b/src/DtronixMessageQueue/DtronixMessageQueue.csproj @@ -67,6 +67,7 @@ + diff --git a/src/DtronixMessageQueue/MqMessage.cs b/src/DtronixMessageQueue/MqMessage.cs index 7c0a1e5..962ed37 100644 --- a/src/DtronixMessageQueue/MqMessage.cs +++ b/src/DtronixMessageQueue/MqMessage.cs @@ -12,13 +12,13 @@ public class MqMessage : IList { /// /// Internal frames for this message. /// - internal readonly List Frames = new List(); + private readonly List frames = new List(); /// /// Total number of frames in this message. /// - public int Count => Frames.Count; + public int Count => frames.Count; /// /// Whether or not this is a read only message. Always returns false. @@ -32,15 +32,15 @@ public class MqMessage : IList { /// The zero-based index of the frame to get or set. /// is not a valid index in the message. public MqFrame this[int index] { - get { return Frames[index]; } - set { Frames[index] = value; } + get { return frames[index]; } + set { frames[index] = value; } } /// /// The total size of the raw frames (headers + body) contained in this message. /// - public int Size => Frames.Sum(frame => frame.FrameSize); + public int Size => frames.Sum(frame => frame.FrameSize); public MqMessage() { } @@ -53,7 +53,7 @@ public MqMessage(MqFrame frame) { /// Fixes any mistakes for the frames' FrameType set. Called before frames are processed by the outbox. /// public void PrepareSend() { - var mq_frames = Frames.ToArray(); + var mq_frames = frames.ToArray(); // Set frame's FrameType appropriately. foreach (var frame in mq_frames) { @@ -73,7 +73,7 @@ public void PrepareSend() { /// /// An enumerator for the frames public IEnumerator GetEnumerator() { - return new List(Frames).GetEnumerator(); + return new List(frames).GetEnumerator(); } /// @@ -81,7 +81,7 @@ public IEnumerator GetEnumerator() { /// /// An enumerator for the frames IEnumerator IEnumerable.GetEnumerator() { - return new List(Frames).GetEnumerator(); + return new List(frames).GetEnumerator(); } /// @@ -89,7 +89,7 @@ IEnumerator IEnumerable.GetEnumerator() { /// /// Frame to add public void Add(MqFrame frame) { - Frames.Add(frame); + frames.Add(frame); } /// @@ -98,7 +98,7 @@ public void Add(MqFrame frame) { /// Frames to add public void AddRange(IEnumerable frames) { foreach (var frame in frames) { - Frames.Add(frame); + this.frames.Add(frame); } } @@ -106,7 +106,7 @@ public void AddRange(IEnumerable frames) { /// Removes all frames from the message. /// public void Clear() { - Frames.Clear(); + frames.Clear(); } /// @@ -115,7 +115,7 @@ public void Clear() { /// The frame to locate in the message. /// True if the frame is in the message; otherwise false. public bool Contains(MqFrame frame) { - return Frames.Contains(frame); + return frames.Contains(frame); } /// @@ -127,7 +127,7 @@ public bool Contains(MqFrame frame) { /// is less than 0. /// The number of elements in the source message is greater than the available space from to the end of the destination . public void CopyTo(MqFrame[] array, int array_index) { - Frames.CopyTo(array, array_index); + frames.CopyTo(array, array_index); } /// @@ -136,7 +136,7 @@ public void CopyTo(MqFrame[] array, int array_index) { /// Frame to remove /// public bool Remove(MqFrame frame) { - return Frames.Remove(frame); + return frames.Remove(frame); } /// @@ -145,7 +145,7 @@ public bool Remove(MqFrame frame) { /// The zero-based index of the first occurrence of frame within the message, if found; otherwise, –1. /// The object to locate in the message. public int IndexOf(MqFrame frame) { - return Frames.IndexOf(frame); + return frames.IndexOf(frame); } @@ -156,7 +156,7 @@ public int IndexOf(MqFrame frame) { /// The object to insert. The value can be null for reference types. /// is less than 0.-or- is greater than the total frames. public void Insert(int index, MqFrame frame) { - Frames.Insert(index, frame); + frames.Insert(index, frame); } /// @@ -166,7 +166,7 @@ public void Insert(int index, MqFrame frame) { /// /// is less than 0.-or- is equal to or greater than the total frames. public void RemoveAt(int index) { - Frames.RemoveAt(index); + frames.RemoveAt(index); } /// @@ -174,8 +174,8 @@ public void RemoveAt(int index) { /// /// string representation of this message. public override string ToString() { - var size = Frames.Sum(frame => frame.DataLength); - return $"MqMessage with {Frames.Count} frames totaling {size:N0} bytes."; + var size = frames.Sum(frame => frame.DataLength); + return $"MqMessage with {frames.Count} frames totaling {size:N0} bytes."; } } diff --git a/src/DtronixMessageQueue/MqMessageReader.cs b/src/DtronixMessageQueue/MqMessageReader.cs index 0703df4..bdd19cb 100644 --- a/src/DtronixMessageQueue/MqMessageReader.cs +++ b/src/DtronixMessageQueue/MqMessageReader.cs @@ -89,7 +89,7 @@ public MqMessage Message { /// /// Total length of the bytes in this message. /// - public int Length => message.Frames.Sum(frm => frm.DataLength); + public int Length => message.Sum(frm => frm.DataLength); /// /// Unused. Stream.Null @@ -101,7 +101,7 @@ public MqMessage Message { /// public bool IsAtEnd { get { - var last_frame = message.Frames[message.Frames.Count - 1]; + var last_frame = message[message.Count - 1]; return current_frame == last_frame && last_frame.DataLength == frame_position; } } diff --git a/src/DtronixMessageQueue/MqMessageWriter.cs b/src/DtronixMessageQueue/MqMessageWriter.cs index 48dee36..f944f96 100644 --- a/src/DtronixMessageQueue/MqMessageWriter.cs +++ b/src/DtronixMessageQueue/MqMessageWriter.cs @@ -278,7 +278,7 @@ public override void Write(decimal value) { /// Value to write to the message. public void Write(MqMessage value) { InternalFinalizeFrame(); - frames.AddRange(value.Frames); + frames.AddRange(value); } /// diff --git a/src/DtronixMessageQueue/MqSession.cs b/src/DtronixMessageQueue/MqSession.cs index c2457c7..3b04419 100644 --- a/src/DtronixMessageQueue/MqSession.cs +++ b/src/DtronixMessageQueue/MqSession.cs @@ -125,13 +125,13 @@ private void SendBufferQueue(Queue buffer_queue, int length) { /// /// True if messages were sent. False if nothing was sent. internal bool ProcessOutbox() { - MqMessage result; + MqMessage message; var length = 0; var buffer_queue = new Queue(); - while (outbox.TryDequeue(out result)) { - result.PrepareSend(); - foreach (var frame in result.Frames) { + while (outbox.TryDequeue(out message)) { + message.PrepareSend(); + foreach (var frame in message) { var frame_size = frame.FrameSize; // If this would overflow the max client buffer size, send the full buffer queue. diff --git a/src/DtronixMessageQueue/Socket/ISetupSocketSession.cs b/src/DtronixMessageQueue/Socket/ISetupSocketSession.cs new file mode 100644 index 0000000..b45bc09 --- /dev/null +++ b/src/DtronixMessageQueue/Socket/ISetupSocketSession.cs @@ -0,0 +1,15 @@ +namespace DtronixMessageQueue.Socket { + public interface ISetupSocketSession + where TConfig : SocketConfig { + + /// + /// Sets up this socket with the specified configurations. + /// + /// Socket this session is to use. + /// Argument pool for this session to use. Pulls two asyncevents for reading and writing and returns them at the end of this socket's life. + /// Socket configurations this session is to use. + void Setup(System.Net.Sockets.Socket session_socket, SocketAsyncEventArgsPool socket_args_pool, TConfig session_config); + + void Start(); + } +} \ No newline at end of file diff --git a/src/DtronixMessageQueue/Socket/SocketBase.cs b/src/DtronixMessageQueue/Socket/SocketBase.cs index 56de4fd..d366545 100644 --- a/src/DtronixMessageQueue/Socket/SocketBase.cs +++ b/src/DtronixMessageQueue/Socket/SocketBase.cs @@ -112,7 +112,9 @@ protected void Setup() { /// New session instance. protected virtual TSession CreateSession(System.Net.Sockets.Socket socket) { var session = new TSession(); - SocketSession.Setup(session, socket, AsyncPool, Config); + + ((ISetupSocketSession) session).Setup(socket, AsyncPool, Config); + SessionSetup?.Invoke(this, new SessionEventArgs(session)); session.Closed += (sender, args) => OnClose(session, args.CloseReason); return session; diff --git a/src/DtronixMessageQueue/Socket/SocketClient.cs b/src/DtronixMessageQueue/Socket/SocketClient.cs index 7267bb1..aa8eb01 100644 --- a/src/DtronixMessageQueue/Socket/SocketClient.cs +++ b/src/DtronixMessageQueue/Socket/SocketClient.cs @@ -52,8 +52,9 @@ public void Connect(IPEndPoint end_point) { event_arg.Completed += (sender, args) => { if (args.LastOperation == SocketAsyncOperation.Connect) { Session = CreateSession(MainSocket); - - Session.Start(); + + ((ISetupSocketSession)Session).Start(); + OnConnect(Session); } }; diff --git a/src/DtronixMessageQueue/Socket/SocketServer.cs b/src/DtronixMessageQueue/Socket/SocketServer.cs index 454fb13..99f21f2 100644 --- a/src/DtronixMessageQueue/Socket/SocketServer.cs +++ b/src/DtronixMessageQueue/Socket/SocketServer.cs @@ -109,7 +109,7 @@ private void AcceptCompleted(SocketAsyncEventArgs e) { ConnectedSessions.TryAdd(session.Id, session); // Start the session. - session.Start(); + ((ISetupSocketSession)session).Start(); // Invoke the events. OnConnect(session); diff --git a/src/DtronixMessageQueue/Socket/SocketSession.cs b/src/DtronixMessageQueue/Socket/SocketSession.cs index 64de053..075bd74 100644 --- a/src/DtronixMessageQueue/Socket/SocketSession.cs +++ b/src/DtronixMessageQueue/Socket/SocketSession.cs @@ -9,7 +9,7 @@ namespace DtronixMessageQueue.Socket { /// Base socket session to be sub-classes by the implementer. /// /// Configuration for this connection. - public abstract class SocketSession : IDisposable + public abstract class SocketSession : IDisposable, ISetupSocketSession where TConfig : SocketConfig { /// @@ -119,25 +119,25 @@ protected SocketSession() { CurrentState = State.Connecting; } + /// /// Sets up this socket with the specified configurations. /// - /// The session to setup. - /// Socket this session is to use. - /// Argument pool for this session to use. Pulls two asyncevents for reading and writing and returns them at the end of this socket's life. - /// Socket configurations this session is to use. - public static void Setup(SocketSession session, System.Net.Sockets.Socket socket, SocketAsyncEventArgsPool args_pool, TConfig config) { - session.config = config; - session.args_pool = args_pool; - session.send_args = args_pool.Pop(); - session.send_args.Completed += session.IoCompleted; - session.receive_args = args_pool.Pop(); - session.receive_args.Completed += session.IoCompleted; - - session.socket = socket; - session.write_semaphore = new SemaphoreSlim(1, 1); - - if(config.SendTimeout > 0) + /// Socket this session is to use. + /// Argument pool for this session to use. Pulls two asyncevents for reading and writing and returns them at the end of this socket's life. + /// Socket configurations this session is to use. + void ISetupSocketSession.Setup(System.Net.Sockets.Socket session_socket, SocketAsyncEventArgsPool socket_args_pool, TConfig session_config) { + config = session_config; + args_pool = socket_args_pool; + send_args = args_pool.Pop(); + send_args.Completed += IoCompleted; + receive_args = args_pool.Pop(); + receive_args.Completed += IoCompleted; + + socket = session_socket; + write_semaphore = new SemaphoreSlim(1, 1); + + if (config.SendTimeout > 0) socket.SendTimeout = config.SendTimeout; if (config.SendAndReceiveBufferSize > 0) @@ -149,15 +149,15 @@ public static void Setup(SocketSession session, System.Net.Sockets.Sock socket.NoDelay = true; socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true); - session.OnSetup(); + OnSetup(); } - internal void Start() { + void ISetupSocketSession.Start() { if (CurrentState == State.Connecting) { // Start receiving data. CurrentState = State.Connected; socket.ReceiveAsync(receive_args); - + } }