diff --git a/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj b/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj index b113552..3a35a96 100644 --- a/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj +++ b/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj @@ -52,17 +52,20 @@ - + + - + + + @@ -74,6 +77,9 @@ PreserveNewest + + PreserveNewest + @@ -82,7 +88,7 @@ - + PreserveNewest diff --git a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs deleted file mode 100644 index 4eb454a..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs +++ /dev/null @@ -1,342 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Reflection; -using System.Threading; - -namespace DtronixMessageQueue.Tests.Performance -{ - class MqPerformanceTest : PerformanceTestBase - { - public MqPerformanceTest(string[] args) - { - string mode = null; - int totalLoops, totalMessages, totalFrames, frameSize, totalClients; - - if (args == null || args.Length == 0) - { - mode = "single-process"; - totalLoops = 1; - totalMessages = 1000000; - totalFrames = 4; - frameSize = 50; - totalClients = 1; - } - else if (args.Length == 7) - { - mode = args[1]; - totalLoops = int.Parse(args[2]); - totalMessages = int.Parse(args[3]); - totalFrames = int.Parse(args[4]); - frameSize = int.Parse(args[5]); - totalClients = int.Parse(args[6]); - } - else - { - Console.WriteLine("Invalid parameters passed to performance tester"); - return; - } - - var exePath = Assembly.GetExecutingAssembly().Location; - - if (mode == "setup") - { - Process.Start(exePath, - $"mq server {totalLoops} {totalMessages} {totalFrames} {frameSize} {totalClients}"); - - for (int i = 0; i < totalClients; i++) - { - Process.Start(exePath, - $"mq client {totalLoops} {totalMessages} {totalFrames} {frameSize} {totalClients}"); - } - } - else if (mode == "client") - { - Console.WriteLine("| Messages | Msg Bytes | Milliseconds | Msg/sec | MBps |"); - Console.WriteLine("|------------|-----------|--------------|------------|----------|"); - - StartClient(totalLoops, totalMessages, totalFrames, frameSize); - } - else if (mode == "server") - { - StartServer(totalMessages, totalClients); - } - else if (mode == "single-process") - { - MqInProcessTest(); - } - } - - - private static void StartClient(int totalLoops, int totalMessages, int totalFrames, int frameSize) - { - var cl = new MqClient(new MqConfig() - { - Ip = "127.0.0.1", - Port = 2828 - }); - - var stopwatch = new Stopwatch(); - var messageReader = new MqMessageReader(); - var messageSize = totalFrames * frameSize; - var message = new MqMessage(); - double[] totalValues = {0, 0, 0}; - - for (int i = 0; i < totalFrames; i++) - { - message.Add(new MqFrame(SequentialBytes(frameSize), MqFrameType.More, (MqConfig) cl.Config)); - } - - cl.IncomingMessage += (sender, args) => - { - MqMessage msg; - while (args.Messages.Count > 0) - { - msg = args.Messages.Dequeue(); - - messageReader.Message = msg; - var result = messageReader.ReadString(); - - if (result == "COMPLETE") - { - if (totalLoops-- > 0) - { - stopwatch.Stop(); - - var messagesPerSecond = - (int) ((double) totalMessages / stopwatch.ElapsedMilliseconds * 1000); - var msgSizeNoHeader = messageSize; - var mbps = totalMessages * (double) (msgSizeNoHeader) / stopwatch.ElapsedMilliseconds / 1000; - Console.WriteLine("| {0,10:N0} | {1,9:N0} | {2,12:N0} | {3,10:N0} | {4,8:N2} |", - totalMessages, - msgSizeNoHeader, stopwatch.ElapsedMilliseconds, messagesPerSecond, mbps); - - totalValues[0] += stopwatch.ElapsedMilliseconds; - totalValues[1] += messagesPerSecond; - totalValues[2] += mbps; - } - - if (totalLoops == 0) - { - Console.WriteLine("| | AVERAGES | {0,12:N0} | {1,10:N0} | {2,8:N2} |", - totalValues[0] / totalLoops, - totalValues[1] / totalLoops, totalValues[2] / totalLoops); - Console.WriteLine(); - Console.WriteLine("Test complete"); - } - - - cl.Close(); - } - else if (result == "START") - { - if (totalLoops > 0) - { - stopwatch.Restart(); - for (var i = 0; i < totalMessages; i++) - { - cl.Send(message); - } - } - } - } - }; - - cl.Connect(); - } - - private static void StartServer(int totalMessages, int totalClients) - { - var server = new MqServer(new MqConfig() - { - Ip = "127.0.0.1", - Port = 2828 - }); - - var builder = new MqMessageWriter((MqConfig) server.Config); - builder.Write("COMPLETE"); - - var completeMessage = builder.ToMessage(true); - - builder.Write("START"); - var startMessage = builder.ToMessage(true); - - ConcurrentDictionary clientsInfo = - new ConcurrentDictionary(); - - - server.Connected += (sender, session) => - { - var currentInfo = new ClientRunInfo() - { - Session = session.Session, - Runs = 0 - }; - clientsInfo.TryAdd(session.Session, currentInfo); - - if (clientsInfo.Count == totalClients) - { - foreach (var mqSession in clientsInfo.Keys) - { - mqSession.Send(startMessage); - } - } - }; - - server.Closed += (session, value) => - { - ClientRunInfo info; - clientsInfo.TryRemove(value.Session, out info); - }; - - server.IncomingMessage += (sender, args) => - { - var clientInfo = clientsInfo[args.Session]; - - // Count the total messages. - clientInfo.Runs += args.Messages.Count; - - if (clientInfo.Runs == totalMessages) - { - args.Session.Send(completeMessage); - args.Session.Send(startMessage); - clientInfo.Runs = 0; - } - }; - - - server.Start(); - } - - - static void MqInProcessTest() - { - var config = new MqConfig - { - Ip = "127.0.0.1", - Port = 2828 - }; - - - Console.WriteLine("FrameBufferSize: {0}; SendAndReceiveBufferSize: {1}\r\n", config.FrameBufferSize, - config.SendAndReceiveBufferSize); - - var smallMessage = new MqMessage - { - new MqFrame(SequentialBytes(50), MqFrameType.More, config), - new MqFrame(SequentialBytes(50), MqFrameType.More, config), - new MqFrame(SequentialBytes(50), MqFrameType.More, config), - new MqFrame(SequentialBytes(50), MqFrameType.Last, config) - }; - - MqInProcessPerformanceTests(1000000, 5, smallMessage, config); - - var medimumMessage = new MqMessage - { - new MqFrame(SequentialBytes(500), MqFrameType.More, config), - new MqFrame(SequentialBytes(500), MqFrameType.More, config), - new MqFrame(SequentialBytes(500), MqFrameType.More, config), - new MqFrame(SequentialBytes(500), MqFrameType.Last, config) - }; - - MqInProcessPerformanceTests(100000, 5, medimumMessage, config); - - var largeMessage = new MqMessage(); - - for (int i = 0; i < 20; i++) - { - largeMessage.Add(new MqFrame(SequentialBytes(3000), MqFrameType.More, config)); - } - - MqInProcessPerformanceTests(10000, 5, largeMessage, config); - - Console.WriteLine("Performance complete"); - - Console.ReadLine(); - } - - private static void MqInProcessPerformanceTests(int runs, int loops, MqMessage message, MqConfig config) - { - var server = new MqServer(config); - server.Start(); - - double[] totalValues = {0, 0, 0}; - - var count = 0; - var sw = new Stopwatch(); - var wait = new AutoResetEvent(false); - var completeTest = new AutoResetEvent(false); - - var client = new MqClient(config); - - Console.WriteLine("| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps |"); - Console.WriteLine("|---------|------------|-----------|--------------|------------|----------|"); - - - var messageSize = message.Size; - - server.IncomingMessage += (sender, args2) => - { - count += args2.Messages.Count; - - - if (count == runs) - { - sw.Stop(); - var mode = "Release"; - -#if DEBUG - mode = "Debug"; -#endif - - var messagesPerSecond = (int) ((double) runs / sw.ElapsedMilliseconds * 1000); - var msgSizeNoHeader = messageSize - 12; - var mbps = runs * (double) (msgSizeNoHeader) / sw.ElapsedMilliseconds / 1000; - Console.WriteLine("| {0,7} | {1,10:N0} | {2,9:N0} | {3,12:N0} | {4,10:N0} | {5,8:N2} |", mode, runs, - msgSizeNoHeader, sw.ElapsedMilliseconds, messagesPerSecond, mbps); - totalValues[0] += sw.ElapsedMilliseconds; - totalValues[1] += messagesPerSecond; - totalValues[2] += mbps; - - - wait.Set(); - } - }; - - - var send = new Action(() => - { - count = 0; - sw.Restart(); - for (var i = 0; i < runs; i++) - { - client.Send(message); - } - //MqServer sv = server; - wait.WaitOne(); - wait.Reset(); - }); - - client.Connected += (sender, args) => - { - for (var i = 0; i < loops; i++) - { - send(); - } - - Console.WriteLine("| | | AVERAGES | {0,12:N0} | {1,10:N0} | {2,8:N2} |", - totalValues[0] / loops, - totalValues[1] / loops, totalValues[2] / loops); - Console.WriteLine(); - - server.Stop(); - client.Close(); - completeTest.Set(); - }; - - client.Connect(); - - completeTest.WaitOne(); - } - } -} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/MqThroughputTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqThroughputTest.cs new file mode 100644 index 0000000..19017d5 --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/MqThroughputTest.cs @@ -0,0 +1,187 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Reflection; +using System.Threading; +using DtronixMessageQueue.Tests.Performance.TestSessions; + +namespace DtronixMessageQueue.Tests.Performance +{ + class MqThroughputTest : PerformanceTestBase + { + + List clients = new List(); + + private Timer _reportTimer; + private MqServer server; + + public MqThroughputTest(string[] args) + { + + string mode = args.Length < 2 ? "in-process" : args[1]; + string ip = "127.0.0.1"; + int totalFrames = 10; + int frameSize = 1024; + int totalClients = 3; + + if (args.Length == 6) + { + ip = args[2]; + totalFrames = int.Parse(args[3]); + frameSize = int.Parse(args[4]); + totalClients = int.Parse(args[5]); + } + + + if (mode == "setup") + { + var exePath = Assembly.GetExecutingAssembly().Location; + Process.Start(exePath, + $"mq-throughput server {ip} {totalFrames} {frameSize} {totalClients}"); + + for (int i = 0; i < 1; i++) + { + Process.Start(exePath, + $"mq-throughput client {ip} {totalFrames} {frameSize} {totalClients}"); + } + } + else if (mode == "server") + { + StartServer(); + + _reportTimer = new Timer(DisplayServerStatus); + } + else if (mode == "client") + { + + for (int i = 0; i < totalClients; i++) + { + StartClient(ip, totalFrames, frameSize); + } + + _reportTimer = new Timer(DisplayClientStatus); + + } + else if (mode == "in-process") + { + + StartServer(); + + for (int i = 0; i < totalClients; i++) + { + StartClient(ip, totalFrames, frameSize); + } + + _reportTimer = new Timer(DisplayClientStatus); + + } + else + { + Console.WriteLine("Invalid parameters passed to performance tester"); + } + + _reportTimer.Change(1000, 1000); + } + + + private void StartClient(string ip, int totalFrames, int frameSize) + { + var cl = new MqClient(new MqConfig{ + Ip = ip, + Port = 2828 + }); + + cl.SessionSetup += (sender, args) => + { + args.Session.IsServer = false; + args.Session.ConfigTest(frameSize, totalFrames); + + }; + + cl.Connected += (sender, args) => + { + clients.Add(args.Session); + args.Session.StartTest(); + }; + + cl.Connect(); + + + } + + + private void StartServer() + { + server = new MqServer(new MqConfig + { + Ip = "127.0.0.1", + Port = 2828 + }); + + + server.SessionSetup += (sender, args) => + { + args.Session.IsServer = true; + }; + + server.Connected += (sender, args) => + { + args.Session.StartTest(); + }; + + + server.Start(); + } + + + private void DisplayClientStatus(object state) + { + Console.Clear(); + + Console.WriteLine($"Clients Test Running.\r\n"); + Console.WriteLine("| Messages | Frames | Time | Msg/sec | Mbps | Runtime |"); + Console.WriteLine("|----------|----------|----------|----------|----------|----------|"); + + foreach (var client in clients) + { + if (client.TotalThroughTime == 0) + continue; + + var messagesPerSecond = (int)((double)client.TotalThroughMessages / client.TotalThroughTime * 1000); + var mbps = (double)client.TotalThroughBytes / client.TotalThroughTime / 1000; + //var mbps = runs * (double)(msgSizeNoHeader) / sw.ElapsedMilliseconds / 1000; + Console.WriteLine("| {0,8} | {1,8} | {2,8:N0} | {3,8:N0} | {4,8:N} | {5,8:N} |", + client.TotalThroughMessages, + client.TotalThroughFrames, + client.TotalThroughTime, + messagesPerSecond, + mbps, + (DateTime.Now - client.StartedTime).TotalMilliseconds/1000); + + /*if ((DateTime.Now - client.StartedTime).TotalSeconds > 10) + { + client.CancelTest = true; + }*/ + } + } + + private void DisplayServerStatus(object state) + { + Console.Clear(); + + Console.WriteLine($"Server Running."); + + int c = 0; + using (var e = server.GetSessionsEnumerator()) + { + while (e.MoveNext()) + c++; + } + + Console.WriteLine($"Total Clients: {c}"); + + } + } +} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs b/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs index b6fe8b8..6c484fb 100644 --- a/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs +++ b/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs @@ -1,6 +1,7 @@ using System; using System.Management; using System.Runtime.InteropServices; +using DtronixMessageQueue.Tests.Performance.TestSessions; namespace DtronixMessageQueue.Tests.Performance { @@ -32,7 +33,7 @@ public static void WriteSysInfo() public class ClientRunInfo { public int Runs { get; set; } - public SimpleMqSession Session { get; set; } + public MqThroughputTest TestSession { get; set; } } public static byte[] SequentialBytes(int len) diff --git a/src/DtronixMessageQueue.Tests.Performance/Program.cs b/src/DtronixMessageQueue.Tests.Performance/Program.cs index d4df3e2..0531ceb 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Program.cs @@ -11,24 +11,25 @@ class Program static void Main(string[] args) { + var mode = args.Length == 0 ? null : args[0]; var fileName = string.Join("-", args); using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{fileName}.txt")) { - PerformanceTestBase.WriteSysInfo(); + //PerformanceTestBase.WriteSysInfo(); - Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); + //Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); switch (mode) { - case "mq": + case "mq-throughput": Console.WriteLine("Running MQ performance tests.\r\n"); - new MqPerformanceTest(args); + new MqThroughputTest(args); break; default: - Console.WriteLine("Running RPC performance tests.\r\n"); - new RpcPerformanceTest(args); + Console.WriteLine("Running MQ performance tests.\r\n"); + new MqThroughputTest(args); break; } } diff --git a/src/DtronixMessageQueue.Tests.Performance/ServerMessageType.cs b/src/DtronixMessageQueue.Tests.Performance/ServerMessageType.cs new file mode 100644 index 0000000..e138025 --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/ServerMessageType.cs @@ -0,0 +1,10 @@ +namespace DtronixMessageQueue.Tests.Performance +{ + public enum ServerMessageType : byte + { + Unset = 0, + Ready = 1, + Complete = 2, + ThroughputTransfer = 3 + } +} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs b/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs index 7ba40dd..1444e38 100644 --- a/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs +++ b/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs @@ -3,16 +3,17 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using DtronixMessageQueue.Tests.Performance.TestSessions; namespace DtronixMessageQueue.Tests.Performance { public class ServerMqPerformanceTests { - private MqServer _server; + private MqServer _server; public ServerMqPerformanceTests(string[] args) { - _server = new MqServer(new MqConfig + _server = new MqServer(new MqConfig { Ip = "127.0.0.1", Port = 2828 diff --git a/src/DtronixMessageQueue.Tests.Performance/SimpleMqSession.cs b/src/DtronixMessageQueue.Tests.Performance/SimpleMqSession.cs deleted file mode 100644 index d9bf5a8..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/SimpleMqSession.cs +++ /dev/null @@ -1,131 +0,0 @@ -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; -using DtronixMessageQueue.Socket; - -namespace DtronixMessageQueue.Tests.Performance -{ - public class SimpleMqSession : MqSession - { - private MqMessageReader reader; - private MqMessageWriter writer; - - public bool IsServer { get; set; } - - public TestMode Mode { get; set; } - - private Stopwatch stopwatch = new Stopwatch(); - - private Timer _responseTimer; - private int _totalThroughBytes; - private int _totalThroughFrames; - private int _totalThroughMessages; - - protected override void OnSetup() - { - base.OnSetup(); - - reader = new MqMessageReader(); - writer = new MqMessageWriter(Config); - } - - protected override void OnIncomingMessage(object sender, IncomingMessageEventArgs e) - { - if (IsServer) - { - ServerMessage(e.Messages); - } - else - { - ClientMessage(e.Messages); - } - } - - private void ClientMessage(Queue message_queue) - { - - } - - public override void Close(SocketCloseReason reason) - { - if (_responseTimer != null) - { - _responseTimer.Change(-1, -1); - _responseTimer.Dispose(); - } - base.Close(reason); - - } - - private void ServerMessage(Queue message_queue) - { - - if (Mode == TestMode.Unset) - { - reader.Message = message_queue.Dequeue(); - - Mode = (TestMode)reader.ReadByte(); // Mode - - if (Mode == TestMode.Throughput) - { - _responseTimer = new Timer(ThroughputResponse); - writer.Write((byte)ServerMessageType.Ready); - Send(writer.ToMessage(true)); - - _responseTimer.Change(1000, 1000); - } - - } - - while (message_queue.Count > 0) - { - var message = message_queue.Dequeue(); - if (Mode == TestMode.Throughput) - { - Interlocked.Add(ref _totalThroughBytes, message.Size); - _totalThroughMessages++; - Interlocked.Add(ref _totalThroughFrames, message.Count); - } - - } - - } - - private void ThroughputResponse(object state) - { - using (var writer = new MqMessageWriter(Config)) - { - var throughBytes = _totalThroughBytes; - var throughMessages = _totalThroughMessages; - var throughFrames = _totalThroughFrames; - - _totalThroughBytes = 0; - _totalThroughMessages = 0; - _totalThroughFrames = 0; - - writer.Write((byte)ServerMessageType.ThroughputTransfer); - writer.Write(throughBytes); - writer.Write(throughMessages); - writer.Write(throughFrames); - writer.Write(stopwatch.ElapsedMilliseconds); - } - } - - - } - - public enum TestMode - { - Unset = 0, - Throughput = 1, - Repeat = 2 - } - - public enum ServerMessageType : byte - { - Unset = 0, - Ready = 1, - Complete = 2, - ThroughputTransfer = 3 - } -} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/TestMode.cs b/src/DtronixMessageQueue.Tests.Performance/TestMode.cs new file mode 100644 index 0000000..2c6c821 --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/TestMode.cs @@ -0,0 +1,9 @@ +namespace DtronixMessageQueue.Tests.Performance +{ + public enum TestMode + { + Unset = 0, + Throughput = 1, + Repeat = 2 + } +} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqBaseTestSession.cs b/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqBaseTestSession.cs new file mode 100644 index 0000000..aa1ac14 --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqBaseTestSession.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DtronixMessageQueue.Tests.Performance.TestSessions +{ + public abstract class MqBaseTestSession : MqSession + { + protected MqMessageReader Reader; + protected MqMessageWriter Writer; + protected readonly Stopwatch Stopwatch = new Stopwatch(); + protected Timer ResponseTimer; + + public bool IsServer { get; set; } + public bool CancelTest { get; set; } + + + protected Random Rand = new Random(); + + protected Thread TestThread; + + protected override void OnSetup() + { + base.OnSetup(); + + Reader = new MqMessageReader(); + Writer = new MqMessageWriter(Config); + + if (!IsServer) + { + TestThread = new Thread(TestThreadAction); + } + } + + protected override void OnIncomingMessage(object sender, IncomingMessageEventArgs e) + { + if (IsServer) + { + ServerMessage(e.Messages); + } + else + { + ClientMessage(e.Messages); + } + } + + + + + protected byte[] RandomBytes(int len) + { + var val = new byte[len]; + Rand.NextBytes(val); + return val; + } + + + + public abstract void StartTest(); + protected abstract void TestThreadAction(object state); + protected abstract void ClientMessage(Queue messageQueue); + protected abstract void ServerMessage(Queue messageQueue); + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqThroughputTestSession.cs b/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqThroughputTestSession.cs new file mode 100644 index 0000000..90e812c --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqThroughputTestSession.cs @@ -0,0 +1,133 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using DtronixMessageQueue.Socket; + +namespace DtronixMessageQueue.Tests.Performance.TestSessions +{ + public class MqThroughputTestSession : MqBaseTestSession + { + public long TotalThroughTime => _totalThroughTime; + public int TotalThroughMessages => _totalThroughMessages; + public int TotalThroughFrames => _totalThroughFrames; + public int TotalThroughBytes => _totalThroughBytes; + public DateTime StartedTime; + + private int _totalThroughBytes; + private int _totalThroughFrames; + private int _totalThroughMessages; + private long _totalThroughTime; + + + private int _configFrameSize; + private int _configFramesPerMessage; + + + public void ConfigTest(int frameSize, int framesPerMessage) + { + _configFrameSize = frameSize; + _configFramesPerMessage = framesPerMessage; + } + + public override void StartTest() + { + StartedTime = DateTime.Now; + TestThread.Start(this); + + if (IsServer) + { + ResponseTimer = new Timer(ThroughputResponse); + ResponseTimer.Change(1000, 1000); + Stopwatch.Restart(); + } + } + + protected override void ServerMessage(Queue messageQueue) + { + while (messageQueue.Count > 0) + { + var message = messageQueue.Dequeue(); + Interlocked.Add(ref _totalThroughBytes, message.Size); + _totalThroughMessages++; + Interlocked.Add(ref _totalThroughFrames, message.Count); + } + + } + + protected override void ClientMessage(Queue messageQueue) + { + while (messageQueue.Count > 0) + { + var message = messageQueue.Dequeue(); + Reader.Message = message; + var messageType = (ServerMessageType)Reader.ReadByte(); + + if (messageType == ServerMessageType.ThroughputTransfer) + { + _totalThroughBytes = Reader.ReadInt32(); + _totalThroughMessages = Reader.ReadInt32(); + _totalThroughFrames = Reader.ReadInt32(); + _totalThroughTime = Reader.ReadInt64(); + } + } + } + + private void ThroughputResponse(object state) + { + + using (var responseWriter = new MqMessageWriter(Config)) + { + var throughBytes = TotalThroughBytes; + var throughMessages = TotalThroughMessages; + var throughFrames = TotalThroughFrames; + + _totalThroughBytes = 0; + _totalThroughMessages = 0; + _totalThroughFrames = 0; + + responseWriter.Write((byte)ServerMessageType.ThroughputTransfer); + responseWriter.Write(throughBytes); + responseWriter.Write(throughMessages); + responseWriter.Write(throughFrames); + responseWriter.Write(Stopwatch.ElapsedMilliseconds); + + Stopwatch.Restart(); + + Send(responseWriter.ToMessage(true)); + } + } + + protected override void TestThreadAction(object state) + { + var session = (MqThroughputTestSession) state; + var message = new MqMessage(); + var frame = new MqFrame(RandomBytes(session._configFrameSize), session.Config); + + for (int i = 0; i < session._configFramesPerMessage; i++) + { + message.Add(frame); + } + + + // Send messages until it is called to stop. + while (!session.CancelTest) + { + session.Send(message); + } + + } + + public override void Close(SocketCloseReason reason) + { + if (ResponseTimer != null) + { + ResponseTimer.Change(-1, -1); + ResponseTimer.Dispose(); + } + base.Close(reason); + + } + } + +} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-mq-client.bat b/src/DtronixMessageQueue.Tests.Performance/run-mq-client.bat new file mode 100644 index 0000000..3a2e56b --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/run-mq-client.bat @@ -0,0 +1 @@ +DMQPerf.exe mq-throughput client 127.0.0.1 10 1024 3 \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-mq-server.bat b/src/DtronixMessageQueue.Tests.Performance/run-mq-server.bat new file mode 100644 index 0000000..510892e --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/run-mq-server.bat @@ -0,0 +1 @@ +DMQPerf.exe mq-throughput server \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-mq.bat b/src/DtronixMessageQueue.Tests.Performance/run-mq.bat deleted file mode 100644 index 696a46e..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/run-mq.bat +++ /dev/null @@ -1,2 +0,0 @@ -DMQPerf.exe mq single-process 1 100000 4 50 10 -pause \ No newline at end of file diff --git a/src/DtronixMessageQueue/MqConfig.cs b/src/DtronixMessageQueue/MqConfig.cs index 7e0f14d..50aa2ad 100644 --- a/src/DtronixMessageQueue/MqConfig.cs +++ b/src/DtronixMessageQueue/MqConfig.cs @@ -16,5 +16,11 @@ public class MqConfig : SocketConfig /// 0 disables pings. /// public int PingFrequency { get; set; } = 0; + + /// + /// Sets a limit on the maximum outgoing queue size. + /// Once the outgoing queue reaches the maximum messages, the MqSession.Send will block. + /// + public int MaxQueuedOutgoingMessages { get; set; } = 50; } } \ No newline at end of file diff --git a/src/DtronixMessageQueue/MqSession.cs b/src/DtronixMessageQueue/MqSession.cs index 5abd4b1..b7e23b1 100644 --- a/src/DtronixMessageQueue/MqSession.cs +++ b/src/DtronixMessageQueue/MqSession.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; +using System.Threading; using System.Threading.Tasks; using DtronixMessageQueue.Socket; @@ -44,14 +45,17 @@ public abstract class MqSession : SocketSession /// Event fired when a new message has been processed by the Postmaster and ready to be read. /// public event EventHandler> IncomingMessage; + protected override void OnSetup() { _frameBuilder = new MqFrameBuilder(Config); + _sendingSemaphore = new Semaphore(Config.MaxQueuedOutgoingMessages, Config.MaxQueuedOutgoingMessages); } /// @@ -114,6 +118,7 @@ private void ProcessOutbox() while (_outbox.TryDequeue(out message)) { + _sendingSemaphore.Release(); //Console.WriteLine("Wrote " + message); message.PrepareSend(); foreach (var frame in message) @@ -281,6 +286,7 @@ public override void Close(SocketCloseReason reason) { while (_outbox.TryDequeue(out msg)) { + _sendingSemaphore.Release(); } } @@ -318,6 +324,7 @@ public void Send(MqMessage message) return; } + _sendingSemaphore.WaitOne(); lock (_outboxLock) { _outbox.Enqueue(message); diff --git a/src/DtronixMessageQueue/Socket/SessionHandler.cs b/src/DtronixMessageQueue/Socket/SessionHandler.cs index 3c862e2..1ad6a3c 100644 --- a/src/DtronixMessageQueue/Socket/SessionHandler.cs +++ b/src/DtronixMessageQueue/Socket/SessionHandler.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Net.Sockets; using System.Threading; @@ -182,5 +183,10 @@ protected virtual TSession CreateSession(System.Net.Sockets.Socket socket) session.Closed += (sender, args) => OnClose(session, args.CloseReason); return session; } + + public IEnumerator> GetSessionsEnumerator() + { + return ConnectedSessions.GetEnumerator(); + } } } \ No newline at end of file