Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added automatic padding to the end of each packet.
Browse files Browse the repository at this point in the history
Started work on unifying SendReceiveBuffer size.
  • Loading branch information
DJGosnell committed Jun 11, 2018
1 parent ee43b2a commit a01ff29
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public MqPerformanceTest()
_config = new MqConfig
{
Address = "127.0.0.1:2828",
PingFrequency = 0
};

_smallMessage = new MqMessage
Expand Down Expand Up @@ -72,6 +73,7 @@ public override void StartTest()
Console.WriteLine("FrameBufferSize: {0}; SendAndReceiveBufferSize: {1}\r\n", _config.FrameBufferSize,
_config.SendAndReceiveBufferSize);

MqInProcessPerformanceTests(1, 1, _smallMessage, _config);

MqInProcessPerformanceTests(1000000, 5, _smallMessage, _config);

Expand Down Expand Up @@ -151,10 +153,6 @@ public void SendMessages(SimpleMqSession client, MqMessage message, int totalMes
{

client.Send(message);

//if(i % 50 == 1)
// Thread.Sleep(1);

}

if (!_loopSemaphore.WaitOne(timeout))
Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ static void Main(string[] args)

Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}");

Console.WriteLine("MQ Performance tests.\r\n");
new MqPerformanceTest().StartTest();
//Console.WriteLine("MQ Performance tests.\r\n");
//new MqPerformanceTest().StartTest();

Console.WriteLine("RPC Performance tests.\r\n");
new RpcPerformanceTest(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public RpcPerformanceTest(string[] args)

RpcSingleProcessTest(10000, 4, config, RpcTestType.Return);

RpcSingleProcessTest(10000, 4, config, RpcTestType.Exception);
RpcSingleProcessTest(100, 4, config, RpcTestType.Exception);


}
Expand Down Expand Up @@ -81,7 +81,7 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest

