Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added MqSession property to IncomingMessageEventArgs.
Browse files Browse the repository at this point in the history
Started performance tests for multiple clients.
Fixed issue with Postmaster creating endless outbox workers.
  • Loading branch information
DJGosnell committed Aug 17, 2016
1 parent 0a65457 commit 6db6a92
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 50 deletions.
118 changes: 117 additions & 1 deletion DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Management;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using DtronixMessageQueue;
using SuperSocket.SocketBase.Config;

Expand Down Expand Up @@ -33,6 +36,8 @@ static void Main(string[] args) {
new MqFrame(RandomBytes(50), MqFrameType.Last)
};

MqPerformanceTests(10000, 1, small_message);

MqPerformanceTests(1000000, 5, small_message);

var medimum_message = new MqMessage {
Expand All @@ -58,6 +63,117 @@ static void Main(string[] args) {
Console.ReadLine();
}

private class ClientRunInfo {
public MqClient Client { get; set; }
public int Runs { get; set; }
public WaitHandle WaitHandle { get; set; }
public ManualResetEventSlim ManualResetEventSlim { get; set; }
public MqSession Session { get; set; }
public Stopwatch Stopwatch { get; set; }

}

private static void MqMultiClientPerformanceTests(int runs, int clients, MqMessage message) {
var server = new MqServer(new ServerConfig {
Ip = "127.0.0.1",
Port = 2828
});
server.Start();
Dictionary<MqSession, ClientRunInfo> client_infos = new Dictionary<MqSession, ClientRunInfo>();

var connection_reset = new ManualResetEventSlim();
ClientRunInfo current_info = null;

server.NewSessionConnected += session => {
current_info.Session = session;
client_infos.Add(session, current_info);

connection_reset.Set();

Console.Write(client_infos.Count + " ");
};

Console.Write("Clients connecting to server:");
for (int i = 0; i < clients; i++) {
var reset = new ManualResetEventSlim();
var cl = new MqClient();
current_info = new ClientRunInfo() {
Client = cl,
ManualResetEventSlim = reset,
WaitHandle = reset.WaitHandle,
Runs = 0,
Stopwatch = new Stopwatch()

};


cl.ConnectAsync("127.0.0.1").Wait();

connection_reset.Wait();

}

Console.WriteLine("\r\nClients connected to server Starting tests.");

Console.WriteLine("| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |");
Console.WriteLine("|---------|------------|-----------|--------------|------------|----------|");


var message_size = message.Size;

server.IncomingMessage += (sender, args) => {
MqMessage message_out;

var client_info = client_infos[args.Session];
while (args.Mailbox.Inbox.TryDequeue(out message_out)) {
client_info.Runs++;
}

if (client_info.Runs == runs) {
client_info.Stopwatch.Stop();
var mode = "Release";

#if DEBUG
mode = "Debug";
#endif

var messages_per_second = (int)((double)runs / sw.ElapsedMilliseconds * 1000);
var msg_size_no_header = message_size - 12;
var mbps = runs * (double)(msg_size_no_header) / sw.ElapsedMilliseconds / 1000;
Console.WriteLine("| {0,7} | {1,10:N0} | {2,9:N0} | {3,12:N0} | {4,10:N0} | {5,8:N2} |", mode, runs, msg_size_no_header, sw.ElapsedMilliseconds, messages_per_second, mbps);

client_info.ManualResetEventSlim.Set();
}

};


var client = new MqClient();


foreach (var client_run_info in client_infos) {
Task.Run(() => {
client_run_info.Value.Stopwatch.Restart();
for (var i = 0; i < runs; i++) {
client.Send(message);
}
MqServer sv = server;
client_run_info.Value.ManualResetEventSlim.Wait();
client_run_info.Value.ManualResetEventSlim.Reset();
});
}

var resets = client_infos.Values.


ManualResetEvent.WaitAll()

Console.WriteLine("Complete");
server.Stop();
client.Close().Wait();*/

}

private static void MqPerformanceTests(int runs, int loops, MqMessage message) {
var server = new MqServer(new ServerConfig {
Ip = "127.0.0.1",
Expand All @@ -71,7 +187,6 @@ private static void MqPerformanceTests(int runs, int loops, MqMessage message) {
var sw = new Stopwatch();
var wait = new AutoResetEvent(false);

var client = new MqClient();

Console.WriteLine("| Build | Messages | Msg Bytes | Milliseconds | MPS | MBps |");
Console.WriteLine("|---------|------------|-----------|--------------|------------|----------|");
Expand Down Expand Up @@ -109,6 +224,7 @@ private static void MqPerformanceTests(int runs, int loops, MqMessage message) {
};


var client = new MqClient();

var send = new Action(() => {
count = 0;
Expand Down
4 changes: 3 additions & 1 deletion DtronixMessageQueue/IncomingMessageEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
namespace DtronixMessageQueue {
public class IncomingMessageEventArgs : EventArgs {
public MqMailbox Mailbox { get; set; }
public MqSession Session { get; set; }

public IncomingMessageEventArgs(MqMailbox mailbox) {
public IncomingMessageEventArgs(MqMailbox mailbox, MqSession session) {
Mailbox = mailbox;
Session = session;
}
}
}
5 changes: 3 additions & 2 deletions DtronixMessageQueue/MqMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void SendBufferQueue(Queue<byte[]> buffer_queue, int length) {
session.Send(buffer, 0, buffer.Length);
}

postmaster.SignalWriteComplete(this);

}


Expand Down Expand Up @@ -184,6 +184,7 @@ internal void ProcessOutbox() {
// Send the last of the buffer queue.
SendBufferQueue(buffer_queue, length);
}
//postmaster.SignalWriteComplete(this);
}

/// <summary>
Expand Down Expand Up @@ -232,7 +233,7 @@ internal async void ProcessIncomingQueue() {
//postmaster.SignalReadComplete(this);

if (new_message) {
IncomingMessage?.Invoke(this, new IncomingMessageEventArgs(this));
IncomingMessage?.Invoke(this, new IncomingMessageEventArgs(this, session));
}
}

Expand Down
61 changes: 18 additions & 43 deletions DtronixMessageQueue/MqPostmaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,7 @@ public class MqPostmaster : IDisposable {

public MqPostmaster() {
// Add a supervisor to review when it is needed to increase or decrease the worker numbers.
supervisor = new MqWorker(async (worker) => {
while (true) {

if (ProcessWorkers(read_workers)) {
CreateReadWorker();
}

if (ProcessWorkers( write_workers)) {
//CreateWriteWorker();

}

await Task.Delay(500);
}
}, "postmaster_supervisor");
supervisor = new MqWorker(SupervisorWork, "postmaster_supervisor");

// Create one reader and one writer workers to start off with.
CreateReadWorker();
Expand All @@ -48,40 +34,29 @@ public MqPostmaster() {
supervisor.Start();
}

private void SupervisorWork(MqWorker worker) {
while (worker.Token.IsCancellationRequested == false) {
if (ProcessWorkers(read_workers)) {
CreateReadWorker();
}

public bool SignalWrite(MqMailbox mailbox) {
return ongoing_write_operations.TryAdd(mailbox, true) && write_operations.TryAdd(mailbox);
}

public bool SignalRead(MqMailbox mailbox) {
return ongoing_read_operations.TryAdd(mailbox, true) && read_operations.TryAdd(mailbox);
if (ProcessWorkers(write_workers)) {
CreateWriteWorker();
}
Thread.Sleep(500);
}
}

public void SignalWriteComplete(MqMailbox mailbox) {
bool out_mailbox;
ongoing_write_operations.TryRemove(mailbox, out out_mailbox);
public MqPostmaster(MqServer server) {
}


public bool SignalReadComplete(MqMailbox mailbox) {
bool out_mailbox;
return ongoing_read_operations.TryRemove(mailbox, out out_mailbox);
public bool SignalWrite(MqMailbox mailbox) {
return ongoing_write_operations.TryAdd(mailbox, true) && write_operations.TryAdd(mailbox);
}

private async void SuperviseWorkers(MqWorker worker) {
while (true) {

//if (ProcessWorkers(read_workers)) {
//CreateReadWorker();
//}

//if (ProcessWorkers( write_workers)) {
//CreateWriteWorker();

//}

await Task.Delay(500);
}
public bool SignalRead(MqMailbox mailbox) {
return ongoing_read_operations.TryAdd(mailbox, true) && read_operations.TryAdd(mailbox);
}


Expand Down Expand Up @@ -110,15 +85,15 @@ public void CreateWriteWorker() {
Console.WriteLine("Created write worker.");
var writer_worker = new MqWorker(worker => {
MqMailbox mailbox = null;
bool out_mailbox;

try {
worker.StartIdle();
while (write_operations.TryTake(out mailbox, 60000, worker.Token)) {
worker.StartWork();
mailbox.ProcessOutbox();
worker.StartIdle();
//bool out_mailbox;
//ongoing_write_operations.TryRemove(mailbox, out out_mailbox);
ongoing_write_operations.TryRemove(mailbox, out out_mailbox);

}
} catch (ThreadAbortException) {
Expand Down
6 changes: 3 additions & 3 deletions DtronixMessageQueue/MqWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public long AverageIdleTime {
private Thread worker_thread;

public MqWorker(Action<MqWorker> work, string name) {
idle_stopwatch.Start();
this.work = work;
Token = cancellation_source.Token;
worker_thread = new Thread(ProcessQueue) {
Expand All @@ -58,7 +59,6 @@ public MqWorker(Action<MqWorker> work, string name) {
/// Start the worker.
/// </summary>
public void Start() {
idle_stopwatch.Start();
worker_thread.Start(this);
//worker_task.Start();
}
Expand Down Expand Up @@ -104,9 +104,9 @@ private void ProcessQueue(object o) {
}

public void Dispose() {
/*if (worker_task.IsCanceled == false) {
if (worker_thread.IsAlive) {
Stop();
}*/
}
}
}
}

0 comments on commit 6db6a92

Please sign in to comment.