Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce separate thread pool for establishing Initiator connections #255

Merged
merged 7 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public final Product build() throws ConfigError {
if (logFactory == null) {
logFactory = new ScreenLogFactory(settings);
}

if (messageFactory == null) {
messageFactory = new DefaultMessageFactory();
}
return doBuild();
}

Expand Down
10 changes: 7 additions & 3 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ public class Session implements Closeable {
// @GuardedBy(this)
private final SessionState state;

private boolean enabled;
/*
* Controls whether it is possible to log on to this Session (if Acceptor)
* or if Logon is sent out respectively (if Initiator).
*/
private volatile boolean enabled;

private final Object responderLock = new Object(); // unique instance
// @GuardedBy(responderLock)
Expand Down Expand Up @@ -729,7 +733,7 @@ public void logon() {
setEnabled(true);
}

private synchronized void setEnabled(boolean enabled) {
private void setEnabled(boolean enabled) {
this.enabled = enabled;
}

Expand Down Expand Up @@ -786,7 +790,7 @@ public void logout(String reason) {
*
* @return true if session is enabled, false otherwise.
*/
public synchronized boolean isEnabled() {
public boolean isEnabled() {
return enabled;
}

Expand Down
3 changes: 1 addition & 2 deletions quickfixj-core/src/main/java/quickfix/SessionSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ public Properties getSessionProperties(SessionID sessionID) throws ConfigError {
* Returns the defaults for the session-level settings.
*
* @return the default properties
* @throws ConfigError
*/
public Properties getDefaultProperties() {
try {
Expand Down Expand Up @@ -250,7 +249,7 @@ public int getInt(String key) throws ConfigError, FieldConvertError {
* @param sessionID the session ID
* @param key the settings key
* @return the long integer value for the setting
* @throws ConfigError configurion error, probably a missing setting.
* @throws ConfigError configuration error, probably a missing setting.
* @throws FieldConvertError error during field type conversion.
*/
public int getInt(SessionID sessionID, String key) throws ConfigError, FieldConvertError {
Expand Down
10 changes: 9 additions & 1 deletion quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SocketInitiator extends AbstractSocketInitiator {

private SocketInitiator(Builder builder) throws ConfigError {
super(builder.application, builder.messageStoreFactory, builder.settings,
builder.logFactory, builder.messageFactory);
builder.logFactory, builder.messageFactory, builder.numReconnectThreads);

if (builder.queueCapacity >= 0) {
eventHandlingStrategy
Expand All @@ -49,9 +49,17 @@ public static Builder newBuilder() {
}

public static final class Builder extends AbstractSessionConnectorBuilder<Builder, SocketInitiator> {

int numReconnectThreads = 3;

private Builder() {
super(Builder.class);
}

public Builder withReconnectThreads(int numReconnectThreads) throws ConfigError {
this.numReconnectThreads = numReconnectThreads;
return this;
}

@Override
protected SocketInitiator doBuild() throws ConfigError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class SessionConnector implements Connector {
private final Map<SessionID, Session> sessions = new ConcurrentHashMap<>();
private final SessionSettings settings;
private final SessionFactory sessionFactory;
private final static ScheduledExecutorService scheduledExecutorService = Executors
private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
.newSingleThreadScheduledExecutor(new QFTimerThreadFactory());
private ScheduledFuture<?> sessionTimerFuture;
private IoFilterChainBuilder ioFilterChainBuilder;
Expand Down Expand Up @@ -318,7 +318,7 @@ protected void startSessionTimer() {
if (shortLivedExecutor != null) {
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
}
sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L,
sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this change, but we probably don't want to queue up timer tasks if they take too long for some reason.

.scheduleWithFixedDelay(reconnectTask, 0, 1, TimeUnit.SECONDS);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconnectTasks timed out after a maximum of 2000ms. That was the reason to have a bigger pool to service these requests, see #254 .
What do you suggest to prevent piling up these tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to IoSessionInitiator. As a rule of thumb scheduleWithFixedDelay is better in majority of cases than scheduleAtFixedRate. It respects the delay, but it only reschedules when the tasks finishes - this does not allow bursts if for some reason task is slow.

TimeUnit.MILLISECONDS);
log.info("SessionTimer started");
}
Expand All @@ -339,10 +339,11 @@ boolean checkSessionTimerRunning() {
}

protected ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
return SCHEDULED_EXECUTOR;
}

private class SessionTimerTask implements Runnable {
@Override
public void run() {
try {
for (Session session : sessions.values()) {
Expand Down Expand Up @@ -411,6 +412,7 @@ void await() throws InterruptedException {

private static class QFTimerThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "QFJ Timer");
thread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Abstract base class for socket initiators.
Expand All @@ -59,6 +64,8 @@ public abstract class AbstractSocketInitiator extends SessionConnector implement

protected final Logger log = LoggerFactory.getLogger(getClass());
private final Set<IoSessionInitiator> initiators = new HashSet<>();
private final ScheduledExecutorService scheduledReconnectExecutor;
public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-";

protected AbstractSocketInitiator(Application application,
MessageStoreFactory messageStoreFactory, SessionSettings settings,
Expand All @@ -69,9 +76,27 @@ protected AbstractSocketInitiator(Application application,

protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory)
throws ConfigError {
this(settings, sessionFactory, 0);
}

protected AbstractSocketInitiator(Application application,
MessageStoreFactory messageStoreFactory, SessionSettings settings,
LogFactory logFactory, MessageFactory messageFactory, int numReconnectThreads) throws ConfigError {
this(settings, new DefaultSessionFactory(application, messageStoreFactory, logFactory,
messageFactory), numReconnectThreads);
}

protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory, int numReconnectThreads)
throws ConfigError {
super(settings, sessionFactory);
IoBuffer.setAllocator(new SimpleBufferAllocator());
IoBuffer.setUseDirectBuffer(false);
if (numReconnectThreads > 0) {
scheduledReconnectExecutor = Executors.newScheduledThreadPool(numReconnectThreads, new QFScheduledReconnectThreadFactory());
((ThreadPoolExecutor) scheduledReconnectExecutor).setMaximumPoolSize(numReconnectThreads);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting maximumPoolSize to initialPoolSize ensures that numReconnectThreads threads are available to service tasks. Otherwise new threads only get created when the task queue is full.

} else {
scheduledReconnectExecutor = null;
}
}

protected void createSessionInitiators()
Expand Down Expand Up @@ -145,9 +170,10 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
}

ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService());
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
socketAddresses, localAddress, reconnectingIntervals,
getScheduledExecutorService(), networkingOptions,
scheduledExecutorService, networkingOptions,
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);

Expand Down Expand Up @@ -309,4 +335,18 @@ public int getQueueSize() {
}

protected abstract EventHandlingStrategy getEventHandlingStrategy();


private static class QFScheduledReconnectThreadFactory implements ThreadFactory {

private static final AtomicInteger COUNTER = new AtomicInteger(1);

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, QFJ_RECONNECT_THREAD_PREFIX + COUNTER.getAndIncrement());
thread.setDaemon(true);
return thread;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ private SSLFilter installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBu
return sslFilter;
}

public synchronized void run() {
@Override
public void run() {
resetIoConnector();
try {
if (connectFuture == null) {
Expand Down