diff --git a/docs/dtronix-message-queue.md b/docs/dtronix-message-queue.md index dc25133..238255c 100644 --- a/docs/dtronix-message-queue.md +++ b/docs/dtronix-message-queue.md @@ -41,9 +41,8 @@ Each frame contains at the very minimum 1 byte. This byte is used to determine ### Types of Frames There are seven types of frames, but only six that are used to send across the wire. The Unset type is never used except as the initial state for the frame. -| Name | Frame bytes | MqFrameType | Message Length | Payload | Description | +| Name | Frame bytes | MqFrameType
(byte) | Message Length
(ushort?)| Payload
(byte[]?) | Description | |-----------|:-----------:|:-----------:|:----------------:|:-------:|--------------------------------------------------------| -| | | byte | ushort? | byte[]? | | | Unset | 0 | 0 | - | - | Initial state for all frames. | | Empty | 1 | 1 | - | - | No body | | More | \>= 3 | 2 | ushort [2 bytes] | byte[] | Contains a body. | @@ -52,7 +51,6 @@ There are seven types of frames, but only six that are used to send across the w | Command | \>= 3 | 5 | ushort [2 bytes] | byte[] | Command to be processed and consumed internally. | | Ping | 1 | 6 | - | - | Same as EmptyLast frame but consumed internally. | - ##### MqFrame Type Empty diff --git a/docs/performance-results/i5-3470-8GB-16KB.md b/docs/performance-results/i5-3470-8GB-16KB.md deleted file mode 100644 index 2f64324..0000000 --- a/docs/performance-results/i5-3470-8GB-16KB.md +++ /dev/null @@ -1,32 +0,0 @@ -Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz with 8 GB of RAM installed. -DMQPerf.exe mq single-process 1 100000 4 50 10 -Running MQ performance tests. - -FrameBufferSize: 16381; SendAndReceiveBufferSize: 16384 - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 1,000,000 | 200 | 1,276 | 783,699 | 156.74 | -| Release | 1,000,000 | 200 | 1,254 | 797,448 | 159.49 | -| Release | 1,000,000 | 200 | 1,265 | 790,513 | 158.10 | -| Release | 1,000,000 | 200 | 1,260 | 793,650 | 158.73 | -| Release | 1,000,000 | 200 | 1,247 | 801,924 | 160.38 | -| | | AVERAGES | 1,260 | 793,447 | 158.69 | - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 100,000 | 2,000 | 536 | 186,567 | 373.13 | -| Release | 100,000 | 2,000 | 521 | 191,938 | 383.88 | -| Release | 100,000 | 2,000 | 518 | 193,050 | 386.10 | -| Release | 100,000 | 2,000 | 519 | 192,678 | 385.36 | -| Release | 100,000 | 2,000 | 518 | 193,050 | 386.10 | -| | | AVERAGES | 522 | 191,457 | 382.91 | - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 10,000 | 60,048 | 1,387 | 7,209 | 432.93 | -| Release | 10,000 | 60,048 | 1,379 | 7,251 | 435.45 | -| Release | 10,000 | 60,048 | 1,376 | 7,267 | 436.40 | -| Release | 10,000 | 60,048 | 1,377 | 7,262 | 436.08 | -| Release | 10,000 | 60,048 | 1,387 | 7,209 | 432.93 | -| | | AVERAGES | 1,381 | 7,240 | 434.76 | diff --git a/docs/performance-results/i7-6500U-16GB-16KB.md b/docs/performance-results/i7-6500U-16GB-16KB.md deleted file mode 100644 index 84a14e8..0000000 --- a/docs/performance-results/i7-6500U-16GB-16KB.md +++ /dev/null @@ -1,32 +0,0 @@ -Intel(R) Core(TM) i7-6500U CPU @ 2.50GHz with 16 GB of RAM installed. -DMQPerf.exe mq single-process 1 100000 4 50 10 -Running MQ performance tests. - -FrameBufferSize: 16381; SendAndReceiveBufferSize: 16384 - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 1,000,000 | 200 | 1,662 | 601,684 | 120.34 | -| Release | 1,000,000 | 200 | 1,624 | 615,763 | 123.15 | -| Release | 1,000,000 | 200 | 1,652 | 605,326 | 121.07 | -| Release | 1,000,000 | 200 | 1,639 | 610,128 | 122.03 | -| Release | 1,000,000 | 200 | 1,657 | 603,500 | 120.70 | -| | | AVERAGES | 1,647 | 607,280 | 121.46 | - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 100,000 | 2,000 | 657 | 152,207 | 304.41 | -| Release | 100,000 | 2,000 | 650 | 153,846 | 307.69 | -| Release | 100,000 | 2,000 | 652 | 153,374 | 306.75 | -| Release | 100,000 | 2,000 | 657 | 152,207 | 304.41 | -| Release | 100,000 | 2,000 | 650 | 153,846 | 307.69 | -| | | AVERAGES | 653 | 153,096 | 306.19 | - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 10,000 | 60,048 | 1,746 | 5,727 | 343.92 | -| Release | 10,000 | 60,048 | 1,740 | 5,747 | 345.10 | -| Release | 10,000 | 60,048 | 1,736 | 5,760 | 345.90 | -| Release | 10,000 | 60,048 | 1,756 | 5,694 | 341.96 | -| Release | 10,000 | 60,048 | 1,722 | 5,807 | 348.71 | -| | | AVERAGES | 1,740 | 5,747 | 345.12 | \ No newline at end of file diff --git a/docs/performance-results/i7-6700K-32GB-16KB.md b/docs/performance-results/i7-6700K-32GB-16KB.md deleted file mode 100644 index 04f693b..0000000 --- a/docs/performance-results/i7-6700K-32GB-16KB.md +++ /dev/null @@ -1,32 +0,0 @@ -Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz with 32 GB of RAM installed. -DMQPerf.exe mq single-process 1 100000 4 50 10 -Running MQ performance tests. - -FrameBufferSize: 16381; SendAndReceiveBufferSize: 16384 - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 1,000,000 | 200 | 821 | 1,218,026 | 243.61 | -| Release | 1,000,000 | 200 | 775 | 1,290,322 | 258.06 | -| Release | 1,000,000 | 200 | 780 | 1,282,051 | 256.41 | -| Release | 1,000,000 | 200 | 786 | 1,272,264 | 254.45 | -| Release | 1,000,000 | 200 | 783 | 1,277,139 | 255.43 | -| | | AVERAGES | 789 | 1,267,960 | 253.59 | - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 100,000 | 2,000 | 315 | 317,460 | 634.92 | -| Release | 100,000 | 2,000 | 296 | 337,837 | 675.68 | -| Release | 100,000 | 2,000 | 296 | 337,837 | 675.68 | -| Release | 100,000 | 2,000 | 297 | 336,700 | 673.40 | -| Release | 100,000 | 2,000 | 305 | 327,868 | 655.74 | -| | | AVERAGES | 302 | 331,540 | 663.08 | - -| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | -|---------|------------|-----------|--------------|------------|----------| -| Release | 10,000 | 60,048 | 814 | 12,285 | 737.69 | -| Release | 10,000 | 60,048 | 836 | 11,961 | 718.28 | -| Release | 10,000 | 60,048 | 832 | 12,019 | 721.73 | -| Release | 10,000 | 60,048 | 814 | 12,285 | 737.69 | -| Release | 10,000 | 60,048 | 828 | 12,077 | 725.22 | -| | | AVERAGES | 825 | 12,125 | 728.12 | \ No newline at end of file diff --git a/docs/performance-results/i7-6800K-16GB.md b/docs/performance-results/i7-6800K-16GB.md new file mode 100644 index 0000000..a0ca8d3 --- /dev/null +++ b/docs/performance-results/i7-6800K-16GB.md @@ -0,0 +1,76 @@ +Intel(R) Core(TM) i7-6800K CPU @ 3.40GHz with 16 GB of RAM installed. +DMQPerf.exe +MQ Performance tests. + +FrameBufferSize: 16381; SendAndReceiveBufferSize: 16384 + +| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | +|---------|------------|-----------|--------------|------------|----------| +| Release | 1,000,000 | 200 | 1,490 | 671,140 | 134.23 | +| Release | 1,000,000 | 200 | 1,430 | 699,300 | 139.86 | +| Release | 1,000,000 | 200 | 1,532 | 652,741 | 130.55 | +| Release | 1,000,000 | 200 | 1,470 | 680,272 | 136.05 | +| Release | 1,000,000 | 200 | 1,481 | 675,219 | 135.04 | +| | | AVERAGES | 1,481 | 675,734 | 135.15 | + +| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | +|---------|------------|-----------|--------------|------------|----------| +| Release | 100,000 | 2,000 | 511 | 195,694 | 391.39 | +| Release | 100,000 | 2,000 | 492 | 203,252 | 406.50 | +| Release | 100,000 | 2,000 | 515 | 194,174 | 388.35 | +| Release | 100,000 | 2,000 | 486 | 205,761 | 411.52 | +| Release | 100,000 | 2,000 | 507 | 197,238 | 394.48 | +| | | AVERAGES | 502 | 199,224 | 398.45 | + +| Build | Messages | Msg Bytes | Milliseconds | Msg/sec | MBps | +|---------|------------|-----------|--------------|------------|----------| +| Release | 10,000 | 60,048 | 1,254 | 7,974 | 478.85 | +| Release | 10,000 | 60,048 | 1,243 | 8,045 | 483.09 | +| Release | 10,000 | 60,048 | 1,253 | 7,980 | 479.23 | +| Release | 10,000 | 60,048 | 1,251 | 7,993 | 480.00 | +| Release | 10,000 | 60,048 | 1,254 | 7,974 | 478.85 | +| | | AVERAGES | 1,251 | 7,993 | 480.01 | + +RPC Performance tests. + +| Build | Type | Calls | Milliseconds | RPC/sec | +|---------|-----------|------------|--------------|------------| +| Release | NoReturn | 200,000 | 1,692 | 118,203 | +| Release | NoReturn | 200,000 | 1,690 | 118,343 | +| Release | NoReturn | 200,000 | 1,698 | 117,785 | +| Release | NoReturn | 200,000 | 1,780 | 112,359 | +| | | AVERAGES | 1,715 | 116,673 | + +| Build | Type | Calls | Milliseconds | RPC/sec | +|---------|-----------|------------|--------------|------------| +| Release | Await | 200,000 | 3,256 | 61,425 | +| Release | Await | 200,000 | 3,182 | 62,853 | +| Release | Await | 200,000 | 3,136 | 63,775 | +| Release | Await | 200,000 | 3,142 | 63,653 | +| | | AVERAGES | 3,179 | 62,927 | + +| Build | Type | Calls | Milliseconds | RPC/sec | +|---------|-----------|------------|--------------|------------| +| Release | Block | 100 | 1,004 | 99 | +| Release | Block | 100 | 1,001 | 99 | +| Release | Block | 100 | 8 | 12,500 | +| Release | Block | 100 | 1,982 | 50 | +| Release | Block | 100 | 1,982 | 50 | +| | | AVERAGES | 999 | 3,187 | + +| Build | Type | Calls | Milliseconds | RPC/sec | +|---------|-----------|------------|--------------|------------| +| Release | Return | 10,000 | 1,060 | 9,433 | +| Release | Return | 10,000 | 1,060 | 9,433 | +| Release | Return | 10,000 | 1,065 | 9,389 | +| Release | Return | 10,000 | 1,061 | 9,425 | +| | | AVERAGES | 1,062 | 9,420 | + +| Build | Type | Calls | Milliseconds | RPC/sec | +|---------|-----------|------------|--------------|------------| +| Release | Exception | 10,000 | 3,634 | 2,751 | +| Release | Exception | 10,000 | 3,625 | 2,758 | +| Release | Exception | 10,000 | 3,792 | 2,637 | +| Release | Exception | 10,000 | 3,759 | 2,660 | +| | | AVERAGES | 3,703 | 2,702 | + diff --git a/docs/performance-results/rpc/i5-3470-8GB-rpc.md b/docs/performance-results/rpc/i5-3470-8GB-rpc.md deleted file mode 100644 index 975e7a9..0000000 --- a/docs/performance-results/rpc/i5-3470-8GB-rpc.md +++ /dev/null @@ -1,36 +0,0 @@ -Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz with 8 GB of RAM installed. -DMQPerf.exe -Running RPC performance tests. - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | NoRetrun | 200,000 | 1,183 | 169,061 | -| Release | NoRetrun | 200,000 | 1,145 | 174,672 | -| Release | NoRetrun | 200,000 | 1,132 | 176,678 | -| Release | NoRetrun | 200,000 | 1,136 | 176,056 | -| | | AVERAGES | 1,149 | 174,117 | - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | Await | 200,000 | 2,403 | 83,229 | -| Release | Await | 200,000 | 2,038 | 98,135 | -| Release | Await | 200,000 | 2,126 | 94,073 | -| Release | Await | 200,000 | 2,071 | 96,571 | -| | | AVERAGES | 2,160 | 93,002 | - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | Return | 10,000 | 836 | 11,961 | -| Release | Return | 10,000 | 774 | 12,919 | -| Release | Return | 10,000 | 777 | 12,870 | -| Release | Return | 10,000 | 782 | 12,787 | -| | | AVERAGES | 792 | 12,634 | - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | Exception | 10,000 | 2,502 | 3,996 | -| Release | Exception | 10,000 | 2,267 | 4,411 | -| Release | Exception | 10,000 | 2,279 | 4,387 | -| Release | Exception | 10,000 | 2,272 | 4,401 | -| | | AVERAGES | 2,330 | 4,299 | - diff --git a/docs/performance-results/rpc/i7-6500U-16GB-rpc.md b/docs/performance-results/rpc/i7-6500U-16GB-rpc.md deleted file mode 100644 index 0ce2228..0000000 --- a/docs/performance-results/rpc/i7-6500U-16GB-rpc.md +++ /dev/null @@ -1,36 +0,0 @@ -Intel(R) Core(TM) i7-6500U CPU @ 2.50GHz with 16 GB of RAM installed. -DMQPerf.exe rpc -Running RPC performance tests. - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | NoRetrun | 200,000 | 1,719 | 116,346 | -| Release | NoRetrun | 200,000 | 1,701 | 117,577 | -| Release | NoRetrun | 200,000 | 1,653 | 120,992 | -| Release | NoRetrun | 200,000 | 1,652 | 121,065 | -| | | AVERAGES | 1,681 | 118,995 | - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | Await | 200,000 | 3,768 | 53,078 | -| Release | Await | 200,000 | 3,809 | 52,507 | -| Release | Await | 200,000 | 3,654 | 54,734 | -| Release | Await | 200,000 | 3,747 | 53,376 | -| | | AVERAGES | 3,745 | 53,424 | - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | Return | 10,000 | 919 | 10,881 | -| Release | Return | 10,000 | 893 | 11,198 | -| Release | Return | 10,000 | 864 | 11,574 | -| Release | Return | 10,000 | 864 | 11,574 | -| | | AVERAGES | 885 | 11,307 | - -| Build | Type | Calls | Milliseconds | RPC/sec | -|---------|-----------|------------|--------------|------------| -| Release | Exception | 10,000 | 4,713 | 2,121 | -| Release | Exception | 10,000 | 4,693 | 2,130 | -| Release | Exception | 10,000 | 4,675 | 2,139 | -| Release | Exception | 10,000 | 4,687 | 2,133 | -| | | AVERAGES | 4,692 | 2,131 | - diff --git a/src/DtronixMessageQueue.Tests.Gui/DtronixMessageQueue.Tests.Gui.csproj b/src/DtronixMessageQueue.Tests.Gui/DtronixMessageQueue.Tests.Gui.csproj index eb3d239..fd38552 100644 --- a/src/DtronixMessageQueue.Tests.Gui/DtronixMessageQueue.Tests.Gui.csproj +++ b/src/DtronixMessageQueue.Tests.Gui/DtronixMessageQueue.Tests.Gui.csproj @@ -135,8 +135,6 @@ DtronixMessageQueue - - - + \ No newline at end of file diff --git a/src/DtronixMessageQueue.Tests.Gui/Tests/PerformanceTest.cs b/src/DtronixMessageQueue.Tests.Gui/Tests/PerformanceTest.cs index 24ae787..c2ddd8a 100644 --- a/src/DtronixMessageQueue.Tests.Gui/Tests/PerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Gui/Tests/PerformanceTest.cs @@ -100,6 +100,8 @@ public virtual void StopTest() { sessions.Current.Value.GetProxy().StopTest(); } + + Server.Stop(); } if (Client != null) diff --git a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs index efb1046..f60e42a 100644 --- a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs @@ -80,8 +80,6 @@ public override void StartTest() MqInProcessPerformanceTests(10000, 5, _largeMessage, _config); - - Console.WriteLine("Performance complete"); } private void MqInProcessPerformanceTests(int totalMessages, int loops, MqMessage message, MqConfig config) diff --git a/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs b/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs index 485e01e..733ec35 100644 --- a/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs +++ b/src/DtronixMessageQueue.Tests/ActionProcessorTests.cs @@ -18,11 +18,16 @@ public ActionProcessorTests(ITestOutputHelper output) _output = output; } - private ActionProcessor CreateProcessor(int threads, bool start) + private ActionProcessor CreateProcessor(int threads, bool start, int rebalanceTime = 10000) { - var processor = new ActionProcessor("test", threads); - - if(start) + var processor = new ActionProcessor(new ActionProcessor.Config + { + ThreadName = "test", + StartThreads = threads, + RebalanceLoadPeriod = rebalanceTime + }); + + if (start) processor.Start(); return processor; @@ -135,6 +140,8 @@ public void Processor_transfers_queued_actions_to_other_thread_on_removal() processor.RemoveThread(1); + Thread.Sleep(10); + Assert.Equal(0, oldThread.RegisteredActionsCount); Assert.Equal(firstRegisteredAction.ProcessorThread, secondRegisteredAction.ProcessorThread); @@ -163,32 +170,52 @@ public void Processor_queues_once() public void Processor_queues_multiple() { var processor = CreateProcessor(1, true); - - - var firstRegisteredAction = RegisterGet(processor, () => Thread.Sleep(5000)); + + processor.Queue(firstRegisteredAction.Id); + processor.Queue(firstRegisteredAction.Id); processor.Queue(firstRegisteredAction.Id); + // Wait a period of time for the processor to pickup the call. Thread.Sleep(50); - processor.Queue(firstRegisteredAction.Id); - processor.Queue(firstRegisteredAction.Id); - Assert.Equal(2, firstRegisteredAction.ProcessorThread.Queued); } [Fact] - public void Processor_balances_on_new_thread () + public void Processor_balances_on_new_thread() { - var processor = CreateProcessor(1, true); + var processor = CreateProcessor(2, true); + int interlockedInt = 0; + var action = (Action) (() => + { + Interlocked.Increment(ref interlockedInt); + Thread.Sleep(1); + }); + var firstRegisteredAction = RegisterGet(processor, action); + var secondRegisteredAction = RegisterGet(processor, action); + var thirdRegisteredAction = RegisterGet(processor, () => processor.RemoveThread(1)); - var firstRegisteredAction = RegisterGet(processor, () => Thread.Sleep(5000)); - RegisterGet(processor, () => Thread.Sleep(5000)); + var totalLoops = 100; + + for (int i = 0; i < totalLoops; i++) + { + processor.Queue(firstRegisteredAction.Id); + processor.Queue(secondRegisteredAction.Id); + + // Half way through the loop, remove a thread. + if (i == totalLoops / 2) + processor.Queue(thirdRegisteredAction.Id); + } + + Thread.Sleep(500); + + Assert.Equal(totalLoops * 2, interlockedInt); - Assert.Equal(2, firstRegisteredAction.ProcessorThread.RegisteredActionsCount); + Assert.Equal(3, thirdRegisteredAction.ProcessorThread.RegisteredActionsCount); } diff --git a/src/DtronixMessageQueue/ActionProcessor.cs b/src/DtronixMessageQueue/ActionProcessor.cs index abc599c..e7172c8 100644 --- a/src/DtronixMessageQueue/ActionProcessor.cs +++ b/src/DtronixMessageQueue/ActionProcessor.cs @@ -22,6 +22,8 @@ namespace DtronixMessageQueue /// public class ActionProcessor { + private readonly Config _configs; + /// /// Ordering when selecting a processor thread. /// @@ -38,11 +40,6 @@ public enum SortOrder LeastRegistered } - /// - /// Base name for all of the threads to use. - /// - private readonly string _name; - /// /// Current threads for this processor to use. /// @@ -64,38 +61,34 @@ public enum SortOrder private readonly ConcurrentDictionary _registeredActions; /// - /// True if the processor is running. + /// Lock to ensure that two threads to rebalance at the same time. /// - public bool IsRunning { get; set; } + private readonly object _rebalanceLock = new object(); + + private bool _isRunning; /// - /// Number of threads currently available to this processor. + /// True if the processor is running. /// - public int ThreadCount => _threads.Count; - + public bool IsRunning => _isRunning; /// - /// Creates a new processor and threads with the specified thread base name. - /// Will use the count of processors as the count of threads to create. + /// Number of threads currently available to this processor. /// - /// Base name to associate with this action processor. - public ActionProcessor(string name) : this(name, Environment.ProcessorCount) - { - } + public int ThreadCount => _threads.Count; /// - /// Creates a new processor and specified count of threads with the specified thread base name. + /// Creates a new processor and with the specified configurations. /// - /// Base name to associate with this action processor. - /// Number of threads to create. - public ActionProcessor(string name, int threads) + /// Configurations to apply + public ActionProcessor(Config configs) { - _name = name; + _configs = configs; _registeredActions = new ConcurrentDictionary(); - _threads = new List(threads); + _threads = new List(configs.StartThreads); - AddThread(threads); + AddThread(configs.StartThreads); _supervisorTimer = new Timer(Supervise); } @@ -106,68 +99,163 @@ public ActionProcessor(string name, int threads) /// Number of threads to add. public void AddThread(int count) { - - var totalRegisteredActions = _registeredActions.Count; - - var totalNewThreads = _threads.Count + count; - - var actionsPerThread = totalRegisteredActions / totalNewThreads; - for (var i = 0; i < count; i++) { var id = Interlocked.Increment(ref _threadId); - var pThread = new ProcessorThread($"dmq-{_name}-{id}"); + var pThread = new ProcessorThread($"dmq-{_configs.ThreadName}-{id}"); _threads.Add(pThread); - - for (int j = 0; j < actionsPerThread; j++) - { - var mostActiveProcessor = GetProcessorThread(SortOrder.MostRegistered); - var transferAction = mostActiveProcessor.GetActions(1); - - // If we did not get an action from this processor, repeat the loop. - // Usually only occurs if the thread has just recently removed the - if (transferAction != null) - TransferAction(transferAction[0], pThread); - - } + if (_isRunning) + pThread.Start(); } + + // Rebalance the load if we are running + if (_isRunning) + RebalanceLoad(false); + } /// /// Removes the least active thread and moves all queued actions to the next least active thread. /// Requires at least 2 threads to run. /// - /// Number of threads to remove - public void RemoveThread(int i) + /// Number of threads to remove + public void RemoveThread(int count) { if (_threads.Count < 2) - throw new InvalidOperationException($"Can not remove {i} threads. Must maintain at least one thread for execution. "); - - // Remove the least active thread from the list of active threads. - var leastActiveProcessor = GetProcessorThread(SortOrder.LeastRegistered); - _threads.Remove(leastActiveProcessor); + throw new InvalidOperationException( + $"Can not remove {count} threads. Must maintain at least one thread for execution."); - // Get the next least active processor thread. - var nextLeastActiveProcessor = GetProcessorThread(SortOrder.LeastRegistered); + if (_threads.Count - count < 1) + throw new ArgumentException( + $"Can not remove {count} threads since this would leave no threads running.", + nameof(count)); - var registeredActions = leastActiveProcessor.GetActions(-1); - + var reQueueActions = new Dictionary(); + + for (var i = 0; i < count; i++) + { + // Remove the least active thread from the list of active threads. + var leastActiveProcessor = GetProcessorThread(SortOrder.LeastRegistered); + _threads.Remove(leastActiveProcessor); - // Transfer the queues from one thread to the other. - foreach (var registeredAction in registeredActions) + // Get the next least active processor thread. + var nextLeastActiveProcessor = GetProcessorThread(SortOrder.LeastRegistered); + leastActiveProcessor.Stop(); + + var registeredActions = leastActiveProcessor.GetActions(-1); + + // Transfer the queues from one thread to the other. + foreach (var registeredAction in registeredActions) + { + // Transfer the action to the next least used thread. + TransferAction(registeredAction, nextLeastActiveProcessor); + + if (!reQueueActions.ContainsKey(registeredAction.Id)) + reQueueActions.Add(registeredAction.Id, registeredAction); + } + } + + // Requeue all the actions only once after they have moved around. + foreach (var registeredAction in reQueueActions) { - TransferAction(registeredAction, nextLeastActiveProcessor); + for (int i = 0; i < registeredAction.Value.QueuedCount; i++) + { + registeredAction.Value.ProcessorThread.Queue(registeredAction.Value, true); + } + + // Reset the queue to allow for adding again. + registeredAction.Value.TransferLock.Reset(); } - var queuedActions = leastActiveProcessor.Stop(); + // Rebalance the load if it is warranted. + if (ShouldRebalance()) + RebalanceLoad(false); + + + } + + + /// + /// Method invoked by a timer to review the current status of the threads and + /// re-arrange to balance out long running tasks across multiple threads. + /// + /// Timer reference. + private void Supervise(object o) + { + if (ShouldRebalance()) + RebalanceLoad(true); + + } + + /// + /// Returns true if the loading of the actions is unbalanced. + /// + /// True if the load should be rebalanced. + private bool ShouldRebalance() + { + var mostRegistered = _threads.OrderByDescending(pt => pt.RegisteredActionsCount).FirstOrDefault(); + var leastRegistered = _threads.OrderBy(pt => pt.RegisteredActionsCount).FirstOrDefault(); + + // No threads are running. + if (mostRegistered == null || leastRegistered == null) + return false; + // If these are equal, nothing to do. + if (mostRegistered == leastRegistered) + return false; - foreach (var queuedAction in queuedActions) + // Get the difference and if the min and max number of registrations is greater than 10, time to rebalance. + return Math.Abs(mostRegistered.RegisteredActionsCount - leastRegistered.RegisteredActionsCount) > 10; + } + + /// + /// Rebalances all the actions based upon their usage. + /// Distributes most active to least active round-robin style to all the threads. + /// + /// True if this is being called from the supervisor thread. False otherwise. + private void RebalanceLoad(bool isSupervisor) + { + lock (_rebalanceLock) { - nextLeastActiveProcessor.Queue(queuedAction, true); + var actions = _registeredActions.Select(kvp => kvp.Value) + .OrderByDescending(ra => ra.AverageUsageTime) + .ToArray(); + var totalActions = actions.Length; + + // Cache the total threads. + var threads = _threads.ToArray(); + var totalThreads = threads.Length; + var currentThreadNumber = 0; + + for (var i = 0; i < totalActions; i++) + { + // If the current thread is not active, get a new array of threads and reset to the beginning. + if (threads[currentThreadNumber].IsRunning == false) + { + threads = _threads.ToArray(); + totalThreads = threads.Length; + currentThreadNumber = 0; + i--; + + // If there is only one thread, and it is not running, quit the balancing since there is nothing to do. + if (threads.Length == 1 && threads[0].IsRunning == false) + return; + + continue; + } + // Round robin actions to all active threads. + TransferAction(actions[i], threads[currentThreadNumber]); + + // Reset to the beginning if we are at the end. + if (++currentThreadNumber >= totalThreads) + currentThreadNumber = 0; + } + + // Reset the supervisor to the default timespan again if this is not being called from the supervisor. + if (_isRunning && !isSupervisor) + _supervisorTimer.Change(_configs.RebalanceLoadPeriod, _configs.RebalanceLoadPeriod); } - } /// @@ -177,11 +265,15 @@ public void RemoveThread(int i) /// Thread to move all the actions to. private void TransferAction(RegisteredAction registeredAction, ProcessorThread destination) { + // If we are moving to the same destination that it is already on, do nothing. + if (registeredAction.ProcessorThread == destination) + return; + registeredAction.ProcessorThread.DeregisterAction(registeredAction); destination.RegisterAction(registeredAction); // Set the queue to continue to allow adds. - registeredAction.ResetEvent.Set(); + registeredAction.TransferLock.Set(); } @@ -276,7 +368,7 @@ internal RegisteredAction GetActionById(T id) private ProcessorThread GetProcessorThread(SortOrder order) { - var processorSorter = _threads.Where(pt => IsRunning); + var processorSorter = _threads.Where(pt => pt.IsRunning); var selectedProcessor = order == SortOrder.LeastRegistered ? processorSorter.OrderBy(pt => pt.RegisteredActionsCount).FirstOrDefault() @@ -294,13 +386,14 @@ private ProcessorThread GetProcessorThread(SortOrder order) /// public void Start() { - IsRunning = true; + if (_isRunning) + return; + + _isRunning = true; foreach (var processorThread in _threads) - { processorThread.Start(); - } - //_supervisorTimer.Change(10000, 10000); + _supervisorTimer.Change(_configs.RebalanceLoadPeriod, _configs.RebalanceLoadPeriod); } /// @@ -308,23 +401,38 @@ public void Start() /// public void Stop() { - IsRunning = false; + if (!_isRunning) + return; + + _isRunning = false; foreach (var processorThread in _threads) - { processorThread.Stop(); - } - //_supervisorTimer.Change(-1, -1); + _supervisorTimer.Change(-1, -1); } /// - /// Method invoked by a timer to review the current status of the threads and - /// re-arrange to balance out long running tasks across multiple threads. + /// Configurations for the processor. /// - /// Timer reference. - private void Supervise(object o) + public class Config { - // TODO: Add logic + /// + /// Base name for all the created threads. + /// + public string ThreadName { get; set; } + + /// + /// Initial number of threads to start. + /// Defaults to the number of logical processors + /// + public int StartThreads { get; set; } = Environment.ProcessorCount; + + /// + /// Number of milliseconds for this processor to automatically rebalance any registered actions if required. + /// Default is 10 seconds. + /// + public int RebalanceLoadPeriod { get; set; } = 10000; + } /// @@ -361,7 +469,7 @@ internal class RegisteredAction /// Called before adding this action to a queue to prevent /// queuing in an inactive thread. /// - public ManualResetEventSlim ResetEvent = new ManualResetEventSlim(true); + public ManualResetEventSlim TransferLock = new ManualResetEventSlim(true); } @@ -448,46 +556,55 @@ private void ThreadProcess() _perfStopwatch.Restart(); // Loop while the cancellation of the thread has not been requested. - while (IsRunning) + while (_isRunning) { // Update the idle time RollingEstimate(ref _idleTime, _perfStopwatch.ElapsedMilliseconds, 10); - while (_actions.TryTake(out RegisteredAction action, 1000, _cancellationTokenSource.Token)) + try { + while (_actions.TryTake(out RegisteredAction action, 1000, _cancellationTokenSource.Token)) + { - // Decrement the total actions queued and the current action being run. - Interlocked.Decrement(ref action.QueuedCount); - Interlocked.Decrement(ref Queued); + // Decrement the total actions queued. + Interlocked.Decrement(ref Queued); - // If this action was transferred to another thread, queue the action up on that other thread. - if (action.ProcessorThread != this) - { - action.ProcessorThread.Queue(action, true); - continue; - } + // If this action is not registered to this thread, skip it. + if (action.ProcessorThread == null) + continue; - // Update the idle time - RollingEstimate(ref _idleTime, _perfStopwatch.ElapsedMilliseconds, 10); + // If this action was transferred to another thread, queue the action up on that other thread. + if (action.ProcessorThread != this) + { + action.ProcessorThread.Queue(action, true); + continue; + } - // Restart the watch to time the runtime of the action. - _perfStopwatch.Restart(); + // Decrement only if this is going to run on this thread. + Interlocked.Decrement(ref action.QueuedCount); - // Catch any exceptions and ignore for now. - // TODO: Figure out a plan for handling exceptions thrown in the process. - try - { - action.Action(); - } - catch - { - // ignored - } + // Update the idle time + RollingEstimate(ref _idleTime, _perfStopwatch.ElapsedMilliseconds, 10); - // Add this performance to the estimated rolling average. - RollingEstimate(ref action.AverageUsageTime, _perfStopwatch.ElapsedMilliseconds, 10); + // Restart the watch to time the runtime of the action. + _perfStopwatch.Restart(); + // Catch any exceptions and ignore for now. + // TODO: Figure out a plan for handling exceptions thrown in the process. + action.Action(); + + // Add this performance to the estimated rolling average. + RollingEstimate(ref action.AverageUsageTime, _perfStopwatch.ElapsedMilliseconds, 10); + } + } + catch (OperationCanceledException) + { + // ignored + } + catch + { + // ignored } } @@ -505,9 +622,8 @@ public void Queue(RegisteredAction registeredAction, bool transferred) Interlocked.Increment(ref Queued); if (transferred == false) - { Interlocked.Increment(ref registeredAction.QueuedCount); - } + _actions.TryAdd(registeredAction); } @@ -522,7 +638,7 @@ public void DeregisterAction(RegisteredAction action) { Interlocked.Decrement(ref _registeredActionsCount); _registeredActions.TryRemove(action.Id, out var removedAction); - removedAction.ResetEvent.Reset(); + removedAction.TransferLock.Reset(); removedAction.ProcessorThread = null; } @@ -543,27 +659,22 @@ public RegisteredAction[] GetActions(int count) /// - /// Stops the loop and stops the thread from running - /// and returns all actions registered to execute. + /// Stops the loop, clears the calling queue and stops the thread from running /// - /// Returns all registered actions for this processor. - public RegisteredAction[] Stop() + public void Stop() { Pause(); - List actions = new List(); // Remove all the actions from the collection. while (_actions.Count > 1) - actions.Add(_actions.Take()); + _actions.Take(); - foreach (var registeredAction in actions) + foreach (var registeredAction in _registeredActions) { - // Puts a hold on all attempted queues to the - if (registeredAction.ResetEvent.IsSet) - registeredAction.ResetEvent.Reset(); + // Puts a hold on all attempted queues to the processor until this action has been transferred. + if (registeredAction.Value.TransferLock.IsSet) + registeredAction.Value.TransferLock.Reset(); } - - return actions.ToArray(); } /// diff --git a/src/DtronixMessageQueue/Socket/SessionHandler.cs b/src/DtronixMessageQueue/Socket/SessionHandler.cs index c34deda..e373320 100644 --- a/src/DtronixMessageQueue/Socket/SessionHandler.cs +++ b/src/DtronixMessageQueue/Socket/SessionHandler.cs @@ -94,8 +94,16 @@ protected SessionHandler(TConfig config, SocketMode mode) if (mode == SocketMode.Client) { - OutboxProcessor = new ActionProcessor($"{modeLower}-outbox", 1); - InboxProcessor = new ActionProcessor($"{modeLower}-inbox", 1); + OutboxProcessor = new ActionProcessor(new ActionProcessor.Config + { + ThreadName = $"{modeLower}-outbox", + StartThreads = 1 + }); + InboxProcessor = new ActionProcessor(new ActionProcessor.Config + { + ThreadName = $"{modeLower}-inbox", + StartThreads = 1 + }); } else { @@ -103,8 +111,16 @@ protected SessionHandler(TConfig config, SocketMode mode) ? Environment.ProcessorCount : config.ProcessorThreads; - OutboxProcessor = new ActionProcessor($"{modeLower}-outbox", processorThreads); - InboxProcessor = new ActionProcessor($"{modeLower}-inbox", processorThreads); + OutboxProcessor = new ActionProcessor(new ActionProcessor.Config + { + ThreadName = $"{modeLower}-outbox", + StartThreads = processorThreads + }); + InboxProcessor = new ActionProcessor(new ActionProcessor.Config + { + ThreadName = $"{modeLower}-inbox", + StartThreads = processorThreads + }); } OutboxProcessor.Start(); diff --git a/tools/RunTests.bat b/tools/RunTests.bat new file mode 100644 index 0000000..87ca207 --- /dev/null +++ b/tools/RunTests.bat @@ -0,0 +1 @@ +..\src\packages\xunit.runner.console.2.1.0\tools\xunit.console.exe ..\src\DtronixMessageQueue.Tests\bin\Debug\DtronixMessageQueue.Tests.dll \ No newline at end of file