diff --git a/src/DtronixMessageQueue/DtronixMessageQueue.csproj b/src/DtronixMessageQueue/DtronixMessageQueue.csproj index c193e14..373709a 100644 --- a/src/DtronixMessageQueue/DtronixMessageQueue.csproj +++ b/src/DtronixMessageQueue/DtronixMessageQueue.csproj @@ -58,6 +58,7 @@ + diff --git a/src/DtronixMessageQueue/MqFrame.cs b/src/DtronixMessageQueue/MqFrame.cs index f9b6580..318375f 100644 --- a/src/DtronixMessageQueue/MqFrame.cs +++ b/src/DtronixMessageQueue/MqFrame.cs @@ -31,6 +31,10 @@ public MqFrameType FrameType { /// public const int HeaderLength = 3; + /// + /// Contains the configuration information about the current client/server. + /// Used to determine how large the frames are to be. + /// private MqConfig config; /// @@ -577,6 +581,14 @@ public override string ToString() { return $"MqFrame totaling {buffer.Length:N0} bytes; Type: {FrameType}"; } + /// + /// Puts this frame inside a message. + /// + /// Message with this frame as the first frame. + public MqMessage ToMessage() { + return new MqMessage(this); + } + /// /// Disposes of this object and its resources. /// diff --git a/src/DtronixMessageQueue/Rpc/MessageHandler.cs b/src/DtronixMessageQueue/Rpc/MessageHandler.cs index d007557..80399b1 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandler.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandler.cs @@ -18,7 +18,7 @@ public abstract class MessageHandler /// public abstract byte Id { get; } - protected TSession Session; + public TSession Session; protected Dictionary Handlers = new Dictionary(); @@ -45,6 +45,10 @@ public bool HandleMessage(MqMessage message) { return false; } + public void SendHandlerMessage(byte action_id) { + SendHandlerMessage(action_id, null); + } + public void SendHandlerMessage(byte action_id, MqMessage message) { var header_frame = Session.CreateFrame(new byte[2], MqFrameType.More); header_frame.Write(0, Id); diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs index 7f7add3..f38834c 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs @@ -7,47 +7,59 @@ using System.Threading.Tasks; namespace DtronixMessageQueue.Rpc.MessageHandlers { - public class ByteTransport + public class ByteTransport : IByteTransport where TSession : RpcSession, new() where TConfig : RpcConfig { - public long Length { get; set; } - private readonly TSession session; + public enum Mode { + Send, + Receive + } + + private readonly ByteTransportMessageHandler handler; private readonly ushort id; + private readonly Mode mode; + private readonly MqMessageReader message_reader; private readonly MqMessageWriter message_writer; - private ConcurrentQueue read_buffer; + public MqMessage ReadBuffer { get; set; } - private readonly SemaphoreSlim read_semaphore; + private readonly SemaphoreSlim semaphore; private readonly object read_lock = new object(); public event EventHandler Receive; - public ByteTransport(TSession session, ushort id, long length) { - Length = length; - this.session = session; + public ByteTransport(ByteTransportMessageHandler handler, ushort id, Mode mode) { + this.handler = handler; this.id = id; - message_reader = new MqMessageReader(); - read_semaphore = new SemaphoreSlim(0, 1); - } + this.mode = mode; - public ByteTransport(TSession session) { - this.session = session; - message_writer = new MqMessageWriter(session.Config); + if (mode == Mode.Receive) { + message_reader = new MqMessageReader(); + } else { + message_writer = new MqMessageWriter(handler.Session.Config); + } + + semaphore = new SemaphoreSlim(0, 1); } - public void OnReceive(MqMessage message) { - read_buffer.Enqueue(message); - lock (read_lock) { - if (read_semaphore.CurrentCount == 0) { - read_semaphore.Release(); - } + bool IByteTransport.OnReady() { + if (semaphore.CurrentCount == 0) { + semaphore.Release(); + return true; } - + + return false; + } + + + + void IByteTransport.OnReceive(MqMessage message) { + ReadBuffer = message; } /// @@ -57,24 +69,18 @@ public void OnReceive(MqMessage message) { /// Offset in the buffer to start copying to. /// Number of bytes to try to read into the buffer. public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellation_token) { - if (read_semaphore == null) { + if (semaphore == null) { throw new InvalidOperationException("Byte transport is set to write mode. Can not read when in write more."); } - if (read_buffer.IsEmpty) { - // ReSharper disable once InconsistentlySynchronizedField - await read_semaphore.WaitAsync(cancellation_token); + if (ReadBuffer == null) { + await semaphore.WaitAsync(cancellation_token); } if (message_reader.Message == null) { - MqMessage message; - if (read_buffer.TryDequeue(out message) == false) { - return 0; - } - - message_reader.Message = message; - message_reader.Skip(3); + message_reader.Message = ReadBuffer; + message_reader.Skip(2); } var total_read = message_reader.Read(buffer, offset, count); @@ -93,14 +99,21 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat /// Buffer to write to the message. /// Offset in the buffer to write from /// Number of bytes to write to the message from the buffer. - public void Write(byte[] buffer, int index, int count) { - message_writer.Write((byte) 2); - message_writer.Write((byte) ByteTransportMessageAction.Write); + public async void WriteAsync(byte[] buffer, int index, int count, CancellationToken cancellation_token) { + // Wait for the server response that we are ready to send the next packet. + await semaphore.WaitAsync(cancellation_token); + message_writer.Write(id); message_writer.Write(buffer, index, count); + + handler.SendHandlerMessage((byte)ByteTransportMessageAction.Write, message_writer.ToMessage(true)); - session.Send(message_writer.ToMessage(true)); } } + + public interface IByteTransport { + bool OnReady(); + void OnReceive(MqMessage message); + } } diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageAction.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageAction.cs index be71278..4d31806 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageAction.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageAction.cs @@ -14,22 +14,28 @@ public enum ByteTransportMessageAction : byte { /// /// Sends a request to the client/server session for a stream handle to be created to write to. /// - RequestTransportHandle = 1, + Request = 1, /// - /// Sends a request to the client/server session for a stream handle to be created to write to. + /// Sent when the transport handle is in an error state. + /// + Error = 2, + + /// + /// Lets the recipient session know it is ok to send the next packet. /// - ResponseTransportHandle = 2, + Ready = 3, /// /// Sends a request to the client/server session for a stream handle be closed. /// - CloseTransportHandle = 3, + Close = 4, /// - /// Sends data to the client/server. + /// This message contains the byte buffer. /// - Write = 4, + Write = 5 + } diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs index 56cb59a..58d84b7 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs @@ -12,47 +12,77 @@ public class ByteTransportMessageHandler : MessageHandler public sealed override byte Id => 2; - private readonly ResponseWait send_operation = new ResponseWait(); + private readonly ResponseWait> send_operation = new ResponseWait>(); - private readonly ResponseWait receive_operation = new ResponseWait(); - - /// - /// Contains all local transports. - /// - public readonly ConcurrentDictionary> RemoteTransports = - new ConcurrentDictionary>(); + private readonly ResponseWait> receive_operation = new ResponseWait>(); public ByteTransportMessageHandler(TSession session) : base(session) { - Handlers.Add((byte)ByteTransportMessageAction.RequestTransportHandle, RequestTransportHandle); + Handlers.Add((byte)ByteTransportMessageAction.Request, Request); + Handlers.Add((byte)ByteTransportMessageAction.Error, Error); + Handlers.Add((byte)ByteTransportMessageAction.Ready, Ready); + Handlers.Add((byte)ByteTransportMessageAction.Write, Write); } - private void RequestTransportHandle(byte action_id, MqMessage message) { + + + + + private void Error(byte action_handler, MqMessage message) { + throw new NotImplementedException(); + } + + public void ReadyReceive(ushort transport_id) { + var ready_frame = Session.CreateFrame(new byte[2], MqFrameType.Last); + ready_frame.Write(0, transport_id); + + SendHandlerMessage((byte)ByteTransportMessageAction.Ready, ready_frame.ToMessage()); + } + + public ByteTransport CreateTransport() { + var handle = send_operation.CreateWaitHandle(null); + handle.ByteTransport = new ByteTransport(this, handle.Id, ByteTransport.Mode.Send); + + var request_frame = Session.CreateFrame(new byte[2], MqFrameType.Last); + request_frame.Write(0, handle.Id); + + SendHandlerMessage((byte)ByteTransportMessageAction.Request, request_frame.ToMessage()); + + return handle.ByteTransport; + } + + private void Request(byte action_id, MqMessage message) { ushort handle_id = message[0].ReadUInt16(0); - long size = message[0].ReadInt64(2); - - - - if (size > Session.Config.MaxByteTransportLength) { - var error_frame = Session.CreateFrame(new byte[2], MqFrameType.Last); - error_frame.Write(0, Id); - error_frame.Write(1, (byte)ByteTransportMessageAction.ResponseTransportHandle); - } - ByteTransport transport = new ByteTransport(Session, handle_id, size); + var transport = new ByteTransport(this, handle_id, ByteTransport.Mode.Receive); + + var handle = receive_operation.CreateWaitHandle(handle_id); + handle.ByteTransport = transport; + + SendHandlerMessage((byte)ByteTransportMessageAction.Ready, message); + } + + private void Ready(byte action_handler, MqMessage message) { + ushort handle_id = message[0].ReadUInt16(0); + var transport = (IByteTransport) send_operation[handle_id].ByteTransport; - receive_operation.CreateWaitHandle(handle_id); + if (transport.OnReady() == false) { + Session.Close(SocketCloseReason.ApplicationError); + } + } - //RemoteTransports.TryAdd(handle_id, remote_transport); + private void Write(byte action_handler, MqMessage message) { + ushort handle_id = message[0].ReadUInt16(0); - // Create response. - var response_frame = Session.CreateFrame(new byte[4], MqFrameType.Last); - response_frame.Write(0, Id); - response_frame.Write(1, (byte)ByteTransportMessageAction.ResponseTransportHandle); - response_frame.Write(2, handle_id); + var transport = (IByteTransport)send_operation[handle_id].ByteTransport; + + transport.OnReceive(message); + + if (transport.OnReady() == false) { + Session.Close(SocketCloseReason.ApplicationError); + } - Session.Send(response_frame); } diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportWaitHandle.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportWaitHandle.cs new file mode 100644 index 0000000..028a161 --- /dev/null +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportWaitHandle.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DtronixMessageQueue.Rpc.MessageHandlers { + public class ByteTransportWaitHandle : ResponseWaitHandle + where TSession : RpcSession, new() + where TConfig : RpcConfig { + + public ByteTransport ByteTransport { get; set; } + } +}