Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Removed ref requirement for register/deregister.
Browse files Browse the repository at this point in the history
Limited number of queued actions to 1.
Actions are registered now before the action can be invoked and the method is invoked via the GUID.
ProcessActions are now created once and then queued accordingly.
  • Loading branch information
DJGosnell committed Jul 19, 2017
1 parent 019bf2f commit 097a327
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 77 deletions.
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ static void Main(string[] args)
Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}");

Console.WriteLine("MQ Performance tests.\r\n");
// new MqPerformanceTest().StartTest();
new MqPerformanceTest().StartTest();

//Console.ReadLine();
Console.ReadLine();

Console.WriteLine("RPC Performance tests.\r\n");
new RpcPerformanceTest(args);
Expand Down
11 changes: 6 additions & 5 deletions src/DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ protected override void OnSetup()
_frameBuilder = new MqFrameBuilder(Config);
_sendingSemaphore = new SemaphoreSlim(Config.MaxQueuedOutgoingMessages, Config.MaxQueuedOutgoingMessages);
_receivingSemaphore = new SemaphoreSlim(Config.MaxQueuedInboundPackets, Config.MaxQueuedInboundPackets);

InboxProcessor.Register(Id, ProcessIncomingQueue);
OutboxProcessor.Register(Id, ProcessOutbox);
}

/// <summary>
Expand All @@ -67,8 +70,7 @@ protected override void HandleIncomingBytes(byte[] buffer)
_receivingSemaphore.Wait();

_inboxBytes.Enqueue(buffer);
var id = Id;
InboxProcessor.QueueProcess(ref id, ProcessIncomingQueue);
InboxProcessor.Queue(Id);
}

/// <summary>
Expand Down Expand Up @@ -276,7 +278,7 @@ public override void Close(SocketCloseReason reason)
msg = new MqMessage(closeFrame);
_outbox.Enqueue(msg);

// QueueProcess the last bit of data.
// Queue the last bit of data.
ProcessOutbox();
}

Expand Down Expand Up @@ -314,8 +316,7 @@ public void Send(MqMessage message)
_sendingSemaphore.Wait();
_outbox.Enqueue(message);

var id = Id;
OutboxProcessor.QueueProcess(ref id, ProcessOutbox);
OutboxProcessor.Queue(Id);
}

/// <summary>
Expand Down
109 changes: 50 additions & 59 deletions src/DtronixMessageQueue/Socket/SessionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DtronixMessageQueue
namespace DtronixMessageQueue.Socket
{
/// <summary>
/// Handles all inbox and outbox queue processing
Expand All @@ -16,7 +14,8 @@ public class SessionProcessor<T>
{
private readonly string _name;
private readonly List<ProcessorThread> _activeThreads;
private readonly ConcurrentDictionary<T, ProcessorThread> _registeredSessions;
private readonly ConcurrentDictionary<T, ProcessAction> _registeredActions;
//private readonly ConcurrentDictionary<T, ProcessorThread> _registeredSessions;

public bool IsRunning { get; set; }

Expand All @@ -26,10 +25,12 @@ public SessionProcessor(string name) : this(Environment.ProcessorCount, name)

}


public SessionProcessor(int threads, string name)
{
_name = name;
_registeredSessions = new ConcurrentDictionary<T, ProcessorThread>();
//_registeredSessions = new ConcurrentDictionary<T, ProcessorThread>();
_registeredActions = new ConcurrentDictionary<T, ProcessAction>();

_activeThreads = new List<ProcessorThread>(threads);
for (int i = 0; i < threads; i++)
Expand All @@ -39,26 +40,42 @@ public SessionProcessor(int threads, string name)
}
}

public void QueueProcess(ref T id, Action action)
public void Queue(T id)
{
if (_registeredSessions.TryGetValue(id, out ProcessorThread processor))
if (_registeredActions.TryGetValue(id, out ProcessAction processAction))
{
processor.QueueProcess(ref id, action);
if (processAction.QueuedCount < 1)
{
processAction.ProcessorThread.Queue(processAction);
}
}
}

public void Register(ref T id)
public void Register(T id, Action action)
{
var leastActive = _activeThreads.OrderByDescending(pt => pt.IdleTime).First();
leastActive.Register(ref id);
_registeredSessions.TryAdd(id, leastActive);
var leastActiveProcessor = _activeThreads.OrderByDescending(pt => pt.IdleTime).First();


var processAction = new ProcessAction
{
Action = action,
Guid = id,
ProcessorThread = leastActiveProcessor
};

Interlocked.Increment(ref leastActiveProcessor.RegisteredActionsCount);

if (_registeredActions.TryAdd(id, processAction) == false)
{
throw new InvalidOperationException($"Id {id} is already registered.");
}
}

public void Unregister(ref T id)
public void Deregister(T id)
{
if (_registeredSessions.TryRemove(id, out ProcessorThread pthread))
if (_registeredActions.TryRemove(id, out ProcessAction processAction))
{
pthread.Unregister(ref id);
Interlocked.Increment(ref processAction.ProcessorThread.RegisteredActionsCount);
}
}

