diff --git a/src/DtronixMessageQueue.Tests.Performance/App.config b/src/DtronixMessageQueue.Tests.Performance/App.config index c067d06..846dde6 100644 --- a/src/DtronixMessageQueue.Tests.Performance/App.config +++ b/src/DtronixMessageQueue.Tests.Performance/App.config @@ -1,9 +1,8 @@  - - - - + + + diff --git a/src/DtronixMessageQueue.Tests.Performance/ConsoleCopy.cs b/src/DtronixMessageQueue.Tests.Performance/ConsoleCopy.cs index 015c1a6..e67da9c 100644 --- a/src/DtronixMessageQueue.Tests.Performance/ConsoleCopy.cs +++ b/src/DtronixMessageQueue.Tests.Performance/ConsoleCopy.cs @@ -1,81 +1,74 @@ using System; +using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text; +using System.Threading.Tasks; -namespace DtronixMessageQueue.Tests.Performance -{ - class ConsoleCopy : IDisposable - { - FileStream _fileStream; - StreamWriter _fileWriter; - TextWriter _doubleWriter; - TextWriter _oldOut; +namespace DtronixMessageQueue.Tests.Performance { + class ConsoleCopy : IDisposable { - class DoubleWriter : TextWriter - { - TextWriter _one; - TextWriter _two; + FileStream fileStream; + StreamWriter fileWriter; + TextWriter doubleWriter; + TextWriter oldOut; - public DoubleWriter(TextWriter one, TextWriter two) - { - _one = one; - _two = two; - } + class DoubleWriter : TextWriter { - public override Encoding Encoding - { - get { return _one.Encoding; } - } + TextWriter one; + TextWriter two; - public override void Flush() - { - _one.Flush(); - _two.Flush(); - } + public DoubleWriter(TextWriter one, TextWriter two) { + this.one = one; + this.two = two; + } - public override void Write(char value) - { - _one.Write(value); - _two.Write(value); - } - } + public override Encoding Encoding { + get { return one.Encoding; } + } - public ConsoleCopy(string path) - { - _oldOut = Console.Out; + public override void Flush() { + one.Flush(); + two.Flush(); + } - try - { - _fileStream = File.Create(path); + public override void Write(char value) { + one.Write(value); + two.Write(value); + } - _fileWriter = new StreamWriter(_fileStream); - _fileWriter.AutoFlush = true; + } - _doubleWriter = new DoubleWriter(_fileWriter, _oldOut); - } - catch (Exception e) - { - Console.WriteLine("Cannot open file for writing"); - Console.WriteLine(e.Message); - return; - } - Console.SetOut(_doubleWriter); - } + public ConsoleCopy(string path) { + oldOut = Console.Out; - public void Dispose() - { - Console.SetOut(_oldOut); - if (_fileWriter != null) - { - _fileWriter.Flush(); - _fileWriter.Close(); - _fileWriter = null; - } - if (_fileStream != null) - { - _fileStream.Close(); - _fileStream = null; - } - } - } -} \ No newline at end of file + try { + fileStream = File.Create(path); + + fileWriter = new StreamWriter(fileStream); + fileWriter.AutoFlush = true; + + doubleWriter = new DoubleWriter(fileWriter, oldOut); + } catch (Exception e) { + Console.WriteLine("Cannot open file for writing"); + Console.WriteLine(e.Message); + return; + } + Console.SetOut(doubleWriter); + } + + public void Dispose() { + Console.SetOut(oldOut); + if (fileWriter != null) { + fileWriter.Flush(); + fileWriter.Close(); + fileWriter = null; + } + if (fileStream != null) { + fileStream.Close(); + fileStream = null; + } + } + + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj b/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj index 7a67afe..fc01480 100644 --- a/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj +++ b/src/DtronixMessageQueue.Tests.Performance/DtronixMessageQueue.Tests.Performance.csproj @@ -34,9 +34,6 @@ bin\Nuget\ - - - @@ -51,21 +48,16 @@ - + - - - - + - - @@ -75,9 +67,7 @@ PreserveNewest - - PreserveNewest - + @@ -86,7 +76,7 @@ - + PreserveNewest diff --git a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs new file mode 100644 index 0000000..7a2d4da --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs @@ -0,0 +1,176 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Management; +using System.Reflection; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DtronixMessageQueue.Tests.Performance { + class MqPerformanceTest : PerformanceTestBase { + + private readonly MqConfig _config; + private readonly MqMessage _smallMessage; + private readonly MqMessage _medimumMessage; + private readonly MqMessage _largeMessage; + private Stopwatch _sw; + private Semaphore _loopSemaphore; + private int count; + private double[] totalValues; + private Semaphore _testCompleteSemaphore; + private MqServer _server; + private MqClient _client; + + public MqPerformanceTest() + { + + _config = new MqConfig + { + Ip = "127.0.0.1", + Port = 2828 + }; + + _smallMessage = new MqMessage { + new MqFrame(SequentialBytes(50), MqFrameType.More, _config), + new MqFrame(SequentialBytes(50), MqFrameType.More, _config), + new MqFrame(SequentialBytes(50), MqFrameType.More, _config), + new MqFrame(SequentialBytes(50), MqFrameType.Last, _config) + }; + + _medimumMessage = new MqMessage { + new MqFrame(SequentialBytes(500), MqFrameType.More, _config), + new MqFrame(SequentialBytes(500), MqFrameType.More, _config), + new MqFrame(SequentialBytes(500), MqFrameType.More, _config), + new MqFrame(SequentialBytes(500), MqFrameType.Last, _config) + }; + + _largeMessage = new MqMessage(); + + for (int i = 0; i < 20; i++) + { + _largeMessage.Add(new MqFrame(SequentialBytes(3000), MqFrameType.More, _config)); + } + + _sw = new Stopwatch(); + _loopSemaphore = new Semaphore(0, 1); + _testCompleteSemaphore = new Semaphore(0, 1); + + count = 0; + } + + + public override void StartTest() { + + Console.WriteLine("FrameBufferSize: {0}; SendAndReceiveBufferSize: {1}\r\n", _config.FrameBufferSize, + _config.SendAndReceiveBufferSize); + + + MqInProcessPerformanceTests(1000000, 5, _smallMessage, _config); + + + MqInProcessPerformanceTests(100000, 5, _medimumMessage, _config); + + + MqInProcessPerformanceTests(10000, 5, _largeMessage, _config); + + Console.WriteLine("Performance complete"); + } + + private void MqInProcessPerformanceTests(int totalMessages, int loops, MqMessage message, MqConfig config) { + _server = new MqServer(config); + _server.Start(); + + totalValues = new [] {0d, 0d, 0d}; + + + _client = new MqClient(config); + + Console.WriteLine("| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps |"); + Console.WriteLine("|---------|------------|-----------|--------------|------------|----------|"); + + + + _server.IncomingMessage += (sender, args2) => { + count += args2.Messages.Count; + + + if (count == totalMessages) + { + ReportResults(totalMessages, _sw.ElapsedMilliseconds, message.Size); + _loopSemaphore.Release(); + + } + }; + + + + _client.Connected += (sender, args) => { + for (var i = 0; i < loops; i++) + { + SendMessages(args.Session, message, totalMessages, 5000); + } + + Console.WriteLine("| | | AVERAGES | {0,12:N0} | {1,10:N0} | {2,8:N2} |", totalValues[0]/loops, + totalValues[1]/loops, totalValues[2]/loops); + Console.WriteLine(); + + _server.Stop(); + _client.Close(); + + _testCompleteSemaphore.Release(); + + }; + + _client.Connect(); + + _testCompleteSemaphore.WaitOne(30000); + + + } + + public void SendMessages(SimpleMqSession client, MqMessage message, int totalMessages, int timeout) + { + + count = 0; + _sw.Restart(); + + for (var i = 0; i < totalMessages; i++) + { + client.Send(message); + } + + if (!_loopSemaphore.WaitOne(timeout)) + { + Console.WriteLine($"Test failed to complete with {count} of {totalMessages} loops performed."); + + ReportResults(totalMessages, _sw.ElapsedMilliseconds, message.Size); + } + + + + } + + private void ReportResults(int totalMessages, long milliseconds, int messageSize) + { + var mode = "Release"; + +#if DEBUG + mode = "Debug"; +#endif + + var messagesPerSecond = (int)((double)totalMessages / milliseconds * 1000); + var msgSizeNoHeader = messageSize - 12; + var mbps = totalMessages * (double)(msgSizeNoHeader) / milliseconds / 1000; + Console.WriteLine("| {0,7} | {1,10:N0} | {2,9:N0} | {3,12:N0} | {4,10:N0} | {5,8:N2} |", mode, totalMessages, + msgSizeNoHeader, milliseconds, messagesPerSecond, mbps); + totalValues[0] += milliseconds; + totalValues[1] += messagesPerSecond; + totalValues[2] += mbps; + } + } + +} diff --git a/src/DtronixMessageQueue.Tests.Performance/MqThroughputTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqThroughputTest.cs deleted file mode 100644 index 19017d5..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/MqThroughputTest.cs +++ /dev/null @@ -1,187 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Reflection; -using System.Threading; -using DtronixMessageQueue.Tests.Performance.TestSessions; - -namespace DtronixMessageQueue.Tests.Performance -{ - class MqThroughputTest : PerformanceTestBase - { - - List clients = new List(); - - private Timer _reportTimer; - private MqServer server; - - public MqThroughputTest(string[] args) - { - - string mode = args.Length < 2 ? "in-process" : args[1]; - string ip = "127.0.0.1"; - int totalFrames = 10; - int frameSize = 1024; - int totalClients = 3; - - if (args.Length == 6) - { - ip = args[2]; - totalFrames = int.Parse(args[3]); - frameSize = int.Parse(args[4]); - totalClients = int.Parse(args[5]); - } - - - if (mode == "setup") - { - var exePath = Assembly.GetExecutingAssembly().Location; - Process.Start(exePath, - $"mq-throughput server {ip} {totalFrames} {frameSize} {totalClients}"); - - for (int i = 0; i < 1; i++) - { - Process.Start(exePath, - $"mq-throughput client {ip} {totalFrames} {frameSize} {totalClients}"); - } - } - else if (mode == "server") - { - StartServer(); - - _reportTimer = new Timer(DisplayServerStatus); - } - else if (mode == "client") - { - - for (int i = 0; i < totalClients; i++) - { - StartClient(ip, totalFrames, frameSize); - } - - _reportTimer = new Timer(DisplayClientStatus); - - } - else if (mode == "in-process") - { - - StartServer(); - - for (int i = 0; i < totalClients; i++) - { - StartClient(ip, totalFrames, frameSize); - } - - _reportTimer = new Timer(DisplayClientStatus); - - } - else - { - Console.WriteLine("Invalid parameters passed to performance tester"); - } - - _reportTimer.Change(1000, 1000); - } - - - private void StartClient(string ip, int totalFrames, int frameSize) - { - var cl = new MqClient(new MqConfig{ - Ip = ip, - Port = 2828 - }); - - cl.SessionSetup += (sender, args) => - { - args.Session.IsServer = false; - args.Session.ConfigTest(frameSize, totalFrames); - - }; - - cl.Connected += (sender, args) => - { - clients.Add(args.Session); - args.Session.StartTest(); - }; - - cl.Connect(); - - - } - - - private void StartServer() - { - server = new MqServer(new MqConfig - { - Ip = "127.0.0.1", - Port = 2828 - }); - - - server.SessionSetup += (sender, args) => - { - args.Session.IsServer = true; - }; - - server.Connected += (sender, args) => - { - args.Session.StartTest(); - }; - - - server.Start(); - } - - - private void DisplayClientStatus(object state) - { - Console.Clear(); - - Console.WriteLine($"Clients Test Running.\r\n"); - Console.WriteLine("| Messages | Frames | Time | Msg/sec | Mbps | Runtime |"); - Console.WriteLine("|----------|----------|----------|----------|----------|----------|"); - - foreach (var client in clients) - { - if (client.TotalThroughTime == 0) - continue; - - var messagesPerSecond = (int)((double)client.TotalThroughMessages / client.TotalThroughTime * 1000); - var mbps = (double)client.TotalThroughBytes / client.TotalThroughTime / 1000; - //var mbps = runs * (double)(msgSizeNoHeader) / sw.ElapsedMilliseconds / 1000; - Console.WriteLine("| {0,8} | {1,8} | {2,8:N0} | {3,8:N0} | {4,8:N} | {5,8:N} |", - client.TotalThroughMessages, - client.TotalThroughFrames, - client.TotalThroughTime, - messagesPerSecond, - mbps, - (DateTime.Now - client.StartedTime).TotalMilliseconds/1000); - - /*if ((DateTime.Now - client.StartedTime).TotalSeconds > 10) - { - client.CancelTest = true; - }*/ - } - } - - private void DisplayServerStatus(object state) - { - Console.Clear(); - - Console.WriteLine($"Server Running."); - - int c = 0; - using (var e = server.GetSessionsEnumerator()) - { - while (e.MoveNext()) - c++; - } - - Console.WriteLine($"Total Clients: {c}"); - - } - } -} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs b/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs index 6c484fb..368057d 100644 --- a/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs +++ b/src/DtronixMessageQueue.Tests.Performance/PerformanceTestBase.cs @@ -1,62 +1,60 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Management; using System.Runtime.InteropServices; -using DtronixMessageQueue.Tests.Performance.TestSessions; - -namespace DtronixMessageQueue.Tests.Performance -{ - class PerformanceTestBase - { - private Random _rand = new Random(); - - [DllImport("kernel32.dll")] - [return: MarshalAs(UnmanagedType.Bool)] - static extern bool GetPhysicallyInstalledSystemMemory(out long totalMemoryInKilobytes); - - - public static void WriteSysInfo() - { - ManagementObjectSearcher mos = new ManagementObjectSearcher("root\\CIMV2", "SELECT * FROM Win32_Processor"); - foreach (var o in mos.Get()) - { - var mo = (ManagementObject) o; - Console.Write(mo["Name"]); - } - - - long memKb; - GetPhysicallyInstalledSystemMemory(out memKb); - Console.WriteLine(" with " + (memKb / 1024 / 1024) + " GB of RAM installed."); - } - - - public class ClientRunInfo - { - public int Runs { get; set; } - public MqThroughputTest TestSession { get; set; } - } - - public static byte[] SequentialBytes(int len) - { - var number = 0; - var val = new byte[len]; - - for (var i = 0; i < len; i++) - { - val[i] = (byte) number++; - if (number > 255) - { - number = 0; - } - } - return val; - } - - public byte[] RandomBytes(int len) - { - var val = new byte[len]; - _rand.NextBytes(val); - return val; - } - } -} \ No newline at end of file +using System.Text; +using System.Threading.Tasks; + +namespace DtronixMessageQueue.Tests.Performance { + abstract class PerformanceTestBase { + + private Random rand = new Random(); + + [DllImport("kernel32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + static extern bool GetPhysicallyInstalledSystemMemory(out long total_memory_in_kilobytes); + + + public static void WriteSysInfo() { + ManagementObjectSearcher mos = new ManagementObjectSearcher("root\\CIMV2", "SELECT * FROM Win32_Processor"); + foreach (var o in mos.Get()) { + var mo = (ManagementObject)o; + Console.Write(mo["Name"]); + } + + + long mem_kb; + GetPhysicallyInstalledSystemMemory(out mem_kb); + Console.WriteLine(" with " + (mem_kb / 1024 / 1024) + " GB of RAM installed."); + } + + + public class ClientRunInfo { + public int Runs { get; set; } + public SimpleMqSession Session { get; set; } + + } + + public static byte[] SequentialBytes(int len) { + var number = 0; + var val = new byte[len]; + + for (var i = 0; i < len; i++) { + val[i] = (byte)number++; + if (number > 255) { + number = 0; + } + } + return val; + } + + public byte[] RandomBytes(int len) { + var val = new byte[len]; + rand.NextBytes(val); + return val; + } + + public abstract void StartTest(); + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/Program.cs b/src/DtronixMessageQueue.Tests.Performance/Program.cs index 0531ceb..fa949cf 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Program.cs @@ -1,40 +1,39 @@ using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Linq; +using System.Management; +using System.Reflection; using System.Runtime.InteropServices; +using System.Threading; -namespace DtronixMessageQueue.Tests.Performance -{ - class Program - { - [DllImport("kernel32.dll")] - [return: MarshalAs(UnmanagedType.Bool)] - static extern bool GetPhysicallyInstalledSystemMemory(out long totalMemoryInKilobytes); - - static void Main(string[] args) - { - - var mode = args.Length == 0 ? null : args[0]; - var fileName = string.Join("-", args); - using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{fileName}.txt")) - { - //PerformanceTestBase.WriteSysInfo(); - - //Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); - - switch (mode) - { - case "mq-throughput": - Console.WriteLine("Running MQ performance tests.\r\n"); - new MqThroughputTest(args); - break; - - default: - Console.WriteLine("Running MQ performance tests.\r\n"); - new MqThroughputTest(args); - break; - } - } - - Console.ReadLine(); - } - } -} \ No newline at end of file +namespace DtronixMessageQueue.Tests.Performance { + class Program { + + [DllImport("kernel32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + static extern bool GetPhysicallyInstalledSystemMemory(out long total_memory_in_kilobytes); + + static void Main(string[] args) { + var mode = args.Length == 0 ? null : args[0]; + var file_name = string.Join("-", args); + using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{file_name}.txt")) { + PerformanceTestBase.WriteSysInfo(); + + Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); + + Console.WriteLine("MQ Performance tests.\r\n"); + new MqPerformanceTest().StartTest(); + + Console.WriteLine("RPC Performance tests.\r\n"); + new RpcPerformanceTest(args); + } + + Console.ReadLine(); + + } + + + } + +} diff --git a/src/DtronixMessageQueue.Tests.Performance/Properties/AssemblyInfo.cs b/src/DtronixMessageQueue.Tests.Performance/Properties/AssemblyInfo.cs index 6f7fddd..7b5b5b7 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Properties/AssemblyInfo.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Properties/AssemblyInfo.cs @@ -4,7 +4,6 @@ // General Information about an assembly is controlled through the following // set of attributes. Change these attribute values to modify the information // associated with an assembly. - [assembly: AssemblyTitle("MqPerformanceTests")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] @@ -17,11 +16,9 @@ // Setting ComVisible to false makes the types in this assembly not visible // to COM components. If you need to access a type in this assembly from // COM, set the ComVisible attribute to true on that type. - [assembly: ComVisible(false)] // The following GUID is for the ID of the typelib if this project is exposed to COM - [assembly: Guid("1df3332e-9b24-45b9-934b-43542793986c")] // Version information for an assembly consists of the following four values: @@ -34,6 +31,5 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] - [assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] \ No newline at end of file +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs index 8af26ba..2e21f38 100644 --- a/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs @@ -1,160 +1,153 @@ using System; +using System.Collections.Generic; using System.Diagnostics; +using System.Linq; +using System.Text; using System.Threading; +using System.Threading.Tasks; using DtronixMessageQueue.Rpc; using DtronixMessageQueue.Tests.Performance.Services.Server; -namespace DtronixMessageQueue.Tests.Performance -{ - class RpcPerformanceTest - { - public RpcPerformanceTest(string[] args) - { - var config = new RpcConfig - { - Ip = "127.0.0.1", - Port = 2828 - }; +namespace DtronixMessageQueue.Tests.Performance { + class RpcPerformanceTest { + public RpcPerformanceTest(string[] args) { + var config = new RpcConfig { + Ip = "127.0.0.1", + Port = 2828 + }; - //RpcSingleProcessTest(20, 4, config, RpcTestType.LngBlock); + //RpcSingleProcessTest(20, 4, config, RpcTestType.LngBlock); - RpcSingleProcessTest(200000, 4, config, RpcTestType.NoReturn); + RpcSingleProcessTest(200000, 4, config, RpcTestType.NoReturn); - RpcSingleProcessTest(200000, 4, config, RpcTestType.Await); + RpcSingleProcessTest(200000, 4, config, RpcTestType.Await); - RpcSingleProcessTest(100, 4, config, RpcTestType.Block); + RpcSingleProcessTest(100, 4, config, RpcTestType.Block); - RpcSingleProcessTest(10000, 4, config, RpcTestType.Return); + RpcSingleProcessTest(10000, 4, config, RpcTestType.Return); - RpcSingleProcessTest(10000, 4, config, RpcTestType.Exception); - } + RpcSingleProcessTest(10000, 4, config, RpcTestType.Exception); - private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTestType type) - { - var server = new RpcServer(config, null); - TestService testService; - double[] totalValues = {0, 0}; - var sw = new Stopwatch(); - var wait = new AutoResetEvent(false); - var completeTest = new AutoResetEvent(false); - var client = new RpcClient(config); + } - server.SessionSetup += (sender, args) => - { - testService = new TestService(); - args.Session.AddService(testService); - testService.Completed += (o, session) => - { - sw.Stop(); - var mode = "Release"; + private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTestType type) { + var server = new RpcServer(config, null); + TestService test_service; + double[] total_values = { 0, 0 }; + var sw = new Stopwatch(); + var wait = new AutoResetEvent(false); + var complete_test = new AutoResetEvent(false); + var client = new RpcClient(config); + + server.SessionSetup += (sender, args) => { + test_service = new TestService(); + args.Session.AddService(test_service); + + test_service.Completed += (o, session) => { + sw.Stop(); + var mode = "Release"; #if DEBUG mode = "Debug"; #endif - var messagesPerSecond = (int) ((double) runs / sw.ElapsedMilliseconds * 1000); - Console.WriteLine("| {0,7} | {1,9} | {2,10:N0} | {3,12:N0} | {4,8:N0} |", mode, type, runs, - sw.ElapsedMilliseconds, messagesPerSecond); - totalValues[0] += sw.ElapsedMilliseconds; - totalValues[1] += messagesPerSecond; - - - wait.Set(); - }; - }; - - - server.Start(); - - - Console.WriteLine("| Build | Type | Calls | Milliseconds | RPC/sec |"); - Console.WriteLine("|---------|-----------|------------|--------------|------------|"); - - - var send = new Action(() => - { - var service = client.Session.GetProxy(); - service.ResetTest(); - - sw.Restart(); - for (var i = 0; i < runs; i++) - { - switch (type) - { - case RpcTestType.LngBlock: - service.TestNoReturnLongBlocking(); - break; - - case RpcTestType.Block: - service.TestNoReturnBlock(); - break; - - case RpcTestType.NoReturn: - service.TestNoReturn(); - break; - - case RpcTestType.Await: - service.TestNoReturnAwait(); - break; - - case RpcTestType.Return: - service.TestIncrement(); - break; - - case RpcTestType.Exception: - try - { - service.TestException(); - } - catch - { - //ignored - } - - break; - } - } - - wait.WaitOne(); - wait.Reset(); - }); - - - client.Ready += (sender, args) => - { - Thread.Sleep(300); - args.Session.AddProxy("TestService"); - var service = client.Session.GetProxy(); - service.TestSetup(runs); - - for (var i = 0; i < loops; i++) - { - send(); - } - - Console.WriteLine("| | | AVERAGES | {0,12:N0} | {1,10:N0} |", totalValues[0] / loops, - totalValues[1] / loops); - Console.WriteLine(); - - server.Stop(); - client.Close(); - completeTest.Set(); - }; - - client.Connect(); - - completeTest.WaitOne(); - } - } - - enum RpcTestType - { - NoReturn, - Return, - Exception, - Await, - Block, - LngBlock - } -} \ No newline at end of file + var messages_per_second = (int)((double)runs / sw.ElapsedMilliseconds * 1000); + Console.WriteLine("| {0,7} | {1,9} | {2,10:N0} | {3,12:N0} | {4,8:N0} |", mode, type, runs, sw.ElapsedMilliseconds, messages_per_second); + total_values[0] += sw.ElapsedMilliseconds; + total_values[1] += messages_per_second; + + + wait.Set(); + }; + + }; + + + server.Start(); + + + Console.WriteLine("| Build | Type | Calls | Milliseconds | RPC/sec |"); + Console.WriteLine("|---------|-----------|------------|--------------|------------|"); + + + + var send = new Action(() => { + + var service = client.Session.GetProxy(); + service.ResetTest(); + + sw.Restart(); + for (var i = 0; i < runs; i++) { + switch (type) { + + case RpcTestType.LngBlock: + service.TestNoReturnLongBlocking(); + break; + + case RpcTestType.Block: + service.TestNoReturnBlock(); + break; + + case RpcTestType.NoReturn: + service.TestNoReturn(); + break; + + case RpcTestType.Await: + service.TestNoReturnAwait(); + break; + + case RpcTestType.Return: + service.TestIncrement(); + break; + + case RpcTestType.Exception: + try { + service.TestException(); + } catch { + //ignored + } + + break; + } + + } + + wait.WaitOne(); + wait.Reset(); + }); + + + client.Ready += (sender, args) => { + Thread.Sleep(300); + args.Session.AddProxy("TestService"); + var service = client.Session.GetProxy(); + service.TestSetup(runs); + + for (var i = 0; i < loops; i++) { + send(); + } + + Console.WriteLine("| | | AVERAGES | {0,12:N0} | {1,10:N0} |", total_values[0]/loops, total_values[1]/loops); + Console.WriteLine(); + + server.Stop(); + client.Close(); + complete_test.Set(); + }; + + client.Connect(); + + complete_test.WaitOne(); + } + } + + enum RpcTestType { + NoReturn, + Return, + Exception, + Await, + Block, + LngBlock + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/ServerMessageType.cs b/src/DtronixMessageQueue.Tests.Performance/ServerMessageType.cs deleted file mode 100644 index e138025..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/ServerMessageType.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace DtronixMessageQueue.Tests.Performance -{ - public enum ServerMessageType : byte - { - Unset = 0, - Ready = 1, - Complete = 2, - ThroughputTransfer = 3 - } -} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs b/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs deleted file mode 100644 index 1444e38..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/ServerMqPerformanceTests.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using DtronixMessageQueue.Tests.Performance.TestSessions; - -namespace DtronixMessageQueue.Tests.Performance -{ - public class ServerMqPerformanceTests - { - private MqServer _server; - - public ServerMqPerformanceTests(string[] args) - { - _server = new MqServer(new MqConfig - { - Ip = "127.0.0.1", - Port = 2828 - }); - - _server.SessionSetup += (sender, eventArgs) => - { - eventArgs.Session.IsServer = true; - }; - - - } - - public void Start() - { - _server.Start(); - } - } -} diff --git a/src/DtronixMessageQueue.Tests.Performance/Services/Server/CalculatorService.cs b/src/DtronixMessageQueue.Tests.Performance/Services/Server/CalculatorService.cs index 5d34439..21d802e 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Services/Server/CalculatorService.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Services/Server/CalculatorService.cs @@ -1,30 +1,24 @@ using System; -namespace DtronixMessageQueue.Tests.Performance.Services.Server -{ - public class CalculatorService : MarshalByRefObject, ICalculatorService - { - public string Name { get; } = "CalculatorService"; - public SimpleRpcSession Session { get; set; } +namespace DtronixMessageQueue.Tests.Performance.Services.Server { + public class CalculatorService : MarshalByRefObject, ICalculatorService { + public string Name { get; } = "CalculatorService"; + public SimpleRpcSession Session { get; set; } - public int Add(int number1, int number2) - { - return number1 + number2; - } + public int Add(int number_1, int number_2) { + return number_1 + number_2; + } - public int Subtract(int number1, int number2) - { - return number1 - number2; - } + public int Subtract(int number_1, int number_2) { + return number_1 - number_2; + } - public int Multiply(int number1, int number2) - { - return number1 * number2; - } + public int Multiply(int number_1, int number_2) { + return number_1*number_2; + } - public int Divide(int number1, int number2) - { - return number1 / number2; - } - } -} \ No newline at end of file + public int Divide(int number_1, int number_2) { + return number_1/number_2; + } + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/Services/Server/ICalculatorService.cs b/src/DtronixMessageQueue.Tests.Performance/Services/Server/ICalculatorService.cs index 3bd1afe..01d80e8 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Services/Server/ICalculatorService.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Services/Server/ICalculatorService.cs @@ -1,12 +1,10 @@ using DtronixMessageQueue.Rpc; -namespace DtronixMessageQueue.Tests.Performance.Services.Server -{ - public interface ICalculatorService : IRemoteService - { - int Add(int number1, int number2); - int Subtract(int number1, int number2); - int Multiply(int number1, int number2); - int Divide(int number1, int number2); - } -} \ No newline at end of file +namespace DtronixMessageQueue.Tests.Performance.Services.Server { + public interface ICalculatorService : IRemoteService { + int Add(int number_1, int number_2); + int Subtract(int number_1, int number_2); + int Multiply(int number_1, int number_2); + int Divide(int number_1, int number_2); + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs b/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs index 0666628..5aae6e8 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs @@ -1,102 +1,99 @@ using System; +using System.Collections.Generic; +using System.Configuration; +using System.Diagnostics; +using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Xml.Serialization; using DtronixMessageQueue.Rpc; -namespace DtronixMessageQueue.Tests.Performance.Services.Server -{ - class TestService : MarshalByRefObject, ITestService - { - public string Name { get; } = "TestService"; - public SimpleRpcSession Session { get; set; } - - public event EventHandler Completed; - - private int _callCount = 0; - private int _totalCalls = 0; - - private bool _completed = false; - - public void TestNoReturn() - { - var number = Interlocked.Increment(ref _callCount); - //Console.Write($"{Thread.CurrentThread.ManagedThreadId} "); - - VerifyComplete(); - } - - public async void TestNoReturnAwait() - { - var number = Interlocked.Increment(ref _callCount); - await Task.Delay(1000); - - VerifyComplete(); - } - - public void TestNoReturnLongBlocking() - { - var number = Interlocked.Increment(ref _callCount); - Thread.Sleep(10000); - VerifyComplete(); - } - - - public void TestNoReturnBlock() - { - Task.Factory.StartNew(() => - { - var number = Interlocked.Increment(ref _callCount); - - Thread.Sleep(1000); - VerifyComplete(); - }, TaskCreationOptions.LongRunning); - } - - public int TestIncrement() - { - _callCount++; - VerifyComplete(); - return _callCount; - } - - public void TestSetup(int calls) - { - _totalCalls = calls; - } - - public bool ResetTest() - { - _callCount = 0; - _completed = false; - return true; - } - - public int TestException() - { - _callCount++; - VerifyComplete(); - throw new Exception("This is a test exception"); - } - - private void VerifyComplete() - { - if (_completed == false && _totalCalls == _callCount) - { - _completed = true; - Completed?.Invoke(this, Session); - } - } - } - - internal interface ITestService : IRemoteService - { - void TestNoReturn(); - void TestNoReturnAwait(); - void TestNoReturnBlock(); - void TestNoReturnLongBlocking(); - int TestIncrement(); - void TestSetup(int calls); - bool ResetTest(); - int TestException(); - } -} \ No newline at end of file +namespace DtronixMessageQueue.Tests.Performance.Services.Server { + class TestService : MarshalByRefObject, ITestService { + public string Name { get; } = "TestService"; + public SimpleRpcSession Session { get; set; } + + public event EventHandler Completed; + + private int call_count = 0; + private int total_calls = 0; + + private bool completed = false; + + public void TestNoReturn() { + var number = Interlocked.Increment(ref call_count); + //Console.Write($"{Thread.CurrentThread.ManagedThreadId} "); + + VerifyComplete(); + + } + + public async void TestNoReturnAwait() { + var number = Interlocked.Increment(ref call_count); + await Task.Delay(1000); + + VerifyComplete(); + + } + + public void TestNoReturnLongBlocking() { + var number = Interlocked.Increment(ref call_count); + Thread.Sleep(10000); + VerifyComplete(); + + } + + + public void TestNoReturnBlock() { + Task.Factory.StartNew(() => { + var number = Interlocked.Increment(ref call_count); + + Thread.Sleep(1000); + VerifyComplete(); + }, TaskCreationOptions.LongRunning); + } + + public int TestIncrement() { + call_count++; + VerifyComplete(); + return call_count; + } + + public void TestSetup(int calls) { + total_calls = calls; + } + + public bool ResetTest() { + call_count = 0; + completed = false; + return true; + } + + public int TestException() { + call_count++; + VerifyComplete(); + throw new Exception("This is a test exception"); + } + + private void VerifyComplete() { + if (completed == false && total_calls == call_count) { + completed = true; + Completed?.Invoke(this, Session); + } + } + + + } + + internal interface ITestService : IRemoteService { + void TestNoReturn(); + void TestNoReturnAwait(); + void TestNoReturnBlock(); + void TestNoReturnLongBlocking(); + int TestIncrement(); + void TestSetup(int calls); + bool ResetTest(); + int TestException(); + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/SimpleMqSession.cs b/src/DtronixMessageQueue.Tests.Performance/SimpleMqSession.cs new file mode 100644 index 0000000..d7b6935 --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/SimpleMqSession.cs @@ -0,0 +1,4 @@ +namespace DtronixMessageQueue.Tests.Performance { + public class SimpleMqSession : MqSession { + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/SimpleRpcSession.cs b/src/DtronixMessageQueue.Tests.Performance/SimpleRpcSession.cs index 58fec58..67f5199 100644 --- a/src/DtronixMessageQueue.Tests.Performance/SimpleRpcSession.cs +++ b/src/DtronixMessageQueue.Tests.Performance/SimpleRpcSession.cs @@ -1,8 +1,8 @@ -using DtronixMessageQueue.Rpc; +using System; +using DtronixMessageQueue.Rpc; -namespace DtronixMessageQueue.Tests.Performance -{ - public class SimpleRpcSession : RpcSession - { - } -} \ No newline at end of file +namespace DtronixMessageQueue.Tests.Performance { + + public class SimpleRpcSession : RpcSession { + } +} diff --git a/src/DtronixMessageQueue.Tests.Performance/TestMode.cs b/src/DtronixMessageQueue.Tests.Performance/TestMode.cs deleted file mode 100644 index 2c6c821..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/TestMode.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace DtronixMessageQueue.Tests.Performance -{ - public enum TestMode - { - Unset = 0, - Throughput = 1, - Repeat = 2 - } -} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqBaseTestSession.cs b/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqBaseTestSession.cs deleted file mode 100644 index aa1ac14..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqBaseTestSession.cs +++ /dev/null @@ -1,68 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace DtronixMessageQueue.Tests.Performance.TestSessions -{ - public abstract class MqBaseTestSession : MqSession - { - protected MqMessageReader Reader; - protected MqMessageWriter Writer; - protected readonly Stopwatch Stopwatch = new Stopwatch(); - protected Timer ResponseTimer; - - public bool IsServer { get; set; } - public bool CancelTest { get; set; } - - - protected Random Rand = new Random(); - - protected Thread TestThread; - - protected override void OnSetup() - { - base.OnSetup(); - - Reader = new MqMessageReader(); - Writer = new MqMessageWriter(Config); - - if (!IsServer) - { - TestThread = new Thread(TestThreadAction); - } - } - - protected override void OnIncomingMessage(object sender, IncomingMessageEventArgs e) - { - if (IsServer) - { - ServerMessage(e.Messages); - } - else - { - ClientMessage(e.Messages); - } - } - - - - - protected byte[] RandomBytes(int len) - { - var val = new byte[len]; - Rand.NextBytes(val); - return val; - } - - - - public abstract void StartTest(); - protected abstract void TestThreadAction(object state); - protected abstract void ClientMessage(Queue messageQueue); - protected abstract void ServerMessage(Queue messageQueue); - } -} diff --git a/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqThroughputTestSession.cs b/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqThroughputTestSession.cs deleted file mode 100644 index 90e812c..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/TestSessions/MqThroughputTestSession.cs +++ /dev/null @@ -1,133 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; -using DtronixMessageQueue.Socket; - -namespace DtronixMessageQueue.Tests.Performance.TestSessions -{ - public class MqThroughputTestSession : MqBaseTestSession - { - public long TotalThroughTime => _totalThroughTime; - public int TotalThroughMessages => _totalThroughMessages; - public int TotalThroughFrames => _totalThroughFrames; - public int TotalThroughBytes => _totalThroughBytes; - public DateTime StartedTime; - - private int _totalThroughBytes; - private int _totalThroughFrames; - private int _totalThroughMessages; - private long _totalThroughTime; - - - private int _configFrameSize; - private int _configFramesPerMessage; - - - public void ConfigTest(int frameSize, int framesPerMessage) - { - _configFrameSize = frameSize; - _configFramesPerMessage = framesPerMessage; - } - - public override void StartTest() - { - StartedTime = DateTime.Now; - TestThread.Start(this); - - if (IsServer) - { - ResponseTimer = new Timer(ThroughputResponse); - ResponseTimer.Change(1000, 1000); - Stopwatch.Restart(); - } - } - - protected override void ServerMessage(Queue messageQueue) - { - while (messageQueue.Count > 0) - { - var message = messageQueue.Dequeue(); - Interlocked.Add(ref _totalThroughBytes, message.Size); - _totalThroughMessages++; - Interlocked.Add(ref _totalThroughFrames, message.Count); - } - - } - - protected override void ClientMessage(Queue messageQueue) - { - while (messageQueue.Count > 0) - { - var message = messageQueue.Dequeue(); - Reader.Message = message; - var messageType = (ServerMessageType)Reader.ReadByte(); - - if (messageType == ServerMessageType.ThroughputTransfer) - { - _totalThroughBytes = Reader.ReadInt32(); - _totalThroughMessages = Reader.ReadInt32(); - _totalThroughFrames = Reader.ReadInt32(); - _totalThroughTime = Reader.ReadInt64(); - } - } - } - - private void ThroughputResponse(object state) - { - - using (var responseWriter = new MqMessageWriter(Config)) - { - var throughBytes = TotalThroughBytes; - var throughMessages = TotalThroughMessages; - var throughFrames = TotalThroughFrames; - - _totalThroughBytes = 0; - _totalThroughMessages = 0; - _totalThroughFrames = 0; - - responseWriter.Write((byte)ServerMessageType.ThroughputTransfer); - responseWriter.Write(throughBytes); - responseWriter.Write(throughMessages); - responseWriter.Write(throughFrames); - responseWriter.Write(Stopwatch.ElapsedMilliseconds); - - Stopwatch.Restart(); - - Send(responseWriter.ToMessage(true)); - } - } - - protected override void TestThreadAction(object state) - { - var session = (MqThroughputTestSession) state; - var message = new MqMessage(); - var frame = new MqFrame(RandomBytes(session._configFrameSize), session.Config); - - for (int i = 0; i < session._configFramesPerMessage; i++) - { - message.Add(frame); - } - - - // Send messages until it is called to stop. - while (!session.CancelTest) - { - session.Send(message); - } - - } - - public override void Close(SocketCloseReason reason) - { - if (ResponseTimer != null) - { - ResponseTimer.Change(-1, -1); - ResponseTimer.Dispose(); - } - base.Close(reason); - - } - } - -} \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/TestStartArgs.cs b/src/DtronixMessageQueue.Tests.Performance/TestStartArgs.cs deleted file mode 100644 index 0a63536..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/TestStartArgs.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace DtronixMessageQueue.Tests.Performance -{ - public class TestStartArgs - { - public StartMode Mode { get; set; } - - - - public enum StartMode - { - ServerRepeat, - ServerRespond, - Client - } - } -} diff --git a/src/DtronixMessageQueue.Tests.Performance/packages.config b/src/DtronixMessageQueue.Tests.Performance/packages.config new file mode 100644 index 0000000..8e3aad2 --- /dev/null +++ b/src/DtronixMessageQueue.Tests.Performance/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-mq-client.bat b/src/DtronixMessageQueue.Tests.Performance/run-mq-client.bat deleted file mode 100644 index 3a2e56b..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/run-mq-client.bat +++ /dev/null @@ -1 +0,0 @@ -DMQPerf.exe mq-throughput client 127.0.0.1 10 1024 3 \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-mq-multi-client.bat b/src/DtronixMessageQueue.Tests.Performance/run-mq-multi-client.bat deleted file mode 100644 index e2c5c07..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/run-mq-multi-client.bat +++ /dev/null @@ -1,2 +0,0 @@ -DMQPerf.exe mq setup 5 100000 4 50 10 -pause \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-mq-server.bat b/src/DtronixMessageQueue.Tests.Performance/run-mq-server.bat deleted file mode 100644 index 510892e..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/run-mq-server.bat +++ /dev/null @@ -1 +0,0 @@ -DMQPerf.exe mq-throughput server \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Performance/run-rpc.bat b/src/DtronixMessageQueue.Tests.Performance/run-rpc.bat deleted file mode 100644 index 882b5f8..0000000 --- a/src/DtronixMessageQueue.Tests.Performance/run-rpc.bat +++ /dev/null @@ -1,2 +0,0 @@ -DMQPerf.exe rpc -pause \ No newline at end of file diff --git a/src/DtronixMessageQueue/MqSession.cs b/src/DtronixMessageQueue/MqSession.cs index 560ff24..93e21ab 100644 --- a/src/DtronixMessageQueue/MqSession.cs +++ b/src/DtronixMessageQueue/MqSession.cs @@ -45,7 +45,7 @@ public abstract class MqSession : SocketSession /// Event fired when a new message has been processed by the Postmaster and ready to be read. @@ -56,7 +56,7 @@ public abstract class MqSession : SocketSession @@ -327,7 +327,7 @@ public void Send(MqMessage message) return; } - _sendingSemaphore.WaitOne(); + _sendingSemaphore.Wait(); lock (_outboxLock) {