diff --git a/QuickFIXn/SocketInitiatorThread.cs b/QuickFIXn/SocketInitiatorThread.cs index f337eb5f4..ee4f2e780 100755 --- a/QuickFIXn/SocketInitiatorThread.cs +++ b/QuickFIXn/SocketInitiatorThread.cs @@ -1,9 +1,10 @@ -using System.Net.Sockets; -using System.Net; -using System.Threading; -using System.IO; +#nullable enable using System; using System.Diagnostics; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; namespace QuickFix @@ -13,58 +14,58 @@ namespace QuickFix /// public class SocketInitiatorThread : IResponder { - public Session Session { get { return session_; } } - public Transport.SocketInitiator Initiator { get { return initiator_; } } + public Session Session { get; } + + public Transport.SocketInitiator Initiator { get; } public const int BUF_SIZE = 512; - private Thread thread_ = null; - private byte[] readBuffer_ = new byte[BUF_SIZE]; - private Parser parser_; - protected Stream stream_; - protected CancellationTokenSource _readCancellationTokenSource; - private Transport.SocketInitiator initiator_; - private Session session_; - private IPEndPoint socketEndPoint_; - protected SocketSettings socketSettings_; - private bool isDisconnectRequested_ = false; + private Thread? _thread; + private readonly byte[] _readBuffer = new byte[BUF_SIZE]; + private readonly Parser _parser = new(); + private Stream? _stream; + private CancellationTokenSource _readCancellationTokenSource = new(); + private readonly IPEndPoint _socketEndPoint; + private readonly SocketSettings _socketSettings; + private bool _isDisconnectRequested = false; + + /// + /// Keep a task for handling async read + /// + private Task? _currentReadTask; public SocketInitiatorThread(Transport.SocketInitiator initiator, Session session, IPEndPoint socketEndPoint, SocketSettings socketSettings) { - isDisconnectRequested_ = false; - initiator_ = initiator; - session_ = session; - socketEndPoint_ = socketEndPoint; - parser_ = new Parser(); - session_ = session; - socketSettings_ = socketSettings; - _readCancellationTokenSource = new CancellationTokenSource(); + Initiator = initiator; + Session = session; + _socketEndPoint = socketEndPoint; + _socketSettings = socketSettings; } public void Start() { - isDisconnectRequested_ = false; - thread_ = new Thread(new ParameterizedThreadStart(Transport.SocketInitiator.SocketInitiatorThreadStart)); - thread_.Start(this); + _isDisconnectRequested = false; + _thread = new Thread(Transport.SocketInitiator.SocketInitiatorThreadStart); + _thread.Start(this); } public void Join() { - if (null == thread_) + if (_thread is null) return; Disconnect(); // Make sure session's socket reader thread doesn't try to do a Join on itself! - if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId) - thread_.Join(2000); - thread_ = null; + if (Environment.CurrentManagedThreadId != _thread.ManagedThreadId) + _thread.Join(2000); + _thread = null; } public void Connect() { - Debug.Assert(stream_ == null); + Debug.Assert(_stream == null); - stream_ = SetupStream(); - session_.SetResponder(this); + _stream = SetupStream(); + Session.SetResponder(this); } /// @@ -74,55 +75,36 @@ public void Connect() /// Stream representing the (network)connection to the other party protected virtual Stream SetupStream() { - return QuickFix.Transport.StreamFactory.CreateClientStream(socketEndPoint_, socketSettings_, session_.Log); + return Transport.StreamFactory.CreateClientStream(_socketEndPoint, _socketSettings, Session.Log); } public bool Read() { try { - int bytesRead = ReadSome(readBuffer_, 1000); + int bytesRead = ReadSome(_readBuffer, 1000); if (bytesRead > 0) - parser_.AddToStream(readBuffer_, bytesRead); - else if (null != session_) - { - session_.Next(); - } + _parser.AddToStream(_readBuffer, bytesRead); else - { - throw new QuickFIXException("Initiator timed out while reading socket"); - } + Session.Next(); ProcessStream(); return true; } - catch (System.ObjectDisposedException e) + catch (ObjectDisposedException) { - // this exception means socket_ is already closed when poll() is called - if (isDisconnectRequested_ == false) - { - // for lack of a better idea, do what the general exception does - if (null != session_) - session_.Disconnect(e.ToString()); - else - Disconnect(); - } - return false; + // this exception means _socket is already closed when poll() is called + if (_isDisconnectRequested == false) + Disconnect(); } - catch (System.Exception e) + catch (Exception e) { - if (null != session_) - session_.Disconnect(e.ToString()); - else - Disconnect(); + Session.Log.OnEvent(e.ToString()); + Disconnect(); } return false; } - /// - /// Keep a task for handling async read - /// - private Task currentReadTask; /// /// Reads data from the network into the specified buffer. /// It will wait up to the specified number of milliseconds for data to arrive, @@ -132,61 +114,57 @@ public bool Read() /// The timeout milliseconds. /// The number of bytes read into the buffer /// On connection reset - protected int ReadSome(byte[] buffer, int timeoutMilliseconds) + protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds) { - // NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader. + if (_stream is null) { + throw new ApplicationException("Initiator is not connected (uninitialized stream)"); + } + + // NOTE: FROM HERE, THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketReader. // Any changes made here should also be performed there. try { // Begin read if it is not already started - if (currentReadTask == null) - currentReadTask = stream_.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token); - - // Wait for it to complete (given timeout) - currentReadTask.Wait(timeoutMilliseconds); - - - if (currentReadTask.IsCompleted) - { - // Make sure to set currentReadRequest_ to before retreiving result - // so a new read can be started next time even if an exception is thrown - var request = currentReadTask; - currentReadTask.Dispose(); - currentReadTask = null; - - int bytesRead = request.Result; + _currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token); + + if (_currentReadTask.Wait(timeoutMilliseconds)) { + // Dispose/nullify currentReadTask *before* retreiving .Result. + // Accessting .Result can throw an exception, so we need to reset currentReadTask + // first, to set us up for the next read even if an exception is thrown. + Task? request = _currentReadTask; + _currentReadTask = null; + + int bytesRead = request.Result; // (As mentioned above, this can throw an exception!) if (0 == bytesRead) - throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown)); + throw new SocketException(Convert.ToInt32(SocketError.Shutdown)); return bytesRead; } - else - return 0; + + return 0; } catch (AggregateException ex) // Timeout { var ioException = ex.InnerException as IOException; var inner = ioException?.InnerException as SocketException; - if (inner != null && inner.SocketErrorCode == SocketError.TimedOut) - { + if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) { // Nothing read return 0; } - else if (inner != null) - { + + if (inner is not null) { throw inner; //rethrow SocketException part (which we have exception logic for) } - else - throw; //rethrow original exception + + throw; //rethrow original exception } } private void ProcessStream() { - string msg; - while (parser_.ReadFixMessage(out msg)) + while (_parser.ReadFixMessage(out var msg)) { - session_.Next(msg); + Session.Next(msg); } } @@ -194,23 +172,26 @@ private void ProcessStream() public bool Send(string data) { + if (_stream is null) { + throw new ApplicationException("Initiator is not connected (uninitialized stream)"); + } + byte[] rawData = CharEncoding.DefaultEncoding.GetBytes(data); - stream_.Write(rawData, 0, rawData.Length); + _stream.Write(rawData, 0, rawData.Length); return true; } public void Disconnect() { - isDisconnectRequested_ = true; - _readCancellationTokenSource?.Cancel(); - _readCancellationTokenSource?.Dispose(); - _readCancellationTokenSource = null; + _isDisconnectRequested = true; + _readCancellationTokenSource.Cancel(); + _readCancellationTokenSource.Dispose(); // just wait when read task will be cancelled - currentReadTask?.ContinueWith(t => { }).Wait(1000); - currentReadTask?.Dispose(); - currentReadTask = null; - stream_?.Close(); + _currentReadTask?.ContinueWith(_ => { }).Wait(1000); + _currentReadTask?.Dispose(); + _currentReadTask = null; + _stream?.Close(); } #endregion diff --git a/QuickFIXn/SocketReader.cs b/QuickFIXn/SocketReader.cs index b2327fc17..d12b87b3e 100755 --- a/QuickFIXn/SocketReader.cs +++ b/QuickFIXn/SocketReader.cs @@ -8,9 +8,6 @@ namespace QuickFix { - /// - /// TODO merge with SocketInitiatorThread - /// public class SocketReader : IDisposable { public const int BUF_SIZE = 4096; @@ -18,7 +15,7 @@ public class SocketReader : IDisposable private readonly Parser _parser = new(); private Session? _qfSession; private readonly Stream _stream; - private readonly CancellationTokenSource _readCancellationTokenSource; + private readonly CancellationTokenSource _readCancellationTokenSource = new(); private readonly TcpClient _tcpClient; private readonly ClientHandlerThread _responder; private readonly AcceptorSocketDescriptor? _acceptorDescriptor; @@ -38,7 +35,6 @@ internal SocketReader( _responder = responder; _acceptorDescriptor = acceptorDescriptor; _stream = Transport.StreamFactory.CreateServerStream(tcpClient, settings, responder.GetLog()); - _readCancellationTokenSource = new CancellationTokenSource(); } public void Read() @@ -75,6 +71,8 @@ public void Read() /// On connection reset protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds) { + // NOTE: THIS FUNCTION IS (nearly) EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread. + // Any changes made here should also be performed there. try { // Begin read if it is not already started _currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token); diff --git a/QuickFIXn/Transport/SocketInitiator.cs b/QuickFIXn/Transport/SocketInitiator.cs index f35638492..99659d863 100644 --- a/QuickFIXn/Transport/SocketInitiator.cs +++ b/QuickFIXn/Transport/SocketInitiator.cs @@ -21,8 +21,6 @@ public class SocketInitiator : AbstractInitiator public const string SOCKET_CONNECT_PORT = "SocketConnectPort"; public const string RECONNECT_INTERVAL = "ReconnectInterval"; - #region Private Members - private volatile bool _shutdownRequested = false; private DateTime _lastConnectTimeDt = DateTime.MinValue; private int _reconnectInterval = 30; @@ -31,8 +29,6 @@ public class SocketInitiator : AbstractInitiator private readonly Dictionary _sessionToHostNum = new(); private readonly object _sync = new(); - #endregion - public SocketInitiator( IApplication application, IMessageStoreFactory storeFactory, @@ -42,7 +38,7 @@ public SocketInitiator( : base(application, storeFactory, settings, logFactoryNullable, messageFactoryNullable) { } - public static void SocketInitiatorThreadStart(object socketInitiatorThread) + public static void SocketInitiatorThreadStart(object? socketInitiatorThread) { SocketInitiatorThread? t = socketInitiatorThread as SocketInitiatorThread; if (t == null) return;