Skip to content

Commit

Permalink
cleanup/nullable-ize SocketInitiatorThread
Browse files Browse the repository at this point in the history
  • Loading branch information
gbirchmeier committed Feb 16, 2024
1 parent fc9156e commit 6b5b278
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 113 deletions.
187 changes: 84 additions & 103 deletions QuickFIXn/SocketInitiatorThread.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.IO;
#nullable enable
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace QuickFix
Expand All @@ -13,58 +14,58 @@ namespace QuickFix
/// </summary>
public class SocketInitiatorThread : IResponder
{
public Session Session { get { return session_; } }
public Transport.SocketInitiator Initiator { get { return initiator_; } }
public Session Session { get; }

public Transport.SocketInitiator Initiator { get; }

public const int BUF_SIZE = 512;

private Thread thread_ = null;
private byte[] readBuffer_ = new byte[BUF_SIZE];
private Parser parser_;
protected Stream stream_;
protected CancellationTokenSource _readCancellationTokenSource;
private Transport.SocketInitiator initiator_;
private Session session_;
private IPEndPoint socketEndPoint_;
protected SocketSettings socketSettings_;
private bool isDisconnectRequested_ = false;
private Thread? _thread;
private readonly byte[] _readBuffer = new byte[BUF_SIZE];
private readonly Parser _parser = new();
private Stream? _stream;
private CancellationTokenSource _readCancellationTokenSource = new();
private readonly IPEndPoint _socketEndPoint;
private readonly SocketSettings _socketSettings;
private bool _isDisconnectRequested = false;

/// <summary>
/// Keep a task for handling async read
/// </summary>
private Task<int>? _currentReadTask;

public SocketInitiatorThread(Transport.SocketInitiator initiator, Session session, IPEndPoint socketEndPoint, SocketSettings socketSettings)
{
isDisconnectRequested_ = false;
initiator_ = initiator;
session_ = session;
socketEndPoint_ = socketEndPoint;
parser_ = new Parser();
session_ = session;
socketSettings_ = socketSettings;
_readCancellationTokenSource = new CancellationTokenSource();
Initiator = initiator;
Session = session;
_socketEndPoint = socketEndPoint;
_socketSettings = socketSettings;
}

public void Start()
{
isDisconnectRequested_ = false;
thread_ = new Thread(new ParameterizedThreadStart(Transport.SocketInitiator.SocketInitiatorThreadStart));
thread_.Start(this);
_isDisconnectRequested = false;
_thread = new Thread(Transport.SocketInitiator.SocketInitiatorThreadStart);
_thread.Start(this);
}

public void Join()
{
if (null == thread_)
if (_thread is null)
return;
Disconnect();
// Make sure session's socket reader thread doesn't try to do a Join on itself!
if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId)
thread_.Join(2000);
thread_ = null;
if (Environment.CurrentManagedThreadId != _thread.ManagedThreadId)
_thread.Join(2000);
_thread = null;
}

public void Connect()
{
Debug.Assert(stream_ == null);
Debug.Assert(_stream == null);

stream_ = SetupStream();
session_.SetResponder(this);
_stream = SetupStream();
Session.SetResponder(this);
}

/// <summary>
Expand All @@ -74,55 +75,36 @@ public void Connect()
/// <returns>Stream representing the (network)connection to the other party</returns>
protected virtual Stream SetupStream()
{
return QuickFix.Transport.StreamFactory.CreateClientStream(socketEndPoint_, socketSettings_, session_.Log);
return Transport.StreamFactory.CreateClientStream(_socketEndPoint, _socketSettings, Session.Log);
}

public bool Read()
{
try
{
int bytesRead = ReadSome(readBuffer_, 1000);
int bytesRead = ReadSome(_readBuffer, 1000);
if (bytesRead > 0)
parser_.AddToStream(readBuffer_, bytesRead);
else if (null != session_)
{
session_.Next();
}
_parser.AddToStream(_readBuffer, bytesRead);
else
{
throw new QuickFIXException("Initiator timed out while reading socket");
}
Session.Next();

ProcessStream();
return true;
}
catch (System.ObjectDisposedException e)
catch (ObjectDisposedException)
{
// this exception means socket_ is already closed when poll() is called
if (isDisconnectRequested_ == false)
{
// for lack of a better idea, do what the general exception does
if (null != session_)
session_.Disconnect(e.ToString());
else
Disconnect();
}
return false;
// this exception means _socket is already closed when poll() is called
if (_isDisconnectRequested == false)
Disconnect();
}
catch (System.Exception e)
catch (Exception e)
{
if (null != session_)
session_.Disconnect(e.ToString());
else
Disconnect();
Session.Log.OnEvent(e.ToString());
Disconnect();
}
return false;
}

