Skip to content

Commit

Permalink
Fix unobserved SocketException
Browse files Browse the repository at this point in the history
resolves #770 (original issue)
resolves #771 (original PR, this is a rebase)

Perform socket read operations according to Task-based asynchronous pattern (TAP) instead of Asynchronous Programming Model (APM)

Also cleanup/nullable-ize SocketInitiatorThread (this part by @gbirchmeier)
  • Loading branch information
nmandzyk authored and gbirchmeier committed Feb 21, 2024
1 parent e317fdd commit 76cbf8f
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 131 deletions.
182 changes: 88 additions & 94 deletions QuickFIXn/SocketInitiatorThread.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.IO;
#nullable enable
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace QuickFix
{
Expand All @@ -12,56 +14,57 @@ namespace QuickFix
/// </summary>
public class SocketInitiatorThread : IResponder
{
public Session Session { get { return session_; } }
public Transport.SocketInitiator Initiator { get { return initiator_; } }
public Session Session { get; }
public Transport.SocketInitiator Initiator { get; }

public const int BUF_SIZE = 512;

private Thread thread_ = null;
private byte[] readBuffer_ = new byte[BUF_SIZE];
private Parser parser_;
protected Stream stream_;
private Transport.SocketInitiator initiator_;
private Session session_;
private IPEndPoint socketEndPoint_;
protected SocketSettings socketSettings_;
private bool isDisconnectRequested_ = false;
private Thread? _thread;
private readonly byte[] _readBuffer = new byte[BUF_SIZE];
private readonly Parser _parser = new();
private Stream? _stream;
private CancellationTokenSource _readCancellationTokenSource = new();
private readonly IPEndPoint _socketEndPoint;
private readonly SocketSettings _socketSettings;
private bool _isDisconnectRequested = false;

/// <summary>
/// Keep a task for handling async read
/// </summary>
private Task<int>? _currentReadTask;

public SocketInitiatorThread(Transport.SocketInitiator initiator, Session session, IPEndPoint socketEndPoint, SocketSettings socketSettings)
{
isDisconnectRequested_ = false;
initiator_ = initiator;
session_ = session;
socketEndPoint_ = socketEndPoint;
parser_ = new Parser();
session_ = session;
socketSettings_ = socketSettings;
Initiator = initiator;
Session = session;
_socketEndPoint = socketEndPoint;
_socketSettings = socketSettings;
}

public void Start()
{
isDisconnectRequested_ = false;
thread_ = new Thread(new ParameterizedThreadStart(Transport.SocketInitiator.SocketInitiatorThreadStart));
thread_.Start(this);
_isDisconnectRequested = false;
_thread = new Thread(Transport.SocketInitiator.SocketInitiatorThreadStart);
_thread.Start(this);
}

public void Join()
{
if (null == thread_)
if (_thread is null)
return;
Disconnect();
// Make sure session's socket reader thread doesn't try to do a Join on itself!
if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId)
thread_.Join(2000);
thread_ = null;
if (Environment.CurrentManagedThreadId != _thread.ManagedThreadId)
_thread.Join(2000);
_thread = null;
}

public void Connect()
{
Debug.Assert(stream_ == null);
Debug.Assert(_stream == null);

stream_ = SetupStream();
session_.SetResponder(this);
_stream = SetupStream();
Session.SetResponder(this);
}

/// <summary>
Expand All @@ -71,55 +74,36 @@ public void Connect()
/// <returns>Stream representing the (network)connection to the other party</returns>
protected virtual Stream SetupStream()
{
return QuickFix.Transport.StreamFactory.CreateClientStream(socketEndPoint_, socketSettings_, session_.Log);
return Transport.StreamFactory.CreateClientStream(_socketEndPoint, _socketSettings, Session.Log);
}

public bool Read()
{
try
{
int bytesRead = ReadSome(readBuffer_, 1000);
int bytesRead = ReadSome(_readBuffer, 1000);
if (bytesRead > 0)
parser_.AddToStream(readBuffer_, bytesRead);
else if (null != session_)
{
session_.Next();
}
_parser.AddToStream(_readBuffer, bytesRead);
else
{
throw new QuickFIXException("Initiator timed out while reading socket");
}
Session.Next();

ProcessStream();
return true;
}
catch (System.ObjectDisposedException e)
catch (ObjectDisposedException)
{
// this exception means socket_ is already closed when poll() is called
if (isDisconnectRequested_ == false)
{
// for lack of a better idea, do what the general exception does
if (null != session_)
session_.Disconnect(e.ToString());
else
Disconnect();
}
return false;
// this exception means _socket is already closed when poll() is called
if (_isDisconnectRequested == false)
Disconnect();
}
catch (System.Exception e)
catch (Exception e)
{
if (null != session_)
session_.Disconnect(e.ToString());
else
Disconnect();
Session.Log.OnEvent(e.ToString());
Disconnect();
}
return false;
}

/// <summary>
/// Keep a handle to the current outstanding read request (if any)
/// </summary>
private IAsyncResult currentReadRequest_;
/// <summary>
/// Reads data from the network into the specified buffer.
/// It will wait up to the specified number of milliseconds for data to arrive,
Expand All @@ -129,75 +113,85 @@ public bool Read()
/// <param name="timeoutMilliseconds">The timeout milliseconds.</param>
/// <returns>The number of bytes read into the buffer</returns>
/// <exception cref="System.Net.Sockets.SocketException">On connection reset</exception>
protected int ReadSome(byte[] buffer, int timeoutMilliseconds)
protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds)
{
// NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader.
if (_stream is null) {
throw new ApplicationException("Initiator is not connected (uninitialized stream)");
}

// NOTE: FROM HERE, THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader.
// Any changes made here should also be performed there.
try
{
// Begin read if it is not already started
if (currentReadRequest_ == null)
currentReadRequest_ = stream_.BeginRead(buffer, 0, buffer.Length, null, null);

// Wait for it to complete (given timeout)
currentReadRequest_.AsyncWaitHandle.WaitOne(timeoutMilliseconds);
_currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);

if (currentReadRequest_.IsCompleted)
{
// Make sure to set currentReadRequest_ to before retreiving result
// so a new read can be started next time even if an exception is thrown
var request = currentReadRequest_;
currentReadRequest_ = null;
if (_currentReadTask.Wait(timeoutMilliseconds)) {
// Dispose/nullify currentReadTask *before* retreiving .Result.
// Accessting .Result can throw an exception, so we need to reset currentReadTask
// first, to set us up for the next read even if an exception is thrown.
Task<int>? request = _currentReadTask;
_currentReadTask = null;

int bytesRead = stream_.EndRead(request);
int bytesRead = request.Result; // (As mentioned above, this can throw an exception!)
if (0 == bytesRead)
throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown));
throw new SocketException(Convert.ToInt32(SocketError.Shutdown));

return bytesRead;
}
else
return 0;

