Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Separated BufferManager to not be dependent upon SocketAsyncEventArgs.
Browse files Browse the repository at this point in the history
Changed AsyncManager, OutboxProcessor & InboxProcessor to private fields.
Sessions now use a pre-made buffer for encryption transformations on receiving data.
  • Loading branch information
DJGosnell committed Jun 15, 2018
1 parent 75e14ec commit 7dcbc3a
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 52 deletions.
33 changes: 13 additions & 20 deletions src/DtronixMessageQueue/TcpSocket/BufferManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Net.Sockets;

namespace DtronixMessageQueue.TcpSocket
Expand Down Expand Up @@ -48,36 +49,28 @@ public BufferManager(int totalBytes, int bufferSize)
}

/// <summary>
/// Assigns a buffer from the buffer pool to the specified SocketAsyncEventArgs object
/// Returns a buffer from the buffer pool.
/// </summary>
/// <param name="args"></param>
/// <returns>true if the buffer was successfully set, else false</returns>
public bool SetBuffer(SocketAsyncEventArgs args)
/// <returns>ArraySegment with the buffer section</returns>
public ArraySegment<byte> GetBuffer()
{
if (_freeIndexPool.Count > 0)
{
args.SetBuffer(_buffer, _freeIndexPool.Pop(), _bufferSize);
}
else
{
if (_numBytes - _bufferSize < _currentIndex)
{
return false;
}
args.SetBuffer(_buffer, _currentIndex, _bufferSize);
_currentIndex += _bufferSize;
}
return true;
return new ArraySegment<byte>(_buffer, _freeIndexPool.Pop(), _bufferSize);

if (_numBytes - _bufferSize < _currentIndex)
throw new Exception("Buffer manager exhausted.");

_currentIndex += _bufferSize;
return new ArraySegment<byte>(_buffer, _currentIndex, _bufferSize);
}

/// <summary>
/// Removes the buffer from a SocketAsyncEventArg object. This frees the buffer back to the buffer pool.
/// </summary>
/// <param name="args"></param>
public void FreeBuffer(SocketAsyncEventArgs args)
public void FreeBuffer(ArraySegment<byte> args)
{
_freeIndexPool.Push(args.Offset);
//args.SetBuffer(null, 0, 0);
}
}
}
25 changes: 18 additions & 7 deletions src/DtronixMessageQueue/TcpSocket/SocketAsyncEventArgsManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.Sockets;
using System;
using System.Net.Sockets;
using System.Threading;

namespace DtronixMessageQueue.TcpSocket
Expand Down Expand Up @@ -34,12 +35,12 @@ public SocketAsyncEventArgsManager(int totalBytes, int bufferSize)
/// <summary>
/// Remove the instance from the buffer
/// </summary>
/// <param name="item">The "item" parameter is the SocketAsyncEventArgs instance to add to the pool</param>
public void Free(SocketAsyncEventArgs item)
/// <param name="eventArgs">SocketAsyncEventArgs containing a buffer user token.</param>
public void Free(SocketAsyncEventArgs eventArgs)
{
Interlocked.Decrement(ref _count);
_bufferManager.FreeBuffer(item);
item.Dispose();
_bufferManager.FreeBuffer((ArraySegment<byte>)eventArgs.UserToken);
eventArgs.Dispose();
}

/// <summary>
Expand All @@ -51,10 +52,20 @@ public SocketAsyncEventArgs Create()
Interlocked.Increment(ref _count);
var eventArg = new SocketAsyncEventArgs();

if (_bufferManager.SetBuffer(eventArg) == false)
try
{

var buffer = _bufferManager.GetBuffer();

// Set the buffer to the event args for freeing later.
eventArg.UserToken = buffer;

eventArg.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
}
catch
{
throw new Exception("Attempted to create more than the max number of sessions.");
}


return eventArg;
}
Expand Down
36 changes: 22 additions & 14 deletions src/DtronixMessageQueue/TcpSocket/TcpSocketHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class TcpSocketHandler<TSession, TConfig>
/// <summary>
/// Pool of async args for sessions to use.
/// </summary>
protected SocketAsyncEventArgsManager AsyncManager;
private SocketAsyncEventArgsManager _asyncManager;

/// <summary>
/// True if the timeout timer is running. False otherwise.
Expand All @@ -69,12 +69,12 @@ public abstract class TcpSocketHandler<TSession, TConfig>
/// <summary>
/// Processor to handle all inbound and outbound message handling.
/// </summary>
protected ActionProcessor<Guid> OutboxProcessor;
private readonly ActionProcessor<Guid> _outboxProcessor;