/// <summary>
/// Keep a task for handling async read
/// </summary>
private Task<int> currentReadTask;
/// <summary>
/// Reads data from the network into the specified buffer.
/// It will wait up to the specified number of milliseconds for data to arrive,
Expand All @@ -132,85 +114,84 @@ public bool Read()
/// <param name="timeoutMilliseconds">The timeout milliseconds.</param>
/// <returns>The number of bytes read into the buffer</returns>
/// <exception cref="System.Net.Sockets.SocketException">On connection reset</exception>
protected int ReadSome(byte[] buffer, int timeoutMilliseconds)
protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds)
{
// NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader.
if (_stream is null) {
throw new ApplicationException("Initiator is not connected (uninitialized stream)");
}

// NOTE: FROM HERE, THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader.
// Any changes made here should also be performed there.
try
{
// Begin read if it is not already started
if (currentReadTask == null)
currentReadTask = stream_.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);

// Wait for it to complete (given timeout)
currentReadTask.Wait(timeoutMilliseconds);


if (currentReadTask.IsCompleted)
{
// Make sure to set currentReadRequest_ to before retreiving result
// so a new read can be started next time even if an exception is thrown
var request = currentReadTask;
currentReadTask.Dispose();
currentReadTask = null;

int bytesRead = request.Result;
_currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);

if (_currentReadTask.Wait(timeoutMilliseconds)) {
// Dispose/nullify currentReadTask *before* retreiving .Result.
// Accessting .Result can throw an exception, so we need to reset currentReadTask
// first, to set us up for the next read even if an exception is thrown.
Task<int>? request = _currentReadTask;
_currentReadTask = null;

int bytesRead = request.Result; // (As mentioned above, this can throw an exception!)
if (0 == bytesRead)
throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown));
throw new SocketException(Convert.ToInt32(SocketError.Shutdown));

return bytesRead;
}
else
return 0;

return 0;
}
catch (AggregateException ex) // Timeout
{
var ioException = ex.InnerException as IOException;
var inner = ioException?.InnerException as SocketException;
if (inner != null && inner.SocketErrorCode == SocketError.TimedOut)
{
if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) {
// Nothing read
return 0;
}
else if (inner != null)
{

if (inner is not null) {
throw inner; //rethrow SocketException part (which we have exception logic for)
}
else
throw; //rethrow original exception

throw; //rethrow original exception
}
}

private void ProcessStream()
{
string msg;
while (parser_.ReadFixMessage(out msg))
while (_parser.ReadFixMessage(out var msg))
{
session_.Next(msg);
Session.Next(msg);
}
}

#region Responder Members

public bool Send(string data)
{
if (_stream is null) {
throw new ApplicationException("Initiator is not connected (uninitialized stream)");
}

byte[] rawData = CharEncoding.DefaultEncoding.GetBytes(data);
stream_.Write(rawData, 0, rawData.Length);
_stream.Write(rawData, 0, rawData.Length);
return true;
}

public void Disconnect()
{
isDisconnectRequested_ = true;
_readCancellationTokenSource?.Cancel();
_readCancellationTokenSource?.Dispose();
_readCancellationTokenSource = null;
_isDisconnectRequested = true;
_readCancellationTokenSource.Cancel();
_readCancellationTokenSource.Dispose();

// just wait when read task will be cancelled
currentReadTask?.ContinueWith(t => { }).Wait(1000);
currentReadTask?.Dispose();
currentReadTask = null;
stream_?.Close();
_currentReadTask?.ContinueWith(_ => { }).Wait(1000);
_currentReadTask?.Dispose();
_currentReadTask = null;
_stream?.Close();
}

#endregion
Expand Down
8 changes: 3 additions & 5 deletions QuickFIXn/SocketReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@

namespace QuickFix
{
/// <summary>
/// TODO merge with SocketInitiatorThread
/// </summary>
public class SocketReader : IDisposable
{
public const int BUF_SIZE = 4096;
private readonly byte[] _readBuffer = new byte[BUF_SIZE];
private readonly Parser _parser = new();
private Session? _qfSession;
private readonly Stream _stream;
private readonly CancellationTokenSource _readCancellationTokenSource;
private readonly CancellationTokenSource _readCancellationTokenSource = new();
private readonly TcpClient _tcpClient;
private readonly ClientHandlerThread _responder;
private readonly AcceptorSocketDescriptor? _acceptorDescriptor;
Expand All @@ -38,7 +35,6 @@ internal SocketReader(
_responder = responder;
_acceptorDescriptor = acceptorDescriptor;
_stream = Transport.StreamFactory.CreateServerStream(tcpClient, settings, responder.GetLog());
_readCancellationTokenSource = new CancellationTokenSource();
}

public void Read()
Expand Down Expand Up @@ -75,6 +71,8 @@ public void Read()
/// <exception cref="System.Net.Sockets.SocketException">On connection reset</exception>
protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds)
{
// NOTE: THIS FUNCTION IS (nearly) EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread.
// Any changes made here should also be performed there.
try {
// Begin read if it is not already started
_currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);
Expand Down
6 changes: 1 addition & 5 deletions QuickFIXn/Transport/SocketInitiator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public class SocketInitiator : AbstractInitiator
public const string SOCKET_CONNECT_PORT = "SocketConnectPort";
public const string RECONNECT_INTERVAL = "ReconnectInterval";

#region Private Members

private volatile bool _shutdownRequested = false;
private DateTime _lastConnectTimeDt = DateTime.MinValue;
private int _reconnectInterval = 30;
Expand All @@ -31,8 +29,6 @@ public class SocketInitiator : AbstractInitiator
private readonly Dictionary<SessionID, int> _sessionToHostNum = new();
private readonly object _sync = new();

#endregion

public SocketInitiator(
IApplication application,
IMessageStoreFactory storeFactory,
Expand All @@ -42,7 +38,7 @@ public SocketInitiator(
: base(application, storeFactory, settings, logFactoryNullable, messageFactoryNullable)
{ }

public static void SocketInitiatorThreadStart(object socketInitiatorThread)
public static void SocketInitiatorThreadStart(object? socketInitiatorThread)
{
SocketInitiatorThread? t = socketInitiatorThread as SocketInitiatorThread;
if (t == null) return;
Expand Down

0 comments on commit 6b5b278

Please sign in to comment.