diff --git a/QuickFIXn/SocketInitiatorThread.cs b/QuickFIXn/SocketInitiatorThread.cs
index 9277cb1cf..117204574 100755
--- a/QuickFIXn/SocketInitiatorThread.cs
+++ b/QuickFIXn/SocketInitiatorThread.cs
@@ -1,9 +1,11 @@
-using System.Net.Sockets;
-using System.Net;
-using System.Threading;
-using System.IO;
+#nullable enable
using System;
using System.Diagnostics;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
namespace QuickFix
{
@@ -12,56 +14,57 @@ namespace QuickFix
///
public class SocketInitiatorThread : IResponder
{
- public Session Session { get { return session_; } }
- public Transport.SocketInitiator Initiator { get { return initiator_; } }
+ public Session Session { get; }
+ public Transport.SocketInitiator Initiator { get; }
public const int BUF_SIZE = 512;
- private Thread thread_ = null;
- private byte[] readBuffer_ = new byte[BUF_SIZE];
- private Parser parser_;
- protected Stream stream_;
- private Transport.SocketInitiator initiator_;
- private Session session_;
- private IPEndPoint socketEndPoint_;
- protected SocketSettings socketSettings_;
- private bool isDisconnectRequested_ = false;
+ private Thread? _thread;
+ private readonly byte[] _readBuffer = new byte[BUF_SIZE];
+ private readonly Parser _parser = new();
+ private Stream? _stream;
+ private CancellationTokenSource _readCancellationTokenSource = new();
+ private readonly IPEndPoint _socketEndPoint;
+ private readonly SocketSettings _socketSettings;
+ private bool _isDisconnectRequested = false;
+
+ ///
+ /// Keep a task for handling async read
+ ///
+ private Task? _currentReadTask;
public SocketInitiatorThread(Transport.SocketInitiator initiator, Session session, IPEndPoint socketEndPoint, SocketSettings socketSettings)
{
- isDisconnectRequested_ = false;
- initiator_ = initiator;
- session_ = session;
- socketEndPoint_ = socketEndPoint;
- parser_ = new Parser();
- session_ = session;
- socketSettings_ = socketSettings;
+ Initiator = initiator;
+ Session = session;
+ _socketEndPoint = socketEndPoint;
+ _socketSettings = socketSettings;
}
public void Start()
{
- isDisconnectRequested_ = false;
- thread_ = new Thread(new ParameterizedThreadStart(Transport.SocketInitiator.SocketInitiatorThreadStart));
- thread_.Start(this);
+ _isDisconnectRequested = false;
+ _thread = new Thread(Transport.SocketInitiator.SocketInitiatorThreadStart);
+ _thread.Start(this);
}
public void Join()
{
- if (null == thread_)
+ if (_thread is null)
return;
Disconnect();
// Make sure session's socket reader thread doesn't try to do a Join on itself!
- if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId)
- thread_.Join(2000);
- thread_ = null;
+ if (Environment.CurrentManagedThreadId != _thread.ManagedThreadId)
+ _thread.Join(2000);
+ _thread = null;
}
public void Connect()
{
- Debug.Assert(stream_ == null);
+ Debug.Assert(_stream == null);
- stream_ = SetupStream();
- session_.SetResponder(this);
+ _stream = SetupStream();
+ Session.SetResponder(this);
}
///
@@ -71,55 +74,36 @@ public void Connect()
/// Stream representing the (network)connection to the other party
protected virtual Stream SetupStream()
{
- return QuickFix.Transport.StreamFactory.CreateClientStream(socketEndPoint_, socketSettings_, session_.Log);
+ return Transport.StreamFactory.CreateClientStream(_socketEndPoint, _socketSettings, Session.Log);
}
public bool Read()
{
try
{
- int bytesRead = ReadSome(readBuffer_, 1000);
+ int bytesRead = ReadSome(_readBuffer, 1000);
if (bytesRead > 0)
- parser_.AddToStream(readBuffer_, bytesRead);
- else if (null != session_)
- {
- session_.Next();
- }
+ _parser.AddToStream(_readBuffer, bytesRead);
else
- {
- throw new QuickFIXException("Initiator timed out while reading socket");
- }
+ Session.Next();
ProcessStream();
return true;
}
- catch (System.ObjectDisposedException e)
+ catch (ObjectDisposedException)
{
- // this exception means socket_ is already closed when poll() is called
- if (isDisconnectRequested_ == false)
- {
- // for lack of a better idea, do what the general exception does
- if (null != session_)
- session_.Disconnect(e.ToString());
- else
- Disconnect();
- }
- return false;
+ // this exception means _socket is already closed when poll() is called
+ if (_isDisconnectRequested == false)
+ Disconnect();
}
- catch (System.Exception e)
+ catch (Exception e)
{
- if (null != session_)
- session_.Disconnect(e.ToString());
- else
- Disconnect();
+ Session.Log.OnEvent(e.ToString());
+ Disconnect();
}
return false;
}
- ///
- /// Keep a handle to the current outstanding read request (if any)
- ///
- private IAsyncResult currentReadRequest_;
///
/// Reads data from the network into the specified buffer.
/// It will wait up to the specified number of milliseconds for data to arrive,
@@ -129,58 +113,58 @@ public bool Read()
/// The timeout milliseconds.
/// The number of bytes read into the buffer
/// On connection reset
- protected int ReadSome(byte[] buffer, int timeoutMilliseconds)
+ protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds)
{
- // NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader.
+ if (_stream is null) {
+ throw new ApplicationException("Initiator is not connected (uninitialized stream)");
+ }
+
+ // NOTE: FROM HERE, THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader.
// Any changes made here should also be performed there.
try
{
// Begin read if it is not already started
- if (currentReadRequest_ == null)
- currentReadRequest_ = stream_.BeginRead(buffer, 0, buffer.Length, null, null);
-
- // Wait for it to complete (given timeout)
- currentReadRequest_.AsyncWaitHandle.WaitOne(timeoutMilliseconds);
+ _currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);
- if (currentReadRequest_.IsCompleted)
- {
- // Make sure to set currentReadRequest_ to before retreiving result
- // so a new read can be started next time even if an exception is thrown
- var request = currentReadRequest_;
- currentReadRequest_ = null;
+ if (_currentReadTask.Wait(timeoutMilliseconds)) {
+ // Dispose/nullify currentReadTask *before* retreiving .Result.
+ // Accessting .Result can throw an exception, so we need to reset currentReadTask
+ // first, to set us up for the next read even if an exception is thrown.
+ Task? request = _currentReadTask;
+ _currentReadTask = null;
- int bytesRead = stream_.EndRead(request);
+ int bytesRead = request.Result; // (As mentioned above, this can throw an exception!)
if (0 == bytesRead)
- throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown));
+ throw new SocketException(Convert.ToInt32(SocketError.Shutdown));
return bytesRead;
}
- else
- return 0;
+
+ return 0;
}
- catch (System.IO.IOException ex) // Timeout
+ catch (AggregateException ex) // Timeout
{
- var inner = ex.InnerException as SocketException;
- if (inner != null && inner.SocketErrorCode == SocketError.TimedOut)
- {
+ _currentReadTask = null;
+ var ioException = ex.InnerException as IOException;
+ var inner = ioException?.InnerException as SocketException;
+ if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) {
// Nothing read
return 0;
}
- else if (inner != null)
- {
+
+ if (inner is not null) {
throw inner; //rethrow SocketException part (which we have exception logic for)
}
- else
- throw; //rethrow original exception
+
+ throw; //rethrow original exception
}
}
private void ProcessStream()
{
- string msg;
- while (parser_.ReadFixMessage(out msg))
+ while (_parser.ReadFixMessage(out var msg))
{
- session_.Next(msg);
+ Session.Next(msg);
}
}
@@ -188,16 +172,26 @@ private void ProcessStream()
public bool Send(string data)
{
+ if (_stream is null) {
+ throw new ApplicationException("Initiator is not connected (uninitialized stream)");
+ }
+
byte[] rawData = CharEncoding.DefaultEncoding.GetBytes(data);
- stream_.Write(rawData, 0, rawData.Length);
+ _stream.Write(rawData, 0, rawData.Length);
return true;
}
public void Disconnect()
{
- isDisconnectRequested_ = true;
- if (stream_ != null)
- stream_.Close();
+ _isDisconnectRequested = true;
+ _readCancellationTokenSource.Cancel();
+ _readCancellationTokenSource.Dispose();
+
+ // just wait when read task will be cancelled
+ _currentReadTask?.ContinueWith(_ => { }).Wait(1000);
+ _currentReadTask?.Dispose();
+ _currentReadTask = null;
+ _stream?.Close();
}
#endregion
diff --git a/QuickFIXn/SocketReader.cs b/QuickFIXn/SocketReader.cs
index 4d8eb7e88..d12b87b3e 100755
--- a/QuickFIXn/SocketReader.cs
+++ b/QuickFIXn/SocketReader.cs
@@ -3,27 +3,27 @@
using System.IO;
using System;
using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
namespace QuickFix
{
- ///
- /// TODO merge with SocketInitiatorThread
- ///
public class SocketReader : IDisposable
{
public const int BUF_SIZE = 4096;
private readonly byte[] _readBuffer = new byte[BUF_SIZE];
private readonly Parser _parser = new();
- private Session? _qfSession; //will be null when initialized
+ private Session? _qfSession;
private readonly Stream _stream;
+ private readonly CancellationTokenSource _readCancellationTokenSource = new();
private readonly TcpClient _tcpClient;
private readonly ClientHandlerThread _responder;
private readonly AcceptorSocketDescriptor? _acceptorDescriptor;
///
- /// Keep a handle to the current outstanding read request (if any)
+ /// Keep a task for handling async read
///
- private IAsyncResult? _currentReadRequest;
+ private Task? _currentReadTask;
internal SocketReader(
TcpClient tcpClient,
@@ -71,46 +71,44 @@ public void Read()
/// On connection reset
protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds)
{
- // NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread.
- // Any changes made here should also be made there.
- try
- {
+ // NOTE: THIS FUNCTION IS (nearly) EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread.
+ // Any changes made here should also be performed there.
+ try {
// Begin read if it is not already started
- _currentReadRequest ??= _stream.BeginRead(buffer, 0, buffer.Length, null, null);
-
- // Wait for it to complete (given timeout)
- _currentReadRequest.AsyncWaitHandle.WaitOne(timeoutMilliseconds);
+ _currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);
- if (_currentReadRequest.IsCompleted)
- {
- // Make sure to set _currentReadRequest to before retreiving result
- // so a new read can be started next time even if an exception is thrown
- var request = _currentReadRequest;
- _currentReadRequest = null;
+ if (_currentReadTask.Wait(timeoutMilliseconds)) {
+ // Dispose/nullify currentReadTask *before* retreiving .Result.
+ // Accessting .Result can throw an exception, so we need to reset currentReadTask
+ // first, to set us up for the next read even if an exception is thrown.
+ Task? request = _currentReadTask;
+ _currentReadTask = null;
- int bytesRead = _stream.EndRead(request);
+ int bytesRead = request.Result; // (As mentioned above, this can throw an exception!)
if (0 == bytesRead)
- throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown));
+ throw new SocketException(Convert.ToInt32(SocketError.Shutdown));
return bytesRead;
}
return 0;
}
- catch (IOException ex) // Timeout
+ catch (AggregateException ex) // Timeout
{
- var inner = ex.InnerException as SocketException;
- if (inner?.SocketErrorCode == SocketError.TimedOut)
- {
+ _currentReadTask = null;
+
+ IOException? ioException = ex.InnerException as IOException;
+ SocketException? inner = ioException?.InnerException as SocketException;
+ if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) {
// Nothing read
return 0;
}
- else if (inner is not null)
- {
+
+ if (inner is not null) {
throw inner; //rethrow SocketException part (which we have exception logic for)
}
- else
- throw; //rethrow original exception
+
+ throw; //rethrow original exception
}
}
@@ -200,8 +198,7 @@ protected void ProcessStream()
protected void DisconnectClient()
{
- _stream.Close();
- _tcpClient.Close();
+ Dispose();
}
private bool IsAssumedSession(SessionID sessionId)
@@ -275,13 +272,23 @@ public void Dispose()
GC.SuppressFinalize(this);
}
+ private bool _disposed = false;
protected virtual void Dispose(bool disposing)
{
+ if (_disposed) return;
if (disposing)
{
+ _readCancellationTokenSource.Cancel();
+ _readCancellationTokenSource.Dispose();
+
+ // just wait when read task will be cancelled
+ _currentReadTask?.ContinueWith(_ => { }).Wait(1000);
+ _currentReadTask?.Dispose();
+
_stream.Dispose();
_tcpClient.Close();
}
+ _disposed = true;
}
~SocketReader() => Dispose(false);
}
diff --git a/QuickFIXn/Transport/SocketInitiator.cs b/QuickFIXn/Transport/SocketInitiator.cs
index 086949126..d8e24a430 100644
--- a/QuickFIXn/Transport/SocketInitiator.cs
+++ b/QuickFIXn/Transport/SocketInitiator.cs
@@ -21,8 +21,6 @@ public class SocketInitiator : AbstractInitiator
public const string SOCKET_CONNECT_PORT = "SocketConnectPort";
public const string RECONNECT_INTERVAL = "ReconnectInterval";
- #region Private Members
-
private volatile bool _shutdownRequested = false;
private DateTime _lastConnectTimeDt = DateTime.MinValue;
private int _reconnectInterval = 30;
@@ -31,8 +29,6 @@ public class SocketInitiator : AbstractInitiator
private readonly Dictionary _sessionToHostNum = new();
private readonly object _sync = new();
- #endregion
-
public SocketInitiator(
IApplication application,
IMessageStoreFactory storeFactory,
@@ -42,7 +38,7 @@ public SocketInitiator(
: base(application, storeFactory, settings, logFactoryNullable, messageFactoryNullable)
{ }
- public static void SocketInitiatorThreadStart(object socketInitiatorThread)
+ public static void SocketInitiatorThreadStart(object? socketInitiatorThread)
{
SocketInitiatorThread? t = socketInitiatorThread as SocketInitiatorThread;
if (t == null) return;
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 70788fe49..e7ce46de5 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -73,6 +73,10 @@ What's New
* #740 - Capture inner exception messages when handling authentication exceptions (rars)
* #833 - Add Try/Catch logic to SocketInitiator.OnStart() (Falcz)
* #782 - proper handling of XmlData field (larsope)
+* #770 - fix unobserved SocketException
+ * Perform socket read operations according to Task-based asynchronous pattern (TAP) instead of Asynchronous
+ Programming Model (APM), in order to catch unobserved SocketExceptions (nmandzyk)
+ * Cleanup/nullable-ize SocketInitiatorThread (gbirchmeier)
### v1.11.2:
* same as v1.11.1, but I fixed the readme in the pushed nuget packages