diff --git a/DtronixMessageQueue.Tests.Performance/Program.cs b/DtronixMessageQueue.Tests.Performance/Program.cs index 4b78f68..a08ea01 100644 --- a/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/DtronixMessageQueue.Tests.Performance/Program.cs @@ -1,8 +1,11 @@ using System; +using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Management; using System.Runtime.InteropServices; using System.Threading; +using System.Threading.Tasks; using DtronixMessageQueue; using SuperSocket.SocketBase.Config; @@ -33,6 +36,8 @@ static void Main(string[] args) { new MqFrame(RandomBytes(50), MqFrameType.Last) }; + MqPerformanceTests(10000, 1, small_message); + MqPerformanceTests(1000000, 5, small_message); var medimum_message = new MqMessage { @@ -58,6 +63,117 @@ static void Main(string[] args) { Console.ReadLine(); } + private class ClientRunInfo { + public MqClient Client { get; set; } + public int Runs { get; set; } + public WaitHandle WaitHandle { get; set; } + public ManualResetEventSlim ManualResetEventSlim { get; set; } + public MqSession Session { get; set; } + public Stopwatch Stopwatch { get; set; } + + } + + private static void MqMultiClientPerformanceTests(int runs, int clients, MqMessage message) { + var server = new MqServer(new ServerConfig { + Ip = "127.0.0.1", + Port = 2828 + }); + server.Start(); + Dictionary client_infos = new Dictionary(); + + var connection_reset = new ManualResetEventSlim(); + ClientRunInfo current_info = null; + + server.NewSessionConnected += session => { + current_info.Session = session; + client_infos.Add(session, current_info); + + connection_reset.Set(); + + Console.Write(client_infos.Count + " "); + }; + + Console.Write("Clients connecting to server:"); + for (int i = 0; i < clients; i++) { + var reset = new ManualResetEventSlim(); + var cl = new MqClient(); + current_info = new ClientRunInfo() { + Client = cl, + ManualResetEventSlim = reset, + WaitHandle = reset.WaitHandle, + Runs = 0, + Stopwatch = new Stopwatch() + + }; + + + cl.ConnectAsync("127.0.0.1").Wait(); + + connection_reset.Wait(); + + } + + Console.WriteLine("\r\nClients connected to server Starting tests."); + + Console.WriteLine("| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |"); + Console.WriteLine("|---------|------------|-----------|--------------|------------|----------|"); + + + var message_size = message.Size; + + server.IncomingMessage += (sender, args) => { + MqMessage message_out; + + var client_info = client_infos[args.Session]; + while (args.Mailbox.Inbox.TryDequeue(out message_out)) { + client_info.Runs++; + } + + if (client_info.Runs == runs) { + client_info.Stopwatch.Stop(); + var mode = "Release"; + +#if DEBUG + mode = "Debug"; +#endif + + var messages_per_second = (int)((double)runs / sw.ElapsedMilliseconds * 1000); + var msg_size_no_header = message_size - 12; + var mbps = runs * (double)(msg_size_no_header) / sw.ElapsedMilliseconds / 1000; + Console.WriteLine("| {0,7} | {1,10:N0} | {2,9:N0} | {3,12:N0} | {4,10:N0} | {5,8:N2} |", mode, runs, msg_size_no_header, sw.ElapsedMilliseconds, messages_per_second, mbps); + + client_info.ManualResetEventSlim.Set(); + } + + }; + + + var client = new MqClient(); + + + foreach (var client_run_info in client_infos) { + Task.Run(() => { + client_run_info.Value.Stopwatch.Restart(); + for (var i = 0; i < runs; i++) { + client.Send(message); + } + MqServer sv = server; + client_run_info.Value.ManualResetEventSlim.Wait(); + client_run_info.Value.ManualResetEventSlim.Reset(); + }); + } + + var resets = client_infos.Values. + + + ManualResetEvent.WaitAll() + + Console.WriteLine("Complete"); + server.Stop(); + client.Close().Wait();*/ + + } + private static void MqPerformanceTests(int runs, int loops, MqMessage message) { var server = new MqServer(new ServerConfig { Ip = "127.0.0.1", @@ -71,7 +187,6 @@ private static void MqPerformanceTests(int runs, int loops, MqMessage message) { var sw = new Stopwatch(); var wait = new AutoResetEvent(false); - var client = new MqClient(); Console.WriteLine("| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |"); Console.WriteLine("|---------|------------|-----------|--------------|------------|----------|"); @@ -109,6 +224,7 @@ private static void MqPerformanceTests(int runs, int loops, MqMessage message) { }; + var client = new MqClient(); var send = new Action(() => { count = 0; diff --git a/DtronixMessageQueue/IncomingMessageEventArgs.cs b/DtronixMessageQueue/IncomingMessageEventArgs.cs index 29d652e..11704ac 100644 --- a/DtronixMessageQueue/IncomingMessageEventArgs.cs +++ b/DtronixMessageQueue/IncomingMessageEventArgs.cs @@ -7,9 +7,11 @@ namespace DtronixMessageQueue { public class IncomingMessageEventArgs : EventArgs { public MqMailbox Mailbox { get; set; } + public MqSession Session { get; set; } - public IncomingMessageEventArgs(MqMailbox mailbox) { + public IncomingMessageEventArgs(MqMailbox mailbox, MqSession session) { Mailbox = mailbox; + Session = session; } } } \ No newline at end of file diff --git a/DtronixMessageQueue/MqMailbox.cs b/DtronixMessageQueue/MqMailbox.cs index 3722c49..9bacb3e 100644 --- a/DtronixMessageQueue/MqMailbox.cs +++ b/DtronixMessageQueue/MqMailbox.cs @@ -150,7 +150,7 @@ private void SendBufferQueue(Queue buffer_queue, int length) { session.Send(buffer, 0, buffer.Length); } - postmaster.SignalWriteComplete(this); + } @@ -184,6 +184,7 @@ internal void ProcessOutbox() { // Send the last of the buffer queue. SendBufferQueue(buffer_queue, length); } + //postmaster.SignalWriteComplete(this); } /// @@ -232,7 +233,7 @@ internal async void ProcessIncomingQueue() { //postmaster.SignalReadComplete(this); if (new_message) { - IncomingMessage?.Invoke(this, new IncomingMessageEventArgs(this)); + IncomingMessage?.Invoke(this, new IncomingMessageEventArgs(this, session)); } } diff --git a/DtronixMessageQueue/MqPostmaster.cs b/DtronixMessageQueue/MqPostmaster.cs index 2f71f4b..9e7a648 100644 --- a/DtronixMessageQueue/MqPostmaster.cs +++ b/DtronixMessageQueue/MqPostmaster.cs @@ -24,21 +24,7 @@ public class MqPostmaster : IDisposable { public MqPostmaster() { // Add a supervisor to review when it is needed to increase or decrease the worker numbers. - supervisor = new MqWorker(async (worker) => { - while (true) { - - if (ProcessWorkers(read_workers)) { - CreateReadWorker(); - } - - if (ProcessWorkers( write_workers)) { - //CreateWriteWorker(); - - } - - await Task.Delay(500); - } - }, "postmaster_supervisor"); + supervisor = new MqWorker(SupervisorWork, "postmaster_supervisor"); // Create one reader and one writer workers to start off with. CreateReadWorker(); @@ -48,40 +34,29 @@ public MqPostmaster() { supervisor.Start(); } + private void SupervisorWork(MqWorker worker) { + while (worker.Token.IsCancellationRequested == false) { + if (ProcessWorkers(read_workers)) { + CreateReadWorker(); + } - public bool SignalWrite(MqMailbox mailbox) { - return ongoing_write_operations.TryAdd(mailbox, true) && write_operations.TryAdd(mailbox); - } - - public bool SignalRead(MqMailbox mailbox) { - return ongoing_read_operations.TryAdd(mailbox, true) && read_operations.TryAdd(mailbox); + if (ProcessWorkers(write_workers)) { + CreateWriteWorker(); + } + Thread.Sleep(500); + } } - public void SignalWriteComplete(MqMailbox mailbox) { - bool out_mailbox; - ongoing_write_operations.TryRemove(mailbox, out out_mailbox); + public MqPostmaster(MqServer server) { } - public bool SignalReadComplete(MqMailbox mailbox) { - bool out_mailbox; - return ongoing_read_operations.TryRemove(mailbox, out out_mailbox); + public bool SignalWrite(MqMailbox mailbox) { + return ongoing_write_operations.TryAdd(mailbox, true) && write_operations.TryAdd(mailbox); } - private async void SuperviseWorkers(MqWorker worker) { - while (true) { - - //if (ProcessWorkers(read_workers)) { - //CreateReadWorker(); - //} - - //if (ProcessWorkers( write_workers)) { - //CreateWriteWorker(); - - //} - - await Task.Delay(500); - } + public bool SignalRead(MqMailbox mailbox) { + return ongoing_read_operations.TryAdd(mailbox, true) && read_operations.TryAdd(mailbox); } @@ -110,6 +85,7 @@ public void CreateWriteWorker() { Console.WriteLine("Created write worker."); var writer_worker = new MqWorker(worker => { MqMailbox mailbox = null; + bool out_mailbox; try { worker.StartIdle(); @@ -117,8 +93,7 @@ public void CreateWriteWorker() { worker.StartWork(); mailbox.ProcessOutbox(); worker.StartIdle(); - //bool out_mailbox; - //ongoing_write_operations.TryRemove(mailbox, out out_mailbox); + ongoing_write_operations.TryRemove(mailbox, out out_mailbox); } } catch (ThreadAbortException) { diff --git a/DtronixMessageQueue/MqWorker.cs b/DtronixMessageQueue/MqWorker.cs index 72ac429..cf088c4 100644 --- a/DtronixMessageQueue/MqWorker.cs +++ b/DtronixMessageQueue/MqWorker.cs @@ -44,6 +44,7 @@ public long AverageIdleTime { private Thread worker_thread; public MqWorker(Action work, string name) { + idle_stopwatch.Start(); this.work = work; Token = cancellation_source.Token; worker_thread = new Thread(ProcessQueue) { @@ -58,7 +59,6 @@ public MqWorker(Action work, string name) { /// Start the worker. /// public void Start() { - idle_stopwatch.Start(); worker_thread.Start(this); //worker_task.Start(); } @@ -104,9 +104,9 @@ private void ProcessQueue(object o) { } public void Dispose() { - /*if (worker_task.IsCanceled == false) { + if (worker_thread.IsAlive) { Stop(); - }*/ + } } } } \ No newline at end of file