diff --git a/DtronixMessageQueue.Tests.Performance/Program.cs b/DtronixMessageQueue.Tests.Performance/Program.cs
index 5664ac4..8d402cf 100644
--- a/DtronixMessageQueue.Tests.Performance/Program.cs
+++ b/DtronixMessageQueue.Tests.Performance/Program.cs
@@ -25,8 +25,8 @@ static void Main(string[] args){
if (args == null || args.Length == 0) {
mode = "single-process";
- total_loops = 5;
- total_messages = 10000;
+ total_loops = 1;
+ total_messages = 1000000;
total_frames = 4;
frame_size = 50;
total_clients = 1;
@@ -114,7 +114,9 @@ private static void StartClient(int total_loops, int total_messages, int total_f
cl.IncomingMessage += (sender, args) => {
MqMessage msg;
- while(args.Mailbox.Inbox.TryDequeue(out msg)) {
+ while (args.Messages.Count > 0) {
+ msg = args.Messages.Dequeue();
+
message_reader.Message = msg;
var result = message_reader.ReadString();
@@ -146,12 +148,12 @@ private static void StartClient(int total_loops, int total_messages, int total_f
cl.Close();
}else if (result == "START") {
- stopwatch.Restart();
- for (var i = 0; i < total_messages; i++) {
- cl.Send(message);
+ if (total_loops > 0) {
+ stopwatch.Restart();
+ for (var i = 0; i < total_messages; i++) {
+ cl.Send(message);
+ }
}
-
- Thread.Sleep(500);
}
}
};
@@ -197,12 +199,10 @@ private static void StartServer(int total_messages, int total_clients) {
};
server.IncomingMessage += (sender, args) => {
- MqMessage message_out;
-
var client_info = client_infos[args.Session];
- while (args.Mailbox.Inbox.TryDequeue(out message_out)) {
- client_info.Runs++;
- }
+
+ // Count the total messages.
+ client_info.Runs += args.Messages.Count;
if (client_info.Runs == total_messages) {
args.Session.Send(complete_message);
diff --git a/DtronixMessageQueue.Tests/MqClientTests.cs b/DtronixMessageQueue.Tests/MqClientTests.cs
index 82c24cb..d8586fa 100644
--- a/DtronixMessageQueue.Tests/MqClientTests.cs
+++ b/DtronixMessageQueue.Tests/MqClientTests.cs
@@ -19,7 +19,6 @@ public MqClientTests(ITestOutputHelper output) : base(output) {
[Theory]
[InlineData(1, false)]
[InlineData(1, true)]
- //[InlineData(50, true)]
public void Client_should_send_data_to_server(int number, bool validate) {
var message_source = GenerateRandomMessage(4, 50);
int received_messages = 0;
@@ -32,8 +31,9 @@ public void Client_should_send_data_to_server(int number, bool validate) {
Server.IncomingMessage += (sender, args) => {
MqMessage message;
- while (args.Mailbox.Inbox.TryDequeue(out message)) {
- received_messages++;
+ while (args.Messages.Count > 0) {
+ message = args.Messages.Dequeue();
+ Interlocked.Increment(ref received_messages);
if (validate) {
CompareMessages(message_source, message);
}
@@ -62,7 +62,8 @@ public void Client_does_not_send_empty_message() {
Server.IncomingMessage += (sender, args) => {
MqMessage message;
- while (args.Mailbox.Inbox.TryDequeue(out message)) {
+ while (args.Messages.Count > 0) {
+ message = args.Messages.Dequeue();
if (message.Count != 2) {
LastException = new Exception("Server received an empty message.");
}
diff --git a/DtronixMessageQueue.Tests/MqServerTests.cs b/DtronixMessageQueue.Tests/MqServerTests.cs
index 457dba7..25f1817 100644
--- a/DtronixMessageQueue.Tests/MqServerTests.cs
+++ b/DtronixMessageQueue.Tests/MqServerTests.cs
@@ -20,7 +20,6 @@ public MqServerTests(ITestOutputHelper output) : base(output) {
[Theory]
[InlineData(1, false)]
[InlineData(1, true)]
- [InlineData(50, true)]
public void Server_should_send_data_to_client(int number, bool validate) {
var message_source = GenerateRandomMessage(4, 50);
@@ -31,15 +30,23 @@ public void Server_should_send_data_to_client(int number, bool validate) {
};
+ int client_message_count = 0;
Client.IncomingMessage += (sender, args) => {
MqMessage message;
- args.Mailbox.Inbox.TryDequeue(out message);
- if (validate) {
- CompareMessages(message_source, message);
+ client_message_count += args.Messages.Count;
+
+ while (args.Messages.Count > 0) {
+ message = args.Messages.Dequeue();
+
+ if (validate) {
+ CompareMessages(message_source, message);
+ }
}
- TestStatus.Set();
+ if (client_message_count == number) {
+ TestStatus.Set();
+ }
};
StartAndWait();
diff --git a/DtronixMessageQueue/DtronixMessageQueue.csproj b/DtronixMessageQueue/DtronixMessageQueue.csproj
index fb35d30..c54029c 100644
--- a/DtronixMessageQueue/DtronixMessageQueue.csproj
+++ b/DtronixMessageQueue/DtronixMessageQueue.csproj
@@ -47,7 +47,6 @@
-
@@ -68,7 +67,6 @@
-
diff --git a/DtronixMessageQueue/IncomingMessageEventArgs.cs b/DtronixMessageQueue/IncomingMessageEventArgs.cs
index b37d2ef..562c15c 100644
--- a/DtronixMessageQueue/IncomingMessageEventArgs.cs
+++ b/DtronixMessageQueue/IncomingMessageEventArgs.cs
@@ -12,23 +12,22 @@ namespace DtronixMessageQueue {
public class IncomingMessageEventArgs : EventArgs {
///
- /// Reference to the mailbox with the new message.
+ /// Messages ready to be read.
///
- public MqMailbox Mailbox { get; set; }
+ public Queue Messages { get; }
///
/// If this message is on the server, this will contain the reference to the connected session of the client.
///
- public MqSession Session { get; set; }
+ public MqSession Session { get; }
///
/// Creates an instance of the event args.
///
- /// Mailbox with the new message
+ /// Messages read and ready to be used.
/// Server session. Null if this is on the client.
- /// Client. Null if this is on the server.
- public IncomingMessageEventArgs(MqMailbox mailbox, MqSession session) {
- Mailbox = mailbox;
+ public IncomingMessageEventArgs(Queue messages, MqSession session) {
+ Messages = messages;
Session = session;
}
}
diff --git a/DtronixMessageQueue/MqClient.cs b/DtronixMessageQueue/MqClient.cs
index f603cd1..6e2525b 100644
--- a/DtronixMessageQueue/MqClient.cs
+++ b/DtronixMessageQueue/MqClient.cs
@@ -51,8 +51,8 @@ private void OnIncomingMessage(object sender, IncomingMessageEventArgs e) {
protected override MqSession CreateSession(System.Net.Sockets.Socket socket) {
var session = base.CreateSession(socket);
- session.Mailbox = new MqMailbox(postmaster, session);
- session.Mailbox.IncomingMessage += OnIncomingMessage;
+ session.Postmaster = postmaster;
+ session.IncomingMessage += OnIncomingMessage;
return session;
}
@@ -72,14 +72,14 @@ public void Send(MqMessage message) {
}
// Enqueue the outgoing message to be processed by the postmaster.
- Session.Mailbox.EnqueueOutgoingMessage(message);
+ Session.EnqueueOutgoingMessage(message);
}
public void Close() {
- Session.Mailbox.IncomingMessage -= OnIncomingMessage;
+ Session.IncomingMessage -= OnIncomingMessage;
Session.CloseConnection(SocketCloseReason.ClientClosing);
- Session.Mailbox.Dispose();
+ Session.Dispose();
}
///
diff --git a/DtronixMessageQueue/MqMailbox.cs b/DtronixMessageQueue/MqMailbox.cs
deleted file mode 100644
index 6607879..0000000
--- a/DtronixMessageQueue/MqMailbox.cs
+++ /dev/null
@@ -1,260 +0,0 @@
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using DtronixMessageQueue.Socket;
-
-namespace DtronixMessageQueue {
-
-
- ///
- /// Mailbox containing inbox, outbox and the logic to process both.
- ///
- public class MqMailbox : IDisposable {
-
- private bool is_running = true;
-
- ///
- /// The postmaster for this client/server.
- ///
- private readonly MqPostmaster postmaster;
-
-
- private readonly MqSession session;
-
- ///
- /// Session reference. If this mailbox is run as a client mailbox, this is then null.
- ///
- public MqSession Session => session;
-
- ///
- /// Internal framebuilder for this instance.
- ///
- private readonly MqFrameBuilder frame_builder;
-
- //private static readonly Logger logger = LogManager.GetCurrentClassLogger();
-
- ///
- /// Total bytes the inbox has remaining to process.
- ///
- private int inbox_byte_count;
-
- ///
- /// Reference to the current message being processed by the inbox.
- ///
- private MqMessage message;
-
- ///
- /// Outbox message queue. Internally used to store Messages before being sent to the wire by the postmaster.
- ///
- private readonly ConcurrentQueue outbox = new ConcurrentQueue();
-
- ///
- /// Inbox byte queue. Internally used to store the raw frame bytes before while waiting to be processed by the postmaster.
- ///
- private readonly ConcurrentQueue inbox_bytes = new ConcurrentQueue();
-
- ///
- /// Event fired when a new message has been processed by the postmaster and ready to be read.
- ///
- public event EventHandler IncomingMessage;
-
- ///
- /// Inbox to containing new messages received.
- ///
- public ConcurrentQueue Inbox { get; } = new ConcurrentQueue();
-
- ///
- /// Initializes a new instance of the MqMailbox class.
- ///
- /// Reference to the postmaster for this instance.
- /// Session from the server for this instance.
- public MqMailbox(MqPostmaster postmaster, MqSession session) {
- this.postmaster = postmaster;
- this.session = session;
- frame_builder = new MqFrameBuilder();
- }
-
- ///
- /// Adds bytes from the client/server reading methods to be processed by the postmaster.
- ///
- /// Buffer of bytes to read. Does not copy the bytes to the buffer.
- internal void EnqueueIncomingBuffer(byte[] buffer) {
- if (is_running == false) {
- return;
- }
-
- inbox_bytes.Enqueue(buffer);
-
- postmaster.SignalRead(this);
-
- Interlocked.Add(ref inbox_byte_count, buffer.Length);
- }
-
-
- ///
- /// Adds a message to the outbox to be processed by the postmaster.
- ///
- /// Message to send.
- internal void EnqueueOutgoingMessage(MqMessage out_message) {
- if (is_running == false) {
- return;
- }
-
- outbox.Enqueue(out_message);
-
- // Signal the workers that work is to be done.
- postmaster.SignalWrite(this);
- }
-
- ///
- /// Stops the mailbox from sending and receiving.
- ///
- /// Last frame to pass over the connection.
- public void Stop(MqFrame frame) {
- if (is_running == false) {
- return;
- }
-
- is_running = false;
-
- // If we are passed a closing frame, then send it to the other connection.
- if (frame != null) {
- MqMessage msg;
- if (outbox.IsEmpty == false) {
- while (outbox.TryDequeue(out msg)) {
- }
- }
-
- msg = new MqMessage(frame);
- outbox.Enqueue(msg);
-
- ProcessOutbox();
- }
- }
-
-
- ///
- /// Sends a queue of bytes to the connected client/server.
- ///
- /// Queue of bytes to send to the wire.
- /// Total length of the bytes in the queue to send.
- private void SendBufferQueue(Queue buffer_queue, int length) {
- var buffer = new byte[length];
- var offset = 0;
-
- while (buffer_queue.Count > 0) {
- var bytes = buffer_queue.Dequeue();
- Buffer.BlockCopy(bytes, 0, buffer, offset, bytes.Length);
-
- // Increment the offset.
- offset += bytes.Length;
- }
-
-
- // This will block
- session.Send(buffer, 0, buffer.Length);
- }
-
-
- ///
- /// Internally called method by the postmaster on a different thread to send all messages in the outbox.
- ///
- internal void ProcessOutbox() {
- MqMessage result;
- var length = 0;
- var buffer_queue = new Queue();
-
- while (outbox.TryDequeue(out result)) {
- result.PrepareSend();
- foreach (var frame in result.Frames) {
- var frame_size = frame.FrameSize;
- // If this would overflow the max client buffer size, send the full buffer queue.
- if (length + frame_size > MqFrame.MaxFrameSize + MqFrame.HeaderLength) {
- SendBufferQueue(buffer_queue, length);
-
- // Reset the length to 0;
- length = 0;
- }
- buffer_queue.Enqueue(frame.RawFrame());
-
- // Increment the total buffer length.
- length += frame_size;
- }
- }
-
- if (buffer_queue.Count > 0) {
- // Send the last of the buffer queue.
- SendBufferQueue(buffer_queue, length);
- }
- //postmaster.SignalWriteComplete(this);
- }
-
- ///
- /// Internal method called by the postmaster on a different thread to process all bytes in the inbox.
- ///
- internal void ProcessIncomingQueue() {
- if (message == null) {
- message = new MqMessage();
- }
-
- bool new_message = false;
- byte[] buffer;
- while (inbox_bytes.TryDequeue(out buffer)) {
- // Update the total bytes this
- Interlocked.Add(ref inbox_byte_count, -buffer.Length);
-
- try {
- frame_builder.Write(buffer, 0, buffer.Length);
- } catch (InvalidDataException) {
- //logger.Error(ex, "Connector {0}: Client send invalid data.", Connection.Id);
-
- session.CloseConnection(SocketCloseReason.ProtocolError);
- break;
- }
-
- var frame_count = frame_builder.Frames.Count;
- //logger.Debug("Connector {0}: Parsed {1} frames.", Connection.Id, frame_count);
-
- for (var i = 0; i < frame_count; i++) {
- var frame = frame_builder.Frames.Dequeue();
-
- // Determine if this frame is a command type. If it is, process it and don't add it to the message.
- if (frame.FrameType == MqFrameType.Command) {
- session.ProcessCommand(frame);
- continue;
- }
-
- message.Add(frame);
-
- if (frame.FrameType != MqFrameType.EmptyLast && frame.FrameType != MqFrameType.Last) {
- continue;
- }
- Inbox.Enqueue(message);
- message = new MqMessage();
- new_message = true;
- }
- }
- //postmaster.SignalReadComplete(this);
-
- if (new_message) {
- IncomingMessage?.Invoke(this, new IncomingMessageEventArgs(this, session));
- }
- }
-
- public override string ToString() {
- return $"Inbox: {Inbox.Count} ({inbox_byte_count} bytes); Outbox: {outbox.Count}";
- }
-
- ///
- /// Releases all resources held by this object.
- ///
- public void Dispose() {
- IncomingMessage = null;
- }
- }
-}
\ No newline at end of file
diff --git a/DtronixMessageQueue/MqMessage.cs b/DtronixMessageQueue/MqMessage.cs
index e8a254b..33ce33a 100644
--- a/DtronixMessageQueue/MqMessage.cs
+++ b/DtronixMessageQueue/MqMessage.cs
@@ -29,7 +29,6 @@ public class MqMessage : IList {
///
public bool IsReadOnly => false;
-
///
/// Gets or sets the frame at the specified index.
///
diff --git a/DtronixMessageQueue/MqPostmaster.cs b/DtronixMessageQueue/MqPostmaster.cs
index 0609a4e..b0cdba2 100644
--- a/DtronixMessageQueue/MqPostmaster.cs
+++ b/DtronixMessageQueue/MqPostmaster.cs
@@ -13,8 +13,8 @@ public class MqPostmaster : IDisposable {
//public int MaxFrameSize { get; }
private readonly MqWorker supervisor;
- private readonly ConcurrentDictionary ongoing_write_operations = new ConcurrentDictionary();
- private readonly BlockingCollection write_operations = new BlockingCollection();
+ private readonly ConcurrentDictionary ongoing_write_operations = new ConcurrentDictionary();
+ private readonly BlockingCollection write_operations = new BlockingCollection();
private readonly List write_workers = new List();
public int TotalWriters => write_workers.Count;
@@ -23,8 +23,8 @@ public class MqPostmaster : IDisposable {
public int MaxWriters { get; set; } = 0;
public int MaxReaders { get; set; } = 0;
- private readonly ConcurrentDictionary ongoing_read_operations = new ConcurrentDictionary();
- private readonly BlockingCollection read_operations = new BlockingCollection();
+ private readonly ConcurrentDictionary ongoing_read_operations = new ConcurrentDictionary();
+ private readonly BlockingCollection read_operations = new BlockingCollection();
private readonly List read_workers = new List();
public MqPostmaster() {
@@ -56,12 +56,12 @@ public MqPostmaster(MqServer server) {
}
- public bool SignalWrite(MqMailbox mailbox) {
- return ongoing_write_operations.TryAdd(mailbox, true) && write_operations.TryAdd(mailbox);
+ public bool SignalWrite(MqSession session) {
+ return ongoing_write_operations.TryAdd(session, true) && write_operations.TryAdd(session);
}
- public bool SignalRead(MqMailbox mailbox) {
- return ongoing_read_operations.TryAdd(mailbox, true) && read_operations.TryAdd(mailbox);
+ public bool SignalRead(MqSession session) {
+ return ongoing_read_operations.TryAdd(session, true) && read_operations.TryAdd(session);
}
@@ -93,25 +93,25 @@ private bool ProcessWorkers(List worker_list, int max_workers) {
///
public void CreateWriteWorker() {
var writer_worker = new MqWorker(worker => {
- MqMailbox mailbox = null;
- bool out_mailbox;
+ MqSession session = null;
+ bool out_session;
try {
worker.StartIdle();
- while (write_operations.TryTake(out mailbox, 60000, worker.Token)) {
+ while (write_operations.TryTake(out session, 60000, worker.Token)) {
worker.StartWork();
- mailbox.ProcessOutbox();
+ session.ProcessOutbox();
worker.StartIdle();
- ongoing_write_operations.TryRemove(mailbox, out out_mailbox);
+ ongoing_write_operations.TryRemove(session, out out_session);
}
} catch (ThreadAbortException) {
} catch (Exception) {
- if (mailbox != null) {
+ if (session != null) {
/*logger.Error(e,
is_writer
? "MqConnection {0}: Exception occurred while when writing."
- : "MqConnection {0}: Exception occurred while when reading.", mailbox.Connection.Id);*/
+ : "MqConnection {0}: Exception occurred while when reading.", session.Connection.Id);*/
}
}
}, "mq_write_worker_" + write_workers.Count);
@@ -126,24 +126,24 @@ public void CreateWriteWorker() {
///
public void CreateReadWorker() {
var reader_worker = new MqWorker(worker => {
- MqMailbox mailbox = null;
- bool out_mailbox;
+ MqSession session = null;
+ bool out_session;
try {
worker.StartIdle();
- while (read_operations.TryTake(out mailbox, 60000, worker.Token)) {
+ while (read_operations.TryTake(out session, 60000, worker.Token)) {
worker.StartWork();
- mailbox.ProcessIncomingQueue();
+ session.ProcessIncomingQueue();
worker.StartIdle();
- ongoing_read_operations.TryRemove(mailbox, out out_mailbox);
+ ongoing_read_operations.TryRemove(session, out out_session);
}
} catch (ThreadAbortException) {
} catch (Exception) {
- if (mailbox != null) {
+ if (session != null) {
/*logger.Error(e,
is_writer
? "MqConnection {0}: Exception occurred while when writing."
- : "MqConnection {0}: Exception occurred while when reading.", mailbox.Connection.Id);*/
+ : "MqConnection {0}: Exception occurred while when reading.", session.Connection.Id);*/
}
}
}, "mq_read_worker_" + read_workers.Count);
diff --git a/DtronixMessageQueue/MqServer.cs b/DtronixMessageQueue/MqServer.cs
index 54781cf..d32ec05 100644
--- a/DtronixMessageQueue/MqServer.cs
+++ b/DtronixMessageQueue/MqServer.cs
@@ -44,16 +44,16 @@ private void OnIncomingMessage(object sender, IncomingMessageEventArgs e) {
protected override MqSession CreateSession(System.Net.Sockets.Socket socket) {
var session = base.CreateSession(socket);
- session.Mailbox = new MqMailbox(postmaster, session);
+ session.Postmaster = postmaster;
- session.Mailbox.IncomingMessage += OnIncomingMessage;
+ session.IncomingMessage += OnIncomingMessage;
return session;
}
protected override void OnClose(MqSession session, SocketCloseReason reason) {
- session.Mailbox.IncomingMessage -= OnIncomingMessage;
- session.Mailbox.Dispose();
+ session.IncomingMessage -= OnIncomingMessage;
+ session.Dispose();
base.OnClose(session, reason);
}
diff --git a/DtronixMessageQueue/MqSession.cs b/DtronixMessageQueue/MqSession.cs
index 64fda00..4ce14fb 100644
--- a/DtronixMessageQueue/MqSession.cs
+++ b/DtronixMessageQueue/MqSession.cs
@@ -1,6 +1,8 @@
using System;
using System.CodeDom;
+using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
@@ -13,17 +15,193 @@ namespace DtronixMessageQueue {
public class MqSession : SocketSession {
- public MqMailbox Mailbox { get; set; }
+
+ private bool is_running = true;
+
+ ///
+ /// The Postmaster for this client/server.
+ ///
+ public MqPostmaster Postmaster { get; set; }
+
+ ///
+ /// Internal framebuilder for this instance.
+ ///
+ private readonly MqFrameBuilder frame_builder = new MqFrameBuilder();
+
+ ///
+ /// Total bytes the inbox has remaining to process.
+ ///
+ private int inbox_byte_count;
+
+ ///
+ /// Reference to the current message being processed by the inbox.
+ ///
+ private MqMessage message;
+
+ ///
+ /// Outbox message queue. Internally used to store Messages before being sent to the wire by the Postmaster.
+ ///
+ private readonly ConcurrentQueue outbox = new ConcurrentQueue();
+
+ ///
+ /// Inbox byte queue. Internally used to store the raw frame bytes before while waiting to be processed by the Postmaster.
+ ///
+ private readonly ConcurrentQueue inbox_bytes = new ConcurrentQueue();
+
+ ///
+ /// Event fired when a new message has been processed by the Postmaster and ready to be read.
+ ///
+ public event EventHandler IncomingMessage;
+
///
/// User supplied token used to pass a related object around with this session.
///
public object Token { get; set; }
+ ///
+ /// Adds bytes from the client/server reading methods to be processed by the Postmaster.
+ ///
+ /// Buffer of bytes to read. Does not copy the bytes to the buffer.
protected override void HandleIncomingBytes(byte[] buffer) {
- Mailbox.EnqueueIncomingBuffer(buffer);
+ if (is_running == false) {
+ return;
+ }
+
+ inbox_bytes.Enqueue(buffer);
+ Interlocked.Add(ref inbox_byte_count, buffer.Length);
+
+ Postmaster.SignalRead(this);
}
+ ///
+ /// Adds a message to the outbox to be processed by the Postmaster.
+ ///
+ /// Message to send.
+ public void EnqueueOutgoingMessage(MqMessage out_message) {
+ if (is_running == false) {
+ return;
+ }
+
+ outbox.Enqueue(out_message);
+
+ // Signal the workers that work is to be done.
+ Postmaster.SignalWrite(this);
+ }
+
+ ///
+ /// Sends a queue of bytes to the connected client/server.
+ ///
+ /// Queue of bytes to send to the wire.
+ /// Total length of the bytes in the queue to send.
+ private void SendBufferQueue(Queue buffer_queue, int length) {
+ var buffer = new byte[length];
+ var offset = 0;
+
+ while (buffer_queue.Count > 0) {
+ var bytes = buffer_queue.Dequeue();
+ Buffer.BlockCopy(bytes, 0, buffer, offset, bytes.Length);
+
+ // Increment the offset.
+ offset += bytes.Length;
+ }
+
+
+ // This will block
+ Send(buffer, 0, buffer.Length);
+ }
+
+
+ ///
+ /// Internally called method by the Postmaster on a different thread to send all messages in the outbox.
+ ///
+ internal void ProcessOutbox() {
+ MqMessage result;
+ var length = 0;
+ var buffer_queue = new Queue();
+
+ while (outbox.TryDequeue(out result)) {
+ result.PrepareSend();
+ foreach (var frame in result.Frames) {
+ var frame_size = frame.FrameSize;
+ // If this would overflow the max client buffer size, send the full buffer queue.
+ if (length + frame_size > MqFrame.MaxFrameSize + MqFrame.HeaderLength) {
+ SendBufferQueue(buffer_queue, length);
+
+ // Reset the length to 0;
+ length = 0;
+ }
+ buffer_queue.Enqueue(frame.RawFrame());
+
+ // Increment the total buffer length.
+ length += frame_size;
+ }
+ }
+
+ if (buffer_queue.Count > 0) {
+ // Send the last of the buffer queue.
+ SendBufferQueue(buffer_queue, length);
+ }
+ //Postmaster.SignalWriteComplete(this);
+ }
+
+ ///
+ /// Internal method called by the Postmaster on a different thread to process all bytes in the inbox.
+ ///
+ internal void ProcessIncomingQueue() {
+ if (message == null) {
+ message = new MqMessage();
+ }
+
+ Queue messages = null;
+ byte[] buffer;
+ while (inbox_bytes.TryDequeue(out buffer)) {
+ // Update the total bytes this
+ Interlocked.Add(ref inbox_byte_count, -buffer.Length);
+
+ try {
+ frame_builder.Write(buffer, 0, buffer.Length);
+ } catch (InvalidDataException) {
+ //logger.Error(ex, "Connector {0}: Client send invalid data.", Connection.Id);
+
+ CloseConnection(SocketCloseReason.ProtocolError);
+ break;
+ }
+
+ var frame_count = frame_builder.Frames.Count;
+ //logger.Debug("Connector {0}: Parsed {1} frames.", Connection.Id, frame_count);
+
+ for (var i = 0; i < frame_count; i++) {
+ var frame = frame_builder.Frames.Dequeue();
+
+ // Determine if this frame is a command type. If it is, process it and don't add it to the message.
+ if (frame.FrameType == MqFrameType.Command) {
+ ProcessCommand(frame);
+ continue;
+ }
+
+ message.Add(frame);
+
+ if (frame.FrameType != MqFrameType.EmptyLast && frame.FrameType != MqFrameType.Last) {
+ continue;
+ }
+
+ if (messages == null) {
+ messages = new Queue();
+ }
+
+ messages.Enqueue(message);
+ message = new MqMessage();
+ }
+ }
+ //Postmaster.SignalReadComplete(this);
+
+ if (messages != null) {
+ IncomingMessage?.Invoke(this, new IncomingMessageEventArgs(messages, this));
+ }
+ }
+
+
public override void CloseConnection(SocketCloseReason reason) {
if (CurrentState == State.Closed) {
return;
@@ -37,11 +215,24 @@ public override void CloseConnection(SocketCloseReason reason) {
close_frame.Write(1, (byte) reason);
}
- Mailbox.Stop(close_frame);
+ // If we are passed a closing frame, then send it to the other connection.
+ if (close_frame != null) {
+ MqMessage msg;
+ if (outbox.IsEmpty == false) {
+ while (outbox.TryDequeue(out msg)) {
+ }
+ }
+
+ msg = new MqMessage(close_frame);
+ outbox.Enqueue(msg);
+
+ ProcessOutbox();
+ }
base.CloseConnection(reason);
}
+
///
/// Sends a message to the session's client.
///
@@ -55,7 +246,7 @@ public void Send(MqMessage message) {
return;
}
- Mailbox.EnqueueOutgoingMessage(message);
+ EnqueueOutgoingMessage(message);
}
@@ -74,5 +265,9 @@ internal void ProcessCommand(MqFrame frame) {
}
}
+
+ public override string ToString() {
+ return $"MqSession; Reading {inbox_byte_count} bytes; Sending {outbox.Count} messages.";
+ }
}
}
\ No newline at end of file
diff --git a/DtronixMessageQueue/Socket/SocketSession.cs b/DtronixMessageQueue/Socket/SocketSession.cs
index 52082f7..4ba26ab 100644
--- a/DtronixMessageQueue/Socket/SocketSession.cs
+++ b/DtronixMessageQueue/Socket/SocketSession.cs
@@ -32,27 +32,25 @@ public enum State : byte {
///
public State CurrentState { get; protected set; }
+
+ protected System.Net.Sockets.Socket socket;
+
///
/// Raw socket for this session.
///
- public System.Net.Sockets.Socket Socket {
- get { return socket; }
- }
+ public System.Net.Sockets.Socket Socket => socket;
+
+ private DateTime last_received;
///
/// Last time the session received anything from the socket.
///
public DateTime LastReceived => last_received;
-
private SocketAsyncEventArgs send_args;
private SocketAsyncEventArgs receive_args;
- private DateTime last_received;
-
-
- private System.Net.Sockets.Socket socket;
///
/// This event fires when a connection has been established.
@@ -64,14 +62,12 @@ public System.Net.Sockets.Socket Socket {
///
public event EventHandler> Closed;
-
-
protected SocketSession() {
Id = Guid.NewGuid();
CurrentState = State.Connecting;
}
- internal static void Setup(SocketSession session, System.Net.Sockets.Socket socket, SocketAsyncEventArgsPool args_pool, SocketConfig configs) {
+ public static void Setup(SocketSession session, System.Net.Sockets.Socket socket, SocketAsyncEventArgsPool args_pool, SocketConfig configs) {
session.args_pool = args_pool;
session.send_args = args_pool.Pop();
session.send_args.Completed += session.IoCompleted;
@@ -108,6 +104,8 @@ protected void OnDisconnected(SocketCloseReason reason) {
Closed?.Invoke(this, new SessionClosedEventArgs(this, reason));
}
+ protected abstract void HandleIncomingBytes(byte[] buffer);
+
///
/// This method is called whenever a receive or send operation is completed on a socket
///
@@ -139,8 +137,6 @@ protected virtual void IoCompleted(object sender, SocketAsyncEventArgs e) {
}
-
-
internal void Send(byte[] buffer, int offset, int length) {
if (Socket == null || Socket.Connected == false) {
return;
@@ -222,8 +218,6 @@ protected void RecieveComplete(SocketAsyncEventArgs e) {
}
}
- protected abstract void HandleIncomingBytes(byte[] buffer);
-
public virtual void CloseConnection(SocketCloseReason reason) {
// If this session has already been closed, nothing more to do.
if (CurrentState == State.Closed) {