Skip to content

Commit

Permalink
Move connection profile into connection manager (#32858) (#32973)
Browse files Browse the repository at this point in the history
This is related to #31835. It moves the default connection profile into
the ConnectionManager class. The will allow us to have different
connection managers with different profiles.
  • Loading branch information
Tim-Brooks committed Aug 19, 2018
1 parent 62dee86 commit 5921353
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -150,7 +149,6 @@ private Bootstrap createBootstrap() {

bootstrap.handler(getClientChannelInitializer());

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));

Expand Down Expand Up @@ -178,14 +176,8 @@ private void createServerBootstrap(ProfileSettings profileSettings) {
String name = profileSettings.profileName;
if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
+ "receive_predictor[{}->{}]",
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
defaultConnectionProfile.getConnectTimeout(),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING),
receivePredictorMin, receivePredictorMax);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,13 @@ public void apply(Settings value, Settings current, Settings previous) {
TcpTransport.TCP_REUSE_ADDRESS_PROFILE,
TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE,
TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE,
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
TcpTransport.CONNECTIONS_PER_NODE_BULK,
TcpTransport.CONNECTIONS_PER_NODE_REG,
TcpTransport.CONNECTIONS_PER_NODE_STATE,
TcpTransport.CONNECTIONS_PER_NODE_PING,
TransportService.CONNECTIONS_PER_NODE_RECOVERY,
TransportService.CONNECTIONS_PER_NODE_BULK,
TransportService.CONNECTIONS_PER_NODE_REG,
TransportService.CONNECTIONS_PER_NODE_STATE,
TransportService.CONNECTIONS_PER_NODE_PING,
TransportService.TCP_CONNECT_TIMEOUT,
TcpTransport.PING_SCHEDULE,
TcpTransport.TCP_CONNECT_TIMEOUT,
NetworkService.NETWORK_SERVER,
TcpTransport.TCP_NO_DELAY,
TcpTransport.TCP_KEEP_ALIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,21 @@ public class ConnectionManager implements Closeable {
private final Transport transport;
private final ThreadPool threadPool;
private final TimeValue pingSchedule;
private final ConnectionProfile defaultProfile;
private final Lifecycle lifecycle = new Lifecycle();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
this.logger = Loggers.getLogger(getClass(), settings);
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.defaultProfile = defaultProfile;
this.lifecycle.moveToStarted();

if (pingSchedule.millis() > 0) {
Expand All @@ -84,13 +90,18 @@ public void removeListener(TransportConnectionListener listener) {
this.connectionListener.listeners.remove(listener);
}

public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
}

/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
*/
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
Expand All @@ -104,8 +115,8 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
boolean success = false;
try {
connection = transport.openConnection(node, connectionProfile);
connectionValidator.accept(connection, connectionProfile);
connection = transport.openConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -279,4 +290,23 @@ public void onNodeConnected(DiscoveryNode node) {
}
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -61,14 +62,35 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption
private final TimeValue connectTimeout;
private final TimeValue handshakeTimeout;

private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout)
{
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
TimeValue handshakeTimeout) {
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
this.handshakeTimeout = handshakeTimeout;
}

/**
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
*/
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
Objects.requireNonNull(fallbackProfile);
if (profile == null) {
return fallbackProfile;
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
return profile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
if (profile.getConnectTimeout() == null) {
builder.setConnectTimeout(fallbackProfile.getConnectTimeout());
}
if (profile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
}
return builder.build();
}
}

/**
* A builder to build a new {@link ConnectionProfile}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
builder.addConnections(0, // we don't want this to be used for anything else but search
TransportRequestOptions.Type.BULK,
Expand Down
72 changes: 8 additions & 64 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -134,30 +133,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// the scheduled internal ping interval setting, defaults to disabled (-1)
public static final Setting<TimeValue> PING_SCHEDULE =
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_NO_DELAY =
boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_KEEP_ALIVE =
boolSetting("transport.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE,
Setting.Property.NodeScope);
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE,
Setting.Property.NodeScope);
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);


public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",
Expand Down Expand Up @@ -211,7 +196,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final boolean compress;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;

private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final CounterMetric numHandshakes = new CounterMetric();
Expand All @@ -235,7 +219,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
this.features = new String[0];
Expand All @@ -259,25 +242,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}

@Override
protected void doStart() {
}
Expand Down Expand Up @@ -454,41 +418,21 @@ public void sendRequest(long requestId, String action, TransportRequest request,
}
}

/**
* takes a {@link ConnectionProfile} that have been passed as a parameter to the public methods
* and resolves it to a fully specified (i.e., no nulls) profile
*/
static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile connectionProfile,
ConnectionProfile defaultConnectionProfile) {
Objects.requireNonNull(defaultConnectionProfile);
if (connectionProfile == null) {
return defaultConnectionProfile;
} else if (connectionProfile.getConnectTimeout() != null && connectionProfile.getHandshakeTimeout() != null) {
return connectionProfile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(connectionProfile);
if (connectionProfile.getConnectTimeout() == null) {
builder.setConnectTimeout(defaultConnectionProfile.getConnectTimeout());
}
if (connectionProfile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(defaultConnectionProfile.getHandshakeTimeout());
}
return builder.build();
}
}

protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
return resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
// This allows transport implementations to potentially override specific connection profiles. This
// primarily exists for the test implementations.
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
return connectionProfile;
}

@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
boolean success = false;
NodeChannels nodeChannels = null;
connectionProfile = resolveConnectionProfile(connectionProfile);
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
Expand Down
Loading

0 comments on commit 5921353

Please sign in to comment.