diff --git a/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs b/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs index a8f0fa5..3ea29cd 100644 --- a/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs +++ b/src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs @@ -90,7 +90,7 @@ public void Client_calls_proxy_method_and_canceles() { bool threw = false; try { service.LongRunningTask(1, 2, token_source.Token); - } catch (OperationCanceledException ex) { + } catch (OperationCanceledException) { threw = true; } diff --git a/src/DtronixMessageQueue/DtronixMessageQueue.csproj b/src/DtronixMessageQueue/DtronixMessageQueue.csproj index 4c174e3..3a74aa8 100644 --- a/src/DtronixMessageQueue/DtronixMessageQueue.csproj +++ b/src/DtronixMessageQueue/DtronixMessageQueue.csproj @@ -81,7 +81,6 @@ - diff --git a/src/DtronixMessageQueue/MqPostmaster.cs b/src/DtronixMessageQueue/MqPostmaster.cs index b4169e4..58a2610 100644 --- a/src/DtronixMessageQueue/MqPostmaster.cs +++ b/src/DtronixMessageQueue/MqPostmaster.cs @@ -11,7 +11,10 @@ namespace DtronixMessageQueue { public class MqPostmaster : IDisposable where TSession : MqSession, new() { - private SmartThreadPool thread_pool; + /// + /// Internal thread pool for this instance. + /// + private readonly SmartThreadPool thread_pool; private class WorkerInfo { @@ -24,11 +27,6 @@ public enum WorkerType { public WorkerType Type; } - /// - /// Internal worker to review the current work being done. - /// - private readonly MqWorker supervisor; - /// /// Dictionary to prevent multiple writes occurring on the same session concurrently. /// @@ -54,7 +52,7 @@ public enum WorkerType { /// private readonly MqSocketConfig config; - private CancellationTokenSource cancellation_token_source = new CancellationTokenSource(); + private readonly CancellationTokenSource cancellation_token_source = new CancellationTokenSource(); /// /// Creates a new postmaster instance to handle reading and writing of all sessions. @@ -62,16 +60,7 @@ public enum WorkerType { public MqPostmaster(MqSocketConfig config) { this.config = config; - // Add a supervisor to review when it is needed to increase or decrease the worker numbers. - //supervisor = new MqWorker(SupervisorWork, "postmaster_supervisor"); - - // Create one reader and one writer workers to start off with. - - - - //supervisor.Start(); - - thread_pool = new SmartThreadPool(config.IdleWorkerTimeout, config.MaxReadWriteWorkers); + thread_pool = new SmartThreadPool(config.IdleWorkerTimeout, config.MaxReadWriteWorkers, 4); var writer_info = new WorkerInfo { Type = WorkerInfo.WorkerType.Writer, diff --git a/src/DtronixMessageQueue/MqSocketConfig.cs b/src/DtronixMessageQueue/MqSocketConfig.cs index 6c23a17..def7a93 100644 --- a/src/DtronixMessageQueue/MqSocketConfig.cs +++ b/src/DtronixMessageQueue/MqSocketConfig.cs @@ -1,7 +1,9 @@ -using DtronixMessageQueue.Socket; +using System; +using DtronixMessageQueue.Socket; namespace DtronixMessageQueue { public class MqSocketConfig : SocketConfig { + private int max_read_write_workers = 4; /// /// Max size of the frame. Needs to be equal or smaller than SendAndReceiveBufferSize. @@ -25,13 +27,19 @@ public class MqSocketConfig : SocketConfig { /// - /// (Server) + /// (Server/Client) /// Max number of workers used to read/write. + /// Minimum of 4 required. /// /// - /// Value of 20 would make a maximum of 20 readers and 20 writers. Total of 40 workers. + /// Writers and readers share the total thread count specified here. /// - public int MaxReadWriteWorkers { get; set; } = 20; + public int MaxReadWriteWorkers { + get { return max_read_write_workers; } + set { + max_read_write_workers = Math.Max(4, value); + } + } /// diff --git a/src/DtronixMessageQueue/MqWorker.cs b/src/DtronixMessageQueue/MqWorker.cs deleted file mode 100644 index ee65cbc..0000000 --- a/src/DtronixMessageQueue/MqWorker.cs +++ /dev/null @@ -1,155 +0,0 @@ -using System; -using System.Diagnostics; -using System.Threading; - -namespace DtronixMessageQueue { - - /// - /// Worker which creates a new thread and monitor's its usage. - /// Loops the passed action worker until the worker had been canceled. - /// - public class MqWorker : IDisposable { - - /// - /// Average time this worker spent idle. Used to determine if this worker is still needed. - /// - private long average_idle_time = 0; - - /// - /// Average time this worker spent working. Used to determine if this worker is still needed. - /// - private long average_work_time = 0; - - /// - /// Stopwatch used to monitor the idle time. - /// - private readonly Stopwatch idle_stopwatch = new Stopwatch(); - - /// - /// Stopwatch used to monitor the work time. - /// - private readonly Stopwatch work_stopwatch = new Stopwatch(); - - /// - /// Token source used to cancel this worker. - /// - private readonly CancellationTokenSource cancellation_source = new CancellationTokenSource(); - - /// - /// Token used to cancel this worker. - /// - public CancellationToken Token { get; } - - /// - /// Average time this worker remains idle. - /// The smaller the number, the more work being done. - /// - public long AverageIdleTime => average_idle_time; - - /// - /// True if this worker is idling. - /// - public bool IsIdling => idle_stopwatch.IsRunning; - - /// - /// True if this worker is working. - /// - public bool IsWorking => work_stopwatch.IsRunning; - - /// - /// Work action to perform. - /// - private readonly Action work; - - /// - /// Thread this worker uses to perform the work. - /// - private readonly Thread worker_thread; - - /// - /// Creates an instance o - /// - /// - /// - public MqWorker(Action work, string name) { - idle_stopwatch.Start(); - this.work = work; - Token = cancellation_source.Token; - worker_thread = new Thread(ProcessQueue) { - Name = name, - IsBackground = true, - Priority = ThreadPriority.Normal - }; - //worker_task = new Task(ProcessQueue, Token, Token, TaskCreationOptions.LongRunning); - } - - /// - /// Start the worker. - /// - public void Start() { - worker_thread.Start(this); - //worker_task.Start(); - } - - /// - /// Called by the worker action when the work has completed to signal the worker is idle. - /// - public void StartIdle() { - idle_stopwatch.Restart(); - - if (work_stopwatch.IsRunning) { - work_stopwatch.Stop(); - - average_work_time = average_work_time == 0 - ? work_stopwatch.ElapsedMilliseconds - : (work_stopwatch.ElapsedMilliseconds + average_work_time) / 2; - } - } - - /// - /// Called by the worker action when the work has started to signal the worker is busy. - /// - public void StartWork() { - work_stopwatch.Restart(); - idle_stopwatch.Stop(); - - average_idle_time = average_idle_time == 0 - ? idle_stopwatch.ElapsedMilliseconds - : (idle_stopwatch.ElapsedMilliseconds + average_idle_time) / 2; - } - - /// - /// Interrupt the worker loop and keep the worker in an idle state. - /// - public void Stop() { - cancellation_source.Cancel(); - } - - /// - /// Called by the new thread to loop the worker's action. - /// - /// - private void ProcessQueue(object o) { - while (Token.IsCancellationRequested == false) { - try { - work?.Invoke((MqWorker)o); - } catch (Exception) { - // ignored - } - } - } - - /// - /// Stops the thread and releases resources. - /// - public void Dispose() { - if (worker_thread.IsAlive) { - Stop(); - } - - idle_stopwatch.Stop(); - work_stopwatch.Stop(); - worker_thread.Abort(); - } - } -} \ No newline at end of file diff --git a/src/DtronixMessageQueue/Properties/AssemblyInfo.cs b/src/DtronixMessageQueue/Properties/AssemblyInfo.cs index ae15b07..135ce20 100644 --- a/src/DtronixMessageQueue/Properties/AssemblyInfo.cs +++ b/src/DtronixMessageQueue/Properties/AssemblyInfo.cs @@ -35,5 +35,5 @@ // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("0.8.0.0")] -[assembly: AssemblyFileVersion("0.8.0.0")] \ No newline at end of file +[assembly: AssemblyVersion("0.9.0.0")] +[assembly: AssemblyFileVersion("0.9.0.0")] \ No newline at end of file