Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added MqFrame.ToMessage() to convert a frame into a message.
Browse files Browse the repository at this point in the history
Implementing methods to handle transportation of bytes.
  • Loading branch information
DJGosnell committed Oct 17, 2016
1 parent cbdb53f commit 0b3827d
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 71 deletions.
1 change: 1 addition & 0 deletions src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<Compile Include="Rpc\IRemoteService.cs" />
<Compile Include="Rpc\MessageHandler.cs" />
<Compile Include="Rpc\MessageHandlers\ByteTransportReceiveEventArgs.cs" />
<Compile Include="Rpc\MessageHandlers\ByteTransportWaitHandle.cs" />
<Compile Include="Rpc\ResponseWait.cs" />
<Compile Include="Rpc\RpcAuthEventArgs.cs" />
<Compile Include="Rpc\MessageHandlers\ByteTransportMessageHandler.cs" />
Expand Down
12 changes: 12 additions & 0 deletions src/DtronixMessageQueue/MqFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public MqFrameType FrameType {
/// </summary>
public const int HeaderLength = 3;

/// <summary>
/// Contains the configuration information about the current client/server.
/// Used to determine how large the frames are to be.
/// </summary>
private MqConfig config;

/// <summary>
Expand Down Expand Up @@ -577,6 +581,14 @@ public override string ToString() {
return $"MqFrame totaling {buffer.Length:N0} bytes; Type: {FrameType}";
}

/// <summary>
/// Puts this frame inside a message.
/// </summary>
/// <returns>Message with this frame as the first frame.</returns>
public MqMessage ToMessage() {
return new MqMessage(this);
}

/// <summary>
/// Disposes of this object and its resources.
/// </summary>
Expand Down
6 changes: 5 additions & 1 deletion src/DtronixMessageQueue/Rpc/MessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public abstract class MessageHandler<TSession, TConfig>
/// </summary>
public abstract byte Id { get; }

protected TSession Session;
public TSession Session;

protected Dictionary<byte, ActionHandler> Handlers = new Dictionary<byte, ActionHandler>();

Expand All @@ -45,6 +45,10 @@ public bool HandleMessage(MqMessage message) {
return false;
}

public void SendHandlerMessage(byte action_id) {
SendHandlerMessage(action_id, null);
}

public void SendHandlerMessage(byte action_id, MqMessage message) {
var header_frame = Session.CreateFrame(new byte[2], MqFrameType.More);
header_frame.Write(0, Id);
Expand Down
85 changes: 49 additions & 36 deletions src/DtronixMessageQueue/Rpc/MessageHandlers/ByteTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,59 @@
using System.Threading.Tasks;

namespace DtronixMessageQueue.Rpc.MessageHandlers {
public class ByteTransport<TSession, TConfig>
public class ByteTransport<TSession, TConfig> : IByteTransport
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {
public long Length { get; set; }

private readonly TSession session;
public enum Mode {
Send,
Receive
}

private readonly ByteTransportMessageHandler<TSession, TConfig> handler;
private readonly ushort id;

private readonly Mode mode;

private readonly MqMessageReader message_reader;
private readonly MqMessageWriter message_writer;

private ConcurrentQueue<MqMessage> read_buffer;
public MqMessage ReadBuffer { get; set; }

private readonly SemaphoreSlim read_semaphore;
private readonly SemaphoreSlim semaphore;

private readonly object read_lock = new object();

public event EventHandler<ByteTransportReceiveEventArgs> Receive;

public ByteTransport(TSession session, ushort id, long length) {
Length = length;
this.session = session;
public ByteTransport(ByteTransportMessageHandler<TSession, TConfig> handler, ushort id, Mode mode) {
this.handler = handler;
this.id = id;
message_reader = new MqMessageReader();
read_semaphore = new SemaphoreSlim(0, 1);
}
this.mode = mode;

public ByteTransport(TSession session) {
this.session = session;
message_writer = new MqMessageWriter(session.Config);
if (mode == Mode.Receive) {
message_reader = new MqMessageReader();
} else {
message_writer = new MqMessageWriter(handler.Session.Config);
}

semaphore = new SemaphoreSlim(0, 1);
}

public void OnReceive(MqMessage message) {
read_buffer.Enqueue(message);

lock (read_lock) {
if (read_semaphore.CurrentCount == 0) {
read_semaphore.Release();
}
bool IByteTransport.OnReady() {
if (semaphore.CurrentCount == 0) {
semaphore.Release();
return true;
}


return false;
}



void IByteTransport.OnReceive(MqMessage message) {
ReadBuffer = message;
}

/// <summary>
Expand All @@ -57,24 +69,18 @@ public void OnReceive(MqMessage message) {
/// <param name="offset">Offset in the buffer to start copying to.</param>
/// <param name="count">Number of bytes to try to read into the buffer.</param>
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellation_token) {
if (read_semaphore == null) {
if (semaphore == null) {
throw new InvalidOperationException("Byte transport is set to write mode. Can not read when in write more.");
}

if (read_buffer.IsEmpty) {
// ReSharper disable once InconsistentlySynchronizedField
await read_semaphore.WaitAsync(cancellation_token);
if (ReadBuffer == null) {
await semaphore.WaitAsync(cancellation_token);
}


if (message_reader.Message == null) {
MqMessage message;
if (read_buffer.TryDequeue(out message) == false) {
return 0;
}

message_reader.Message = message;
message_reader.Skip(3);
message_reader.Message = ReadBuffer;
message_reader.Skip(2);
}

var total_read = message_reader.Read(buffer, offset, count);
Expand All @@ -93,14 +99,21 @@ public async Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancellat
/// <param name="buffer">Buffer to write to the message.</param>
/// <param name="index">Offset in the buffer to write from</param>
/// <param name="count">Number of bytes to write to the message from the buffer.</param>
public void Write(byte[] buffer, int index, int count) {
message_writer.Write((byte) 2);
message_writer.Write((byte) ByteTransportMessageAction.Write);
public async void WriteAsync(byte[] buffer, int index, int count, CancellationToken cancellation_token) {
// Wait for the server response that we are ready to send the next packet.
await semaphore.WaitAsync(cancellation_token);

message_writer.Write(id);
message_writer.Write(buffer, index, count);

handler.SendHandlerMessage((byte)ByteTransportMessageAction.Write, message_writer.ToMessage(true));

session.Send(message_writer.ToMessage(true));

}
}

public interface IByteTransport {
bool OnReady();
void OnReceive(MqMessage message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,28 @@ public enum ByteTransportMessageAction : byte {
/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
/// </summary>
RequestTransportHandle = 1,
Request = 1,

/// <summary>
/// Sends a request to the client/server session for a stream handle to be created to write to.
/// Sent when the transport handle is in an error state.
/// </summary>
Error = 2,

/// <summary>
/// Lets the recipient session know it is ok to send the next packet.
/// </summary>
ResponseTransportHandle = 2,
Ready = 3,

/// <summary>
/// Sends a request to the client/server session for a stream handle be closed.
/// </summary>
CloseTransportHandle = 3,
Close = 4,

/// <summary>
/// Sends data to the client/server.
/// This message contains the byte buffer.
/// </summary>
Write = 4,
Write = 5

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,77 @@ public class ByteTransportMessageHandler<TSession, TConfig> : MessageHandler<TSe
/// </summary>
public sealed override byte Id => 2;

private readonly ResponseWait<ResponseWaitHandle> send_operation = new ResponseWait<ResponseWaitHandle>();
private readonly ResponseWait<ByteTransportWaitHandle<TSession, TConfig>> send_operation = new ResponseWait<ByteTransportWaitHandle<TSession, TConfig>>();

private readonly ResponseWait<ResponseWaitHandle> receive_operation = new ResponseWait<ResponseWaitHandle>();

/// <summary>
/// Contains all local transports.
/// </summary>
public readonly ConcurrentDictionary<ushort, ByteTransport<TSession, TConfig>> RemoteTransports =
new ConcurrentDictionary<ushort, ByteTransport<TSession, TConfig>>();
private readonly ResponseWait<ByteTransportWaitHandle<TSession, TConfig>> receive_operation = new ResponseWait<ByteTransportWaitHandle<TSession, TConfig>>();


public ByteTransportMessageHandler(TSession session) : base(session) {
Handlers.Add((byte)ByteTransportMessageAction.RequestTransportHandle, RequestTransportHandle);
Handlers.Add((byte)ByteTransportMessageAction.Request, Request);
Handlers.Add((byte)ByteTransportMessageAction.Error, Error);
Handlers.Add((byte)ByteTransportMessageAction.Ready, Ready);
Handlers.Add((byte)ByteTransportMessageAction.Write, Write);
}

private void RequestTransportHandle(byte action_id, MqMessage message) {




private void Error(byte action_handler, MqMessage message) {
throw new NotImplementedException();
}

public void ReadyReceive(ushort transport_id) {
var ready_frame = Session.CreateFrame(new byte[2], MqFrameType.Last);
ready_frame.Write(0, transport_id);

SendHandlerMessage((byte)ByteTransportMessageAction.Ready, ready_frame.ToMessage());
}

public ByteTransport<TSession, TConfig> CreateTransport() {
var handle = send_operation.CreateWaitHandle(null);
handle.ByteTransport = new ByteTransport<TSession, TConfig>(this, handle.Id, ByteTransport<TSession, TConfig>.Mode.Send);

var request_frame = Session.CreateFrame(new byte[2], MqFrameType.Last);
request_frame.Write(0, handle.Id);

SendHandlerMessage((byte)ByteTransportMessageAction.Request, request_frame.ToMessage());

return handle.ByteTransport;
}

private void Request(byte action_id, MqMessage message) {
ushort handle_id = message[0].ReadUInt16(0);
long size = message[0].ReadInt64(2);



if (size > Session.Config.MaxByteTransportLength) {
var error_frame = Session.CreateFrame(new byte[2], MqFrameType.Last);
error_frame.Write(0, Id);
error_frame.Write(1, (byte)ByteTransportMessageAction.ResponseTransportHandle);
}

ByteTransport<TSession, TConfig> transport = new ByteTransport<TSession, TConfig>(Session, handle_id, size);
var transport = new ByteTransport<TSession, TConfig>(this, handle_id, ByteTransport<TSession, TConfig>.Mode.Receive);

var handle = receive_operation.CreateWaitHandle(handle_id);
handle.ByteTransport = transport;

SendHandlerMessage((byte)ByteTransportMessageAction.Ready, message);
}

private void Ready(byte action_handler, MqMessage message) {
ushort handle_id = message[0].ReadUInt16(0);

var transport = (IByteTransport) send_operation[handle_id].ByteTransport;

receive_operation.CreateWaitHandle(handle_id);
if (transport.OnReady() == false) {
Session.Close(SocketCloseReason.ApplicationError);
}
}

//RemoteTransports.TryAdd(handle_id, remote_transport);
private void Write(byte action_handler, MqMessage message) {
ushort handle_id = message[0].ReadUInt16(0);

// Create response.
var response_frame = Session.CreateFrame(new byte[4], MqFrameType.Last);
response_frame.Write(0, Id);
response_frame.Write(1, (byte)ByteTransportMessageAction.ResponseTransportHandle);
response_frame.Write(2, handle_id);
var transport = (IByteTransport)send_operation[handle_id].ByteTransport;

transport.OnReceive(message);

if (transport.OnReady() == false) {
Session.Close(SocketCloseReason.ApplicationError);
}

Session.Send(response_frame);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DtronixMessageQueue.Rpc.MessageHandlers {
public class ByteTransportWaitHandle<TSession, TConfig> : ResponseWaitHandle
where TSession : RpcSession<TSession, TConfig>, new()
where TConfig : RpcConfig {

public ByteTransport<TSession, TConfig> ByteTransport { get; set; }
}
}

0 comments on commit 0b3827d

Please sign in to comment.