var send = new Action(() =>
{

var service = client.Session.GetProxy<ITestService>();
service.ResetTest();

Expand Down
188 changes: 101 additions & 87 deletions src/DtronixMessageQueue.Tests.Performance/Services/Server/TestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,91 +9,105 @@
using System.Xml.Serialization;
using DtronixMessageQueue.Rpc;

namespace DtronixMessageQueue.Tests.Performance.Services.Server {
class TestService : MarshalByRefObject, ITestService {
public string Name { get; } = "TestService";
public SimpleRpcSession Session { get; set; }

public event EventHandler<SimpleRpcSession> Completed;

private int call_count = 0;
private int total_calls = 0;

private bool completed = false;

public void TestNoReturn() {
var number = Interlocked.Increment(ref call_count);
//Console.Write($"{Thread.CurrentThread.ManagedThreadId} ");

VerifyComplete();

}

public async void TestNoReturnAwait() {
var number = Interlocked.Increment(ref call_count);
await Task.Delay(1000);

VerifyComplete();

}

public void TestNoReturnLongBlocking() {
var number = Interlocked.Increment(ref call_count);
Thread.Sleep(10000);
VerifyComplete();

}


public void TestNoReturnBlock() {
Task.Factory.StartNew(() => {
var number = Interlocked.Increment(ref call_count);

Thread.Sleep(1000);
VerifyComplete();
}, TaskCreationOptions.LongRunning);
}

public int TestIncrement() {
call_count++;
VerifyComplete();
return call_count;
}

public void TestSetup(int calls) {
total_calls = calls;
}

public bool ResetTest() {
call_count = 0;
completed = false;
return true;
}

public int TestException() {
call_count++;
VerifyComplete();
throw new Exception("This is a test exception");
}

private void VerifyComplete() {
if (completed == false && total_calls == call_count) {
completed = true;
Completed?.Invoke(this, Session);
}
}


}

internal interface ITestService : IRemoteService<SimpleRpcSession, RpcConfig> {
void TestNoReturn();
void TestNoReturnAwait();
void TestNoReturnBlock();
void TestNoReturnLongBlocking();
int TestIncrement();
void TestSetup(int calls);
bool ResetTest();
int TestException();
}
namespace DtronixMessageQueue.Tests.Performance.Services.Server
{
class TestService : MarshalByRefObject, ITestService
{
public string Name { get; } = "TestService";
public SimpleRpcSession Session { get; set; }

public event EventHandler<SimpleRpcSession> Completed;

private int call_count = 0;
private int total_calls = 0;

private bool completed = false;

public void TestNoReturn()
{
var number = Interlocked.Increment(ref call_count);
//Console.Write($"{Thread.CurrentThread.ManagedThreadId} ");

VerifyComplete();

}

public async void TestNoReturnAwait()
{
var number = Interlocked.Increment(ref call_count);
await Task.Delay(1000);

VerifyComplete();

}

public void TestNoReturnLongBlocking()
{
var number = Interlocked.Increment(ref call_count);
Thread.Sleep(10000);
VerifyComplete();

}


public void TestNoReturnBlock()
{
Task.Factory.StartNew(() =>
{
var number = Interlocked.Increment(ref call_count);

Thread.Sleep(1000);
VerifyComplete();
}, TaskCreationOptions.LongRunning);
}

public int TestIncrement()
{
call_count++;
VerifyComplete();
return call_count;
}

public void TestSetup(int calls)
{
total_calls = calls;
}

public bool ResetTest()
{
call_count = 0;
completed = false;
return true;
}

public int TestException()
{
call_count++;
VerifyComplete();
throw new Exception("This is a test exception");
}

private void VerifyComplete()
{
if (completed == false && total_calls == call_count)
{
completed = true;
Completed?.Invoke(this, Session);
}
}


}

internal interface ITestService : IRemoteService<SimpleRpcSession, RpcConfig>
{
void TestNoReturn();
void TestNoReturnAwait();
void TestNoReturnBlock();
void TestNoReturnLongBlocking();
int TestIncrement();
void TestSetup(int calls);
bool ResetTest();
int TestException();
}
}
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue.Tests/Mq/MqServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void Server_accepts_new_connection_after_max()
client2.Connected += (sender, args) => TestComplete.Set();
client.Connect();

TestComplete.Wait(new TimeSpan(0, 0, 0, 0, 1000));
TestComplete.Wait(new TimeSpan(0, 0, 0, 0, 2000));

if (TestComplete.IsSet == false)
{
Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
<Compile Include="Rpc\SerializationCache.cs" />
<Compile Include="Rpc\ServiceMethodCache.cs" />
<Compile Include="TcpSocket\BufferManager.cs" />
<Compile Include="TcpSocket\EncryptedSocketSession.cs" />
<None Include="TcpSocket\EncryptedSocketSession.cs" />
<Compile Include="TcpSocket\ISecureSocketSession.cs" />
<Compile Include="TcpSocket\ISetupSocketSession.cs" />
<Compile Include="SessionEventArgs.cs" />
Expand Down
7 changes: 3 additions & 4 deletions src/DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ private void ProcessOutbox()

while (_outbox.TryDequeue(out message))
{

if(CurrentState != State.Closed)
_sendingSemaphore.Release();

message.PrepareSend();
foreach (var frame in message)
for (var i = 0; i < message.Count; i++)
{
var frameSize = frame.FrameSize;
var frameSize = message[i].FrameSize;

// If this would overflow the max client buffer size, send the full buffer queue.
if (length + frameSize > Config.FrameBufferSize + MqFrame.HeaderLength)
Expand All @@ -127,7 +126,7 @@ private void ProcessOutbox()
// Reset the length to 0;
length = 0;
}
bufferQueue.Enqueue(frame.RawFrame());
bufferQueue.Enqueue(message[i].RawFrame());

// Increment the total buffer length.
length += frameSize;
Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/TcpSocket/EncryptedSocketSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ private enum EncryptedMessageType : byte
PartialMessage = 2,
}

protected override void Send(byte[] buffer, int offset, int length)
protected override void Send(byte[] buffer, int offset, int length, bool last)
{
base.Send(buffer, offset, length);
base.Send(buffer, offset, length, last);
}


Expand Down
6 changes: 4 additions & 2 deletions src/DtronixMessageQueue/TcpSocket/TcpSocketHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ protected void Setup()
var maxConnections = Config.MaxConnections + 1;

// preallocate pool of SocketAsyncEventArgs objects
AsyncManager = new SocketAsyncEventArgsManager(Config.SendAndReceiveBufferSize * maxConnections * 2,
Config.SendAndReceiveBufferSize);
var bufferSize = Config.SendAndReceiveBufferSize / 16 * 16 + 16;

AsyncManager = new SocketAsyncEventArgsManager(bufferSize * 2,
bufferSize);
}

/// <summary>
Expand Down
Loading

0 comments on commit a01ff29

Please sign in to comment.