Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Removed mailbox and merged functionality into MqSession.
Browse files Browse the repository at this point in the history
Fixed issues with the performance tester.
  • Loading branch information
DJGosnell committed Aug 28, 2016
1 parent 9c416cb commit fc18a63
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 342 deletions.
26 changes: 13 additions & 13 deletions DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ static void Main(string[] args){

if (args == null || args.Length == 0) {
mode = "single-process";
total_loops = 5;
total_messages = 10000;
total_loops = 1;
total_messages = 1000000;
total_frames = 4;
frame_size = 50;
total_clients = 1;
Expand Down Expand Up @@ -114,7 +114,9 @@ private static void StartClient(int total_loops, int total_messages, int total_f

cl.IncomingMessage += (sender, args) => {
MqMessage msg;
while(args.Mailbox.Inbox.TryDequeue(out msg)) {
while (args.Messages.Count > 0) {
msg = args.Messages.Dequeue();

message_reader.Message = msg;
var result = message_reader.ReadString();

Expand Down Expand Up @@ -146,12 +148,12 @@ private static void StartClient(int total_loops, int total_messages, int total_f

cl.Close();
}else if (result == "START") {
stopwatch.Restart();
for (var i = 0; i < total_messages; i++) {
cl.Send(message);
if (total_loops > 0) {
stopwatch.Restart();
for (var i = 0; i < total_messages; i++) {
cl.Send(message);
}
}

Thread.Sleep(500);
}
}
};
Expand Down Expand Up @@ -197,12 +199,10 @@ private static void StartServer(int total_messages, int total_clients) {
};

server.IncomingMessage += (sender, args) => {
MqMessage message_out;

var client_info = client_infos[args.Session];
while (args.Mailbox.Inbox.TryDequeue(out message_out)) {
client_info.Runs++;
}

// Count the total messages.
client_info.Runs += args.Messages.Count;

if (client_info.Runs == total_messages) {
args.Session.Send(complete_message);
Expand Down
9 changes: 5 additions & 4 deletions DtronixMessageQueue.Tests/MqClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public MqClientTests(ITestOutputHelper output) : base(output) {
[Theory]
[InlineData(1, false)]
[InlineData(1, true)]
//[InlineData(50, true)]
public void Client_should_send_data_to_server(int number, bool validate) {
var message_source = GenerateRandomMessage(4, 50);
int received_messages = 0;
Expand All @@ -32,8 +31,9 @@ public void Client_should_send_data_to_server(int number, bool validate) {
Server.IncomingMessage += (sender, args) => {
MqMessage message;

while (args.Mailbox.Inbox.TryDequeue(out message)) {
received_messages++;
while (args.Messages.Count > 0) {
message = args.Messages.Dequeue();
Interlocked.Increment(ref received_messages);
if (validate) {
CompareMessages(message_source, message);
}
Expand Down Expand Up @@ -62,7 +62,8 @@ public void Client_does_not_send_empty_message() {
Server.IncomingMessage += (sender, args) => {
MqMessage message;

while (args.Mailbox.Inbox.TryDequeue(out message)) {
while (args.Messages.Count > 0) {
message = args.Messages.Dequeue();
if (message.Count != 2) {
LastException = new Exception("Server received an empty message.");
}
Expand Down
17 changes: 12 additions & 5 deletions DtronixMessageQueue.Tests/MqServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public MqServerTests(ITestOutputHelper output) : base(output) {
[Theory]
[InlineData(1, false)]
[InlineData(1, true)]
[InlineData(50, true)]
public void Server_should_send_data_to_client(int number, bool validate) {
var message_source = GenerateRandomMessage(4, 50);

Expand All @@ -31,15 +30,23 @@ public void Server_should_send_data_to_client(int number, bool validate) {

};

int client_message_count = 0;
Client.IncomingMessage += (sender, args) => {
MqMessage message;
args.Mailbox.Inbox.TryDequeue(out message);

if (validate) {
CompareMessages(message_source, message);
client_message_count += args.Messages.Count;

while (args.Messages.Count > 0) {
message = args.Messages.Dequeue();

if (validate) {
CompareMessages(message_source, message);
}
}

TestStatus.Set();
if (client_message_count == number) {
TestStatus.Set();
}
};

StartAndWait();
Expand Down
2 changes: 0 additions & 2 deletions DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
<Compile Include="MqMessageWriter.cs" />
<Compile Include="MqSession.cs" />
<Compile Include="Socket\BufferManager.cs" />
<Compile Include="Socket\LogWrapper.cs" />
<Compile Include="Socket\SessionConnectedEventArgs.cs" />
<Compile Include="Socket\SessionClosedEventArgs.cs" />
<Compile Include="Socket\SocketAsyncEventArgsPool.cs" />
Expand All @@ -68,7 +67,6 @@
<Compile Include="MqClient.cs" />
<Compile Include="MqFrame.cs" />
<Compile Include="MqFrameType.cs" />
<Compile Include="MqMailbox.cs" />
<Compile Include="MqMessage.cs" />
<Compile Include="MqServer.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Expand Down
13 changes: 6 additions & 7 deletions DtronixMessageQueue/IncomingMessageEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ namespace DtronixMessageQueue {
public class IncomingMessageEventArgs : EventArgs {

/// <summary>
/// Reference to the mailbox with the new message.
/// Messages ready to be read.
/// </summary>
public MqMailbox Mailbox { get; set; }
public Queue<MqMessage> Messages { get; }

/// <summary>
/// If this message is on the server, this will contain the reference to the connected session of the client.
/// </summary>
public MqSession Session { get; set; }
public MqSession Session { get; }

/// <summary>
/// Creates an instance of the event args.
/// </summary>
/// <param name="mailbox">Mailbox with the new message</param>
/// <param name="messages">Messages read and ready to be used.</param>
/// <param name="session">Server session. Null if this is on the client.</param>
/// <param name="client">Client. Null if this is on the server.</param>
public IncomingMessageEventArgs(MqMailbox mailbox, MqSession session) {
Mailbox = mailbox;
public IncomingMessageEventArgs(Queue<MqMessage> messages, MqSession session) {
Messages = messages;
Session = session;
}
}
Expand Down
10 changes: 5 additions & 5 deletions DtronixMessageQueue/MqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ private void OnIncomingMessage(object sender, IncomingMessageEventArgs e) {

protected override MqSession CreateSession(System.Net.Sockets.Socket socket) {
var session = base.CreateSession(socket);
session.Mailbox = new MqMailbox(postmaster, session);
session.Mailbox.IncomingMessage += OnIncomingMessage;
session.Postmaster = postmaster;
session.IncomingMessage += OnIncomingMessage;

return session;
}
Expand All @@ -72,14 +72,14 @@ public void Send(MqMessage message) {
}

// Enqueue the outgoing message to be processed by the postmaster.
Session.Mailbox.EnqueueOutgoingMessage(message);
Session.EnqueueOutgoingMessage(message);
}


public void Close() {
Session.Mailbox.IncomingMessage -= OnIncomingMessage;
Session.IncomingMessage -= OnIncomingMessage;
Session.CloseConnection(SocketCloseReason.ClientClosing);
Session.Mailbox.Dispose();
Session.Dispose();
}

/// <summary>
Expand Down
Loading

0 comments on commit fc18a63

Please sign in to comment.