diff --git a/src/DtronixMessageQueue/DtronixMessageQueue.csproj b/src/DtronixMessageQueue/DtronixMessageQueue.csproj index db52c77..bd64020 100644 --- a/src/DtronixMessageQueue/DtronixMessageQueue.csproj +++ b/src/DtronixMessageQueue/DtronixMessageQueue.csproj @@ -70,6 +70,7 @@ + diff --git a/src/DtronixMessageQueue/Rpc/RpcCommandType.cs b/src/DtronixMessageQueue/Rpc/RpcCommandType.cs index 592c937..5e17386 100644 --- a/src/DtronixMessageQueue/Rpc/RpcCommandType.cs +++ b/src/DtronixMessageQueue/Rpc/RpcCommandType.cs @@ -24,5 +24,17 @@ public enum RpcCommandType : byte { /// Server is sending the result of the authentication request. /// AuthenticationResult = 2, + + + + /// + /// Sends a request to the client/server session for a stream handle to be created to write to. + /// + RequestStreamHandle = 10, + + /// + /// Sends a request to the client/server session for a stream handle be closed. + /// + CloseStreamHandle = 11, } } diff --git a/src/DtronixMessageQueue/Rpc/RpcSession.cs b/src/DtronixMessageQueue/Rpc/RpcSession.cs index a358f42..a6358e6 100644 --- a/src/DtronixMessageQueue/Rpc/RpcSession.cs +++ b/src/DtronixMessageQueue/Rpc/RpcSession.cs @@ -71,6 +71,12 @@ public abstract class RpcSession : MqSession private readonly Dictionary remote_service_realproxy = new Dictionary(); + /// + /// Contains all active stream handles for this session. + /// + private readonly ConcurrentDictionary stream_handles = + new ConcurrentDictionary(); + /// /// Server base socket for this session. /// Null if the BaseSocket is not running in server mode. @@ -193,7 +199,7 @@ protected override void ProcessCommand(MqFrame frame) { // Alert the server that this session is ready for usage. worker_thread_pool.QueueWorkItem(() => { - Ready?.Invoke(this, new SessionEventArgs((TSession)this)); + Ready?.Invoke(this, new SessionEventArgs((TSession) this)); }); SerializationCache.Put(serializer); @@ -212,7 +218,7 @@ protected override void ProcessCommand(MqFrame frame) { Close(SocketCloseReason.ProtocolError); return; } - + byte[] auth_bytes = new byte[frame.DataLength - 2]; frame.Read(2, auth_bytes, 0, auth_bytes.Length); @@ -240,7 +246,7 @@ protected override void ProcessCommand(MqFrame frame) { } else { // Alert the server that this session is ready for usage. worker_thread_pool.QueueWorkItem(() => { - Ready?.Invoke(this, new SessionEventArgs((TSession)this)); + Ready?.Invoke(this, new SessionEventArgs((TSession) this)); }); } @@ -269,9 +275,16 @@ protected override void ProcessCommand(MqFrame frame) { // Alert the client that this session is ready for usage. worker_thread_pool.QueueWorkItem(() => { - Ready?.Invoke(this, new SessionEventArgs((TSession)this)); + Ready?.Invoke(this, new SessionEventArgs((TSession) this)); }); + } else if (rpc_command_type == RpcCommandType.RequestStreamHandle) { + // RpcCommand:byte; RpcCommandType:byte; AuthResult:bool; + Send(new MqFrame()); + var sream_handle_id = frame.ReadUInt16(2); + + } else if (rpc_command_type == RpcCommandType.RequestStreamHandle) { + } else { Close(SocketCloseReason.ProtocolError); } diff --git a/src/DtronixMessageQueue/Rpc/RpcStream.cs b/src/DtronixMessageQueue/Rpc/RpcStream.cs new file mode 100644 index 0000000..718d3e8 --- /dev/null +++ b/src/DtronixMessageQueue/Rpc/RpcStream.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DtronixMessageQueue.Rpc { + public class RpcStream : Stream + where TSession : RpcSession, new() + where TConfig : RpcConfig { + + private readonly RpcSession session; + + private readonly ushort? read_id; + + + private MqMessageReader reader; + private MqMessageWriter writer; + + public override bool CanRead { get; } + public override bool CanSeek { get; } + public override bool CanWrite { get; } + public override long Length { get; } + public override long Position { get; set; } + + /// + /// Create a RpcStream in writer mode. + /// + /// Session to write to. + public RpcStream(RpcSession session) { + this.session = session; + + session. + } + + + /// + /// Create a RpcStream in reader mode. + /// + /// Session to read from. + /// Id used to read from the session with. + public RpcStream(RpcSession session, ushort read_id) { + this.session = session; + this.read_id = read_id; + } + + /// + /// All data is immediately flushed automatically. Calling flush does nothing. + /// + public override void Flush() { + } + + public override long Seek(long offset, SeekOrigin origin) { + throw new NotImplementedException(); + } + + public override void SetLength(long value) { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) { + throw new NotImplementedException(); + } + + + + + } +}