diff --git a/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs b/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs index 97ab451..eed88ac 100644 --- a/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs +++ b/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs @@ -16,7 +16,7 @@ public RpcPerformanceTest(string[] args) { Port = 2828 }; - RpcSingleProcessTest(1000, 10, config, RpcTestType.NoRetrun); + RpcSingleProcessTest(100000, 4, config, RpcTestType.NoRetrun); } diff --git a/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs b/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs index 0c730df..9c8f2c0 100644 --- a/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs +++ b/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs @@ -20,7 +20,7 @@ class TestService : MarshalByRefObject, ITestService { public void TestNoReturn() { var number = Interlocked.Increment(ref call_count); - Console.Write(number + " "); + //Console.Write(number + " "); VerifyComplete(); } diff --git a/DtronixMessageQueue.Tests/MqClientTests.cs b/DtronixMessageQueue.Tests/MqClientTests.cs index 777dc78..897f067 100644 --- a/DtronixMessageQueue.Tests/MqClientTests.cs +++ b/DtronixMessageQueue.Tests/MqClientTests.cs @@ -15,6 +15,8 @@ public MqClientTests(ITestOutputHelper output) : base(output) { [Theory] [InlineData(1, false)] [InlineData(1, true)] + [InlineData(100, true)] + [InlineData(1000, true)] public void Client_should_send_data_to_server(int number, bool validate) { var message_source = GenerateRandomMessage(4, 50); int received_messages = 0; diff --git a/DtronixMessageQueue.Tests/MqServerTests.cs b/DtronixMessageQueue.Tests/MqServerTests.cs index fbf624c..f0fd52e 100644 --- a/DtronixMessageQueue.Tests/MqServerTests.cs +++ b/DtronixMessageQueue.Tests/MqServerTests.cs @@ -15,6 +15,8 @@ public MqServerTests(ITestOutputHelper output) : base(output) { [Theory] [InlineData(1, false)] [InlineData(1, true)] + [InlineData(100, true)] + [InlineData(1000, true)] public void Server_should_send_data_to_client(int number, bool validate) { var message_source = GenerateRandomMessage(4, 50); diff --git a/DtronixMessageQueue/MqPostmaster.cs b/DtronixMessageQueue/MqPostmaster.cs index 3319a50..66b6fbb 100644 --- a/DtronixMessageQueue/MqPostmaster.cs +++ b/DtronixMessageQueue/MqPostmaster.cs @@ -155,9 +155,14 @@ public void CreateWriteWorker() { worker.StartIdle(); while (write_operations.TryTake(out session, 60000, worker.Token)) { worker.StartWork(); - session.ProcessOutbox(); + + if (session.ProcessOutbox()) { + write_operations.TryAdd(session); + } else { + ongoing_write_operations.TryRemove(session, out out_session); + } + worker.StartIdle(); - ongoing_write_operations.TryRemove(session, out out_session); } } catch (ThreadAbortException) { } catch (Exception) { @@ -186,9 +191,13 @@ public void CreateReadWorker() { worker.StartIdle(); while (read_operations.TryTake(out session, 60000, worker.Token)) { worker.StartWork(); - session.ProcessIncomingQueue(); + if (session.ProcessIncomingQueue()) { + read_operations.TryAdd(session); + } else { + ongoing_read_operations.TryRemove(session, out out_session); + } + worker.StartIdle(); - ongoing_read_operations.TryRemove(session, out out_session); } } catch (ThreadAbortException) { diff --git a/DtronixMessageQueue/MqSession.cs b/DtronixMessageQueue/MqSession.cs index 09fb455..9343e4e 100644 --- a/DtronixMessageQueue/MqSession.cs +++ b/DtronixMessageQueue/MqSession.cs @@ -115,7 +115,8 @@ private void SendBufferQueue(Queue buffer_queue, int length) { /// /// Internally called method by the Postmaster on a different thread to send all messages in the outbox. /// - internal void ProcessOutbox() { + /// True if messages were sent. False if nothing was sent. + internal bool ProcessOutbox() { MqMessage result; var length = 0; var buffer_queue = new Queue(); @@ -139,16 +140,20 @@ internal void ProcessOutbox() { } } - if (buffer_queue.Count > 0) { - // Send the last of the buffer queue. - SendBufferQueue(buffer_queue, length); + if (buffer_queue.Count == 0) { + return false; } + + // Send the last of the buffer queue. + SendBufferQueue(buffer_queue, length); + return true; } /// /// Internal method called by the Postmaster on a different thread to process all bytes in the inbox. /// - internal void ProcessIncomingQueue() { + /// True if incoming queue was processed; False if nothing was available for process. + internal bool ProcessIncomingQueue() { if (message == null) { message = new MqMessage(); } @@ -201,9 +206,13 @@ internal void ProcessIncomingQueue() { } //Postmaster.SignalReadComplete(this); - if (messages != null) { - OnIncomingMessage(this, new IncomingMessageEventArgs(messages, (TSession)this)); + if (messages == null) { + return false; } + + OnIncomingMessage(this, new IncomingMessageEventArgs(messages, (TSession)this)); + return true; + } ///