From 46e82dac12bfc21a8cf12b1cea1acf35371944f6 Mon Sep 17 00:00:00 2001 From: DJGosnell Date: Tue, 30 Aug 2016 13:40:21 -0400 Subject: [PATCH] Timeout can now be disabled via the configurations. Updated default send/receive buffer size to 16KB. Timeout timer starts with the first client connection and ends with the last disconnected client. --- .../Program.cs | 16 ++++----- .../Results/i5-3470-8GB-16KB.md | 30 +++++++++++++++++ .../{i5-3470-8GB.md => i5-3470-8GB-8KB.md} | 0 DtronixMessageQueue.Tests/MqClientTests.cs | 7 +++- DtronixMessageQueue/MqClient.cs | 10 ++++-- DtronixMessageQueue/MqServer.cs | 33 ++++++++++++++----- DtronixMessageQueue/MqSocketConfig.cs | 8 +++-- DtronixMessageQueue/Socket/SocketConfig.cs | 4 +-- 8 files changed, 82 insertions(+), 26 deletions(-) create mode 100644 DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-16KB.md rename DtronixMessageQueue.Tests.Performance/Results/{i5-3470-8GB.md => i5-3470-8GB-8KB.md} (100%) diff --git a/DtronixMessageQueue.Tests.Performance/Program.cs b/DtronixMessageQueue.Tests.Performance/Program.cs index 6daf632..149e462 100644 --- a/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/DtronixMessageQueue.Tests.Performance/Program.cs @@ -163,7 +163,7 @@ private static void StartServer(int total_messages, int total_clients) { builder.Write("START"); var start_message = builder.ToMessage(true); - ConcurrentDictionary client_infos = new ConcurrentDictionary(); + ConcurrentDictionary clients_info = new ConcurrentDictionary(); server.Connected += (sender, session) => { @@ -171,11 +171,11 @@ private static void StartServer(int total_messages, int total_clients) { Session = session.Session, Runs = 0 }; - client_infos.TryAdd(session.Session, current_info); + clients_info.TryAdd(session.Session, current_info); - if (client_infos.Count == total_clients) { + if (clients_info.Count == total_clients) { - foreach (var mq_session in client_infos.Keys) { + foreach (var mq_session in clients_info.Keys) { mq_session.Send(start_message); } } @@ -183,11 +183,11 @@ private static void StartServer(int total_messages, int total_clients) { server.Closed += (session, value) => { ClientRunInfo info; - client_infos.TryRemove(value.Session, out info); + clients_info.TryRemove(value.Session, out info); }; server.IncomingMessage += (sender, args) => { - var client_info = client_infos[args.Session]; + var client_info = clients_info[args.Session]; // Count the total messages. client_info.Runs += args.Messages.Count; @@ -232,9 +232,7 @@ private static byte[] RandomBytes(int len) { static void InProcessTest() { var config = new MqSocketConfig { Ip = "127.0.0.1", - Port = 2828, - SendAndReceiveBufferSize = 8000, - FrameBufferSize = 8000 - MqFrame.HeaderLength + Port = 2828 }; diff --git a/DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-16KB.md b/DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-16KB.md new file mode 100644 index 0000000..da95d8e --- /dev/null +++ b/DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-16KB.md @@ -0,0 +1,30 @@ +Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz with 8 GB of RAM installed. + +FrameBufferSize: 15997; SendAndReceiveBufferSize: 16000 + +| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps | +|---------|------------|-----------|--------------|------------|----------| +| Release | 1,000,000 | 200 | 1,273 | 785,545 | 157.11 | +| Release | 1,000,000 | 200 | 1,268 | 788,643 | 157.73 | +| Release | 1,000,000 | 200 | 1,244 | 803,858 | 160.77 | +| Release | 1,000,000 | 200 | 1,247 | 801,924 | 160.38 | +| Release | 1,000,000 | 200 | 1,233 | 811,030 | 162.21 | +| | | AVERAGES | 1,253 | 798,200 | 159.64 | + +| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps | +|---------|------------|-----------|--------------|------------|----------| +| Release | 100,000 | 2,000 | 645 | 155,038 | 310.08 | +| Release | 100,000 | 2,000 | 638 | 156,739 | 313.48 | +| Release | 100,000 | 2,000 | 636 | 157,232 | 314.47 | +| Release | 100,000 | 2,000 | 637 | 156,985 | 313.97 | +| Release | 100,000 | 2,000 | 634 | 157,728 | 315.46 | +| | | AVERAGES | 638 | 156,744 | 313.49 | + +| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps | +|---------|------------|-----------|--------------|------------|----------| +| Release | 10,000 | 60,048 | 1,893 | 5,282 | 317.21 | +| Release | 10,000 | 60,048 | 1,872 | 5,341 | 320.77 | +| Release | 10,000 | 60,048 | 1,858 | 5,382 | 323.19 | +| Release | 10,000 | 60,048 | 1,855 | 5,390 | 323.71 | +| Release | 10,000 | 60,048 | 1,864 | 5,364 | 322.15 | +| | | AVERAGES | 1,868 | 5,352 | 321.40 | \ No newline at end of file diff --git a/DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB.md b/DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-8KB.md similarity index 100% rename from DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB.md rename to DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-8KB.md diff --git a/DtronixMessageQueue.Tests/MqClientTests.cs b/DtronixMessageQueue.Tests/MqClientTests.cs index 77e3729..2ec68ef 100644 --- a/DtronixMessageQueue.Tests/MqClientTests.cs +++ b/DtronixMessageQueue.Tests/MqClientTests.cs @@ -205,7 +205,12 @@ public void Client_times_out() { } }; - StartAndWait(true, 1000); + StartAndWait(false, 1000); + + if (TestStatus.IsSet == false) { + throw new Exception("Socket did not timeout."); + } + } [Fact] diff --git a/DtronixMessageQueue/MqClient.cs b/DtronixMessageQueue/MqClient.cs index f487314..67239ed 100644 --- a/DtronixMessageQueue/MqClient.cs +++ b/DtronixMessageQueue/MqClient.cs @@ -24,17 +24,18 @@ public class MqClient : SocketClient { /// /// Timer used to verify that the sessions are still connected. /// - private Timer timeout_timer; + private readonly Timer timeout_timer; /// /// Initializes a new instance of a message queue. /// public MqClient(MqSocketConfig config) : base(config) { + + // Override the default connection limit and read/write workers. config.MaxConnections = 1; config.MaxReadWriteWorkers = 1; timeout_timer = new Timer(TimeoutCallback); - postmaster = new MqPostmaster(config); Setup(); @@ -43,7 +44,10 @@ public MqClient(MqSocketConfig config) : base(config) { protected override void OnConnect(MqSession session) { // Start the timeout timer. var ping_frequency = ((MqSocketConfig) Config).PingFrequency; - timeout_timer.Change(ping_frequency / 2, ping_frequency); + + if (ping_frequency > 0) { + timeout_timer.Change(ping_frequency/2, ping_frequency); + } base.OnConnect(session); } diff --git a/DtronixMessageQueue/MqServer.cs b/DtronixMessageQueue/MqServer.cs index d0ca48d..2cf8b7d 100644 --- a/DtronixMessageQueue/MqServer.cs +++ b/DtronixMessageQueue/MqServer.cs @@ -15,16 +15,21 @@ public class MqServer : SocketServer { /// public event EventHandler IncomingMessage; + /// + /// True if the timeout timer is running. False otherwise. + /// + private bool timeout_timer_running = false; + /// /// Timer used to verify that the sessions are still connected. /// - private Timer timeout_timer; + private readonly Timer timeout_timer; /// /// Initializes a new instance of a message queue. /// public MqServer(MqSocketConfig config) : base(config) { - timeout_timer = new Timer(TimeoutCallback, ConnectedSessions, 0, config.PingTimeout); + timeout_timer = new Timer(TimeoutCallback); postmaster = new MqPostmaster(config); @@ -38,15 +43,10 @@ public MqServer(MqSocketConfig config) : base(config) { /// /// Concurrent dictionary of the sessions. private void TimeoutCallback(object state) { - var sessions = state as ConcurrentDictionary; - if (sessions.IsEmpty) { - return; - } - var timout_int = ((MqSocketConfig) Config).PingTimeout; var timeout_time = DateTime.UtcNow.Subtract(new TimeSpan(0, 0, 0, 0, timout_int)); - foreach (var session in sessions.Values) { + foreach (var session in ConnectedSessions.Values) { if (session.LastReceived < timeout_time) { session.CloseConnection(SocketCloseReason.TimeOut); } @@ -68,14 +68,31 @@ protected override MqSession CreateSession(System.Net.Sockets.Socket socket) { session.Postmaster = postmaster; session.IncomingMessage += OnIncomingMessage; session.BaseSocket = this; + return session; } + protected override void OnConnect(MqSession session) { + // Start the timeout timer if it is not already running. + if (timeout_timer_running == false) { + timeout_timer.Change(0, ((MqSocketConfig)Config).PingTimeout); + timeout_timer_running = true; + } + + base.OnConnect(session); + } + protected override void OnClose(MqSession session, SocketCloseReason reason) { MqSession out_session; ConnectedSessions.TryRemove(session.Id, out out_session); + // If there are no clients connected, stop the timer. + if (ConnectedSessions.IsEmpty) { + timeout_timer.Change(Timeout.Infinite, Timeout.Infinite); + timeout_timer_running = false; + } + session.IncomingMessage -= OnIncomingMessage; session.Dispose(); base.OnClose(session, reason); diff --git a/DtronixMessageQueue/MqSocketConfig.cs b/DtronixMessageQueue/MqSocketConfig.cs index 9a4549c..60e8da5 100644 --- a/DtronixMessageQueue/MqSocketConfig.cs +++ b/DtronixMessageQueue/MqSocketConfig.cs @@ -7,17 +7,19 @@ public class MqSocketConfig : SocketConfig { /// Max size of the frame. Needs to be equal or smaller than SendAndReceiveBufferSize. /// Need to exclude the header length for a frame. /// - public int FrameBufferSize { get; set; } = 1024 * 4 - MqFrame.HeaderLength; + public int FrameBufferSize { get; set; } = 1024 * 16 - MqFrame.HeaderLength; /// /// (Client) - /// Milliseconds between pings. -1 Disables pings. + /// Milliseconds between pings. + /// 0 disables pings. /// - public int PingFrequency { get; set; } = -1; + public int PingFrequency { get; set; } = 0; /// /// (Server) /// Max milliseconds since the last received packet before the session is disconnected. + /// 0 disables the automatic disconnection functionality. /// public int PingTimeout { get; set; } = 60000; diff --git a/DtronixMessageQueue/Socket/SocketConfig.cs b/DtronixMessageQueue/Socket/SocketConfig.cs index b8b2eab..b5aa24b 100644 --- a/DtronixMessageQueue/Socket/SocketConfig.cs +++ b/DtronixMessageQueue/Socket/SocketConfig.cs @@ -7,7 +7,7 @@ public class SocketConfig { /// /// Maximum number of connections allowed. Only used by the server. /// - public int MaxConnections { get; set; } = 10; + public int MaxConnections { get; set; } = 1000; /// /// Maximum backlog for pending connections. @@ -18,7 +18,7 @@ public class SocketConfig { /// /// Size of the buffer for the sockets. /// - public int SendAndReceiveBufferSize { get; set; } = 1024*4; + public int SendAndReceiveBufferSize { get; set; } = 1024*16; /// /// Time in milliseconds it takes for the sending event to fail.