Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added documentation.
Browse files Browse the repository at this point in the history
Cleaned up tests.
Added some internal actions which can be called by the tests.
  • Loading branch information
DJGosnell committed Jul 26, 2017
1 parent aa044c4 commit 0d25ff9
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 72 deletions.
78 changes: 43 additions & 35 deletions src/DtronixMessageQueue.Tests/ActionProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ namespace DtronixMessageQueue.Tests
public class ActionProcessorTests
{
private readonly ITestOutputHelper _output;
private ManualResetEventSlim completeEvent = new ManualResetEventSlim(false);

public ActionProcessorTests(ITestOutputHelper output)
{
_output = output;
}

private ActionProcessor<Guid> CreateProcessor(int threads, bool start, int rebalanceTime = 10000)
private ActionProcessor<Guid> CreateProcessor(int threads, bool start, Action<ActionProcessor<Guid>> complete = null, int rebalanceTime = 10000)
{
var processor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
ThreadName = "test",
StartThreads = threads,
RebalanceLoadPeriod = rebalanceTime
});
}, complete);

if (start)
processor.Start();
Expand Down Expand Up @@ -125,34 +126,6 @@ public void Processor_adds_to_least_used_thread()
Assert.Equal(firstRegisteredAction.ProcessorThread, thirdRegisteredAction.ProcessorThread);
}

[Fact]
public void Processor_transfers_queued_actions_to_other_thread_on_removal()
{
var processor = CreateProcessor(2, true);

var firstRegisteredAction = RegisterQueueGet(processor, () => Thread.Sleep(5000));
processor.Queue(firstRegisteredAction.Id);

var secondRegisteredAction = RegisterQueueGet(processor, () => Thread.Sleep(5000));
processor.Queue(secondRegisteredAction.Id);

var oldThread = firstRegisteredAction.ProcessorThread;

processor.RemoveThread(1);

Thread.Sleep(10);

Assert.Equal(0, oldThread.RegisteredActionsCount);

Assert.Equal(firstRegisteredAction.ProcessorThread, secondRegisteredAction.ProcessorThread);

Assert.Equal(2, secondRegisteredAction.ProcessorThread.RegisteredActionsCount);

Assert.Equal(2, secondRegisteredAction.ProcessorThread.Queued);

}


[Fact]
public void Processor_queues_once()
{
Expand Down Expand Up @@ -184,10 +157,12 @@ public void Processor_queues_multiple()
}

[Fact]
public void Processor_balances_on_new_thread()
public void Processor_balances_on_removed_thread()
{
var processor = CreateProcessor(2, true);
int interlockedInt = 0;
Action<ActionProcessor<Guid>> complete = ap => completeEvent.Set();
var processor = CreateProcessor(2, true, complete);

var interlockedInt = 0;

var action = (Action) (() =>
{
Expand All @@ -199,7 +174,7 @@ public void Processor_balances_on_new_thread()
var secondRegisteredAction = RegisterGet(processor, action);
var thirdRegisteredAction = RegisterGet(processor, () => processor.RemoveThread(1));

var totalLoops = 100;
const int totalLoops = 10;

for (int i = 0; i < totalLoops; i++)
{
Expand All @@ -211,17 +186,50 @@ public void Processor_balances_on_new_thread()
processor.Queue(thirdRegisteredAction.Id);
}

Thread.Sleep(500);
Assert.True(completeEvent.Wait(2000));

Assert.Equal(totalLoops * 2, interlockedInt);

Assert.Equal(3, thirdRegisteredAction.ProcessorThread.RegisteredActionsCount);
}

[Fact]
public void Processor_balances_on_added_thread()
{
Action<ActionProcessor<Guid>> complete = ap => completeEvent.Set();
var processor = CreateProcessor(2, true, complete);

var interlockedInt = 0;

var action = (Action)(() =>
{
Interlocked.Increment(ref interlockedInt);
Thread.Sleep(1);
});

var firstRegisteredAction = RegisterGet(processor, action);
var secondRegisteredAction = RegisterGet(processor, action);
var thirdRegisteredAction = RegisterGet(processor, () => processor.AddThread(1));

const int totalLoops = 100;

for (int i = 0; i < totalLoops; i++)
{
processor.Queue(firstRegisteredAction.Id);
processor.Queue(secondRegisteredAction.Id);

// Half way through the loop, remove a thread.
if (i == totalLoops / 2)
processor.Queue(thirdRegisteredAction.Id);
}

Assert.True(completeEvent.Wait(2000));

Assert.Equal(totalLoops * 2, interlockedInt);

Assert.Equal(1, firstRegisteredAction.ProcessorThread.RegisteredActionsCount);
Assert.Equal(1, secondRegisteredAction.ProcessorThread.RegisteredActionsCount);
Assert.Equal(1, thirdRegisteredAction.ProcessorThread.RegisteredActionsCount);
}
}
}
Loading

0 comments on commit 0d25ff9

Please sign in to comment.