Expand All @@ -82,8 +99,11 @@ public void Stop()

private class ProcessAction
{
public T Guid { get; set; }
public Action Action { get; set; }
public T Guid;
public Action Action;
public float AverageUsageTime;
public ProcessorThread ProcessorThread;
public int QueuedCount;
}


Expand All @@ -96,30 +116,23 @@ private class ProcessorThread
private CancellationTokenSource _cancellationTokenSource;
private Stopwatch _perfStopwatch;
private float _idleTime;

private readonly ConcurrentDictionary<T, float> _guidPerformance;

private int _registeredGuids;

public int RegisteredActionsCount = 0;



public int Queued => _queued;

public bool IsRunning => _isRunning;

public int RegisteredGuids => _registeredGuids;

public float IdleTime => _idleTime;

public ProcessorThread(string name)
{
_queued = 0;
_thread = new Thread(Process);
_thread.Name = name;
_thread.IsBackground = true;
_thread = new Thread(Process)
{
Name = name,
IsBackground = true
};

_actions = new BlockingCollection<ProcessAction>();
_guidPerformance = new ConcurrentDictionary<T, float>();
}

private void Process()
Expand All @@ -130,6 +143,7 @@ private void Process()
while (_actions.TryTake(out ProcessAction action, 10000, _cancellationTokenSource.Token))
{

Interlocked.Decrement(ref action.QueuedCount);
Interlocked.Decrement(ref _queued);
// Update the idle time
RollingEstimate(ref _idleTime, _perfStopwatch.ElapsedMilliseconds, 10);
Expand All @@ -141,44 +155,21 @@ private void Process()


// Add this performance to the estimated rolling average.
if (_guidPerformance.TryGetValue(action.Guid, out float previousAverage))
{
RollingEstimate(ref previousAverage, _perfStopwatch.ElapsedMilliseconds, 10);
}
RollingEstimate(ref action.AverageUsageTime, _perfStopwatch.ElapsedMilliseconds, 10);


}
}
_perfStopwatch.Stop();

}

public void QueueProcess(ref T id, Action action)
public void Queue(ProcessAction processAction)
{
Interlocked.Increment(ref _queued);
Interlocked.Increment(ref processAction.QueuedCount);
_actions.TryAdd(processAction);

_actions.TryAdd(new ProcessAction
{
Guid = id,
Action = action
});

}

public void Register(ref T id)
{
if (_guidPerformance.TryAdd(id, 0))
{
Interlocked.Increment(ref _registeredGuids);
}
}

public void Unregister(ref T id)
{
float perfValue;
if (_guidPerformance.TryRemove(id, out perfValue))
{
Interlocked.Decrement(ref _registeredGuids);
}
}

public void Stop()
Expand Down
20 changes: 9 additions & 11 deletions src/DtronixMessageQueue/Socket/SocketSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,13 @@ protected SocketSession()
/// <param name="socketArgsManager">Argument pool for this session to use. Pulls two asyncevents for reading and writing and returns them at the end of this socket's life.</param>
/// <param name="sessionConfig">Socket configurations this session is to use.</param>
/// <param name="sessionHandler">Handler base which is handling this session.</param>
public static TSession Create(System.Net.Sockets.Socket sessionSocket, SocketAsyncEventArgsManager socketArgsManager,
TConfig sessionConfig, SessionHandler<TSession, TConfig> sessionHandler, SessionProcessor<Guid> inboxProcessor,
/// <param name="inboxProcessor">Processor which handles all inboxdata.</param>
/// /// <param name="outboxProcessor">Processor which handles all outbox data.</param>
public static TSession Create(System.Net.Sockets.Socket sessionSocket,
SocketAsyncEventArgsManager socketArgsManager,
TConfig sessionConfig,
SessionHandler<TSession, TConfig> sessionHandler,
SessionProcessor<Guid> inboxProcessor,
SessionProcessor<Guid> outboxProcessor)
{
var session = new TSession
Expand All @@ -170,14 +175,9 @@ public static TSession Create(System.Net.Sockets.Socket sessionSocket, SocketAsy
session._receiveArgs = session._argsPool.Create();
session._receiveArgs.Completed += session.IoCompleted;

var id = session.Id;

session.InboxProcessor = inboxProcessor;
inboxProcessor.Register(ref id);

session.OutboxProcessor = outboxProcessor;
outboxProcessor.Register(ref id);


if (session._config.SendTimeout > 0)
session._socket.SendTimeout = session._config.SendTimeout;
Expand Down Expand Up @@ -402,10 +402,8 @@ public virtual void Close(SocketCloseReason reason)
_argsPool.Free(_sendArgs);
_argsPool.Free(_receiveArgs);

var id = Id;

InboxProcessor.Unregister(ref id);
OutboxProcessor.Unregister(ref id);
InboxProcessor.Deregister(Id);
OutboxProcessor.Deregister(Id);

CurrentState = State.Closed;

Expand Down

0 comments on commit 097a327

Please sign in to comment.