Skip to content

Commit

Permalink
Maintain connection low watermark in NetworkClient (#1186)
Browse files Browse the repository at this point in the history
Maintain connection low watermark in NetworkClient
  • Loading branch information
cgtz authored and dharju committed Jun 20, 2019
1 parent cf53025 commit 65eef32
Show file tree
Hide file tree
Showing 25 changed files with 467 additions and 384 deletions.
2 changes: 1 addition & 1 deletion HEADER
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
52 changes: 0 additions & 52 deletions ambry-api/src/main/java/com.github.ambry/config/MetricsConfig.java

This file was deleted.

7 changes: 6 additions & 1 deletion ambry-api/src/main/java/com.github.ambry/network/Port.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.ambry.network;

/**
* Represents a port containing port number and {@PortType}
* Represents a port containing port number and {@link PortType}
*/
public class Port {
private final int port;
Expand Down Expand Up @@ -51,4 +51,9 @@ public boolean equals(Object o) {
Port p = (Port) o;
return p.port == port && p.type.equals(type);
}

@Override
public int hashCode() {
return Integer.hashCode(port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package com.github.ambry.cloud;

import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.github.ambry.clustermap.ClusterAgentsFactory;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.DataNodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@
package com.github.ambry.network;

import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.utils.Pair;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -26,28 +32,27 @@
*/

class ConnectionTracker {
private final HashMap<String, HostPortPoolManager> hostPortToPoolManager;
private final HashMap<String, HostPortPoolManager> connectionIdToPoolManager;
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionTracker.class);
private final HashMap<Pair<String, Port>, HostPortPoolManager> hostPortToPoolManager = new HashMap<>();
private final HashMap<String, HostPortPoolManager> connectionIdToPoolManager = new HashMap<>();
private final HashSet<HostPortPoolManager> poolManagersBelowMinActiveConnections = new HashSet<>();
private int totalManagedConnectionsCount = 0;
private final int maxConnectionsPerPortPlainText;
private final int maxConnectionsPerPortSsl;
private int totalManagedConnectionsCount;

/**
* Instantiates a ConnectionTracker
* @param maxConnectionsPerPortPlainText the connection pool limit for plain text connections to a (host, port)
* @param maxConnectionsPerPortSsl the connection pool limit for ssl connections to a (host, port)
*/
ConnectionTracker(int maxConnectionsPerPortPlainText, int maxConnectionsPerPortSsl) {
hostPortToPoolManager = new HashMap<String, HostPortPoolManager>();
connectionIdToPoolManager = new HashMap<String, HostPortPoolManager>();
totalManagedConnectionsCount = 0;
this.maxConnectionsPerPortPlainText = maxConnectionsPerPortPlainText;
this.maxConnectionsPerPortSsl = maxConnectionsPerPortSsl;
}

/**
* Returns true if a new connection may be created for the given hostPort, that is if the number of connections for
* the given hostPort has not reached the pool limit.
* the given (host, port) has not reached the pool limit.
* @param host the host associated with this check.
* @param port the port associated with this check.
* @param dataNodeId the {@link DataNodeId} associated with this check.
Expand All @@ -59,22 +64,74 @@ boolean mayCreateNewConnection(String host, Port port, DataNodeId dataNodeId) {
}

/**
* Start tracking a new connection id associated with the given host and port. Note that this connection will not
* be made available for checking out until a {@link #checkInConnection(String)} is called on it.
* Configure the connection tracker to keep a specified percentage of connections to this data node ready for use.
* @param dataNodeId the {@link DataNodeId} to configure the connection pool for.
* @param minActiveConnectionsPercentage percentage of max connections to this data node that should be kept ready
* for use. The minimum connection number will be rounded down to the nearest
* whole number.
*/
void setMinimumActiveConnectionsPercentage(DataNodeId dataNodeId, int minActiveConnectionsPercentage) {
HostPortPoolManager hostPortPoolManager =
getHostPortPoolManager(dataNodeId.getHostname(), dataNodeId.getPortToConnectTo(), dataNodeId);
hostPortPoolManager.setMinActiveConnections(minActiveConnectionsPercentage * hostPortPoolManager.poolLimit / 100);
if (!hostPortPoolManager.hasMinActiveConnections()) {
poolManagersBelowMinActiveConnections.add(hostPortPoolManager);
}
}

/**
* For (host, port) pools that are below the minimum number of active connections, initiate new connections to each
* host until they meet it.
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @return the number of connections initiated.
*/
int replenishConnections(ConnectionFactory connectionFactory) {
int connectionsInitiated = 0;
Iterator<HostPortPoolManager> iter = poolManagersBelowMinActiveConnections.iterator();
while (iter.hasNext()) {
HostPortPoolManager poolManager = iter.next();
try {
while (!poolManager.hasMinActiveConnections()) {
String connId = connectionFactory.connect(poolManager.host, poolManager.port);
poolManager.incrementPoolCount();
connectionIdToPoolManager.put(connId, poolManager);
totalManagedConnectionsCount++;
connectionsInitiated++;
}
iter.remove();
} catch (IOException e) {
LOGGER.warn("Encountered exception while replenishing connections to {}:{}.", poolManager.host,
poolManager.port.getPort(), e);
}
}
return connectionsInitiated;
}

/**
* Initiate a new connection using the provided {@link ConnectionFactory} and start tracking a new connection id
* associated with the given host and port. Note that this connection will not be made available for checking out
* until a {@link #checkInConnection} is called on it.
* @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer.
* @param host the host to which this connection belongs.
* @param port the port on the host to which this connection belongs.
* @param connId the connection id of the connection.
* @return the connection id of the connection returned by {@link ConnectionFactory#connect}.
* @param dataNodeId the {@link DataNodeId} associated with this connection
*/
void startTrackingInitiatedConnection(String host, Port port, String connId, DataNodeId dataNodeId) {
String connectAndTrack(ConnectionFactory connectionFactory, String host, Port port, DataNodeId dataNodeId)
throws IOException {
String connId = connectionFactory.connect(host, port);
HostPortPoolManager hostPortPoolManager = getHostPortPoolManager(host, port, dataNodeId);
hostPortPoolManager.incrementPoolCount();
connectionIdToPoolManager.put(connId, hostPortPoolManager);
totalManagedConnectionsCount++;
if (hostPortPoolManager.hasMinActiveConnections()) {
poolManagersBelowMinActiveConnections.remove(hostPortPoolManager);
}
return connId;
}

/**
* Attempts to check out an existing connection to the hostPort provided, or returns null if none available.
* Attempts to check out an existing connection to the (host, port) provided, or returns null if none available.
* @param host The host to connect to.
* @param port The port on the host to connect to.
* @param dataNodeId The {@link DataNodeId} to connect to.
Expand Down Expand Up @@ -110,6 +167,9 @@ DataNodeId removeConnection(String connectionId) {
}
DataNodeId dataNodeId = hostPortPoolManager.removeConnection(connectionId);
totalManagedConnectionsCount--;
if (!hostPortPoolManager.hasMinActiveConnections()) {
poolManagersBelowMinActiveConnections.add(hostPortPoolManager);
}
return dataNodeId;
}

Expand All @@ -122,7 +182,7 @@ int getTotalConnectionsCount() {
}

/**
* Return the total available connections across all hostPortPoolManagers.
* Return the total available connections across all {@link HostPortPoolManager}s.
* @return total established and available connections.
*/
int getAvailableConnectionsCount() {
Expand All @@ -142,58 +202,56 @@ int getAvailableConnectionsCount() {
* @return the HostPortPoolManager for the associated (host, port) pair.
*/
private HostPortPoolManager getHostPortPoolManager(String host, Port port, DataNodeId dataNodeId) {
String lookupStr = host + ":" + port.getPort();
HostPortPoolManager poolManager = hostPortToPoolManager.get(lookupStr);
if (poolManager == null) {
poolManager = new HostPortPoolManager(
port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText, dataNodeId);
hostPortToPoolManager.put(lookupStr, poolManager);
}
return poolManager;
}

/**
* Returns max number of connections allowed for a plain text port.
*/
int getMaxConnectionsPerPortPlainText() {
return maxConnectionsPerPortPlainText;
}

/**
* Returns max number of connections allowed for a ssl port.
*/
int getMaxConnectionsPerPortSsl() {
return maxConnectionsPerPortSsl;
return hostPortToPoolManager.computeIfAbsent(new Pair<>(host, port), k -> new HostPortPoolManager(host, port,
port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText, dataNodeId));
}

/**
* HostPortPoolManager manages all the connections to a specific (host,
* port) pair. The {@link ConnectionTracker} creates one for every (host, port) pair it knows of.
* {@link HostPortPoolManager} manages all the connections to a specific (host, port) pair. The
* {@link ConnectionTracker} creates one for every (host, port) pair it knows of.
*/
private class HostPortPoolManager {
private final int maxConnectionsToHostPort;
private final LinkedList<String> availableConnections;
private static class HostPortPoolManager {
private final LinkedList<String> availableConnections = new LinkedList<>();
private final DataNodeId dataNodeId;
private int poolCount;
private int minActiveConnections = 0;
private int poolCount = 0;
final String host;
final Port port;
final int poolLimit;

/**
* Instantiate a HostPortPoolManager
* @param poolLimit the max connections allowed for this hostPort.
* @param host the destination host for this pool.
* @param port the destination port for this pool.
* @param poolLimit the max connections allowed for this (host, port).
* @param dataNodeId the {@link DataNodeId} associated with this {@link HostPortPoolManager}.
*/
HostPortPoolManager(int poolLimit, DataNodeId dataNodeId) {
poolCount = 0;
maxConnectionsToHostPort = poolLimit;
availableConnections = new LinkedList<String>();
HostPortPoolManager(String host, Port port, int poolLimit, DataNodeId dataNodeId) {
this.host = host;
this.port = port;
this.poolLimit = poolLimit;
this.dataNodeId = dataNodeId;
}

/**
* Return true if this manager has reached the pool limit.
* @return true if this manager has at least {@link #minActiveConnections}.
*/
boolean hasMinActiveConnections() {
return poolCount >= minActiveConnections;
}

/**
* @return true if this manager has reached the pool limit
*/
boolean hasReachedPoolLimit() {
return poolCount == maxConnectionsToHostPort;
return poolCount == poolLimit;
}

/**
* @param minActiveConnections the minimum number of connections to this (host, port) to keep ready for use.
*/
void setMinActiveConnections(int minActiveConnections) {
this.minActiveConnections = Math.min(poolLimit, minActiveConnections);
}

/**
Expand Down Expand Up @@ -232,11 +290,26 @@ DataNodeId removeConnection(String connectionId) {
}

/**
* Return the number of available connections to this hostPort
* Return the number of available connections to this (host, port)
* @return number of available connections
*/
int getAvailableConnectionsCount() {
return availableConnections.size();
}
}

/**
* Used to signal to the networking layer to initiate a new connection.
*/
interface ConnectionFactory {
/**
* Initiate a new connection to the given (host, port). This method can return before the connection is ready for
* sending requests. Once it is ready, {@link #checkInConnection} should be called.
* @param host the hostname to connect to.
* @param port the port to connect to.
* @return a unique connection ID to represent the (future) connection.
* @throws IOException if the connection could not be initiated.
*/
String connect(String host, Port port) throws IOException;
}
}
Loading

0 comments on commit 65eef32

Please sign in to comment.