Skip to content

Commit

Permalink
cleanup/nullableize ThreadedSocketAcceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
gbirchmeier committed Jan 19, 2024
1 parent 4b94811 commit 1a2cfe0
Showing 1 changed file with 57 additions and 64 deletions.
121 changes: 57 additions & 64 deletions QuickFIXn/ThreadedSocketAcceptor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
#nullable enable
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System;
Expand All @@ -11,13 +12,13 @@ namespace QuickFix
/// </summary>
public class ThreadedSocketAcceptor : IAcceptor
{
private Dictionary<SessionID, Session> sessions_ = new();
private SessionSettings settings_;
private Dictionary<IPEndPoint, AcceptorSocketDescriptor> socketDescriptorForAddress_ = new();
private SessionFactory sessionFactory_;
private bool isStarted_ = false;
private readonly Dictionary<SessionID, Session> _sessions = new();
private readonly SessionSettings _settings;
private readonly Dictionary<IPEndPoint, AcceptorSocketDescriptor> _socketDescriptorForAddress = new();
private readonly SessionFactory _sessionFactory;
private bool _isStarted = false;
private bool _disposed = false;
private object sync_ = new();
private readonly object _sync = new();

#region Constructors

Expand Down Expand Up @@ -59,11 +60,16 @@ public ThreadedSocketAcceptor(
{
ILogFactory lf = logFactory ?? new NullLogFactory();
IMessageFactory mf = messageFactory ?? new DefaultMessageFactory();
SessionFactory sf = new SessionFactory(application, storeFactory, lf, mf);
_settings = settings;
_sessionFactory = new SessionFactory(application, storeFactory, lf, mf);

try
{
CreateSessions(settings, sf);
foreach (SessionID sessionId in settings.GetSessions())
{
QuickFix.Dictionary dict = settings.Get(sessionId);
CreateSession(sessionId, dict);
}
}
catch (Exception e)
{
Expand All @@ -75,17 +81,6 @@ public ThreadedSocketAcceptor(

#region Private Methods

private void CreateSessions(SessionSettings settings, SessionFactory sessionFactory)
{
sessionFactory_ = sessionFactory;
settings_ = settings;
foreach (SessionID sessionID in settings.GetSessions())
{
QuickFix.Dictionary dict = settings.Get(sessionID);
CreateSession(sessionID, dict);
}
}

private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
{
int port = System.Convert.ToInt32(dict.GetLong(SessionSettings.SOCKET_ACCEPT_PORT));
Expand All @@ -106,13 +101,12 @@ private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
}

socketSettings.Configure(dict);


AcceptorSocketDescriptor descriptor;
if (!socketDescriptorForAddress_.TryGetValue(socketEndPoint, out descriptor))

if (!_socketDescriptorForAddress.TryGetValue(socketEndPoint, out var descriptor))
{
descriptor = new AcceptorSocketDescriptor(socketEndPoint, socketSettings, dict);
socketDescriptorForAddress_[socketEndPoint] = descriptor;
_socketDescriptorForAddress[socketEndPoint] = descriptor;
}

return descriptor;
Expand All @@ -121,24 +115,24 @@ private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
/// <summary>
/// Create session, either at start-up or as an ad-hoc operation
/// </summary>
/// <param name="sessionID">ID of new session</param>
/// <param name="sessionId">ID of new session</param>
/// <param name="dict">config settings for new session</param>
/// <returns>true if session added successfully, false if session already exists or is not an acceptor</returns>
private bool CreateSession(SessionID sessionID, Dictionary dict)
private bool CreateSession(SessionID sessionId, Dictionary dict)
{
if (!sessions_.ContainsKey(sessionID))
if (!_sessions.ContainsKey(sessionId))
{
string connectionType = dict.GetString(SessionSettings.CONNECTION_TYPE);
if ("acceptor" == connectionType)
{
AcceptorSocketDescriptor descriptor = GetAcceptorSocketDescriptor(dict);
Session session = sessionFactory_.Create(sessionID, dict);
Session session = _sessionFactory.Create(sessionId, dict);
descriptor.AcceptSession(session);
sessions_[sessionID] = session;
_sessions[sessionId] = session;

// start SocketReactor if it was created via AddSession call
// and if acceptor is already started
if (isStarted_ && !_disposed)
if (_isStarted && !_disposed)
{
descriptor.SocketReactor.Start();
}
Expand All @@ -151,10 +145,10 @@ private bool CreateSession(SessionID sessionID, Dictionary dict)

private void StartAcceptingConnections()
{
lock (sync_)
lock (_sync)
{
// FIXME StartSessionTimer();
foreach (AcceptorSocketDescriptor socketDescriptor in socketDescriptorForAddress_.Values)
foreach (AcceptorSocketDescriptor socketDescriptor in _socketDescriptorForAddress.Values)
{
socketDescriptor.SocketReactor.Start();
// FIXME log_.Info("Listening for connections on " + socketDescriptor.getAddress());
Expand All @@ -164,9 +158,9 @@ private void StartAcceptingConnections()

private void StopAcceptingConnections()
{
lock (sync_)
lock (_sync)
{
foreach (AcceptorSocketDescriptor socketDescriptor in socketDescriptorForAddress_.Values)
foreach (AcceptorSocketDescriptor socketDescriptor in _socketDescriptorForAddress.Values)
{
socketDescriptor.SocketReactor.Shutdown();
// FIXME log_.Info("No longer accepting connections on " + socketDescriptor.getAddress());
Expand All @@ -176,13 +170,13 @@ private void StopAcceptingConnections()

private void LogoutAllSessions(bool force)
{
foreach (Session session in sessions_.Values)
foreach (Session session in _sessions.Values)
{
try
{
session.Logout();
}
catch (System.Exception e)
catch (Exception e)
{
// FIXME logError(session.getSessionID(), "Error during logout", e);
System.Console.WriteLine("Error during logout of Session " + session.SessionID + ": " + e.Message);
Expand All @@ -191,14 +185,14 @@ private void LogoutAllSessions(bool force)

if (force && IsLoggedOn)
{
foreach (Session session in sessions_.Values)
foreach (Session session in _sessions.Values)
{
try
{
if (session.IsLoggedOn)
session.Disconnect("Forcibly disconnecting session");
}
catch (System.Exception e)
catch (Exception e)
{
// FIXME logError(session.getSessionID(), "Error during disconnect", e);
System.Console.WriteLine("Error during disconnect of Session " + session.SessionID + ": " + e.Message);
Expand All @@ -211,7 +205,7 @@ private void LogoutAllSessions(bool force)
}

/// <summary>
/// FIXME
/// FIXME implement WaitForLogout
/// </summary>
private void WaitForLogout()
{
Expand Down Expand Up @@ -246,7 +240,7 @@ private void WaitForLogout()

private void DisposeSessions()
{
foreach (var session in sessions_.Values)
foreach (var session in _sessions.Values)
{
session.Dispose();
}
Expand All @@ -261,12 +255,12 @@ public void Start()
if (_disposed)
throw new ObjectDisposedException(GetType().Name);

lock (sync_)
lock (_sync)
{
if (!isStarted_)
if (!_isStarted)
{
StartAcceptingConnections();
isStarted_ = true;
_isStarted = true;
}
}
}
Expand All @@ -284,7 +278,7 @@ public void Stop(bool force)
StopAcceptingConnections();
LogoutAllSessions(force);
DisposeSessions();
sessions_.Clear();
_sessions.Clear();

// FIXME StopSessionTimer();
// FIXME Session.UnregisterSessions(GetSessions());
Expand All @@ -298,7 +292,7 @@ public bool IsLoggedOn
{
get
{
return sessions_.Values.Any(session => session.IsLoggedOn);
return _sessions.Values.Any(session => session.IsLoggedOn);
}
}

Expand All @@ -308,7 +302,7 @@ public bool IsLoggedOn
/// <returns>the SessionIDs for the sessions managed by this acceptor</returns>
public HashSet<SessionID> GetSessionIDs()
{
return new HashSet<SessionID>(sessions_.Keys);
return new HashSet<SessionID>(_sessions.Keys);
}

/// <summary>
Expand All @@ -317,53 +311,52 @@ public HashSet<SessionID> GetSessionIDs()
/// <returns></returns>
public Dictionary<SessionID, IPEndPoint> GetAcceptorAddresses()
{
throw new System.NotImplementedException();
throw new NotImplementedException();
}

/// <summary>
/// Add new session as an ad-oc (dynamic) operation
/// </summary>
/// <param name="sessionID">ID of new session</param>
/// <param name="sessionId">ID of new session</param>
/// <param name="dict">config settings for new session</param>
/// <returns>true if session added successfully, false if session already exists or is not an acceptor</returns>
public bool AddSession(SessionID sessionID, Dictionary dict)
public bool AddSession(SessionID sessionId, Dictionary dict)
{
lock (settings_)
if (!settings_.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup
settings_.Set(sessionID, dict); // need to to this here to merge in default config settings
lock (_settings)
if (!_settings.Has(sessionId)) // session won't be in settings if ad-hoc creation after startup
_settings.Set(sessionId, dict); // need to to this here to merge in default config settings
else
return false; // session already exists

if (CreateSession(sessionID, dict))
if (CreateSession(sessionId, dict))
return true;

lock (settings_) // failed to create session, so remove from settings
settings_.Remove(sessionID);
lock (_settings) // failed to create session, so remove from settings
_settings.Remove(sessionId);
return false;
}


/// <summary>
/// Ad-hoc removal of an existing session
/// </summary>
/// <param name="sessionID">ID of session to be removed</param>
/// <param name="sessionId">ID of session to be removed</param>
/// <param name="terminateActiveSession">if true, force disconnection and removal of session even if it has an active connection</param>
/// <returns>true if session removed or not already present; false if could not be removed due to an active connection</returns>
public bool RemoveSession(SessionID sessionID, bool terminateActiveSession)
public bool RemoveSession(SessionID sessionId, bool terminateActiveSession)
{
Session session = null;
if (sessions_.TryGetValue(sessionID, out session))
if (_sessions.TryGetValue(sessionId, out var session))
{
if (session.IsLoggedOn && !terminateActiveSession)
return false;
session.Disconnect("Dynamic session removal");
foreach (AcceptorSocketDescriptor descriptor in socketDescriptorForAddress_.Values)
if (descriptor.RemoveSession(sessionID))
foreach (AcceptorSocketDescriptor descriptor in _socketDescriptorForAddress.Values)
if (descriptor.RemoveSession(sessionId))
break;
sessions_.Remove(sessionID);
_sessions.Remove(sessionId);
session.Dispose();
lock (settings_)
settings_.Remove(sessionID);
lock (_settings)
_settings.Remove(sessionId);
}
return true;
}
Expand Down

0 comments on commit 1a2cfe0

Please sign in to comment.