diff --git a/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs b/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs index 733ec35..c726059 100644 --- a/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs +++ b/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs @@ -12,20 +12,21 @@ namespace DtronixMessageQueue.Tests public class ActionProcessorTests { private readonly ITestOutputHelper _output; + private ManualResetEventSlim completeEvent = new ManualResetEventSlim(false); public ActionProcessorTests(ITestOutputHelper output) { _output = output; } - private ActionProcessor CreateProcessor(int threads, bool start, int rebalanceTime = 10000) + private ActionProcessor CreateProcessor(int threads, bool start, Action> complete = null, int rebalanceTime = 10000) { var processor = new ActionProcessor(new ActionProcessor.Config { ThreadName = "test", StartThreads = threads, RebalanceLoadPeriod = rebalanceTime - }); + }, complete); if (start) processor.Start(); @@ -125,34 +126,6 @@ public void Processor_adds_to_least_used_thread() Assert.Equal(firstRegisteredAction.ProcessorThread, thirdRegisteredAction.ProcessorThread); } - [Fact] - public void Processor_transfers_queued_actions_to_other_thread_on_removal() - { - var processor = CreateProcessor(2, true); - - var firstRegisteredAction = RegisterQueueGet(processor, () => Thread.Sleep(5000)); - processor.Queue(firstRegisteredAction.Id); - - var secondRegisteredAction = RegisterQueueGet(processor, () => Thread.Sleep(5000)); - processor.Queue(secondRegisteredAction.Id); - - var oldThread = firstRegisteredAction.ProcessorThread; - - processor.RemoveThread(1); - - Thread.Sleep(10); - - Assert.Equal(0, oldThread.RegisteredActionsCount); - - Assert.Equal(firstRegisteredAction.ProcessorThread, secondRegisteredAction.ProcessorThread); - - Assert.Equal(2, secondRegisteredAction.ProcessorThread.RegisteredActionsCount); - - Assert.Equal(2, secondRegisteredAction.ProcessorThread.Queued); - - } - - [Fact] public void Processor_queues_once() { @@ -184,10 +157,12 @@ public void Processor_queues_multiple() } [Fact] - public void Processor_balances_on_new_thread() + public void Processor_balances_on_removed_thread() { - var processor = CreateProcessor(2, true); - int interlockedInt = 0; + Action> complete = ap => completeEvent.Set(); + var processor = CreateProcessor(2, true, complete); + + var interlockedInt = 0; var action = (Action) (() => { @@ -199,7 +174,7 @@ public void Processor_balances_on_new_thread() var secondRegisteredAction = RegisterGet(processor, action); var thirdRegisteredAction = RegisterGet(processor, () => processor.RemoveThread(1)); - var totalLoops = 100; + const int totalLoops = 10; for (int i = 0; i < totalLoops; i++) { @@ -211,17 +186,50 @@ public void Processor_balances_on_new_thread() processor.Queue(thirdRegisteredAction.Id); } - Thread.Sleep(500); + Assert.True(completeEvent.Wait(2000)); Assert.Equal(totalLoops * 2, interlockedInt); Assert.Equal(3, thirdRegisteredAction.ProcessorThread.RegisteredActionsCount); } + [Fact] + public void Processor_balances_on_added_thread() + { + Action> complete = ap => completeEvent.Set(); + var processor = CreateProcessor(2, true, complete); + + var interlockedInt = 0; + var action = (Action)(() => + { + Interlocked.Increment(ref interlockedInt); + Thread.Sleep(1); + }); + var firstRegisteredAction = RegisterGet(processor, action); + var secondRegisteredAction = RegisterGet(processor, action); + var thirdRegisteredAction = RegisterGet(processor, () => processor.AddThread(1)); + const int totalLoops = 100; + for (int i = 0; i < totalLoops; i++) + { + processor.Queue(firstRegisteredAction.Id); + processor.Queue(secondRegisteredAction.Id); + + // Half way through the loop, remove a thread. + if (i == totalLoops / 2) + processor.Queue(thirdRegisteredAction.Id); + } + + Assert.True(completeEvent.Wait(2000)); + Assert.Equal(totalLoops * 2, interlockedInt); + + Assert.Equal(1, firstRegisteredAction.ProcessorThread.RegisteredActionsCount); + Assert.Equal(1, secondRegisteredAction.ProcessorThread.RegisteredActionsCount); + Assert.Equal(1, thirdRegisteredAction.ProcessorThread.RegisteredActionsCount); + } } } diff --git a/src/DtronixMessageQueue/ActionProcessor.cs b/src/DtronixMessageQueue/ActionProcessor.cs index e7172c8..e8fa221 100644 --- a/src/DtronixMessageQueue/ActionProcessor.cs +++ b/src/DtronixMessageQueue/ActionProcessor.cs @@ -53,7 +53,7 @@ public enum SortOrder /// /// Timer to execute the supervisor method. /// - private Timer _supervisorTimer; + private readonly Timer _supervisorTimer; /// /// Actions currently registered with this processor. @@ -61,7 +61,7 @@ public enum SortOrder private readonly ConcurrentDictionary _registeredActions; /// - /// Lock to ensure that two threads to rebalance at the same time. + /// Lock to ensure that two threads to re-balance at the same time. /// private readonly object _rebalanceLock = new object(); @@ -77,19 +77,38 @@ public enum SortOrder /// public int ThreadCount => _threads.Count; + /// + /// Internally called event used for testing. + /// Called when the processor has completed all queued items. + /// + private readonly Action> _onComplete; + /// /// Creates a new processor and with the specified configurations. /// /// Configurations to apply - public ActionProcessor(Config configs) + public ActionProcessor(Config configs) : this(configs, null) + { + + } + + /// + /// Creates a new processor and with the specified configurations. + /// + /// Configurations to apply + /// Method called when all actions have been processed and the threads are idle. + internal ActionProcessor(Config configs, Action> onComplete) { _configs = configs; _registeredActions = new ConcurrentDictionary(); + _onComplete = onComplete; _threads = new List(configs.StartThreads); + // Add the specified number of threads. AddThread(configs.StartThreads); + // Create the supervisor timer. _supervisorTimer = new Timer(Supervise); } @@ -106,12 +125,15 @@ public void AddThread(int count) _threads.Add(pThread); if (_isRunning) pThread.Start(); + + // Bind the idle event if it is not null for testing purposes. + if (_onComplete != null) + pThread.Idle = e => InvokeIfComplete(); } - // Rebalance the load if we are running - if (_isRunning) + // Re-balance the load if we are running + if (_isRunning) RebalanceLoad(false); - } /// @@ -130,6 +152,7 @@ public void RemoveThread(int count) $"Can not remove {count} threads since this would leave no threads running.", nameof(count)); + // Create a dictionary of ids and actions to re-queue once they have been placed in their final locations. var reQueueActions = new Dictionary(); for (var i = 0; i < count; i++) @@ -155,7 +178,7 @@ public void RemoveThread(int count) } } - // Requeue all the actions only once after they have moved around. + // Re-queue all the actions only once after they have moved around. foreach (var registeredAction in reQueueActions) { for (int i = 0; i < registeredAction.Value.QueuedCount; i++) @@ -167,11 +190,9 @@ public void RemoveThread(int count) registeredAction.Value.TransferLock.Reset(); } - // Rebalance the load if it is warranted. + // Re-balance the load if it is warranted. if (ShouldRebalance()) RebalanceLoad(false); - - } @@ -184,7 +205,6 @@ private void Supervise(object o) { if (ShouldRebalance()) RebalanceLoad(true); - } /// @@ -204,12 +224,13 @@ private bool ShouldRebalance() if (mostRegistered == leastRegistered) return false; - // Get the difference and if the min and max number of registrations is greater than 10, time to rebalance. - return Math.Abs(mostRegistered.RegisteredActionsCount - leastRegistered.RegisteredActionsCount) > 10; + // Get the difference and if the min and max number of registrations is greater than 10, time to re-balance. + return Math.Abs(mostRegistered.RegisteredActionsCount - leastRegistered.RegisteredActionsCount) > + _configs.RebalanceLoadDelta; } /// - /// Rebalances all the actions based upon their usage. + /// Re-balances all the actions based upon their usage. /// Distributes most active to least active round-robin style to all the threads. /// /// True if this is being called from the supervisor thread. False otherwise. @@ -255,15 +276,14 @@ private void RebalanceLoad(bool isSupervisor) if (_isRunning && !isSupervisor) _supervisorTimer.Change(_configs.RebalanceLoadPeriod, _configs.RebalanceLoadPeriod); } - } /// /// Stops and transfers all actions from one thread to a specified thread. /// - /// Source thread to stop and remove all actions from + /// Action to transfer to another processor thread. /// Thread to move all the actions to. - private void TransferAction(RegisteredAction registeredAction, ProcessorThread destination) + private static void TransferAction(RegisteredAction registeredAction, ProcessorThread destination) { // If we are moving to the same destination that it is already on, do nothing. if (registeredAction.ProcessorThread == destination) @@ -332,7 +352,6 @@ private bool Queue(T id, int limit) /// Action to execute when this Id is queued. public void Register(T id, Action action) { - var leastActiveProcessor = GetProcessorThread(SortOrder.LeastRegistered); var processAction = new RegisteredAction @@ -361,11 +380,30 @@ public void Deregister(T id) } } + /// + /// Retrieves action by a set ID. Used for testing only. + /// + /// + /// internal RegisteredAction GetActionById(T id) { return _registeredActions[id]; } + /// + /// Internal method called for testing when the processors are complete. + /// + private void InvokeIfComplete() + { + if (_threads.Sum(thread => thread.Queued) == 0) + _onComplete?.Invoke(this); + } + + /// + /// Gets a single processor thread based upon with specified conditions. + /// + /// Specifies to retrieve the most or least action registered processor. + /// Processor thread. private ProcessorThread GetProcessorThread(SortOrder order) { var processorSorter = _threads.Where(pt => pt.IsRunning); @@ -378,7 +416,6 @@ private ProcessorThread GetProcessorThread(SortOrder order) throw new InvalidOperationException("No ProcessorThreads are running."); return selectedProcessor; - } /// @@ -391,9 +428,17 @@ public void Start() _isRunning = true; foreach (var processorThread in _threads) + { processorThread.Start(); + // Bind the idle event if it is not null for testing purposes. + if (_onComplete != null) + processorThread.Idle = e => InvokeIfComplete(); + } + _supervisorTimer.Change(_configs.RebalanceLoadPeriod, _configs.RebalanceLoadPeriod); + + } /// @@ -428,11 +473,20 @@ public class Config public int StartThreads { get; set; } = Environment.ProcessorCount; /// - /// Number of milliseconds for this processor to automatically rebalance any registered actions if required. + /// Number of milliseconds for this processor to automatically re-balance any registered actions if required. /// Default is 10 seconds. /// public int RebalanceLoadPeriod { get; set; } = 10000; + /// + /// Number used to determine when re-balancing needs to occur. + /// When the difference between the thread with the max registered actions + /// and thread with the min number of registered actions is greater than this number, + /// a re-balancing will take place. + /// Default is 10. + /// + /// + public int RebalanceLoadDelta { get; set; } = 10; } /// @@ -470,6 +524,11 @@ internal class RegisteredAction /// queuing in an inactive thread. /// public ManualResetEventSlim TransferLock = new ManualResetEventSlim(true); + + /// + /// Integer of the total number of exceptions this action has thrown. + /// + public int ThrownExceptionsCount; } @@ -511,7 +570,7 @@ internal class ProcessorThread /// /// Contains all the actions registered on this thread. /// - private ConcurrentDictionary _registeredActions; + private readonly ConcurrentDictionary _registeredActions; private int _registeredActionsCount; @@ -535,6 +594,10 @@ internal class ProcessorThread /// public float IdleTime => _idleTime; + /// + /// Internal event called for testing when this thread has completed all registered work. + /// + public Action Idle; /// /// Starts a new managed thread to handle the actions passed to process. @@ -552,6 +615,7 @@ public ProcessorThread(string name) /// private void ThreadProcess() { + RegisteredAction action = null; // Restart the stop watch to clear any existing time data. _perfStopwatch.Restart(); @@ -561,18 +625,23 @@ private void ThreadProcess() // Update the idle time RollingEstimate(ref _idleTime, _perfStopwatch.ElapsedMilliseconds, 10); + // Wrap the entire loop in a try/catch for when the loop times out, it will not throw. try { - while (_actions.TryTake(out RegisteredAction action, 1000, _cancellationTokenSource.Token)) + while (_actions.TryTake(out action, 1000, _cancellationTokenSource.Token)) { + // If this action is not registered to this thread, add it back to be + // processed again at a later time in hopes that the processor thread will be finally + // set to an active processor. + if (action.ProcessorThread == null) + { + _actions.Add(action); + continue; + } // Decrement the total actions queued. Interlocked.Decrement(ref Queued); - // If this action is not registered to this thread, skip it. - if (action.ProcessorThread == null) - continue; - // If this action was transferred to another thread, queue the action up on that other thread. if (action.ProcessorThread != this) { @@ -590,12 +659,15 @@ private void ThreadProcess() _perfStopwatch.Restart(); // Catch any exceptions and ignore for now. - // TODO: Figure out a plan for handling exceptions thrown in the process. action.Action(); // Add this performance to the estimated rolling average. RollingEstimate(ref action.AverageUsageTime, _perfStopwatch.ElapsedMilliseconds, 10); + + // Called for testing purposes. + if(Idle != null && Queued == 0) + Idle.Invoke(this); } } catch (OperationCanceledException) @@ -604,10 +676,16 @@ private void ThreadProcess() } catch { - // ignored + if (action == null) + continue; + + // If an exception was thrown on an action processing, then increment the exception counter and add the performance data. + RollingEstimate(ref action.AverageUsageTime, _perfStopwatch.ElapsedMilliseconds, 10); + Interlocked.Increment(ref action.ThrownExceptionsCount); } } + // Stop the timer as we have existed the main loop. _perfStopwatch.Stop(); } @@ -627,6 +705,10 @@ public void Queue(RegisteredAction registeredAction, bool transferred) _actions.TryAdd(registeredAction); } + /// + /// Adds the specified action from the list of registered actions for this processor. + /// + /// Action to register. public void RegisterAction(RegisteredAction action) { Interlocked.Increment(ref _registeredActionsCount); @@ -634,6 +716,10 @@ public void RegisterAction(RegisteredAction action) action.ProcessorThread = this; } + /// + /// Removes the specified action from the list of registered actions for this processor. + /// + /// Action to de-register. public void DeregisterAction(RegisteredAction action) { Interlocked.Decrement(ref _registeredActionsCount); @@ -642,14 +728,19 @@ public void DeregisterAction(RegisteredAction action) removedAction.ProcessorThread = null; } + /// + /// Gets the requested number of actions registered in this processor. + /// + /// Number of actions to get. Use -1 for all actions. + /// Returns all, some or none actions. Always returns an array. May be an empty array. public RegisteredAction[] GetActions(int count) { if (count == -1) return _registeredActions - .Select(kvp => kvp.Value) - .OrderByDescending(ra => ra.AverageUsageTime) - .ToArray(); - + .Select(kvp => kvp.Value) + .OrderByDescending(ra => ra.AverageUsageTime) + .ToArray(); + return _registeredActions .Select(kvp => kvp.Value) .OrderByDescending(ra => ra.AverageUsageTime) @@ -706,7 +797,6 @@ public void Start() _cancellationTokenSource = new CancellationTokenSource(); _isRunning = true; _thread.Start(); - } /// @@ -736,10 +826,8 @@ private void RollingEstimate(ref float rollingEstimate, float update, int uiProp public override string ToString() { return - $"Name: {_name}; Running: {_isRunning}; Idle Time: {_idleTime}; Registered Actions: {_registeredActionsCount}, Queued: {Queued}"; + $"Name: {_name}; Running: {_isRunning}; Complete Time: {_idleTime}; Registered Actions: {_registeredActionsCount}, Queued: {Queued}"; } } - - } -} +} \ No newline at end of file