Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added logic for removal and transfer of actions to another active thr…
Browse files Browse the repository at this point in the history
…ead.

Still not building.
  • Loading branch information
DJGosnell committed Jul 24, 2017
1 parent 12adb69 commit 488461c
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions src/DtronixMessageQueue/ActionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,29 +128,38 @@ public void RemoveThread(int i)
// Get the next least active processor thread.
var nextLeastActiveProcessor = GetProcessorThread();

var registeredActions = leastActiveProcessor.GetActions(-1);


// Transfer the queues from one thread to the other.
TransferActions(leastActiveProcessor, nextLeastActiveProcessor);
foreach (var registeredAction in registeredActions)
{
TransferAction(registeredAction, nextLeastActiveProcessor);
}

var queuedActions = leastActiveProcessor.Stop();


foreach (var queuedAction in queuedActions)
{
nextLeastActiveProcessor.Queue(queuedAction, true);
}


}

/// <summary>
/// Stops and transfers all actions from one thread to a specified thread.
/// </summary>
/// <param name="source">Source thread to stop and remove all actions from</param>
/// <param name="destination">Thread to move all the actions to.</param>
private void TransferActions(ProcessorThread source, ProcessorThread destination, int count)
private void TransferAction(RegisteredAction registeredAction, ProcessorThread destination)
{
registeredAction.ProcessorThread.DeregisterAction(registeredAction);
destination.RegisterAction(registeredAction);

var registeredActions = source.GetActions(count);

// Filter down to the actions being transferred.
foreach (var registeredAction in registeredActions)
{
source.DeregisterAction(registeredAction);
destination.RegisterAction(registeredAction);

// Set the queue to continue to allow adds.
registeredAction.ResetEvent.Set();
}
// Set the queue to continue to allow adds.
registeredAction.ResetEvent.Set();
}


Expand Down Expand Up @@ -489,15 +498,18 @@ public void DeregisterAction(RegisteredAction action)
{
Interlocked.Decrement(ref _registeredActionsCount);
_registeredActions.TryRemove(action.Id, out var removedAction);
removedAction.ResetEvent.Reset();
removedAction.ProcessorThread = null;
}

public RegisteredAction[] GetActions(int count)
{
if (count == -1)
{
return
}
return _registeredActions
.Select(kvp => kvp.Value)
.OrderByDescending(ra => ra.AverageUsageTime)
.ToArray();

return _registeredActions
.Select(kvp => kvp.Value)
.OrderByDescending(ra => ra.AverageUsageTime)
Expand All @@ -514,11 +526,11 @@ public RegisteredAction[] GetActions(int count)
public RegisteredAction[] Stop()
{
Pause();
var actions = _actions.ToArray();
List<RegisteredAction> actions = new List<RegisteredAction>();

// Remove all the actions from the collection.
while (_actions.Count > 1)
_actions.Take();
actions.Add(_actions.Take());

foreach (var registeredAction in actions)
{
Expand All @@ -527,7 +539,7 @@ public RegisteredAction[] Stop()
registeredAction.ResetEvent.Reset();
}

return actions;
return actions.ToArray();
}

/// <summary>
Expand Down

0 comments on commit 488461c

Please sign in to comment.