Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Fixed timeout error on client re-connections.
Browse files Browse the repository at this point in the history
Fixed code formatting.
Passing all of GUIDs by reference.
  • Loading branch information
DJGosnell committed Jul 18, 2017
1 parent 30ac960 commit f406776
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public override void StartTest()
_config.SendAndReceiveBufferSize);


MqInProcessPerformanceTests(1000000, 2, _smallMessage, _config);

return;
MqInProcessPerformanceTests(1000000, 5, _smallMessage, _config);

MqInProcessPerformanceTests(100000, 5, _medimumMessage, _config);

Expand Down
36 changes: 20 additions & 16 deletions src/DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@
using System.Runtime.InteropServices;
using System.Threading;

namespace DtronixMessageQueue.Tests.Performance {
class Program {
namespace DtronixMessageQueue.Tests.Performance
{
class Program
{

[DllImport("kernel32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
static extern bool GetPhysicallyInstalledSystemMemory(out long total_memory_in_kilobytes);
[DllImport("kernel32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
static extern bool GetPhysicallyInstalledSystemMemory(out long total_memory_in_kilobytes);

static void Main(string[] args) {
var mode = args.Length == 0 ? null : args[0];
var file_name = string.Join("-", args);
using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{file_name}.txt")) {
PerformanceTestBase.WriteSysInfo();
static void Main(string[] args)
{
var mode = args.Length == 0 ? null : args[0];
var file_name = string.Join("-", args);
using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{file_name}.txt"))
{
PerformanceTestBase.WriteSysInfo();

Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}");
Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}");

Console.WriteLine("MQ Performance tests.\r\n");
new MqPerformanceTest().StartTest();
Expand All @@ -29,13 +33,13 @@ static void Main(string[] args) {

Console.WriteLine("RPC Performance tests.\r\n");
new RpcPerformanceTest(args);
}
}

Console.ReadLine();
Console.ReadLine();

}

}

}

}

}
12 changes: 5 additions & 7 deletions src/DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using DtronixMessageQueue.Socket;

namespace DtronixMessageQueue
Expand Down Expand Up @@ -37,10 +36,6 @@ public abstract class MqSession<TSession, TConfig> : SocketSession<TSession, TCo
/// </summary>
private readonly ConcurrentQueue<byte[]> _inboxBytes = new ConcurrentQueue<byte[]>();

private Task _outboxTask;

private Task _inboxTask;

private SemaphoreSlim _sendingSemaphore;

private SemaphoreSlim _receivingSemaphore;
Expand Down Expand Up @@ -72,7 +67,8 @@ protected override void HandleIncomingBytes(byte[] buffer)
_receivingSemaphore.Wait();

_inboxBytes.Enqueue(buffer);
InboxProcessor.QueueProcess(Id, ProcessIncomingQueue);
var id = Id;
InboxProcessor.QueueProcess(ref id, ProcessIncomingQueue);
}

/// <summary>
Expand Down Expand Up @@ -317,7 +313,9 @@ public void Send(MqMessage message)

_sendingSemaphore.Wait();
_outbox.Enqueue(message);
OutboxProcessor.QueueProcess(Id, ProcessOutbox);

var id = Id;
OutboxProcessor.QueueProcess(ref id, ProcessOutbox);
}

/// <summary>
Expand Down
20 changes: 10 additions & 10 deletions src/DtronixMessageQueue/Socket/SessionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,26 @@ public SessionProcessor(int threads, string name)
}
}

public void QueueProcess(T id, Action action)
public void QueueProcess(ref T id, Action action)
{
if (_registeredSessions.TryGetValue(id, out ProcessorThread processor))
{
processor.QueueProcess(id, action);
processor.QueueProcess(ref id, action);
}
}

public void Register(T id)
public void Register(ref T id)
{
var leastActive = _activeThreads.OrderByDescending(pt => pt.IdleTime).First();
leastActive.Register(id);
leastActive.Register(ref id);
_registeredSessions.TryAdd(id, leastActive);
}

public void Unregister(T id)
public void Unregister(ref T id)
{
if (_registeredSessions.TryRemove(id, out ProcessorThread pthread))
{
pthread.Unregister(id);
pthread.Unregister(ref id);
}
}

Expand Down Expand Up @@ -99,7 +99,7 @@ private class ProcessorThread

private readonly ConcurrentDictionary<T, float> _guidPerformance;

private int _registeredGuids = 0;
private int _registeredGuids;



Expand Down Expand Up @@ -151,7 +151,7 @@ private void Process()

}

public void QueueProcess(T id, Action action)
public void QueueProcess(ref T id, Action action)
{
Interlocked.Increment(ref _queued);

Expand All @@ -163,15 +163,15 @@ public void QueueProcess(T id, Action action)

}

public void Register(T id)
public void Register(ref T id)
{
if (_guidPerformance.TryAdd(id, 0))
{
Interlocked.Increment(ref _registeredGuids);
}
}

public void Unregister(T id)
public void Unregister(ref T id)
{
float perfValue;
if (_guidPerformance.TryRemove(id, out perfValue))
Expand Down
32 changes: 16 additions & 16 deletions src/DtronixMessageQueue/Socket/SocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,6 @@ public void Connect(IPEndPoint endPoint)
_connectionTimeoutCancellation = new CancellationTokenSource();


Task.Run(async () =>
{
try
{
await Task.Delay(Config.ConnectionTimeout, _connectionTimeoutCancellation.Token);
}
catch
{
return;
}

timedOut = true;
OnClose(null, SocketCloseReason.TimeOut);
MainSocket.Close();
}, _connectionTimeoutCancellation.Token);


var eventArg = new SocketAsyncEventArgs
Expand Down Expand Up @@ -113,8 +98,23 @@ public void Connect(IPEndPoint endPoint)
}
};


MainSocket.ConnectAsync(eventArg);

Task.Run(async () =>
{
try
{
await Task.Delay(Config.ConnectionTimeout, _connectionTimeoutCancellation.Token);
}
catch
{
return;
}

timedOut = true;
OnClose(null, SocketCloseReason.TimeOut);
MainSocket.Close();
}, _connectionTimeoutCancellation.Token);
}

protected override void OnClose(TSession session, SocketCloseReason reason)
Expand Down
12 changes: 8 additions & 4 deletions src/DtronixMessageQueue/Socket/SocketSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,13 @@ public static TSession Create(System.Net.Sockets.Socket sessionSocket, SocketAsy
session._receiveArgs = session._argsPool.Create();
session._receiveArgs.Completed += session.IoCompleted;

var id = session.Id;

session.InboxProcessor = inboxProcessor;
inboxProcessor.Register(session.Id);
inboxProcessor.Register(ref id);

session.OutboxProcessor = outboxProcessor;
outboxProcessor.Register(session.Id);
outboxProcessor.Register(ref id);


if (session._config.SendTimeout > 0)
Expand Down Expand Up @@ -400,8 +402,10 @@ public virtual void Close(SocketCloseReason reason)
_argsPool.Free(_sendArgs);
_argsPool.Free(_receiveArgs);

InboxProcessor.Unregister(Id);
OutboxProcessor.Unregister(Id);
var id = Id;

InboxProcessor.Unregister(ref id);
OutboxProcessor.Unregister(ref id);

CurrentState = State.Closed;

Expand Down

0 comments on commit f406776

Please sign in to comment.