Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Timeout can now be disabled via the configurations.
Browse files Browse the repository at this point in the history
Updated default send/receive buffer size to 16KB.
Timeout timer starts with the first client connection and ends with the last disconnected client.
  • Loading branch information
DJGosnell committed Aug 30, 2016
1 parent 44239fd commit 46e82da
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 26 deletions.
16 changes: 7 additions & 9 deletions DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,31 +163,31 @@ private static void StartServer(int total_messages, int total_clients) {
builder.Write("START");
var start_message = builder.ToMessage(true);

ConcurrentDictionary<MqSession, ClientRunInfo> client_infos = new ConcurrentDictionary<MqSession, ClientRunInfo>();
ConcurrentDictionary<MqSession, ClientRunInfo> clients_info = new ConcurrentDictionary<MqSession, ClientRunInfo>();


server.Connected += (sender, session) => {
var current_info = new ClientRunInfo() {
Session = session.Session,
Runs = 0
};
client_infos.TryAdd(session.Session, current_info);
clients_info.TryAdd(session.Session, current_info);

if (client_infos.Count == total_clients) {
if (clients_info.Count == total_clients) {

foreach (var mq_session in client_infos.Keys) {
foreach (var mq_session in clients_info.Keys) {
mq_session.Send(start_message);
}
}
};

server.Closed += (session, value) => {
ClientRunInfo info;
client_infos.TryRemove(value.Session, out info);
clients_info.TryRemove(value.Session, out info);
};

server.IncomingMessage += (sender, args) => {
var client_info = client_infos[args.Session];
var client_info = clients_info[args.Session];

// Count the total messages.
client_info.Runs += args.Messages.Count;
Expand Down Expand Up @@ -232,9 +232,7 @@ private static byte[] RandomBytes(int len) {
static void InProcessTest() {
var config = new MqSocketConfig {
Ip = "127.0.0.1",
Port = 2828,
SendAndReceiveBufferSize = 8000,
FrameBufferSize = 8000 - MqFrame.HeaderLength
Port = 2828
};


Expand Down
30 changes: 30 additions & 0 deletions DtronixMessageQueue.Tests.Performance/Results/i5-3470-8GB-16KB.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz with 8 GB of RAM installed.

FrameBufferSize: 15997; SendAndReceiveBufferSize: 16000

| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |
|---------|------------|-----------|--------------|------------|----------|
| Release | 1,000,000 | 200 | 1,273 | 785,545 | 157.11 |
| Release | 1,000,000 | 200 | 1,268 | 788,643 | 157.73 |
| Release | 1,000,000 | 200 | 1,244 | 803,858 | 160.77 |
| Release | 1,000,000 | 200 | 1,247 | 801,924 | 160.38 |
| Release | 1,000,000 | 200 | 1,233 | 811,030 | 162.21 |
| | | AVERAGES | 1,253 | 798,200 | 159.64 |

| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |
|---------|------------|-----------|--------------|------------|----------|
| Release | 100,000 | 2,000 | 645 | 155,038 | 310.08 |
| Release | 100,000 | 2,000 | 638 | 156,739 | 313.48 |
| Release | 100,000 | 2,000 | 636 | 157,232 | 314.47 |
| Release | 100,000 | 2,000 | 637 | 156,985 | 313.97 |
| Release | 100,000 | 2,000 | 634 | 157,728 | 315.46 |
| | | AVERAGES | 638 | 156,744 | 313.49 |

| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |
|---------|------------|-----------|--------------|------------|----------|
| Release | 10,000 | 60,048 | 1,893 | 5,282 | 317.21 |
| Release | 10,000 | 60,048 | 1,872 | 5,341 | 320.77 |
| Release | 10,000 | 60,048 | 1,858 | 5,382 | 323.19 |
| Release | 10,000 | 60,048 | 1,855 | 5,390 | 323.71 |
| Release | 10,000 | 60,048 | 1,864 | 5,364 | 322.15 |
| | | AVERAGES | 1,868 | 5,352 | 321.40 |
7 changes: 6 additions & 1 deletion DtronixMessageQueue.Tests/MqClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ public void Client_times_out() {
}
};

StartAndWait(true, 1000);
StartAndWait(false, 1000);

if (TestStatus.IsSet == false) {
throw new Exception("Socket did not timeout.");
}

}

[Fact]
Expand Down
10 changes: 7 additions & 3 deletions DtronixMessageQueue/MqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ public class MqClient : SocketClient<MqSession> {
/// <summary>
/// Timer used to verify that the sessions are still connected.
/// </summary>
private Timer timeout_timer;
private readonly Timer timeout_timer;


/// <summary>
/// Initializes a new instance of a message queue.
/// </summary>
public MqClient(MqSocketConfig config) : base(config) {

// Override the default connection limit and read/write workers.
config.MaxConnections = 1;
config.MaxReadWriteWorkers = 1;
timeout_timer = new Timer(TimeoutCallback);

postmaster = new MqPostmaster(config);

Setup();
Expand All @@ -43,7 +44,10 @@ public MqClient(MqSocketConfig config) : base(config) {
protected override void OnConnect(MqSession session) {
// Start the timeout timer.
var ping_frequency = ((MqSocketConfig) Config).PingFrequency;
timeout_timer.Change(ping_frequency / 2, ping_frequency);

if (ping_frequency > 0) {
timeout_timer.Change(ping_frequency/2, ping_frequency);
}

base.OnConnect(session);
}
Expand Down
33 changes: 25 additions & 8 deletions DtronixMessageQueue/MqServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@ public class MqServer : SocketServer<MqSession> {
/// </summary>
public event EventHandler<IncomingMessageEventArgs> IncomingMessage;

/// <summary>
/// True if the timeout timer is running. False otherwise.
/// </summary>
private bool timeout_timer_running = false;

/// <summary>
/// Timer used to verify that the sessions are still connected.
/// </summary>
private Timer timeout_timer;
private readonly Timer timeout_timer;

/// <summary>
/// Initializes a new instance of a message queue.
/// </summary>
public MqServer(MqSocketConfig config) : base(config) {
timeout_timer = new Timer(TimeoutCallback, ConnectedSessions, 0, config.PingTimeout);
timeout_timer = new Timer(TimeoutCallback);

postmaster = new MqPostmaster(config);

Expand All @@ -38,15 +43,10 @@ public MqServer(MqSocketConfig config) : base(config) {
/// </summary>
/// <param name="state">Concurrent dictionary of the sessions.</param>
private void TimeoutCallback(object state) {
var sessions = state as ConcurrentDictionary<Guid, MqSession>;
if (sessions.IsEmpty) {
return;
}

var timout_int = ((MqSocketConfig) Config).PingTimeout;
var timeout_time = DateTime.UtcNow.Subtract(new TimeSpan(0, 0, 0, 0, timout_int));

foreach (var session in sessions.Values) {
foreach (var session in ConnectedSessions.Values) {
if (session.LastReceived < timeout_time) {
session.CloseConnection(SocketCloseReason.TimeOut);
}
Expand All @@ -68,14 +68,31 @@ protected override MqSession CreateSession(System.Net.Sockets.Socket socket) {
session.Postmaster = postmaster;
session.IncomingMessage += OnIncomingMessage;
session.BaseSocket = this;

return session;
}

protected override void OnConnect(MqSession session) {
// Start the timeout timer if it is not already running.
if (timeout_timer_running == false) {
timeout_timer.Change(0, ((MqSocketConfig)Config).PingTimeout);
timeout_timer_running = true;
}

base.OnConnect(session);
}


protected override void OnClose(MqSession session, SocketCloseReason reason) {
MqSession out_session;
ConnectedSessions.TryRemove(session.Id, out out_session);

// If there are no clients connected, stop the timer.
if (ConnectedSessions.IsEmpty) {
timeout_timer.Change(Timeout.Infinite, Timeout.Infinite);
timeout_timer_running = false;
}

session.IncomingMessage -= OnIncomingMessage;
session.Dispose();
base.OnClose(session, reason);
Expand Down
8 changes: 5 additions & 3 deletions DtronixMessageQueue/MqSocketConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ public class MqSocketConfig : SocketConfig {
/// Max size of the frame. Needs to be equal or smaller than SendAndReceiveBufferSize.
/// Need to exclude the header length for a frame.
/// </summary>
public int FrameBufferSize { get; set; } = 1024 * 4 - MqFrame.HeaderLength;
public int FrameBufferSize { get; set; } = 1024 * 16 - MqFrame.HeaderLength;

/// <summary>
/// (Client)
/// Milliseconds between pings. -1 Disables pings.
/// Milliseconds between pings.
/// 0 disables pings.
/// </summary>
public int PingFrequency { get; set; } = -1;
public int PingFrequency { get; set; } = 0;

/// <summary>
/// (Server)
/// Max milliseconds since the last received packet before the session is disconnected.
/// 0 disables the automatic disconnection functionality.
/// </summary>
public int PingTimeout { get; set; } = 60000;

Expand Down
4 changes: 2 additions & 2 deletions DtronixMessageQueue/Socket/SocketConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class SocketConfig {
/// <summary>
/// Maximum number of connections allowed. Only used by the server.
/// </summary>
public int MaxConnections { get; set; } = 10;
public int MaxConnections { get; set; } = 1000;

/// <summary>
/// Maximum backlog for pending connections.
Expand All @@ -18,7 +18,7 @@ public class SocketConfig {
/// <summary>
/// Size of the buffer for the sockets.
/// </summary>
public int SendAndReceiveBufferSize { get; set; } = 1024*4;
public int SendAndReceiveBufferSize { get; set; } = 1024*16;

/// <summary>
/// Time in milliseconds it takes for the sending event to fail.
Expand Down

0 comments on commit 46e82da

Please sign in to comment.