/// <summary>
/// Processor to handle all inbound and outbound message handling.
/// </summary>
protected ActionProcessor<Guid> InboxProcessor;
private readonly ActionProcessor<Guid> _inboxProcessor;

/// <summary>
/// Dictionary of all connected clients.
Expand All @@ -87,6 +87,11 @@ public abstract class TcpSocketHandler<TSession, TConfig>
/// </summary>
protected readonly ServiceMethodCache ServiceMethodCache;

/// <summary>
/// Contains the buffer manager for all the encryption transformations.
/// </summary>
private BufferManager _receiveBufferManager;

/// <summary>
/// Base constructor to all socket classes.
/// </summary>
Expand All @@ -102,12 +107,12 @@ protected TcpSocketHandler(TConfig config, TcpSocketMode mode)

if (mode == TcpSocketMode.Client)
{
OutboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
_outboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
ThreadName = $"{modeLower}-outbox",
StartThreads = 1
});
InboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
_inboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
ThreadName = $"{modeLower}-inbox",
StartThreads = 1
Expand All @@ -119,20 +124,20 @@ protected TcpSocketHandler(TConfig config, TcpSocketMode mode)
? Environment.ProcessorCount
: config.ProcessorThreads;

OutboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
_outboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
ThreadName = $"{modeLower}-outbox",
StartThreads = processorThreads
});
InboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
_inboxProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
ThreadName = $"{modeLower}-inbox",
StartThreads = processorThreads
});
}

OutboxProcessor.Start();
InboxProcessor.Start();
_outboxProcessor.Start();
_inboxProcessor.Start();
}


Expand Down Expand Up @@ -201,8 +206,10 @@ protected void Setup()
// preallocate pool of SocketAsyncEventArgs objects
var bufferSize = Config.SendAndReceiveBufferSize / 16 * 16 + 16;

AsyncManager = new SocketAsyncEventArgsManager(bufferSize * 2 * maxConnections,
_asyncManager = new SocketAsyncEventArgsManager(bufferSize * 2 * maxConnections,
bufferSize);

_receiveBufferManager = new BufferManager(bufferSize * maxConnections, bufferSize);
}

/// <summary>
Expand All @@ -215,12 +222,13 @@ protected virtual TSession CreateSession(System.Net.Sockets.Socket socket)
new TlsSocketSessionCreateArguments<TSession, TConfig>
{
SessionSocket = socket,
SocketArgsManager = AsyncManager,
SocketArgsManager = _asyncManager,
SessionConfig = Config,
TlsSocketHandler = this,
InboxProcessor = InboxProcessor,
OutboxProcessor = OutboxProcessor,
ServiceMethodCache = ServiceMethodCache
InboxProcessor = _inboxProcessor,
OutboxProcessor = _outboxProcessor,
ServiceMethodCache = ServiceMethodCache,
ReceiveBufferManager = _receiveBufferManager
});

SessionSetup?.Invoke(this, new SessionEventArgs<TSession, TConfig>(session));
Expand Down
27 changes: 16 additions & 11 deletions src/DtronixMessageQueue/TcpSocket/TcpSocketSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public enum State : byte
private byte[] _sendBuffer = new byte[16];
private int _sendBufferLength = 0;

private byte[] _receiveTransformedBuffer;
private ArraySegment<byte> _receiveTransformedBuffer;
private BufferManager _receiveTransformedBufferManager;
private byte[] _receivePartialBuffer = new byte[16];
private int _receivePartialBufferLength = 0;

Expand Down Expand Up @@ -214,10 +215,11 @@ public static TSession Create(TlsSocketSessionCreateArguments<TSession, TConfig>
_receiveArgs = args.SocketArgsManager.Create(),
InboxProcessor = args.InboxProcessor,
OutboxProcessor = args.OutboxProcessor,
ServiceMethodCache = args.ServiceMethodCache
ServiceMethodCache = args.ServiceMethodCache,
_receiveTransformedBufferManager = args.ReceiveBufferManager,
_receiveTransformedBuffer = args.ReceiveBufferManager.GetBuffer()
};

session._receiveTransformedBuffer = new byte[args.SessionConfig.SendAndReceiveBufferSize / 16 * 16 + 16];
session._sendArgs.Completed += session.IoCompleted;
session._receiveArgs.Completed += session.IoCompleted;

Expand Down Expand Up @@ -317,7 +319,7 @@ private void SecureConnectionReceive(byte[] buffer)

