diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs index f38834c..b42704f 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs @@ -30,7 +30,7 @@ public enum Mode { private readonly object read_lock = new object(); - public event EventHandler Receive; + private MqMessage ready_message; public ByteTransport(ByteTransportMessageHandler handler, ushort id, Mode mode) { this.handler = handler; @@ -39,6 +39,10 @@ public ByteTransport(ByteTransportMessageHandler handler, ush if (mode == Mode.Receive) { message_reader = new MqMessageReader(); + var ready_frame = handler.Session.CreateFrame(new byte[2], MqFrameType.Last); + ready_frame.Write(0, id); + + ready_message = ready_frame.ToMessage(); } else { message_writer = new MqMessageWriter(handler.Session.Config); } @@ -87,8 +91,13 @@ public async Task ReadAsync(byte[] buffer, int offset, int count, Cancellat if (message_reader.IsAtEnd) { message_reader.Message = null; + + handler.SendHandlerMessage((byte)ByteTransportMessageAction.Ready, ready_message); + } + + return total_read; } diff --git a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs index 58d84b7..86159b9 100644 --- a/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs +++ b/src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransportMessageHandler.cs @@ -22,12 +22,11 @@ public ByteTransportMessageHandler(TSession session) : base(session) { Handlers.Add((byte)ByteTransportMessageAction.Error, Error); Handlers.Add((byte)ByteTransportMessageAction.Ready, Ready); Handlers.Add((byte)ByteTransportMessageAction.Write, Write); + Handlers.Add((byte)ByteTransportMessageAction.Close, Close); } - - private void Error(byte action_handler, MqMessage message) { throw new NotImplementedException(); } @@ -86,6 +85,11 @@ private void Write(byte action_handler, MqMessage message) { } + private void Close(byte action_handler, MqMessage message) { + throw new NotImplementedException(); + } + + } }