diff --git a/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java index f5048cffbe..2b7e5faa23 100644 --- a/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java +++ b/quickfixj-core/src/main/java/quickfix/AbstractSessionConnectorBuilder.java @@ -66,7 +66,9 @@ public final Product build() throws ConfigError { if (logFactory == null) { logFactory = new ScreenLogFactory(settings); } - + if (messageFactory == null) { + messageFactory = new DefaultMessageFactory(); + } return doBuild(); } diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 897cc9bdf5..738a541be5 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -372,7 +372,11 @@ public class Session implements Closeable { // @GuardedBy(this) private final SessionState state; - private boolean enabled; + /* + * Controls whether it is possible to log on to this Session (if Acceptor) + * or if Logon is sent out respectively (if Initiator). + */ + private volatile boolean enabled; private final Object responderLock = new Object(); // unique instance // @GuardedBy(responderLock) @@ -729,7 +733,7 @@ public void logon() { setEnabled(true); } - private synchronized void setEnabled(boolean enabled) { + private void setEnabled(boolean enabled) { this.enabled = enabled; } @@ -786,7 +790,7 @@ public void logout(String reason) { * * @return true if session is enabled, false otherwise. */ - public synchronized boolean isEnabled() { + public boolean isEnabled() { return enabled; } diff --git a/quickfixj-core/src/main/java/quickfix/SessionSettings.java b/quickfixj-core/src/main/java/quickfix/SessionSettings.java index 8c83e87270..8814e4eba6 100644 --- a/quickfixj-core/src/main/java/quickfix/SessionSettings.java +++ b/quickfixj-core/src/main/java/quickfix/SessionSettings.java @@ -192,7 +192,6 @@ public Properties getSessionProperties(SessionID sessionID) throws ConfigError { * Returns the defaults for the session-level settings. * * @return the default properties - * @throws ConfigError */ public Properties getDefaultProperties() { try { @@ -250,7 +249,7 @@ public int getInt(String key) throws ConfigError, FieldConvertError { * @param sessionID the session ID * @param key the settings key * @return the long integer value for the setting - * @throws ConfigError configurion error, probably a missing setting. + * @throws ConfigError configuration error, probably a missing setting. * @throws FieldConvertError error during field type conversion. */ public int getInt(SessionID sessionID, String key) throws ConfigError, FieldConvertError { diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index 3502ae1467..f7c7389cbd 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -33,7 +33,7 @@ public class SocketInitiator extends AbstractSocketInitiator { private SocketInitiator(Builder builder) throws ConfigError { super(builder.application, builder.messageStoreFactory, builder.settings, - builder.logFactory, builder.messageFactory); + builder.logFactory, builder.messageFactory, builder.numReconnectThreads); if (builder.queueCapacity >= 0) { eventHandlingStrategy @@ -49,9 +49,17 @@ public static Builder newBuilder() { } public static final class Builder extends AbstractSessionConnectorBuilder { + + int numReconnectThreads = 3; + private Builder() { super(Builder.class); } + + public Builder withReconnectThreads(int numReconnectThreads) throws ConfigError { + this.numReconnectThreads = numReconnectThreads; + return this; + } @Override protected SocketInitiator doBuild() throws ConfigError { diff --git a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java index abd9c082ce..8975996606 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java @@ -71,7 +71,7 @@ public abstract class SessionConnector implements Connector { private final Map sessions = new ConcurrentHashMap<>(); private final SessionSettings settings; private final SessionFactory sessionFactory; - private final static ScheduledExecutorService scheduledExecutorService = Executors + private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors .newSingleThreadScheduledExecutor(new QFTimerThreadFactory()); private ScheduledFuture sessionTimerFuture; private IoFilterChainBuilder ioFilterChainBuilder; @@ -318,7 +318,7 @@ protected void startSessionTimer() { if (shortLivedExecutor != null) { timerTask = new DelegatingTask(timerTask, shortLivedExecutor); } - sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L, + sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L, TimeUnit.MILLISECONDS); log.info("SessionTimer started"); } @@ -339,10 +339,11 @@ boolean checkSessionTimerRunning() { } protected ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorService; + return SCHEDULED_EXECUTOR; } private class SessionTimerTask implements Runnable { + @Override public void run() { try { for (Session session : sessions.values()) { @@ -411,6 +412,7 @@ void await() throws InterruptedException { private static class QFTimerThreadFactory implements ThreadFactory { + @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "QFJ Timer"); thread.setDaemon(true); diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index fd3ea0aa20..0704c9f864 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -51,6 +51,11 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; /** * Abstract base class for socket initiators. @@ -59,6 +64,8 @@ public abstract class AbstractSocketInitiator extends SessionConnector implement protected final Logger log = LoggerFactory.getLogger(getClass()); private final Set initiators = new HashSet<>(); + private final ScheduledExecutorService scheduledReconnectExecutor; + public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-"; protected AbstractSocketInitiator(Application application, MessageStoreFactory messageStoreFactory, SessionSettings settings, @@ -69,9 +76,27 @@ protected AbstractSocketInitiator(Application application, protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError { + this(settings, sessionFactory, 0); + } + + protected AbstractSocketInitiator(Application application, + MessageStoreFactory messageStoreFactory, SessionSettings settings, + LogFactory logFactory, MessageFactory messageFactory, int numReconnectThreads) throws ConfigError { + this(settings, new DefaultSessionFactory(application, messageStoreFactory, logFactory, + messageFactory), numReconnectThreads); + } + + protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory, int numReconnectThreads) + throws ConfigError { super(settings, sessionFactory); IoBuffer.setAllocator(new SimpleBufferAllocator()); IoBuffer.setUseDirectBuffer(false); + if (numReconnectThreads > 0) { + scheduledReconnectExecutor = Executors.newScheduledThreadPool(numReconnectThreads, new QFScheduledReconnectThreadFactory()); + ((ThreadPoolExecutor) scheduledReconnectExecutor).setMaximumPoolSize(numReconnectThreads); + } else { + scheduledReconnectExecutor = null; + } } protected void createSessionInitiators() @@ -145,9 +170,10 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); } + ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService()); final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session, socketAddresses, localAddress, reconnectingIntervals, - getScheduledExecutorService(), networkingOptions, + scheduledExecutorService, networkingOptions, getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); @@ -309,4 +335,18 @@ public int getQueueSize() { } protected abstract EventHandlingStrategy getEventHandlingStrategy(); + + + private static class QFScheduledReconnectThreadFactory implements ThreadFactory { + + private static final AtomicInteger COUNTER = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, QFJ_RECONNECT_THREAD_PREFIX + COUNTER.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + } + } diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java index e70e6325d8..e96f56a70f 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java @@ -196,7 +196,8 @@ private SSLFilter installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBu return sslFilter; } - public synchronized void run() { + @Override + public void run() { resetIoConnector(); try { if (connectFuture == null) {