if (SocketHandler.Mode == TcpSocketMode.Server)
{
var sendPacket = new byte[1 + 140];
var sendPacket = new byte[1 + 140]; // ProtocolVersion + PublicKey
sendPacket[0] = ProtocolVersion;
var publicKey = dh.PublicKey.ToByteArray();
Buffer.BlockCopy(publicKey, 0, sendPacket, 1, 140);
Expand Down Expand Up @@ -711,7 +713,7 @@ private void ReceiveCompleteInternal(byte[] buffer, int offset, int count)
var position = 0;

int receiveLength = TransformDataBuffer(buffer, offset, count,
_receiveTransformedBuffer, 0, _receivePartialBuffer,
_receiveTransformedBuffer.Array, _receiveTransformedBuffer.Offset, _receivePartialBuffer,
ref _receivePartialBufferLength, _decryptor, false, false);

while (position < receiveLength)
Expand All @@ -722,7 +724,7 @@ private void ReceiveCompleteInternal(byte[] buffer, int offset, int count)
// See if we are ready for a new header.
if (receiveHeader.HeaderReceiveState == ReceiveHeader.State.Empty)
{
receiveHeader.HeaderType = (ReceiveHeader.Type)_receiveTransformedBuffer[position];
receiveHeader.HeaderType = (ReceiveHeader.Type)_receiveTransformedBuffer.Array[_receiveTransformedBuffer.Offset + position];

switch (receiveHeader.HeaderType)
{
Expand All @@ -748,7 +750,7 @@ private void ReceiveCompleteInternal(byte[] buffer, int offset, int count)
{
if (position + 1 < count) // See if we can read the entire size at once.
{
receiveHeader.BodyLength = BitConverter.ToInt16(_receiveTransformedBuffer, position);
receiveHeader.BodyLength = BitConverter.ToInt16(_receiveTransformedBuffer.Array, _receiveTransformedBuffer.Offset + position);
position += 2;

// Body length complete.
Expand All @@ -757,7 +759,7 @@ private void ReceiveCompleteInternal(byte[] buffer, int offset, int count)
else
{
// Read the first byte of the body length.
receiveHeader.BodyLengthBuffer[0] = _receiveTransformedBuffer[position];
receiveHeader.BodyLengthBuffer[0] = _receiveTransformedBuffer.Array[_receiveTransformedBuffer.Offset + position];
receiveHeader.BodyLengthBufferLength = 1;
// Nothing more to read.
break;
Expand All @@ -766,7 +768,7 @@ private void ReceiveCompleteInternal(byte[] buffer, int offset, int count)
else
{
// The buffer already contains a byte.
receiveHeader.BodyLengthBuffer[1] = _receiveTransformedBuffer[position];
receiveHeader.BodyLengthBuffer[1] = _receiveTransformedBuffer.Array[_receiveTransformedBuffer.Offset + position];
position++;

receiveHeader.BodyLength = BitConverter.ToInt16(receiveHeader.BodyLengthBuffer, 0);
Expand All @@ -789,7 +791,7 @@ private void ReceiveCompleteInternal(byte[] buffer, int offset, int count)
break;

readBuffer = new byte[currentMessageReadLength];
Buffer.BlockCopy(_receiveTransformedBuffer, position, readBuffer, 0, currentMessageReadLength);
Buffer.BlockCopy(_receiveTransformedBuffer.Array, _receiveTransformedBuffer.Offset + position, readBuffer, 0, currentMessageReadLength);

receiveHeader.BodyPosition += currentMessageReadLength;
position += currentMessageReadLength;
Expand Down Expand Up @@ -856,10 +858,13 @@ public virtual void Close(CloseReason reason)
_sendArgs.Completed -= IoCompleted;
_receiveArgs.Completed -= IoCompleted;

// Free the SocketAsyncEventArg so they can be reused by another client
// Free the SocketAsyncEventArg so they can be reused by another client.
_argsPool.Free(_sendArgs);
_argsPool.Free(_receiveArgs);

// Free the transformed buffer.
_receiveTransformedBufferManager.FreeBuffer(_receiveTransformedBuffer);

InboxProcessor.Deregister(Id);
OutboxProcessor.Deregister(Id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@ public class TlsSocketSessionCreateArguments<TSession, TConfig>
/// Cache for commonly called methods used throughout the session.
/// </summary>
public ServiceMethodCache ServiceMethodCache;

/// <summary>
/// Receive buffer manager used to encryption trasformations on received data.
/// </summary>
public BufferManager ReceiveBufferManager;
}
}

0 comments on commit 7dcbc3a

Please sign in to comment.