Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain connection low watermark in NetworkClient #1186

Merged
merged 3 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion HEADER
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
52 changes: 0 additions & 52 deletions ambry-api/src/main/java/com.github.ambry/config/MetricsConfig.java

This file was deleted.

7 changes: 6 additions & 1 deletion ambry-api/src/main/java/com.github.ambry/network/Port.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.ambry.network;

/**
* Represents a port containing port number and {@PortType}
* Represents a port containing port number and {@link PortType}
*/
public class Port {
private final int port;
Expand Down Expand Up @@ -51,4 +51,9 @@ public boolean equals(Object o) {
Port p = (Port) o;
return p.port == port && p.type.equals(type);
}

@Override
public int hashCode() {
return Integer.hashCode(port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package com.github.ambry.cloud;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.github.ambry.clustermap.ClusterAgentsFactory;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.DataNodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@
package com.github.ambry.network;

import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.utils.Pair;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -26,28 +32,27 @@
*/

class ConnectionTracker {
private final HashMap<String, HostPortPoolManager> hostPortToPoolManager;
private final HashMap<String, HostPortPoolManager> connectionIdToPoolManager;
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionTracker.class);
private final HashMap<Pair<String, Port>, HostPortPoolManager> hostPortToPoolManager = new HashMap<>();
private final HashMap<String, HostPortPoolManager> connectionIdToPoolManager = new HashMap<>();
private final HashSet<HostPortPoolManager> poolManagersBelowMinActiveConnections = new HashSet<>();
private int totalManagedConnectionsCount = 0;
private final int maxConnectionsPerPortPlainText;
private final int maxConnectionsPerPortSsl;
private int totalManagedConnectionsCount;

/**
* Instantiates a ConnectionTracker
* @param maxConnectionsPerPortPlainText the connection pool limit for plain text connections to a (host, port)
* @param maxConnectionsPerPortSsl the connection pool limit for ssl connections to a (host, port)
*/
ConnectionTracker(int maxConnectionsPerPortPlainText, int maxConnectionsPerPortSsl) {
hostPortToPoolManager = new HashMap<String, HostPortPoolManager>();
connectionIdToPoolManager = new HashMap<String, HostPortPoolManager>();
totalManagedConnectionsCount = 0;
this.maxConnectionsPerPortPlainText = maxConnectionsPerPortPlainText;
this.maxConnectionsPerPortSsl = maxConnectionsPerPortSsl;
}

/**
* Returns true if a new connection may be created for the given hostPort, that is if the number of connections for
* the given hostPort has not reached the pool limit.
* the given (host, port) has not reached the pool limit.
* @param host the host associated with this check.
* @param port the port associated with this check.
* @param dataNodeId the {@link DataNodeId} associated with this check.
Expand All @@ -59,22 +64,74 @@ boolean mayCreateNewConnection(String host, Port port, DataNodeId dataNodeId) {
}

/**
* Start tracking a new connection id associated with the given host and port. Note that this connection will not
* be made available for checking out until a {@link #checkInConnection(String)} is called on it.
* Configure the connection tracker to keep a specified percentage of connections to this data node ready for use.
* @param dataNodeId the {@link DataNodeId} to configure the connection pool for.
* @param minActiveConnectionsPercentage percentage of max connections to this data node that should be kept ready
* for use. The minimum connection number will be rounded down to the nearest
* whole number.
*/
void setMinimumActiveConnectionsPercentage(DataNodeId dataNodeId, int minActiveConnectionsPercentage) {
HostPortPoolManager hostPortPoolManager =
getHostPortPoolManager(dataNodeId.getHostname(), dataNodeId.getPortToConnectTo(), dataNodeId);
hostPortPoolManager.setMinActiveConnections(minActiveConnectionsPercentage * hostPortPoolManager.poolLimit / 100);
if (!hostPortPoolManager.hasMinActiveConnections()) {
poolManagersBelowMinActiveConnections.add(hostPortPoolManager);
}
}

/**
* For (host, port) pools that are below the minimum number of active connections, initiate new connections to each
* host until they meet it.
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @return the number of connections initiated.
*/
int replenishConnections(ConnectionFactory connectionFactory) {
int connectionsInitiated = 0;
Iterator<HostPortPoolManager> iter = poolManagersBelowMinActiveConnections.iterator();
while (iter.hasNext()) {
HostPortPoolManager poolManager = iter.next();
try {
while (!poolManager.hasMinActiveConnections()) {
String connId = connectionFactory.connect(poolManager.host, poolManager.port);
poolManager.incrementPoolCount();
connectionIdToPoolManager.put(connId, poolManager);
totalManagedConnectionsCount++;
connectionsInitiated++;
}
iter.remove();
} catch (IOException e) {
LOGGER.warn("Encountered exception while replenishing connections to {}:{}.", poolManager.host,
poolManager.port.getPort(), e);
}
}
return connectionsInitiated;
}

/**
* Initiate a new connection using the provided {@link ConnectionFactory} and start tracking a new connection id
* associated with the given host and port. Note that this connection will not be made available for checking out
* until a {@link #checkInConnection} is called on it.
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @param host the host to which this connection belongs.
* @param port the port on the host to which this connection belongs.
* @param connId the connection id of the connection.
* @return the connection id of the connection returned by {@link ConnectionFactory#connect}.
* @param dataNodeId the {@link DataNodeId} associated with this connection
*/
void startTrackingInitiatedConnection(String host, Port port, String connId, DataNodeId dataNodeId) {
String connectAndTrack(ConnectionFactory connectionFactory, String host, Port port, DataNodeId dataNodeId)
throws IOException {
String connId = connectionFactory.connect(host, port);
HostPortPoolManager hostPortPoolManager = getHostPortPoolManager(host, port, dataNodeId);
hostPortPoolManager.incrementPoolCount();
connectionIdToPoolManager.put(connId, hostPortPoolManager);
totalManagedConnectionsCount++;
if (hostPortPoolManager.hasMinActiveConnections()) {
poolManagersBelowMinActiveConnections.remove(hostPortPoolManager);
}
return connId;
}

/**
* Attempts to check out an existing connection to the hostPort provided, or returns null if none available.
* Attempts to check out an existing connection to the (host, port) provided, or returns null if none available.
* @param host The host to connect to.
* @param port The port on the host to connect to.
* @param dataNodeId The {@link DataNodeId} to connect to.
Expand Down Expand Up @@ -110,6 +167,9 @@ DataNodeId removeConnection(String connectionId) {
}
DataNodeId dataNodeId = hostPortPoolManager.removeConnection(connectionId);
totalManagedConnectionsCount--;
if (!hostPortPoolManager.hasMinActiveConnections()) {
poolManagersBelowMinActiveConnections.add(hostPortPoolManager);
}
return dataNodeId;
}

Expand All @@ -122,7 +182,7 @@ int getTotalConnectionsCount() {
}

/**
* Return the total available connections across all hostPortPoolManagers.
* Return the total available connections across all {@link HostPortPoolManager}s.
* @return total established and available connections.
*/
int getAvailableConnectionsCount() {
Expand All @@ -142,58 +202,56 @@ int getAvailableConnectionsCount() {
* @return the HostPortPoolManager for the associated (host, port) pair.
*/
private HostPortPoolManager getHostPortPoolManager(String host, Port port, DataNodeId dataNodeId) {
String lookupStr = host + ":" + port.getPort();
HostPortPoolManager poolManager = hostPortToPoolManager.get(lookupStr);
if (poolManager == null) {
poolManager = new HostPortPoolManager(
port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText, dataNodeId);
hostPortToPoolManager.put(lookupStr, poolManager);
}
return poolManager;
}

/**
* Returns max number of connections allowed for a plain text port.
*/
int getMaxConnectionsPerPortPlainText() {
return maxConnectionsPerPortPlainText;
}

/**
* Returns max number of connections allowed for a ssl port.
*/
int getMaxConnectionsPerPortSsl() {
return maxConnectionsPerPortSsl;
return hostPortToPoolManager.computeIfAbsent(new Pair<>(host, port), k -> new HostPortPoolManager(host, port,
port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText, dataNodeId));
}

/**
* HostPortPoolManager manages all the connections to a specific (host,
* port) pair. The {@link ConnectionTracker} creates one for every (host, port) pair it knows of.
* {@link HostPortPoolManager} manages all the connections to a specific (host, port) pair. The
* {@link ConnectionTracker} creates one for every (host, port) pair it knows of.
*/
private class HostPortPoolManager {
private final int maxConnectionsToHostPort;
private final LinkedList<String> availableConnections;
private static class HostPortPoolManager {
private final LinkedList<String> availableConnections = new LinkedList<>();
private final DataNodeId dataNodeId;
private int poolCount;
private int minActiveConnections = 0;
private int poolCount = 0;
final String host;
final Port port;
final int poolLimit;

/**
* Instantiate a HostPortPoolManager
* @param poolLimit the max connections allowed for this hostPort.
* @param host the destination host for this pool.
* @param port the destination port for this pool.
* @param poolLimit the max connections allowed for this (host, port).
* @param dataNodeId the {@link DataNodeId} associated with this {@link HostPortPoolManager}.
*/
HostPortPoolManager(int poolLimit, DataNodeId dataNodeId) {
poolCount = 0;
maxConnectionsToHostPort = poolLimit;
availableConnections = new LinkedList<String>();
HostPortPoolManager(String host, Port port, int poolLimit, DataNodeId dataNodeId) {
this.host = host;
this.port = port;
this.poolLimit = poolLimit;
this.dataNodeId = dataNodeId;
}

/**
* Return true if this manager has reached the pool limit.
* @return true if this manager has at least {@link #minActiveConnections}.
*/
boolean hasMinActiveConnections() {
return poolCount >= minActiveConnections;
}

/**
* @return true if this manager has reached the pool limit
*/
boolean hasReachedPoolLimit() {
return poolCount == maxConnectionsToHostPort;
return poolCount == poolLimit;
}

/**
* @param minActiveConnections the minimum number of connections to this (host, port) to keep ready for use.
*/
void setMinActiveConnections(int minActiveConnections) {
this.minActiveConnections = Math.min(poolLimit, minActiveConnections);
}

/**
Expand Down Expand Up @@ -232,11 +290,26 @@ DataNodeId removeConnection(String connectionId) {
}

/**
* Return the number of available connections to this hostPort
* Return the number of available connections to this (host, port)
* @return number of available connections
*/
int getAvailableConnectionsCount() {
return availableConnections.size();
}
}

/**
* Used to signal to the networking layer to initiate a new connection.
*/
interface ConnectionFactory {
/**
* Initiate a new connection to the given (host, port). This method can return before the connection is ready for
* sending requests. Once it is ready, {@link #checkInConnection} should be called.
* @param host the hostname to connect to.
* @param port the port to connect to.
* @return a unique connection ID to represent the (future) connection.
* @throws IOException if the connection could not be initiated.
*/
String connect(String host, Port port) throws IOException;
}
}
Loading