Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Started work on streaming RPC data.
Browse files Browse the repository at this point in the history
  • Loading branch information
DJGosnell committed Sep 23, 2016
1 parent 72bcabf commit a6ba5dd
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<Compile Include="Rpc\RpcOperationWait.cs" />
<Compile Include="Rpc\RpcServer.cs" />
<Compile Include="Rpc\RpcSession.cs" />
<Compile Include="Rpc\RpcStream.cs" />
<Compile Include="Rpc\SerializationCache.cs" />
<Compile Include="Socket\BufferManager.cs" />
<Compile Include="Socket\ISetupSocketSession.cs" />
Expand Down
12 changes: 12 additions & 0 deletions src/DtronixMessageQueue/Rpc/RpcCommandType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,17 @@ public enum RpcCommandType : byte {
/// Server is sending the result of the authentication request.
/// </summary>
AuthenticationResult = 2,



/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
/// </summary>
RequestStreamHandle = 10,

/// <summary>
/// Sends a request to the client/server session for a stream handle be closed.
/// </summary>
CloseStreamHandle = 11,
}
}
21 changes: 17 additions & 4 deletions src/DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public abstract class RpcSession<TSession, TConfig> : MqSession<TSession, TConfi
/// </summary>
private readonly Dictionary<Type, RealProxy> remote_service_realproxy = new Dictionary<Type, RealProxy>();

/// <summary>
/// Contains all active stream handles for this session.
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcOperationWait> stream_handles =
new ConcurrentDictionary<ushort, RpcOperationWait>();

/// <summary>
/// Server base socket for this session.
/// Null if the BaseSocket is not running in server mode.
Expand Down Expand Up @@ -193,7 +199,7 @@ protected override void ProcessCommand(MqFrame frame) {

// Alert the server that this session is ready for usage.
worker_thread_pool.QueueWorkItem(() => {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession)this));
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});

SerializationCache.Put(serializer);
Expand All @@ -212,7 +218,7 @@ protected override void ProcessCommand(MqFrame frame) {
Close(SocketCloseReason.ProtocolError);
return;
}

byte[] auth_bytes = new byte[frame.DataLength - 2];
frame.Read(2, auth_bytes, 0, auth_bytes.Length);

Expand Down Expand Up @@ -240,7 +246,7 @@ protected override void ProcessCommand(MqFrame frame) {
} else {
// Alert the server that this session is ready for usage.
worker_thread_pool.QueueWorkItem(() => {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession)this));
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});
}

Expand Down Expand Up @@ -269,9 +275,16 @@ protected override void ProcessCommand(MqFrame frame) {

// Alert the client that this session is ready for usage.
worker_thread_pool.QueueWorkItem(() => {
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession)this));
Ready?.Invoke(this, new SessionEventArgs<TSession, TConfig>((TSession) this));
});

} else if (rpc_command_type == RpcCommandType.RequestStreamHandle) {
// RpcCommand:byte; RpcCommandType:byte; AuthResult:bool;
Send(new MqFrame());
var sream_handle_id = frame.ReadUInt16(2);

} else if (rpc_command_type == RpcCommandType.RequestStreamHandle) {

} else {
Close(SocketCloseReason.ProtocolError);
}
Expand Down
74 changes: 74 additions & 0 deletions src/DtronixMessageQueue/Rpc/RpcStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DtronixMessageQueue.Rpc {
public class RpcStream<TSession, TConfig> : Stream
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {

private readonly RpcSession<TSession, TConfig> session;

private readonly ushort? read_id;


private MqMessageReader reader;
private MqMessageWriter writer;

public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length { get; }
public override long Position { get; set; }

/// <summary>
/// Create a RpcStream in writer mode.
/// </summary>
/// <param name="session">Session to write to.</param>
public RpcStream(RpcSession<TSession, TConfig> session) {
this.session = session;

session.
}


/// <summary>
/// Create a RpcStream in reader mode.
/// </summary>
/// <param name="session">Session to read from.</param>
/// <param name="read_id">Id used to read from the session with.</param>
public RpcStream(RpcSession<TSession, TConfig> session, ushort read_id) {
this.session = session;
this.read_id = read_id;
}

/// <summary>
/// All data is immediately flushed automatically. Calling flush does nothing.
/// </summary>
public override void Flush() {
}

public override long Seek(long offset, SeekOrigin origin) {
throw new NotImplementedException();
}

public override void SetLength(long value) {
throw new NotImplementedException();
}

public override int Read(byte[] buffer, int offset, int count) {
throw new NotImplementedException();
}

public override void Write(byte[] buffer, int offset, int count) {
throw new NotImplementedException();
}




}
}

0 comments on commit a6ba5dd

Please sign in to comment.