Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Migrated session setup to use explicit interface implementation.
Browse files Browse the repository at this point in the history
Made Frames private in MqMessage.  Frames are accessed by using the MqMessage class directly.
Fixed performance tests not setting up.
  • Loading branch information
DJGosnell committed Sep 15, 2016
1 parent 72d4067 commit 0108201
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public MqPerformanceTest(string[] args) {


} else if (mode == "client") {
WriteSysInfo();
Console.WriteLine("| Messages | Msg Bytes | Milliseconds | Msg/sec | MBps |");
Console.WriteLine("|------------|-----------|--------------|------------|----------|");

Expand All @@ -57,10 +56,7 @@ public MqPerformanceTest(string[] args) {
StartServer(total_messages, total_clients);

} else if (mode == "single-process") {
using (var cc = new ConsoleCopy("MessageQueuePerformanceTest.txt")) {
WriteSysInfo();
MqInProcessTest();
}
MqInProcessTest();

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static void WriteSysInfo() {

long mem_kb;
GetPhysicallyInstalledSystemMemory(out mem_kb);
Console.WriteLine(" with " + (mem_kb / 1024 / 1024) + " GB of RAM installed.\r\n");
Console.WriteLine(" with " + (mem_kb / 1024 / 1024) + " GB of RAM installed.");
}


Expand Down
28 changes: 18 additions & 10 deletions src/DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Management;
using System.Reflection;
using System.Runtime.InteropServices;
Expand All @@ -15,16 +16,23 @@ class Program {

static void Main(string[] args) {
var mode = args.Length == 0 ? null : args[0];
switch (mode) {
case "mq":
Console.WriteLine("Running MQ performance tests.\r\n");
new MqPerformanceTest(args);
break;

default:
Console.WriteLine("Running RPC performance tests.\r\n");
new RpcPerformanceTest(args);
break;
var file_name = string.Join("-", args);
using (var cc = new ConsoleCopy($"MessageQueuePerformanceTest-{file_name}.txt")) {
PerformanceTestBase.WriteSysInfo();

Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}");

switch (mode) {
case "mq":
Console.WriteLine("Running MQ performance tests.\r\n");
new MqPerformanceTest(args);
break;

default:
Console.WriteLine("Running RPC performance tests.\r\n");
new RpcPerformanceTest(args);
break;
}
}

Console.ReadLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public RpcPerformanceTest(string[] args) {
Port = 2828
};

RpcSingleProcessTest(100000, 4, config, RpcTestType.NoRetrun);
RpcSingleProcessTest(200000, 4, config, RpcTestType.NoRetrun);

RpcSingleProcessTest(10000, 4, config, RpcTestType.Return);

Expand All @@ -35,7 +35,7 @@ private void RpcSingleProcessTest(int runs, int loops, RpcConfig config, RpcTest
var complete_test = new AutoResetEvent(false);
var client = new RpcClient<SimpleRpcSession, RpcConfig>(config);

server.Connected += (sender, args) => {
server.SessionSetup += (sender, args) => {
test_service = new TestService();
args.Session.AddService(test_service);

Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue.Tests/Mq/MqTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class MqTestsBase : IDisposable {

public Exception LastException { get; set; }

public TimeSpan TestTimeout { get; set; } = new TimeSpan(0, 0, 0, 0, 60000);
public TimeSpan TestTimeout { get; set; } = new TimeSpan(0, 0, 0, 0, 2000);

public ManualResetEventSlim TestStatus { get; set; } = new ManualResetEventSlim(false);

Expand Down
1 change: 1 addition & 0 deletions src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<Compile Include="Rpc\RpcSession.cs" />
<Compile Include="Rpc\SerializationStore.cs" />
<Compile Include="Socket\BufferManager.cs" />
<Compile Include="Socket\ISetupSocketSession.cs" />
<Compile Include="Socket\SessionEventArgs.cs" />
<Compile Include="Socket\SessionClosedEventArgs.cs" />
<Compile Include="Socket\SocketAsyncEventArgsPool.cs" />
Expand Down
38 changes: 19 additions & 19 deletions src/DtronixMessageQueue/MqMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ public class MqMessage : IList<MqFrame> {
/// <summary>
/// Internal frames for this message.
/// </summary>
internal readonly List<MqFrame> Frames = new List<MqFrame>();
private readonly List<MqFrame> frames = new List<MqFrame>();


/// <summary>
/// Total number of frames in this message.
/// </summary>
public int Count => Frames.Count;
public int Count => frames.Count;

/// <summary>
/// Whether or not this is a read only message. Always returns false.
Expand All @@ -32,15 +32,15 @@ public class MqMessage : IList<MqFrame> {
/// <param name="index">The zero-based index of the frame to get or set.</param>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="index" /> is not a valid index in the message.</exception>
public MqFrame this[int index] {
get { return Frames[index]; }
set { Frames[index] = value; }
get { return frames[index]; }
set { frames[index] = value; }
}


/// <summary>
/// The total size of the raw frames (headers + body) contained in this message.
/// </summary>
public int Size => Frames.Sum(frame => frame.FrameSize);
public int Size => frames.Sum(frame => frame.FrameSize);

public MqMessage() {
}
Expand All @@ -53,7 +53,7 @@ public MqMessage(MqFrame frame) {
/// Fixes any mistakes for the frames' FrameType set. Called before frames are processed by the outbox.
/// </summary>
public void PrepareSend() {
var mq_frames = Frames.ToArray();
var mq_frames = frames.ToArray();

// Set frame's FrameType appropriately.
foreach (var frame in mq_frames) {
Expand All @@ -73,23 +73,23 @@ public void PrepareSend() {
/// </summary>
/// <returns>An enumerator for the frames</returns>
public IEnumerator<MqFrame> GetEnumerator() {
return new List<MqFrame>(Frames).GetEnumerator();
return new List<MqFrame>(frames).GetEnumerator();
}

/// <summary>
/// Returns an enumerator that iterates through the frames.
/// </summary>
/// <returns>An enumerator for the frames</returns>
IEnumerator IEnumerable.GetEnumerator() {
return new List<MqFrame>(Frames).GetEnumerator();
return new List<MqFrame>(frames).GetEnumerator();
}

/// <summary>
/// Adds a frame to this message.
/// </summary>
/// <param name="frame">Frame to add</param>
public void Add(MqFrame frame) {
Frames.Add(frame);
frames.Add(frame);
}

/// <summary>
Expand All @@ -98,15 +98,15 @@ public void Add(MqFrame frame) {
/// <param name="frames">Frames to add</param>
public void AddRange(IEnumerable<MqFrame> frames) {
foreach (var frame in frames) {
Frames.Add(frame);
this.frames.Add(frame);
}
}

/// <summary>
/// Removes all frames from the message.
/// </summary>
public void Clear() {
Frames.Clear();
frames.Clear();
}

/// <summary>
Expand All @@ -115,7 +115,7 @@ public void Clear() {
/// <param name="frame">The frame to locate in the message.</param>
/// <returns>True if the frame is in the message; otherwise false.</returns>
public bool Contains(MqFrame frame) {
return Frames.Contains(frame);
return frames.Contains(frame);
}

/// <summary>
Expand All @@ -127,7 +127,7 @@ public bool Contains(MqFrame frame) {
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="array_index" /> is less than 0.</exception>
/// <exception cref="T:System.ArgumentException">The number of elements in the source message is greater than the available space from <paramref name="array_index" /> to the end of the destination <paramref name="array" />.</exception>
public void CopyTo(MqFrame[] array, int array_index) {
Frames.CopyTo(array, array_index);
frames.CopyTo(array, array_index);
}

/// <summary>
Expand All @@ -136,7 +136,7 @@ public void CopyTo(MqFrame[] array, int array_index) {
/// <param name="frame">Frame to remove</param>
/// <returns></returns>
public bool Remove(MqFrame frame) {
return Frames.Remove(frame);
return frames.Remove(frame);
}

/// <summary>
Expand All @@ -145,7 +145,7 @@ public bool Remove(MqFrame frame) {
/// <returns>The zero-based index of the first occurrence of frame within the message, if found; otherwise, –1.</returns>
/// <param name="frame">The object to locate in the message.</param>
public int IndexOf(MqFrame frame) {
return Frames.IndexOf(frame);
return frames.IndexOf(frame);
}


Expand All @@ -156,7 +156,7 @@ public int IndexOf(MqFrame frame) {
/// <param name="frame">The object to insert. The value can be null for reference types.</param>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="index" /> is less than 0.-or-<paramref name="index" /> is greater than the total frames.</exception>
public void Insert(int index, MqFrame frame) {
Frames.Insert(index, frame);
frames.Insert(index, frame);
}

/// <summary>
Expand All @@ -166,16 +166,16 @@ public void Insert(int index, MqFrame frame) {
/// <exception cref="T:System.ArgumentOutOfRangeException">
/// <paramref name="index" /> is less than 0.-or-<paramref name="index" /> is equal to or greater than the total frames.</exception>
public void RemoveAt(int index) {
Frames.RemoveAt(index);
frames.RemoveAt(index);
}

/// <summary>
/// Displays total frames and payload size.
/// </summary>
/// <returns>string representation of this message.</returns>
public override string ToString() {
var size = Frames.Sum(frame => frame.DataLength);
return $"MqMessage with {Frames.Count} frames totaling {size:N0} bytes.";
var size = frames.Sum(frame => frame.DataLength);
return $"MqMessage with {frames.Count} frames totaling {size:N0} bytes.";
}

}
Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/MqMessageReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public MqMessage Message {
/// <summary>
/// Total length of the bytes in this message.
/// </summary>
public int Length => message.Frames.Sum(frm => frm.DataLength);
public int Length => message.Sum(frm => frm.DataLength);

/// <summary>
/// Unused. Stream.Null
Expand All @@ -101,7 +101,7 @@ public MqMessage Message {
/// </summary>
public bool IsAtEnd {
get {
var last_frame = message.Frames[message.Frames.Count - 1];
var last_frame = message[message.Count - 1];
return current_frame == last_frame && last_frame.DataLength == frame_position;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue/MqMessageWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public override void Write(decimal value) {
/// <param name="value">Value to write to the message.</param>
public void Write(MqMessage value) {
InternalFinalizeFrame();
frames.AddRange(value.Frames);
frames.AddRange(value);
}

/// <summary>
Expand Down
8 changes: 4 additions & 4 deletions src/DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ private void SendBufferQueue(Queue<byte[]> buffer_queue, int length) {
/// </summary>
/// <returns>True if messages were sent. False if nothing was sent.</returns>
internal bool ProcessOutbox() {
MqMessage result;
MqMessage message;
var length = 0;
var buffer_queue = new Queue<byte[]>();

while (outbox.TryDequeue(out result)) {
result.PrepareSend();
foreach (var frame in result.Frames) {
while (outbox.TryDequeue(out message)) {
message.PrepareSend();
foreach (var frame in message) {
var frame_size = frame.FrameSize;

// If this would overflow the max client buffer size, send the full buffer queue.
Expand Down
15 changes: 15 additions & 0 deletions src/DtronixMessageQueue/Socket/ISetupSocketSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace DtronixMessageQueue.Socket {
public interface ISetupSocketSession<TConfig>
where TConfig : SocketConfig {

/// <summary>
/// Sets up this socket with the specified configurations.
/// </summary>
/// <param name="session_socket">Socket this session is to use.</param>
/// <param name="socket_args_pool">Argument pool for this session to use. Pulls two asyncevents for reading and writing and returns them at the end of this socket's life.</param>
/// <param name="session_config">Socket configurations this session is to use.</param>
void Setup(System.Net.Sockets.Socket session_socket, SocketAsyncEventArgsPool socket_args_pool, TConfig session_config);

void Start();
}
}
4 changes: 3 additions & 1 deletion src/DtronixMessageQueue/Socket/SocketBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ protected void Setup() {
/// <returns>New session instance.</returns>
protected virtual TSession CreateSession(System.Net.Sockets.Socket socket) {
var session = new TSession();
SocketSession<TConfig>.Setup(session, socket, AsyncPool, Config);

((ISetupSocketSession<TConfig>) session).Setup(socket, AsyncPool, Config);

SessionSetup?.Invoke(this, new SessionEventArgs<TSession, TConfig>(session));
session.Closed += (sender, args) => OnClose(session, args.CloseReason);
return session;
Expand Down
5 changes: 3 additions & 2 deletions src/DtronixMessageQueue/Socket/SocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public void Connect(IPEndPoint end_point) {
event_arg.Completed += (sender, args) => {
if (args.LastOperation == SocketAsyncOperation.Connect) {
Session = CreateSession(MainSocket);

Session.Start();

((ISetupSocketSession<TConfig>)Session).Start();

OnConnect(Session);
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue/Socket/SocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void AcceptCompleted(SocketAsyncEventArgs e) {
ConnectedSessions.TryAdd(session.Id, session);

// Start the session.
session.Start();
((ISetupSocketSession<TConfig>)session).Start();

// Invoke the events.
OnConnect(session);
Expand Down
Loading

0 comments on commit 0108201

Please sign in to comment.