From a01ff29f82a76e8a484e4d520a0a90c7ea56ed38 Mon Sep 17 00:00:00 2001 From: DJGosnell Date: Mon, 11 Jun 2018 17:33:05 -0400 Subject: [PATCH] Added automatic padding to the end of each packet. Started work on unifying SendReceiveBuffer size. --- .../MqPerformanceTest.cs | 6 +- .../Program.cs | 4 +- .../RpcPerformanceTest.cs | 4 +- .../Services/Server/TestService.cs | 188 ++++++++++-------- .../Mq/MqServerTests.cs | 2 +- .../DtronixMessageQueue.csproj | 2 +- src/DtronixMessageQueue/MqSession.cs | 7 +- .../TcpSocket/EncryptedSocketSession.cs | 4 +- .../TcpSocket/TcpSocketHandler.cs | 6 +- .../TcpSocket/TcpSocketSession.cs | 29 ++- 10 files changed, 140 insertions(+), 112 deletions(-) diff --git a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs index 50d64b8..ba55b85 100644 --- a/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Performance/MqPerformanceTest.cs @@ -33,6 +33,7 @@ public MqPerformanceTest() _config = new MqConfig { Address = "127.0.0.1:2828", + PingFrequency = 0 }; _smallMessage = new MqMessage @@ -72,6 +73,7 @@ public override void StartTest() Console.WriteLine("FrameBufferSize: {0}; SendAndReceiveBufferSize: {1}\r\n", _config.FrameBufferSize, _config.SendAndReceiveBufferSize); + MqInProcessPerformanceTests(1, 1, _smallMessage, _config); MqInProcessPerformanceTests(1000000, 5, _smallMessage, _config); @@ -151,10 +153,6 @@ public void SendMessages(SimpleMqSession client, MqMessage message, int totalMes { client.Send(message); - - //if(i % 50 == 1) - // Thread.Sleep(1); - } if (!_loopSemaphore.WaitOne(timeout)) diff --git a/src/DtronixMessageQueue.Tests.Performance/Program.cs b/src/DtronixMessageQueue.Tests.Performance/Program.cs index 1f4ef19..aefc403 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Program.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Program.cs @@ -29,8 +29,8 @@ static void Main(string[] args) Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}"); - Console.WriteLine("MQ Performance tests.\r\n"); - new MqPerformanceTest().StartTest(); + //Console.WriteLine("MQ Performance tests.\r\n"); + //new MqPerformanceTest().StartTest(); Console.WriteLine("RPC Performance tests.\r\n"); new RpcPerformanceTest(args); diff --git a/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs b/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs index 0f48940..8d3ebaa 100644 --- a/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs +++ b/src/DtronixMessageQueue.Tests.Performance/RpcPerformanceTest.cs @@ -29,7 +29,7 @@ public RpcPerformanceTest(string[] args) RpcSingleProcessTest(10000, 4, config, RpcTestType.Return); - RpcSingleProcessTest(10000, 4, config, RpcTestType.Exception); + RpcSingleProcessTest(100, 4, config, RpcTestType.Exception); } @@ -81,7 +81,7 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest var send = new Action(() => { - + var service = client.Session.GetProxy(); service.ResetTest(); diff --git a/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs b/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs index 5aae6e8..4de53f7 100644 --- a/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs +++ b/src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs @@ -9,91 +9,105 @@ using System.Xml.Serialization; using DtronixMessageQueue.Rpc; -namespace DtronixMessageQueue.Tests.Performance.Services.Server { - class TestService : MarshalByRefObject, ITestService { - public string Name { get; } = "TestService"; - public SimpleRpcSession Session { get; set; } - - public event EventHandler Completed; - - private int call_count = 0; - private int total_calls = 0; - - private bool completed = false; - - public void TestNoReturn() { - var number = Interlocked.Increment(ref call_count); - //Console.Write($"{Thread.CurrentThread.ManagedThreadId} "); - - VerifyComplete(); - - } - - public async void TestNoReturnAwait() { - var number = Interlocked.Increment(ref call_count); - await Task.Delay(1000); - - VerifyComplete(); - - } - - public void TestNoReturnLongBlocking() { - var number = Interlocked.Increment(ref call_count); - Thread.Sleep(10000); - VerifyComplete(); - - } - - - public void TestNoReturnBlock() { - Task.Factory.StartNew(() => { - var number = Interlocked.Increment(ref call_count); - - Thread.Sleep(1000); - VerifyComplete(); - }, TaskCreationOptions.LongRunning); - } - - public int TestIncrement() { - call_count++; - VerifyComplete(); - return call_count; - } - - public void TestSetup(int calls) { - total_calls = calls; - } - - public bool ResetTest() { - call_count = 0; - completed = false; - return true; - } - - public int TestException() { - call_count++; - VerifyComplete(); - throw new Exception("This is a test exception"); - } - - private void VerifyComplete() { - if (completed == false && total_calls == call_count) { - completed = true; - Completed?.Invoke(this, Session); - } - } - - - } - - internal interface ITestService : IRemoteService { - void TestNoReturn(); - void TestNoReturnAwait(); - void TestNoReturnBlock(); - void TestNoReturnLongBlocking(); - int TestIncrement(); - void TestSetup(int calls); - bool ResetTest(); - int TestException(); - } +namespace DtronixMessageQueue.Tests.Performance.Services.Server +{ + class TestService : MarshalByRefObject, ITestService + { + public string Name { get; } = "TestService"; + public SimpleRpcSession Session { get; set; } + + public event EventHandler Completed; + + private int call_count = 0; + private int total_calls = 0; + + private bool completed = false; + + public void TestNoReturn() + { + var number = Interlocked.Increment(ref call_count); + //Console.Write($"{Thread.CurrentThread.ManagedThreadId} "); + + VerifyComplete(); + + } + + public async void TestNoReturnAwait() + { + var number = Interlocked.Increment(ref call_count); + await Task.Delay(1000); + + VerifyComplete(); + + } + + public void TestNoReturnLongBlocking() + { + var number = Interlocked.Increment(ref call_count); + Thread.Sleep(10000); + VerifyComplete(); + + } + + + public void TestNoReturnBlock() + { + Task.Factory.StartNew(() => + { + var number = Interlocked.Increment(ref call_count); + + Thread.Sleep(1000); + VerifyComplete(); + }, TaskCreationOptions.LongRunning); + } + + public int TestIncrement() + { + call_count++; + VerifyComplete(); + return call_count; + } + + public void TestSetup(int calls) + { + total_calls = calls; + } + + public bool ResetTest() + { + call_count = 0; + completed = false; + return true; + } + + public int TestException() + { + call_count++; + VerifyComplete(); + throw new Exception("This is a test exception"); + } + + private void VerifyComplete() + { + if (completed == false && total_calls == call_count) + { + completed = true; + Completed?.Invoke(this, Session); + } + } + + + } + + internal interface ITestService : IRemoteService + { + void TestNoReturn(); + void TestNoReturnAwait(); + void TestNoReturnBlock(); + void TestNoReturnLongBlocking(); + int TestIncrement(); + void TestSetup(int calls); + bool ResetTest(); + int TestException(); + } } diff --git a/src/DtronixMessageQueue.Tests/Mq/MqServerTests.cs b/src/DtronixMessageQueue.Tests/Mq/MqServerTests.cs index 2b0c98f..409b304 100644 --- a/src/DtronixMessageQueue.Tests/Mq/MqServerTests.cs +++ b/src/DtronixMessageQueue.Tests/Mq/MqServerTests.cs @@ -93,7 +93,7 @@ public void Server_accepts_new_connection_after_max() client2.Connected += (sender, args) => TestComplete.Set(); client.Connect(); - TestComplete.Wait(new TimeSpan(0, 0, 0, 0, 1000)); + TestComplete.Wait(new TimeSpan(0, 0, 0, 0, 2000)); if (TestComplete.IsSet == false) { diff --git a/src/DtronixMessageQueue/DtronixMessageQueue.csproj b/src/DtronixMessageQueue/DtronixMessageQueue.csproj index 56788a7..085cedf 100644 --- a/src/DtronixMessageQueue/DtronixMessageQueue.csproj +++ b/src/DtronixMessageQueue/DtronixMessageQueue.csproj @@ -74,7 +74,7 @@ - + diff --git a/src/DtronixMessageQueue/MqSession.cs b/src/DtronixMessageQueue/MqSession.cs index 01bc8f1..1088d5b 100644 --- a/src/DtronixMessageQueue/MqSession.cs +++ b/src/DtronixMessageQueue/MqSession.cs @@ -110,14 +110,13 @@ private void ProcessOutbox() while (_outbox.TryDequeue(out message)) { - if(CurrentState != State.Closed) _sendingSemaphore.Release(); message.PrepareSend(); - foreach (var frame in message) + for (var i = 0; i < message.Count; i++) { - var frameSize = frame.FrameSize; + var frameSize = message[i].FrameSize; // If this would overflow the max client buffer size, send the full buffer queue. if (length + frameSize > Config.FrameBufferSize + MqFrame.HeaderLength) @@ -127,7 +126,7 @@ private void ProcessOutbox() // Reset the length to 0; length = 0; } - bufferQueue.Enqueue(frame.RawFrame()); + bufferQueue.Enqueue(message[i].RawFrame()); // Increment the total buffer length. length += frameSize; diff --git a/src/DtronixMessageQueue/TcpSocket/EncryptedSocketSession.cs b/src/DtronixMessageQueue/TcpSocket/EncryptedSocketSession.cs index c9e2d63..0f72912 100644 --- a/src/DtronixMessageQueue/TcpSocket/EncryptedSocketSession.cs +++ b/src/DtronixMessageQueue/TcpSocket/EncryptedSocketSession.cs @@ -271,9 +271,9 @@ private enum EncryptedMessageType : byte PartialMessage = 2, } - protected override void Send(byte[] buffer, int offset, int length) + protected override void Send(byte[] buffer, int offset, int length, bool last) { - base.Send(buffer, offset, length); + base.Send(buffer, offset, length, last); } diff --git a/src/DtronixMessageQueue/TcpSocket/TcpSocketHandler.cs b/src/DtronixMessageQueue/TcpSocket/TcpSocketHandler.cs index 7cfc702..5ec6f66 100644 --- a/src/DtronixMessageQueue/TcpSocket/TcpSocketHandler.cs +++ b/src/DtronixMessageQueue/TcpSocket/TcpSocketHandler.cs @@ -212,8 +212,10 @@ protected void Setup() var maxConnections = Config.MaxConnections + 1; // preallocate pool of SocketAsyncEventArgs objects - AsyncManager = new SocketAsyncEventArgsManager(Config.SendAndReceiveBufferSize * maxConnections * 2, - Config.SendAndReceiveBufferSize); + var bufferSize = Config.SendAndReceiveBufferSize / 16 * 16 + 16; + + AsyncManager = new SocketAsyncEventArgsManager(bufferSize * 2, + bufferSize); } /// diff --git a/src/DtronixMessageQueue/TcpSocket/TcpSocketSession.cs b/src/DtronixMessageQueue/TcpSocket/TcpSocketSession.cs index 62aa269..a8a3c06 100644 --- a/src/DtronixMessageQueue/TcpSocket/TcpSocketSession.cs +++ b/src/DtronixMessageQueue/TcpSocket/TcpSocketSession.cs @@ -213,7 +213,7 @@ public static TSession Create(TlsSocketSessionCreateArguments ServiceMethodCache = args.ServiceMethodCache }; - session._receiveTransformedBuffer = new byte[args.SessionConfig.SendAndReceiveBufferSize]; + session._receiveTransformedBuffer = new byte[args.SessionConfig.SendAndReceiveBufferSize + 3]; session._sendArgs.Completed += session.IoCompleted; session._receiveArgs.Completed += session.IoCompleted; @@ -301,7 +301,7 @@ private void SecureConnectionReceive(byte[] buffer) { KeySize = 256, Mode = CipherMode.CBC, - Padding = PaddingMode.PKCS7 + Padding = PaddingMode.None }; // Server side. @@ -346,7 +346,7 @@ private void SecureConnectionReceive(byte[] buffer) { KeySize = 256, Mode = CipherMode.CBC, - Padding = PaddingMode.PKCS7, + Padding = PaddingMode.None, Key = key, IV = iv }; @@ -368,8 +368,11 @@ private void SecureConnectionComplete() } // Set the encryptor and decryptor. + /*_decryptor = new PlainCryptoTransform(); + _encryptor = new PlainCryptoTransform(); //*/ + _decryptor = _aes.CreateDecryptor(); - _encryptor = _aes.CreateEncryptor(); + _encryptor = _aes.CreateEncryptor(); //*/ _negotiationStream.Close(); _negotiationStream = null; @@ -522,7 +525,7 @@ protected virtual void Send(byte[] buffer, int offset, int length) if (Socket == null || Socket.Connected == false) return; - if(length + 3 >= Config.SendAndReceiveBufferSize) + if(length > Config.SendAndReceiveBufferSize) throw new ArgumentException("Too large"); _writeSemaphore.Wait(-1); @@ -545,8 +548,19 @@ protected virtual void Send(byte[] buffer, int offset, int length) sendLength += TransformDataBuffer(buffer, offset, length, _sendArgs.Buffer, _sendArgs.Offset + sendLength, _sendBuffer, ref _sendBufferLength, _encryptor, false, false); + // If this is the last frame in a packet, pad the ending. + if (length + 3 != sendLength) + { + var paddingLength = 16 - _sendBufferLength; + sendLength += TransformDataBuffer(paddingBuffer[paddingLength - 1], 0, paddingLength, + _sendArgs.Buffer, + _sendArgs.Offset + sendLength, _sendBuffer, + ref _sendBufferLength, _encryptor, false, false); + } + if (sendLength == 0) return; + _sendArgs.SetBuffer(_sendArgs.Offset, sendLength); @@ -563,14 +577,15 @@ protected virtual void Send(byte[] buffer, int offset, int length) /// public void Flush() { + if (Socket == null || Socket.Connected == false) return; - _writeSemaphore.Wait(-1); - if (_sendBufferLength == 0) return; + //_writeSemaphore.Wait(-1); + var paddingLength = 16 - _sendBufferLength; // Padding