return 0;
}
catch (System.IO.IOException ex) // Timeout
catch (AggregateException ex) // Timeout
{
var inner = ex.InnerException as SocketException;
if (inner != null && inner.SocketErrorCode == SocketError.TimedOut)
{
_currentReadTask = null;
var ioException = ex.InnerException as IOException;
var inner = ioException?.InnerException as SocketException;
if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) {
// Nothing read
return 0;
}
else if (inner != null)
{

if (inner is not null) {
throw inner; //rethrow SocketException part (which we have exception logic for)
}
else
throw; //rethrow original exception

throw; //rethrow original exception
}
}

private void ProcessStream()
{
string msg;
while (parser_.ReadFixMessage(out msg))
while (_parser.ReadFixMessage(out var msg))
{
session_.Next(msg);
Session.Next(msg);
}
}

#region Responder Members

public bool Send(string data)
{
if (_stream is null) {
throw new ApplicationException("Initiator is not connected (uninitialized stream)");
}

byte[] rawData = CharEncoding.DefaultEncoding.GetBytes(data);
stream_.Write(rawData, 0, rawData.Length);
_stream.Write(rawData, 0, rawData.Length);
return true;
}

public void Disconnect()
{
isDisconnectRequested_ = true;
if (stream_ != null)
stream_.Close();
_isDisconnectRequested = true;
_readCancellationTokenSource.Cancel();
_readCancellationTokenSource.Dispose();

// just wait when read task will be cancelled
_currentReadTask?.ContinueWith(_ => { }).Wait(1000);
_currentReadTask?.Dispose();
_currentReadTask = null;
_stream?.Close();
}

#endregion
Expand Down
Loading

0 comments on commit 76cbf8f

Please sign in to comment.