diff --git a/HEADER b/HEADER index 0bffdb759a..66a6b15da3 100644 --- a/HEADER +++ b/HEADER @@ -1,4 +1,4 @@ -/** +/* * Copyright 2016 LinkedIn Corp. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/ambry-api/src/main/java/com.github.ambry/config/MetricsConfig.java b/ambry-api/src/main/java/com.github.ambry/config/MetricsConfig.java deleted file mode 100644 index 20320c553c..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/config/MetricsConfig.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.config; - -import java.util.ArrayList; -import java.util.List; - - -/** - * The configs for metrics - */ -public class MetricsConfig { - - /** - * The metrics reporter factory classes. This would be a comma separated list of metrics reporter factory - * classes - */ - @Config("metrics.reporters.factory.classes") - @Default("com.github.ambry.metrics.JmxReporterFactory") - public final String metricsFactoryClasses; - - /** - * Returns a list of all metrics names from the config file. Useful for - * getting individual metrics. - */ - public List getMetricsReporterFactoryClassNames() { - List trimmedFactoryClasses = new ArrayList(); - if (!metricsFactoryClasses.equals("")) { - String[] factoryClasses = metricsFactoryClasses.split(","); - for (String factoryClass : factoryClasses) { - trimmedFactoryClasses.add(factoryClass.trim()); - } - } - return trimmedFactoryClasses; - } - - public MetricsConfig(VerifiableProperties verifiableProperties) { - metricsFactoryClasses = verifiableProperties.getString("metrics.reporters.factory.classes", - "com.github.ambry.metrics.JmxReporterFactory"); - } -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/Port.java b/ambry-api/src/main/java/com.github.ambry/network/Port.java index 7d7879cad8..bc58896122 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/Port.java +++ b/ambry-api/src/main/java/com.github.ambry/network/Port.java @@ -14,7 +14,7 @@ package com.github.ambry.network; /** - * Represents a port containing port number and {@PortType} + * Represents a port containing port number and {@link PortType} */ public class Port { private final int port; @@ -51,4 +51,9 @@ public boolean equals(Object o) { Port p = (Port) o; return p.port == port && p.type.equals(type); } + + @Override + public int hashCode() { + return Integer.hashCode(port); + } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java index 0c4c1670d6..8adc1af18e 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java @@ -13,8 +13,8 @@ */ package com.github.ambry.cloud; -import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.jmx.JmxReporter; import com.github.ambry.clustermap.ClusterAgentsFactory; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.DataNodeId; diff --git a/ambry-network/src/main/java/com.github.ambry.network/ConnectionTracker.java b/ambry-network/src/main/java/com.github.ambry.network/ConnectionTracker.java index 614cef278d..b18a63dbde 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/ConnectionTracker.java +++ b/ambry-network/src/main/java/com.github.ambry.network/ConnectionTracker.java @@ -14,8 +14,14 @@ package com.github.ambry.network; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.utils.Pair; +import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -26,11 +32,13 @@ */ class ConnectionTracker { - private final HashMap hostPortToPoolManager; - private final HashMap connectionIdToPoolManager; + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionTracker.class); + private final HashMap, HostPortPoolManager> hostPortToPoolManager = new HashMap<>(); + private final HashMap connectionIdToPoolManager = new HashMap<>(); + private final HashSet poolManagersBelowMinActiveConnections = new HashSet<>(); + private int totalManagedConnectionsCount = 0; private final int maxConnectionsPerPortPlainText; private final int maxConnectionsPerPortSsl; - private int totalManagedConnectionsCount; /** * Instantiates a ConnectionTracker @@ -38,16 +46,13 @@ class ConnectionTracker { * @param maxConnectionsPerPortSsl the connection pool limit for ssl connections to a (host, port) */ ConnectionTracker(int maxConnectionsPerPortPlainText, int maxConnectionsPerPortSsl) { - hostPortToPoolManager = new HashMap(); - connectionIdToPoolManager = new HashMap(); - totalManagedConnectionsCount = 0; this.maxConnectionsPerPortPlainText = maxConnectionsPerPortPlainText; this.maxConnectionsPerPortSsl = maxConnectionsPerPortSsl; } /** * Returns true if a new connection may be created for the given hostPort, that is if the number of connections for - * the given hostPort has not reached the pool limit. + * the given (host, port) has not reached the pool limit. * @param host the host associated with this check. * @param port the port associated with this check. * @param dataNodeId the {@link DataNodeId} associated with this check. @@ -59,22 +64,74 @@ boolean mayCreateNewConnection(String host, Port port, DataNodeId dataNodeId) { } /** - * Start tracking a new connection id associated with the given host and port. Note that this connection will not - * be made available for checking out until a {@link #checkInConnection(String)} is called on it. + * Configure the connection tracker to keep a specified percentage of connections to this data node ready for use. + * @param dataNodeId the {@link DataNodeId} to configure the connection pool for. + * @param minActiveConnectionsPercentage percentage of max connections to this data node that should be kept ready + * for use. The minimum connection number will be rounded down to the nearest + * whole number. + */ + void setMinimumActiveConnectionsPercentage(DataNodeId dataNodeId, int minActiveConnectionsPercentage) { + HostPortPoolManager hostPortPoolManager = + getHostPortPoolManager(dataNodeId.getHostname(), dataNodeId.getPortToConnectTo(), dataNodeId); + hostPortPoolManager.setMinActiveConnections(minActiveConnectionsPercentage * hostPortPoolManager.poolLimit / 100); + if (!hostPortPoolManager.hasMinActiveConnections()) { + poolManagersBelowMinActiveConnections.add(hostPortPoolManager); + } + } + + /** + * For (host, port) pools that are below the minimum number of active connections, initiate new connections to each + * host until they meet it. + * @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer. + * @return the number of connections initiated. + */ + int replenishConnections(ConnectionFactory connectionFactory) { + int connectionsInitiated = 0; + Iterator iter = poolManagersBelowMinActiveConnections.iterator(); + while (iter.hasNext()) { + HostPortPoolManager poolManager = iter.next(); + try { + while (!poolManager.hasMinActiveConnections()) { + String connId = connectionFactory.connect(poolManager.host, poolManager.port); + poolManager.incrementPoolCount(); + connectionIdToPoolManager.put(connId, poolManager); + totalManagedConnectionsCount++; + connectionsInitiated++; + } + iter.remove(); + } catch (IOException e) { + LOGGER.warn("Encountered exception while replenishing connections to {}:{}.", poolManager.host, + poolManager.port.getPort(), e); + } + } + return connectionsInitiated; + } + + /** + * Initiate a new connection using the provided {@link ConnectionFactory} and start tracking a new connection id + * associated with the given host and port. Note that this connection will not be made available for checking out + * until a {@link #checkInConnection} is called on it. + * @param connectionFactory the {@link ConnectionFactory} for interfacing with the networking layer. * @param host the host to which this connection belongs. * @param port the port on the host to which this connection belongs. - * @param connId the connection id of the connection. + * @return the connection id of the connection returned by {@link ConnectionFactory#connect}. * @param dataNodeId the {@link DataNodeId} associated with this connection */ - void startTrackingInitiatedConnection(String host, Port port, String connId, DataNodeId dataNodeId) { + String connectAndTrack(ConnectionFactory connectionFactory, String host, Port port, DataNodeId dataNodeId) + throws IOException { + String connId = connectionFactory.connect(host, port); HostPortPoolManager hostPortPoolManager = getHostPortPoolManager(host, port, dataNodeId); hostPortPoolManager.incrementPoolCount(); connectionIdToPoolManager.put(connId, hostPortPoolManager); totalManagedConnectionsCount++; + if (hostPortPoolManager.hasMinActiveConnections()) { + poolManagersBelowMinActiveConnections.remove(hostPortPoolManager); + } + return connId; } /** - * Attempts to check out an existing connection to the hostPort provided, or returns null if none available. + * Attempts to check out an existing connection to the (host, port) provided, or returns null if none available. * @param host The host to connect to. * @param port The port on the host to connect to. * @param dataNodeId The {@link DataNodeId} to connect to. @@ -110,6 +167,9 @@ DataNodeId removeConnection(String connectionId) { } DataNodeId dataNodeId = hostPortPoolManager.removeConnection(connectionId); totalManagedConnectionsCount--; + if (!hostPortPoolManager.hasMinActiveConnections()) { + poolManagersBelowMinActiveConnections.add(hostPortPoolManager); + } return dataNodeId; } @@ -122,7 +182,7 @@ int getTotalConnectionsCount() { } /** - * Return the total available connections across all hostPortPoolManagers. + * Return the total available connections across all {@link HostPortPoolManager}s. * @return total established and available connections. */ int getAvailableConnectionsCount() { @@ -142,58 +202,56 @@ int getAvailableConnectionsCount() { * @return the HostPortPoolManager for the associated (host, port) pair. */ private HostPortPoolManager getHostPortPoolManager(String host, Port port, DataNodeId dataNodeId) { - String lookupStr = host + ":" + port.getPort(); - HostPortPoolManager poolManager = hostPortToPoolManager.get(lookupStr); - if (poolManager == null) { - poolManager = new HostPortPoolManager( - port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText, dataNodeId); - hostPortToPoolManager.put(lookupStr, poolManager); - } - return poolManager; - } - - /** - * Returns max number of connections allowed for a plain text port. - */ - int getMaxConnectionsPerPortPlainText() { - return maxConnectionsPerPortPlainText; - } - - /** - * Returns max number of connections allowed for a ssl port. - */ - int getMaxConnectionsPerPortSsl() { - return maxConnectionsPerPortSsl; + return hostPortToPoolManager.computeIfAbsent(new Pair<>(host, port), k -> new HostPortPoolManager(host, port, + port.getPortType() == PortType.SSL ? maxConnectionsPerPortSsl : maxConnectionsPerPortPlainText, dataNodeId)); } /** - * HostPortPoolManager manages all the connections to a specific (host, - * port) pair. The {@link ConnectionTracker} creates one for every (host, port) pair it knows of. + * {@link HostPortPoolManager} manages all the connections to a specific (host, port) pair. The + * {@link ConnectionTracker} creates one for every (host, port) pair it knows of. */ - private class HostPortPoolManager { - private final int maxConnectionsToHostPort; - private final LinkedList availableConnections; + private static class HostPortPoolManager { + private final LinkedList availableConnections = new LinkedList<>(); private final DataNodeId dataNodeId; - private int poolCount; + private int minActiveConnections = 0; + private int poolCount = 0; + final String host; + final Port port; + final int poolLimit; /** * Instantiate a HostPortPoolManager - * @param poolLimit the max connections allowed for this hostPort. + * @param host the destination host for this pool. + * @param port the destination port for this pool. + * @param poolLimit the max connections allowed for this (host, port). * @param dataNodeId the {@link DataNodeId} associated with this {@link HostPortPoolManager}. */ - HostPortPoolManager(int poolLimit, DataNodeId dataNodeId) { - poolCount = 0; - maxConnectionsToHostPort = poolLimit; - availableConnections = new LinkedList(); + HostPortPoolManager(String host, Port port, int poolLimit, DataNodeId dataNodeId) { + this.host = host; + this.port = port; + this.poolLimit = poolLimit; this.dataNodeId = dataNodeId; } /** - * Return true if this manager has reached the pool limit. + * @return true if this manager has at least {@link #minActiveConnections}. + */ + boolean hasMinActiveConnections() { + return poolCount >= minActiveConnections; + } + + /** * @return true if this manager has reached the pool limit */ boolean hasReachedPoolLimit() { - return poolCount == maxConnectionsToHostPort; + return poolCount == poolLimit; + } + + /** + * @param minActiveConnections the minimum number of connections to this (host, port) to keep ready for use. + */ + void setMinActiveConnections(int minActiveConnections) { + this.minActiveConnections = Math.min(poolLimit, minActiveConnections); } /** @@ -232,11 +290,26 @@ DataNodeId removeConnection(String connectionId) { } /** - * Return the number of available connections to this hostPort + * Return the number of available connections to this (host, port) * @return number of available connections */ int getAvailableConnectionsCount() { return availableConnections.size(); } } + + /** + * Used to signal to the networking layer to initiate a new connection. + */ + interface ConnectionFactory { + /** + * Initiate a new connection to the given (host, port). This method can return before the connection is ready for + * sending requests. Once it is ready, {@link #checkInConnection} should be called. + * @param host the hostname to connect to. + * @param port the port to connect to. + * @return a unique connection ID to represent the (future) connection. + * @throws IOException if the connection could not be initiated. + */ + String connect(String host, Port port) throws IOException; + } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java index afa3a47f2a..84334239e4 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java @@ -106,6 +106,8 @@ public List sendAndPoll(List requestInfos, int pollTi pendingRequests.add(new RequestMetadata(time.milliseconds(), requestInfo, clientNetworkRequestMetrics)); } List sends = prepareSends(responseInfoList); + int connectionsInitiated = connectionTracker.replenishConnections(this::connect); + networkMetrics.connectionReplenished.inc(connectionsInitiated); selector.poll(pollTimeoutMs, sends); handleSelectorEvents(responseInfoList); } catch (Exception e) { @@ -167,10 +169,7 @@ private List prepareSends(List responseInfoList) { networkMetrics.connectionNotAvailable.inc(); if (requestMetadata.pendingConnectionId == null) { if (connectionTracker.mayCreateNewConnection(host, port, replicaId.getDataNodeId())) { - connId = - selector.connect(new InetSocketAddress(host, port.getPort()), networkConfig.socketSendBufferBytes, - networkConfig.socketReceiveBufferBytes, port.getPortType()); - connectionTracker.startTrackingInitiatedConnection(host, port, connId, replicaId.getDataNodeId()); + connId = connectionTracker.connectAndTrack(this::connect, host, port, replicaId.getDataNodeId()); requestMetadata.pendingConnectionId = connId; pendingConnectionsToAssociatedRequests.put(connId, requestMetadata); logger.trace("Initiated a connection to host {} port {} ", host, port); @@ -205,39 +204,26 @@ private List prepareSends(List responseInfoList) { * If a connection breaks after successfully established, it's counted as a failedConnection. This impact the * counting in the method, but this is tolerable as other good/bad connections can be handled in next selector.poll(). * If a connection established after this time window of timeForWarmUp, it can be handled in next selector.poll(). + *

+ * This will also set the minimum number of active connections for each of the data nodes. This means that the + * NetworkClient will attempt to keep a percentage of connections ready for use at all times by initiating extra + * connections in {@link NetworkClient#sendAndPoll} when a pool drops below this number. * @param dataNodeIds warm up target nodes. * @param connectionWarmUpPercentagePerDataNode percentage of max connections would like to establish in the warmup. - * @param timeForWarmUp max time to wait for connections' establish. + * @param timeForWarmUp max time to wait for connections' establish in milliseconds. * @param responseInfoList records responses from disconnected connections. * @return number of connections established successfully. */ public int warmUpConnections(List dataNodeIds, int connectionWarmUpPercentagePerDataNode, long timeForWarmUp, List responseInfoList) { - int expectedConnections = 0; logger.info("Connection warm up start."); if (dataNodeIds.size() == 0) { return 0; } - int numberOfConnections = connectionWarmUpPercentagePerDataNode == 0 ? 0 : Math.max(1, - connectionWarmUpPercentagePerDataNode * ( - dataNodeIds.get(0).getPortToConnectTo().getPortType() == PortType.PLAINTEXT - ? connectionTracker.getMaxConnectionsPerPortPlainText() - : connectionTracker.getMaxConnectionsPerPortSsl()) / 100); - for (DataNodeId dataNodeId : dataNodeIds) { - for (int i = 0; i < numberOfConnections; i++) { - try { - String connId = selector.connect( - new InetSocketAddress(dataNodeId.getHostname(), dataNodeId.getPortToConnectTo().getPort()), - networkConfig.socketSendBufferBytes, networkConfig.socketReceiveBufferBytes, - dataNodeId.getPortToConnectTo().getPortType()); - connectionTracker.startTrackingInitiatedConnection(dataNodeId.getHostname(), dataNodeId.getPortToConnectTo(), - connId, dataNodeId); - expectedConnections++; - } catch (IOException e) { - logger.error("Received exception while warming up connection: ", e); - } - } - } + dataNodeIds.forEach(dataNodeId -> connectionTracker.setMinimumActiveConnectionsPercentage(dataNodeId, + connectionWarmUpPercentagePerDataNode)); + int expectedConnections = connectionTracker.replenishConnections(this::connect); + long startTime = System.currentTimeMillis(); int successfulConnections = 0; int failedConnections = 0; @@ -260,6 +246,18 @@ public int warmUpConnections(List dataNodeIds, int connectionWarmUpP return successfulConnections; } + /** + * Start creating a connection to a host using {@link Selector#connect}. + * @param host the hostname to connect to. + * @param port the port to connect to. + * @return the connection ID. + * @throws IOException if DNS resolution fails on the hostname or if the server is down + */ + private String connect(String host, Port port) throws IOException { + return selector.connect(new InetSocketAddress(host, port.getPort()), networkConfig.socketSendBufferBytes, + networkConfig.socketReceiveBufferBytes, port.getPortType()); + } + /** * Handle Selector events after a poll: newly established connections, new disconnections, newly completed sends and * receives. diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 16fe9571b6..316fb6e8b8 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -92,6 +92,7 @@ public class NetworkMetrics { public final Counter connectionNotAvailable; public final Counter connectionReachLimit; public final Counter connectionDisconnected; + public final Counter connectionReplenished; public final Counter networkClientIOError; public final Counter networkClientException; private List networkClientPendingRequestList; @@ -152,6 +153,7 @@ public NetworkMetrics(MetricRegistry registry) { connectionNotAvailable = registry.counter(MetricRegistry.name(NetworkClient.class, "ConnectionNotAvailable")); connectionReachLimit = registry.counter(MetricRegistry.name(NetworkClient.class, "ConnectionReachLimit")); connectionDisconnected = registry.counter(MetricRegistry.name(NetworkClient.class, "ConnectionDisconnected")); + connectionReplenished = registry.counter(MetricRegistry.name(NetworkClient.class, "ConnectionReplenished")); networkClientIOError = registry.counter(MetricRegistry.name(NetworkClient.class, "NetworkClientIOError")); networkClientException = registry.counter(MetricRegistry.name(NetworkClient.class, "NetworkClientException")); diff --git a/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java b/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java index cd8ad2d277..d4867133d0 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java @@ -109,7 +109,6 @@ public boolean isOpen() { /** * Sends a SSL close message and closes socketChannel. - * @throws IOException if an I/O error occurs */ @Override public void close() { @@ -138,7 +137,7 @@ public void close() { socketChannel.close(); } catch (IOException ie) { metrics.selectorCloseSocketErrorCount.inc(); - logger.warn("Failed to send SSL close message ", ie); + logger.debug("Failed to send SSL close message ", ie); } key.attach(null); key.cancel(); diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 0ea7f06f68..46a740e9b0 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -78,7 +78,7 @@ public class Selector implements Selectable { private final Set unreadyConnections; private final Time time; private final NetworkMetrics metrics; - private final AtomicLong IdGenerator; + private final AtomicLong idGenerator; private final AtomicLong numActiveConnections; private final SSLFactory sslFactory; @@ -88,19 +88,19 @@ public class Selector implements Selectable { public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws IOException { this.nioSelector = java.nio.channels.Selector.open(); this.time = time; - this.keyMap = new HashMap(); - this.completedSends = new ArrayList(); - this.completedReceives = new ArrayList(); - this.connected = new ArrayList(); - this.disconnected = new ArrayList(); + this.keyMap = new HashMap<>(); + this.completedSends = new ArrayList<>(); + this.completedReceives = new ArrayList<>(); + this.connected = new ArrayList<>(); + this.disconnected = new ArrayList<>(); this.closedConnections = new ArrayList<>(); this.metrics = metrics; - this.IdGenerator = new AtomicLong(0); + this.sslFactory = sslFactory; + idGenerator = new AtomicLong(0); numActiveConnections = new AtomicLong(0); unreadyConnections = new HashSet<>(); metrics.registerSelectorActiveConnections(numActiveConnections); metrics.registerSelectorUnreadyConnections(unreadyConnections); - this.sslFactory = sslFactory; } /** @@ -114,18 +114,8 @@ private String generateConnectionId(SocketChannel channel) { int localPort = socket.getLocalPort(); String remoteHost = socket.getInetAddress().getHostAddress(); int remotePort = socket.getPort(); - long connectionIdSuffix = IdGenerator.getAndIncrement(); - StringBuilder connectionIdBuilder = new StringBuilder(); - connectionIdBuilder.append(localHost) - .append(":") - .append(localPort) - .append("-") - .append(remoteHost) - .append(":") - .append(remotePort) - .append("_") - .append(connectionIdSuffix); - return connectionIdBuilder.toString(); + long connectionIdSuffix = idGenerator.getAndIncrement(); + return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "_" + connectionIdSuffix; } /** @@ -194,7 +184,7 @@ public String register(SocketChannel channel, PortType portType) throws IOExcept createTransmission(connectionId, key, socket.getInetAddress().getHostName(), socket.getPort(), portType, SSLFactory.Mode.SERVER); } catch (IOException e) { - logger.error("IOException on transmission creation " + e); + logger.error("IOException on transmission creation ", e); socket.close(); channel.close(); throw e; diff --git a/ambry-network/src/test/java/com.github.ambry.network/ConnectionTrackerTest.java b/ambry-network/src/test/java/com.github.ambry.network/ConnectionTrackerTest.java index f294672391..ab930e23b0 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/ConnectionTrackerTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/ConnectionTrackerTest.java @@ -19,6 +19,7 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.utils.MockTime; import com.github.ambry.utils.Time; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -74,7 +75,7 @@ public void testConnectionTrackerInstantiation() { * Tests honoring of pool totalConnectionsCounts. */ @Test - public void testConnectionTracker() { + public void testConnectionTracker() throws IOException { connectionTracker = new ConnectionTracker(routerConfig.routerScalingUnitMaxConnectionsPerPortPlainText, routerConfig.routerScalingUnitMaxConnectionsPerPortSsl); // When no connections were ever made to a host:port, connectionTracker should return null, but @@ -89,8 +90,7 @@ public void testConnectionTracker() { do { String connId = connectionTracker.checkOutConnection("host1", plainTextPort, dataNodeId1); if (connId == null && connectionTracker.mayCreateNewConnection("host1", plainTextPort, dataNodeId1)) { - connId = mockNewConnection("host1", plainTextPort); - connectionTracker.startTrackingInitiatedConnection("host1", plainTextPort, connId, dataNodeId1); + connId = connectionTracker.connectAndTrack(this::mockNewConnection, "host1", plainTextPort, dataNodeId1); } else { done = true; } @@ -103,8 +103,7 @@ public void testConnectionTracker() { do { String connId = connectionTracker.checkOutConnection("host2", sslPort, dataNodeId2); if (connId == null && connectionTracker.mayCreateNewConnection("host2", sslPort, dataNodeId2)) { - connId = mockNewConnection("host2", sslPort); - connectionTracker.startTrackingInitiatedConnection("host2", sslPort, connId, dataNodeId2); + connId = connectionTracker.connectAndTrack(this::mockNewConnection, "host2", sslPort, dataNodeId1); } else { done = true; } @@ -194,8 +193,7 @@ public void testConnectionTracker() { connectionTracker.checkOutConnection("host1", plainTextPort, dataNodeId1)); Assert.assertTrue("It should be okay to initiate a new connection", connectionTracker.mayCreateNewConnection("host1", plainTextPort, dataNodeId1)); - connectionTracker.startTrackingInitiatedConnection("host1", plainTextPort, - mockNewConnection("host1", plainTextPort), dataNodeId1); + connectionTracker.connectAndTrack(this::mockNewConnection, "host1", plainTextPort, dataNodeId1); totalConnectionsCount++; } Assert.assertNull("There should not be any available connections to check out", @@ -205,6 +203,77 @@ public void testConnectionTracker() { assertCounts(totalConnectionsCount, availableCount); } + /** + * Test behavior of {@link ConnectionTracker#setMinimumActiveConnectionsPercentage} and + * {@link ConnectionTracker#replenishConnections}. + * @throws IOException + */ + @Test + public void testReplenishConnections() { + connectionTracker = new ConnectionTracker(routerConfig.routerScalingUnitMaxConnectionsPerPortPlainText, + routerConfig.routerScalingUnitMaxConnectionsPerPortSsl); + // When no connections were ever made to a host:port, connectionTracker should return null, but + // initiate connections. + int minActiveConnectionsCount = 0; + int totalConnectionsCount = 0; + int availableCount = 0; + + MockDataNodeId dataNodeId1 = + new MockDataNodeId("host1", Collections.singletonList(plainTextPort), Collections.emptyList(), "DC1"); + MockDataNodeId dataNodeId2 = + new MockDataNodeId("host2", Arrays.asList(plainTextPort, sslPort), Collections.emptyList(), "DC1"); + dataNodeId2.setSslEnabledDataCenters(Collections.singletonList("DC1")); + connectionTracker.setMinimumActiveConnectionsPercentage(dataNodeId1, 50); + minActiveConnectionsCount += 50 * routerConfig.routerScalingUnitMaxConnectionsPerPortPlainText / 100; + connectionTracker.setMinimumActiveConnectionsPercentage(dataNodeId2, 200); + minActiveConnectionsCount += routerConfig.routerScalingUnitMaxConnectionsPerPortSsl; + + // call replenishConnections to warm up connections + assertCounts(totalConnectionsCount, availableCount); + connectionTracker.replenishConnections(this::mockNewConnection); + totalConnectionsCount += minActiveConnectionsCount; + assertCounts(totalConnectionsCount, availableCount); + List newConnections = getNewlyEstablishedConnections(); + newConnections.forEach(connectionTracker::checkInConnection); + availableCount += minActiveConnectionsCount; + assertCounts(totalConnectionsCount, availableCount); + Assert.assertTrue(connectionTracker.mayCreateNewConnection("host1", plainTextPort, dataNodeId1)); + Assert.assertFalse(connectionTracker.mayCreateNewConnection("host2", sslPort, dataNodeId2)); + + // remove 2 connections + newConnections.stream().limit(2).forEach(connectionTracker::removeConnection); + totalConnectionsCount -= 2; + availableCount -= 2; + assertCounts(totalConnectionsCount, availableCount); + + // replenish connections again + connectionTracker.replenishConnections(this::mockNewConnection); + totalConnectionsCount += 2; + assertCounts(totalConnectionsCount, availableCount); + newConnections = getNewlyEstablishedConnections(); + newConnections.forEach(connectionTracker::checkInConnection); + availableCount += 2; + assertCounts(totalConnectionsCount, availableCount); + + // check out connections + String conn1 = connectionTracker.checkOutConnection("host1", plainTextPort, dataNodeId1); + Assert.assertNotNull(conn1); + String conn2 = connectionTracker.checkOutConnection("host2", sslPort, dataNodeId2); + Assert.assertNotNull(conn2); + availableCount -= 2; + assertCounts(totalConnectionsCount, availableCount); + + // destroy one and return the other and then replenish + connectionTracker.removeConnection(conn1); + connectionTracker.checkInConnection(conn2); + totalConnectionsCount -= 1; + availableCount += 1; + assertCounts(totalConnectionsCount, availableCount); + connectionTracker.replenishConnections(this::mockNewConnection); + totalConnectionsCount += 1; + assertCounts(totalConnectionsCount, availableCount); + } + private void assertCounts(int totalConnectionsCount, int availableCount) { Assert.assertEquals("total connections should match", totalConnectionsCount, connectionTracker.getTotalConnectionsCount()); @@ -220,7 +289,7 @@ private void assertCounts(int totalConnectionsCount, int availableCount) { */ private String mockNewConnection(String host, Port port) { // mocks selector connect. - String connId = host + Integer.toString(port.getPort()) + connStringIndex++; + String connId = host + port.getPort() + connStringIndex++; connIds.add(connId); return connId; } @@ -231,7 +300,7 @@ private String mockNewConnection(String host, Port port) { */ private List getNewlyEstablishedConnections() { ArrayList toReturnList = connIds; - connIds = new ArrayList(); + connIds = new ArrayList<>(); return toReturnList; } } diff --git a/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java b/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java index 4e546f3a30..230e3bc968 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/NetworkClientTest.java @@ -32,7 +32,10 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.Assert; import org.junit.Test; @@ -41,9 +44,12 @@ * Test the {@link NetworkClient} */ public class NetworkClientTest { - private final int CHECKOUT_TIMEOUT_MS = 1000; - private final int MAX_PORTS_PLAIN_TEXT = 3; - private final int MAX_PORTS_SSL = 3; + private static final int CHECKOUT_TIMEOUT_MS = 1000; + private static final int MAX_PORTS_PLAIN_TEXT = 3; + private static final int MAX_PORTS_SSL = 4; + public static final int POLL_TIMEOUT_MS = 100; + public static final int TIME_FOR_WARM_UP_MS = 2000; + private final Time time; private MockSelector selector; @@ -106,7 +112,7 @@ public NetworkClientTest() throws IOException { @Test public void testWarmUpConnectionsSslAndPlainText() { // warm up plain-text connections with SSL enabled nodes. - doTestWarmUpConnections(localSslDataNodes, MAX_PORTS_SSL, PortType.PLAINTEXT); + doTestWarmUpConnections(localSslDataNodes, MAX_PORTS_PLAIN_TEXT, PortType.PLAINTEXT); // enable SSL to local DC. for (DataNodeId dataNodeId : localSslDataNodes) { ((MockDataNodeId) dataNodeId).setSslEnabledDataCenters(Collections.singletonList( @@ -114,8 +120,6 @@ public void testWarmUpConnectionsSslAndPlainText() { } // warm up SSL connections.` doTestWarmUpConnections(localSslDataNodes, MAX_PORTS_SSL, PortType.SSL); - // warm up plain-text connections with plain-text nodes. - doTestWarmUpConnections(localPlainTextDataNodes, MAX_PORTS_PLAIN_TEXT, PortType.PLAINTEXT); } /** @@ -126,9 +130,10 @@ public void testWarmUpConnectionsFailedAll() { selector.setState(MockSelectorState.FailConnectionInitiationOnPoll); List responseInfos = new ArrayList<>(); Assert.assertEquals("Connection count is not expected", 0, - networkClient.warmUpConnections(localPlainTextDataNodes, 2, 2000, responseInfos)); + networkClient.warmUpConnections(localPlainTextDataNodes, 100, TIME_FOR_WARM_UP_MS, responseInfos)); // verify that the connections to all local nodes get disconnected - Assert.assertEquals("Mismatch in timeout responses", 3, responseInfos.size()); + Assert.assertEquals("Mismatch in timeout responses", localPlainTextDataNodes.size() * MAX_PORTS_PLAIN_TEXT, + responseInfos.size()); selector.setState(MockSelectorState.Good); } @@ -149,7 +154,7 @@ public void testBasicSendAndPoll() { int responseCount = 0; do { - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); @@ -165,7 +170,7 @@ public void testBasicSendAndPoll() { } while (requestCount > responseCount); Assert.assertEquals("Should receive only as many responses as there were requests", requestCount, responseCount); - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); Assert.assertEquals("No responses are expected at this time", 0, responseInfoList.size()); } @@ -182,7 +187,7 @@ public void testConnectionUnavailable() throws InterruptedException { int requestCount = requestInfoList.size(); int responseCount = 0; - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); // The first sendAndPoll() initiates the connections. So, after the selector poll, new connections // would have been established, but no new responses or disconnects, so the NetworkClient should not have been @@ -192,7 +197,7 @@ public void testConnectionUnavailable() throws InterruptedException { time.sleep(CHECKOUT_TIMEOUT_MS + 1); do { - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { NetworkClientErrorCode error = responseInfo.getError(); @@ -204,7 +209,7 @@ public void testConnectionUnavailable() throws InterruptedException { responseCount++; } } while (requestCount > responseCount); - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); Assert.assertEquals("No responses are expected at this time", 0, responseInfoList.size()); } @@ -224,7 +229,7 @@ public void testNetworkError() { // set beBad so that requests end up failing due to "network error". selector.setState(MockSelectorState.DisconnectOnSend); do { - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { NetworkClientErrorCode error = responseInfo.getError(); @@ -236,21 +241,21 @@ public void testNetworkError() { responseCount++; } } while (requestCount > responseCount); - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); Assert.assertEquals("No responses are expected at this time", 0, responseInfoList.size()); selector.setState(MockSelectorState.Good); } /** - * Test exceptions in sendAndPoll(). + * Test exceptions thrown in sendAndPoll(). */ @Test public void testExceptionOnConnect() { List requestInfoList = new ArrayList<>(); // test that IllegalStateException would be thrown if replica is not specified in RequestInfo requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(-1), null)); - networkClient.sendAndPoll(requestInfoList, 100); + networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals("NetworkClientException should increase because replica is null in request", 1, networkMetrics.networkClientException.getCount()); requestInfoList.clear(); @@ -259,7 +264,7 @@ public void testExceptionOnConnect() { requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(4), replicaOnSslNode)); selector.setState(MockSelectorState.ThrowExceptionOnConnect); try { - networkClient.sendAndPoll(requestInfoList, 100); + networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); } catch (Exception e) { Assert.fail("If selector throws on connect, sendAndPoll() should not throw"); } @@ -280,32 +285,32 @@ public void testConnectionInitializationFailures() { selector.setState(MockSelectorState.IdlePoll); Assert.assertEquals(0, selector.connectCallCount()); // this sendAndPoll() should initiate a connect(). - List responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + List responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); // At this time a single connection would have been initiated for the above request. Assert.assertEquals(1, selector.connectCallCount()); Assert.assertEquals(0, responseInfoList.size()); requestInfoList.clear(); // Subsequent calls to sendAndPoll() should not initiate any connections. - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals(1, selector.connectCallCount()); Assert.assertEquals(0, responseInfoList.size()); // Another connection should get initialized if a new request comes in for the same destination. requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(1), replicaOnSslNode)); - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals(2, selector.connectCallCount()); Assert.assertEquals(0, responseInfoList.size()); requestInfoList.clear(); // Subsequent calls to sendAndPoll() should not initiate any more connections. - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals(2, selector.connectCallCount()); Assert.assertEquals(0, responseInfoList.size()); // Once connect failure kicks in, the pending requests should be failed immediately. selector.setState(MockSelectorState.FailConnectionInitiationOnPoll); - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals(2, selector.connectCallCount()); Assert.assertEquals(2, responseInfoList.size()); Assert.assertEquals(NetworkClientErrorCode.NetworkError, responseInfoList.get(0).getError()); @@ -313,6 +318,52 @@ public void testConnectionInitializationFailures() { responseInfoList.clear(); } + /** + * Test that connections get replenished in {@link NetworkClient#sendAndPoll(List, int)} to maintain the minimum + * number of active connections. + */ + @Test + public void testConnectionReplenishment() { + AtomicInteger nextCorrelationId = new AtomicInteger(1); + Function> requestGen = numRequests -> IntStream.range(0, numRequests) + .mapToObj( + i -> new RequestInfo(sslHost, sslPort, new MockSend(nextCorrelationId.getAndIncrement()), replicaOnSslNode)) + .collect(Collectors.toList()); + // 1 host x 1 port x 3 connections x 100% + int warmUpPercentage = 100; + AtomicInteger expectedConnectCalls = new AtomicInteger(warmUpPercentage * 3 /100); + Runnable checkConnectCalls = () -> Assert.assertEquals(expectedConnectCalls.get(), selector.connectCallCount()); + + networkClient.warmUpConnections(Collections.singletonList(replicaOnSslNode.getDataNodeId()), warmUpPercentage, + TIME_FOR_WARM_UP_MS, new ArrayList<>()); + checkConnectCalls.run(); + + selector.setState(MockSelectorState.Good); + // this sendAndPoll() should use one of the pre-warmed connections + List responseInfoList = networkClient.sendAndPoll(requestGen.apply(3), POLL_TIMEOUT_MS); + checkConnectCalls.run(); + Assert.assertEquals(3, responseInfoList.size()); + + // this sendAndPoll() should disconnect two of the pre-warmed connections + selector.setState(MockSelectorState.DisconnectOnSend); + responseInfoList = networkClient.sendAndPoll(requestGen.apply(2), POLL_TIMEOUT_MS); + checkConnectCalls.run(); + Assert.assertEquals(2, responseInfoList.size()); + + // the two connections lost in the previous sendAndPoll should be replenished + selector.setState(MockSelectorState.Good); + responseInfoList = networkClient.sendAndPoll(requestGen.apply(1), POLL_TIMEOUT_MS); + expectedConnectCalls.addAndGet(2); + checkConnectCalls.run(); + Assert.assertEquals(1, responseInfoList.size()); + + // this call should use the existing connections in the pool + selector.setState(MockSelectorState.Good); + responseInfoList = networkClient.sendAndPoll(requestGen.apply(3), POLL_TIMEOUT_MS); + checkConnectCalls.run(); + Assert.assertEquals(3, responseInfoList.size()); + } + /** * Test the following case: * Connection C1 gets initiated in the context of Request R1 @@ -331,13 +382,13 @@ public void testOutOfOrderConnectionEstablishment() { List requestInfoList = new ArrayList<>(); requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(2), replicaOnSslNode)); requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(3), replicaOnSslNode)); - List responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + List responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); requestInfoList.clear(); Assert.assertEquals(2, selector.connectCallCount()); Assert.assertEquals(0, responseInfoList.size()); // Invoke sendAndPoll() again, the Connection C1 will get disconnected - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); // Verify that no more connection is initiated in network client Assert.assertEquals(2, selector.connectCallCount()); // There should be 2 responses, one is success from Request R1, another is from Connection C1 timeout. @@ -352,7 +403,7 @@ public void testOutOfOrderConnectionEstablishment() { responseInfoList.clear(); // Invoke sendAndPoll() again, Request R2 will get sent via Connection C2 - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals(2, selector.connectCallCount()); Assert.assertEquals(1, responseInfoList.size()); Assert.assertNull(responseInfoList.get(0).getError()); @@ -372,14 +423,14 @@ public void testPendingRequestTimeOutWithDisconnection() throws Exception { List requestInfoList = new ArrayList<>(); selector.setState(MockSelectorState.IdlePoll); requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(4), replicaOnSslNode)); - List responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + List responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.assertEquals(0, responseInfoList.size()); requestInfoList.clear(); // now make the selector return any attempted connections as disconnections. selector.setState(MockSelectorState.FailConnectionInitiationOnPoll); // increment the time so that the request times out in the next cycle. time.sleep(2000); - responseInfoList = networkClient.sendAndPoll(requestInfoList, 100); + responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); // the size of responseInfoList should be 2 because first response comes from dropping request in the queue that // waits too long. (This response would be handled by corresponding manager, i.e PutManager, GetManager, etc); second // response comes from underlying connection timeout in nioSelector (usually due to remote node is down). This response @@ -403,7 +454,7 @@ public void testExceptionOnPoll() { requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(4), replicaOnSslNode)); selector.setState(MockSelectorState.ThrowExceptionOnPoll); try { - networkClient.sendAndPoll(requestInfoList, 100); + networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); } catch (Exception e) { Assert.fail("If selector throws on poll, sendAndPoll() should not throw."); } @@ -428,28 +479,31 @@ public void testClose() { List requestInfoList = new ArrayList(); networkClient.close(); try { - networkClient.sendAndPoll(requestInfoList, 100); + networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS); Assert.fail("Polling after close should throw"); } catch (IllegalStateException e) { } } /** - * Helper function to test {@link NetworkClient#warmUpConnections(List, int, long, List)} + * Helper function to test {@link NetworkClient#warmUpConnections(List, int, long, List)}. This will build up to 100% + * pre-warmed connections. */ private void doTestWarmUpConnections(List localDataNodeIds, int maxPort, PortType expectedPortType) { Assert.assertEquals("Port type is not expected.", expectedPortType, localDataNodeIds.get(0).getPortToConnectTo().getPortType()); - Assert.assertEquals("Connection count is not expected", maxPort * localDataNodeIds.size(), - networkClient.warmUpConnections(localDataNodeIds, 100, 2000, new ArrayList<>())); - Assert.assertEquals("Connection count is not expected", 50 * maxPort / 100 * localDataNodeIds.size(), - networkClient.warmUpConnections(localDataNodeIds, 50, 2000, new ArrayList<>())); Assert.assertEquals("Connection count is not expected", 0, - networkClient.warmUpConnections(localDataNodeIds, 0, 2000, new ArrayList<>())); + networkClient.warmUpConnections(localDataNodeIds, 0, TIME_FOR_WARM_UP_MS, new ArrayList<>())); selector.setState(MockSelectorState.FailConnectionInitiationOnPoll); Assert.assertEquals("Connection count is not expected", 0, - networkClient.warmUpConnections(localDataNodeIds, 100, 2000, new ArrayList<>())); + networkClient.warmUpConnections(localDataNodeIds, 100, TIME_FOR_WARM_UP_MS, new ArrayList<>())); selector.setState(MockSelectorState.Good); + int halfConnections = 50 * maxPort / 100 * localDataNodeIds.size(); + int allConnections = maxPort * localDataNodeIds.size(); + Assert.assertEquals("Connection count is not expected", halfConnections, + networkClient.warmUpConnections(localDataNodeIds, 50, TIME_FOR_WARM_UP_MS, new ArrayList<>())); + Assert.assertEquals("Connection count is not expected", allConnections - halfConnections, + networkClient.warmUpConnections(localDataNodeIds, 100, TIME_FOR_WARM_UP_MS, new ArrayList<>())); } } @@ -574,13 +628,14 @@ enum MockSelectorState { */ class MockSelector extends Selector { private int index; - private Set connectionIds = new HashSet(); - private List connected = new ArrayList(); - private List disconnected = new ArrayList(); + private Set connectionIds = new HashSet<>(); + private List connected = new ArrayList<>(); + private List nextConnected = new ArrayList<>(); + private List disconnected = new ArrayList<>(); private final List delayedFailFreshList = new ArrayList<>(); private final List delayedFailPassedList = new ArrayList<>(); - private List sends = new ArrayList(); - private List receives = new ArrayList(); + private List sends = new ArrayList<>(); + private List receives = new ArrayList<>(); private MockSelectorState state = MockSelectorState.Good; private boolean wakeUpCalled = false; private int connectCallCount = 0; @@ -625,7 +680,7 @@ public String connect(InetSocketAddress address, int sendBufferSize, int receive // next poll (when it is fresh), but the subsequent poll. delayedFailFreshList.add(hostPortString); } else { - connected.add(hostPortString); + nextConnected.add(hostPortString); connectionIds.add(hostPortString); } return hostPortString; @@ -648,23 +703,21 @@ int connectCallCount() { */ @Override public void poll(long timeoutMs, List sends) throws IOException { - disconnected.clear(); if (state == MockSelectorState.ThrowExceptionOnPoll) { throw new IOException("Mock exception on poll"); } + disconnected = new ArrayList<>(); if (state == MockSelectorState.FailConnectionInitiationOnPoll) { - for (String connId : connected) { - disconnected.add(connId); - } - connected.clear(); - } - for (String connId : delayedFailPassedList) { - disconnected.add(connId); + disconnected.addAll(nextConnected); + connected = new ArrayList<>(); + nextConnected = new ArrayList<>(); + } else if (state != MockSelectorState.IdlePoll) { + connected = nextConnected; + nextConnected = new ArrayList<>(); } + disconnected.addAll(delayedFailPassedList); delayedFailPassedList.clear(); - for (String connId : delayedFailFreshList) { - delayedFailPassedList.add(connId); - } + delayedFailPassedList.addAll(delayedFailFreshList); delayedFailFreshList.clear(); this.sends = sends; if (sends != null) { @@ -691,9 +744,7 @@ public List connected() { if (state == MockSelectorState.IdlePoll) { return new ArrayList<>(); } - List toReturn = connected; - connected = new ArrayList(); - return toReturn; + return connected; } /** @@ -705,10 +756,7 @@ public List disconnected() { if (state == MockSelectorState.IdlePoll) { return new ArrayList<>(); } - return this.disconnected; -// List toReturn = disconnected; -// disconnected = new ArrayList(); -// return toReturn; + return disconnected; } /** @@ -721,7 +769,7 @@ public List completedSends() { return new ArrayList<>(); } List toReturn = sends; - sends = new ArrayList(); + sends = new ArrayList<>(); return toReturn; } @@ -735,7 +783,7 @@ public List completedReceives() { return new ArrayList<>(); } List toReturn = receives; - receives = new ArrayList(); + receives = new ArrayList<>(); return toReturn; } diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java index c2382e71d0..262da9f057 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java @@ -16,8 +16,8 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; -import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.jmx.JmxReporter; import com.github.ambry.account.AccountService; import com.github.ambry.account.AccountServiceFactory; import com.github.ambry.clustermap.ClusterMap; diff --git a/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java b/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java index 4cdaa710a4..1f21710650 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java @@ -18,22 +18,17 @@ import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterMapUtils; -import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.RouterConfig; -import com.github.ambry.network.NetworkClientErrorCode; import com.github.ambry.network.RequestInfo; import com.github.ambry.network.ResponseInfo; import com.github.ambry.notification.NotificationSystem; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; import com.github.ambry.protocol.RequestOrResponse; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Time; -import java.io.DataInputStream; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Set; @@ -99,8 +94,8 @@ public void registerRequestToSend(DeleteOperation deleteOperation, RequestInfo r this.routerMetrics = routerMetrics; this.routerCallback = routerCallback; this.time = time; - deleteOperations = Collections.newSetFromMap(new ConcurrentHashMap()); - correlationIdToDeleteOperation = new HashMap(); + deleteOperations = ConcurrentHashMap.newKeySet(); + correlationIdToDeleteOperation = new HashMap<>(); } /** @@ -158,7 +153,9 @@ public void poll(List requestListToFill) { */ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); - DeleteResponse deleteReponse = extractDeleteResponseAndNotifyResponseHandler(responseInfo); + DeleteResponse deleteResponse = + RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, + DeleteResponse::readFrom, DeleteResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((DeleteRequest) routerRequestInfo.getRequest()).getCorrelationId(); DeleteOperation deleteOperation = correlationIdToDeleteOperation.remove(correlationId); @@ -166,7 +163,7 @@ void handleResponse(ResponseInfo responseInfo) { if (deleteOperations.contains(deleteOperation)) { boolean exceptionEncountered = false; try { - deleteOperation.handleResponse(responseInfo, deleteReponse); + deleteOperation.handleResponse(responseInfo, deleteResponse); } catch (Exception e) { exceptionEncountered = true; deleteOperation.setOperationException( @@ -184,31 +181,6 @@ void handleResponse(ResponseInfo responseInfo) { } } - /** - * Extract the {@link DeleteResponse} from the given {@link ResponseInfo} - * @param responseInfo the {@link ResponseInfo} from which the {@link DeleteResponse} is to be extracted. - * @return the extracted {@link DeleteResponse} if there is one; null otherwise. - */ - private DeleteResponse extractDeleteResponseAndNotifyResponseHandler(ResponseInfo responseInfo) { - DeleteResponse deleteResponse = null; - ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); - NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); - if (networkClientErrorCode == null) { - try { - deleteResponse = - DeleteResponse.readFrom(new DataInputStream(new ByteBufferInputStream(responseInfo.getResponse()))); - responseHandler.onEvent(replicaId, deleteResponse.getError()); - } catch (Exception e) { - // Ignore. There is no value in notifying the response handler. - logger.error("Response deserialization received unexpected error", e); - routerMetrics.responseDeserializationErrorCount.inc(); - } - } else { - responseHandler.onEvent(replicaId, networkClientErrorCode); - } - return deleteResponse; - } - /** * Called when the delete operation is completed. The {@code DeleteManager} also finishes the delete operation * by performing the callback and notification. diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetManager.java b/ambry-router/src/main/java/com.github.ambry.router/GetManager.java index 458ca134b8..ba0a1020be 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetManager.java @@ -16,24 +16,19 @@ import com.codahale.metrics.Meter; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterMapUtils; -import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.BlobIdFactory; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.commons.ServerErrorCode; import com.github.ambry.config.RouterConfig; -import com.github.ambry.network.NetworkClientErrorCode; import com.github.ambry.network.RequestInfo; import com.github.ambry.network.ResponseInfo; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; import com.github.ambry.protocol.RequestOrResponse; import com.github.ambry.store.StoreKey; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; -import java.io.DataInputStream; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -109,7 +104,7 @@ public void registerRequestToSend(GetOperation getOperation, RequestInfo request this.cryptoService = cryptoService; this.cryptoJobHandler = cryptoJobHandler; this.time = time; - getOperations = Collections.newSetFromMap(new ConcurrentHashMap()); + getOperations = ConcurrentHashMap.newKeySet(); } /** @@ -222,7 +217,15 @@ void poll(List requestListToFill) { */ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); - GetResponse getResponse = extractGetResponseAndNotifyResponseHandler(responseInfo); + GetResponse getResponse = + RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, + stream -> GetResponse.readFrom(stream, clusterMap), response -> { + ServerErrorCode serverError = response.getError(); + if (serverError == ServerErrorCode.No_Error) { + serverError = response.getPartitionResponseInfoList().get(0).getErrorCode(); + } + return serverError; + }); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); GetRequest getRequest = (GetRequest) routerRequestInfo.getRequest(); GetOperation getOperation = correlationIdToGetOperation.remove(getRequest.getCorrelationId()); @@ -242,35 +245,6 @@ void handleResponse(ResponseInfo responseInfo) { } } - /** - * Extract the {@link GetResponse} from the given {@link ResponseInfo} - * @param responseInfo the {@link ResponseInfo} from which the {@link GetResponse} is to be extracted. - * @return the extracted {@link GetResponse} if there is one; null otherwise. - */ - private GetResponse extractGetResponseAndNotifyResponseHandler(ResponseInfo responseInfo) { - GetResponse getResponse = null; - ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); - NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); - if (networkClientErrorCode == null) { - try { - getResponse = GetResponse.readFrom(new DataInputStream(new ByteBufferInputStream(responseInfo.getResponse())), - clusterMap); - ServerErrorCode serverError = getResponse.getError(); - if (serverError == ServerErrorCode.No_Error) { - serverError = getResponse.getPartitionResponseInfoList().get(0).getErrorCode(); - } - responseHandler.onEvent(replicaId, serverError); - } catch (Exception e) { - // Ignore. There is no value in notifying the response handler. - logger.error("Response deserialization received unexpected error", e); - routerMetrics.responseDeserializationErrorCount.inc(); - } - } else { - responseHandler.onEvent(replicaId, networkClientErrorCode); - } - return getResponse; - } - /** * Close the GetManager. * Complete all existing get operations. diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java index 8db4177109..fed074b866 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2016 LinkedIn Corp. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -148,14 +148,11 @@ public Future getBlob(String blobIdStr, GetBlobOptions options, routerMetrics.operationQueuingRate.mark(); try { if (isOpen.get()) { - getOperationController().getBlob(blobIdStr, internalOptions, new Callback() { - @Override - public void onCompletion(GetBlobResultInternal internalResult, Exception exception) { - GetBlobResult getBlobResult = internalResult == null ? null : internalResult.getBlobResult; - futureResult.done(getBlobResult, exception); - if (callback != null) { - callback.onCompletion(getBlobResult, exception); - } + getOperationController().getBlob(blobIdStr, internalOptions, (internalResult, exception) -> { + GetBlobResult getBlobResult = internalResult == null ? null : internalResult.getBlobResult; + futureResult.done(getBlobResult, exception); + if (callback != null) { + callback.onCompletion(getBlobResult, exception); } }); } else { diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java index dccce61c0e..5ab7efb7ab 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java @@ -17,22 +17,18 @@ import com.github.ambry.account.AccountService; import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.commons.ByteBufferAsyncWritableChannel; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.RouterConfig; import com.github.ambry.messageformat.BlobProperties; -import com.github.ambry.network.NetworkClientErrorCode; import com.github.ambry.network.RequestInfo; import com.github.ambry.network.ResponseInfo; import com.github.ambry.notification.NotificationSystem; import com.github.ambry.protocol.PutRequest; import com.github.ambry.protocol.PutResponse; import com.github.ambry.protocol.RequestOrResponse; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; -import java.io.DataInputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -231,7 +227,9 @@ void poll(List requestListToFill) { */ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); - PutResponse putResponse = extractPutResponseAndNotifyResponseHandler(responseInfo); + PutResponse putResponse = + RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, + PutResponse::readFrom, PutResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((PutRequest) routerRequestInfo.getRequest()).getCorrelationId(); // Get the PutOperation that generated the request. @@ -254,30 +252,6 @@ void handleResponse(ResponseInfo responseInfo) { } } - /** - * Extract the {@link PutResponse} from the given {@link ResponseInfo} - * @param responseInfo the {@link ResponseInfo} from which the {@link PutResponse} is to be extracted. - * @return the extracted {@link PutResponse} if there is one; null otherwise. - */ - private PutResponse extractPutResponseAndNotifyResponseHandler(ResponseInfo responseInfo) { - PutResponse putResponse = null; - ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); - NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); - if (networkClientErrorCode == null) { - try { - putResponse = PutResponse.readFrom(new DataInputStream(new ByteBufferInputStream(responseInfo.getResponse()))); - responseHandler.onEvent(replicaId, putResponse.getError()); - } catch (Exception e) { - // Ignore. There is no value in notifying the response handler. - logger.error("Response deserialization received unexpected error", e); - routerMetrics.responseDeserializationErrorCount.inc(); - } - } else { - responseHandler.onEvent(replicaId, networkClientErrorCode); - } - return putResponse; - } - /** * Called for a {@link PutOperation} when the operation is complete. Any cleanup that the PutManager needs to do * with respect to this operation will have to be done here. The PutManager also finishes the operation by diff --git a/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java b/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java index 85c5262f43..1fdf6479e4 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java +++ b/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java @@ -19,10 +19,19 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.ResponseHandler; +import com.github.ambry.commons.ServerErrorCode; import com.github.ambry.config.RouterConfig; +import com.github.ambry.network.NetworkClientErrorCode; +import com.github.ambry.network.ResponseInfo; +import com.github.ambry.protocol.Response; +import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.ToIntFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,4 +160,49 @@ static void replaceOperationException(AtomicReference operationExcept return newException; }); } + + /** + * Extract the {@link Response} from the given {@link ResponseInfo} + * @param the {@link Response} type. + * @param responseHandler the {@link ResponseHandler} instance to use. + * @param routerMetrics the {@link NonBlockingRouterMetrics} instance to use. + * @param responseInfo the {@link ResponseInfo} from which the {@link Response} is to be extracted. + * @param deserializer the {@link Deserializer} to use. + * @param errorExtractor extract the {@link ServerErrorCode} to send to {@link ResponseHandler#onEvent}. + * @return the extracted {@link Response} if there is one; null otherwise. + */ + static R extractResponseAndNotifyResponseHandler(ResponseHandler responseHandler, + NonBlockingRouterMetrics routerMetrics, ResponseInfo responseInfo, Deserializer deserializer, + Function errorExtractor) { + R response = null; + ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); + NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); + if (networkClientErrorCode == null) { + try { + response = deserializer.readFrom(new DataInputStream(new ByteBufferInputStream(responseInfo.getResponse()))); + responseHandler.onEvent(replicaId, errorExtractor.apply(response)); + } catch (Exception e) { + // Ignore. There is no value in notifying the response handler. + logger.error("Response deserialization received unexpected error", e); + routerMetrics.responseDeserializationErrorCount.inc(); + } + } else { + responseHandler.onEvent(replicaId, networkClientErrorCode); + } + return response; + } + + /** + * Used to deserialize an object from a {@link DataInputStream}. + * @param the type of the deserialized object. + */ + @FunctionalInterface + interface Deserializer { + /** + * @param stream the {@link DataInputStream} to read from. + * @return the deserialized object. + * @throws IOException on deserialization errors. + */ + T readFrom(DataInputStream stream) throws IOException; + } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java index 04f622dfa0..f8f4e38996 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java @@ -19,21 +19,17 @@ import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterMapUtils; -import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.RouterConfig; -import com.github.ambry.network.NetworkClientErrorCode; import com.github.ambry.network.RequestInfo; import com.github.ambry.network.ResponseInfo; import com.github.ambry.notification.NotificationSystem; import com.github.ambry.protocol.RequestOrResponse; import com.github.ambry.protocol.TtlUpdateRequest; import com.github.ambry.protocol.TtlUpdateResponse; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Time; -import java.io.DataInputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -174,7 +170,9 @@ void poll(List requestListToFill) { */ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); - TtlUpdateResponse ttlUpdateResponse = extractTtlUpdateResponseAndNotifyResponseHandler(responseInfo); + TtlUpdateResponse ttlUpdateResponse = + RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, + TtlUpdateResponse::readFrom, TtlUpdateResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((TtlUpdateRequest) routerRequestInfo.getRequest()).getCorrelationId(); TtlUpdateOperation ttlUpdateOperation = correlationIdToTtlUpdateOperation.remove(correlationId); @@ -200,31 +198,6 @@ void handleResponse(ResponseInfo responseInfo) { } } - /** - * Extract the {@link TtlUpdateResponse} from the given {@link ResponseInfo} - * @param responseInfo the {@link ResponseInfo} from which the {@link TtlUpdateResponse} is to be extracted. - * @return the extracted {@link TtlUpdateResponse} if there is one; null otherwise. - */ - private TtlUpdateResponse extractTtlUpdateResponseAndNotifyResponseHandler(ResponseInfo responseInfo) { - TtlUpdateResponse ttlUpdateResponse = null; - ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); - NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); - if (networkClientErrorCode == null) { - try { - ttlUpdateResponse = - TtlUpdateResponse.readFrom(new DataInputStream(new ByteBufferInputStream(responseInfo.getResponse()))); - responseHandler.onEvent(replicaId, ttlUpdateResponse.getError()); - } catch (Exception e) { - // Ignore. There is no value in notifying the response handler. - logger.error("Response deserialization received unexpected error", e); - routerMetrics.responseDeserializationErrorCount.inc(); - } - } else { - responseHandler.onEvent(replicaId, networkClientErrorCode); - } - return ttlUpdateResponse; - } - /** * Called when the ttl update operation is completed. The {@link TtlUpdateManager} also finishes the ttl update * operation by performing the callback and notification. diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java index 0cc1cf117f..4c10fedb94 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java @@ -13,8 +13,8 @@ */ package com.github.ambry.server; -import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.jmx.JmxReporter; import com.github.ambry.clustermap.ClusterAgentsFactory; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterParticipant; diff --git a/ambry-tools/src/main/java/com.github.ambry/store/DumpDataTool.java b/ambry-tools/src/main/java/com.github.ambry/store/DumpDataTool.java index 657f922a62..ba1cf7ca89 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/DumpDataTool.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/DumpDataTool.java @@ -13,9 +13,9 @@ */ package com.github.ambry.store; -import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.codahale.metrics.jmx.JmxReporter; import com.github.ambry.clustermap.ClusterAgentsFactory; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.commons.BlobIdFactory; diff --git a/ambry-tools/src/main/java/com.github.ambry/store/DumpLogTool.java b/ambry-tools/src/main/java/com.github.ambry/store/DumpLogTool.java index 9aa54039a9..7265eb639a 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/DumpLogTool.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/DumpLogTool.java @@ -13,9 +13,9 @@ */ package com.github.ambry.store; -import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.codahale.metrics.jmx.JmxReporter; import com.github.ambry.clustermap.ClusterAgentsFactory; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.config.ClusterMapConfig; diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java b/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java index 45f48b886b..75d15d393b 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/perf/rest/NettyPerfClient.java @@ -15,10 +15,10 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; -import com.codahale.metrics.JmxReporter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Snapshot; +import com.codahale.metrics.jmx.JmxReporter; import com.github.ambry.commons.SSLFactory; import com.github.ambry.config.SSLConfig; import com.github.ambry.config.VerifiableProperties; diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/Pair.java b/ambry-utils/src/main/java/com.github.ambry.utils/Pair.java index 35370b87aa..a7fab2b31f 100644 --- a/ambry-utils/src/main/java/com.github.ambry.utils/Pair.java +++ b/ambry-utils/src/main/java/com.github.ambry.utils/Pair.java @@ -14,6 +14,9 @@ package com.github.ambry.utils; +import java.util.Objects; + + /** * Represents a pair of two objects * @param The type of the first object in the pair. @@ -61,10 +64,10 @@ public boolean equals(Object o) { return false; } Pair pair = (Pair) o; - if (first != null ? !first.equals(pair.first) : pair.first != null) { + if (!Objects.equals(first, pair.first)) { return false; } - return second != null ? second.equals(pair.second) : pair.second == null; + return Objects.equals(second, pair.second); } @Override diff --git a/build.gradle b/build.gradle index e21d7e9a03..6793bc40ef 100644 --- a/build.gradle +++ b/build.gradle @@ -93,7 +93,7 @@ subprojects { project(':ambry-utils') { dependencies { - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "commons-codec:commons-codec:$commonsVersion" compile "org.json:json:$jsonVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" @@ -105,7 +105,7 @@ project(':ambry-api') { dependencies { compile project(':ambry-utils') compile "org.apache.helix:helix-core:$helixVersion" - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "org.codehaus.jackson:jackson-core-asl:$jacksonVersion" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" testCompile project(':ambry-clustermap') @@ -119,7 +119,7 @@ project(':ambry-account') { project(':ambry-utils'), project(':ambry-commons') compile "org.apache.helix:helix-core:$helixVersion" - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "org.json:json:$jsonVersion" testCompile project(':ambry-utils').sourceSets.test.output testCompile project(':ambry-commons').sourceSets.test.output @@ -132,7 +132,7 @@ project(':ambry-clustermap') { compile project(':ambry-api'), project(':ambry-utils') compile "org.apache.helix:helix-core:$helixVersion" - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "org.json:json:$jsonVersion" testCompile project(':ambry-commons') testCompile project(':ambry-utils').sourceSets.test.output @@ -158,7 +158,7 @@ project(':ambry-network') { project(':ambry-utils'), project(':ambry-commons'), project(':ambry-clustermap') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" testCompile project(':ambry-utils').sourceSets.test.output testCompile project(':ambry-commons').sourceSets.test.output testCompile project(':ambry-clustermap').sourceSets.test.output @@ -176,7 +176,8 @@ project(':ambry-server') { project(':ambry-store'), project(':ambry-utils'), project(':ambry-replication') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion" testCompile project(':ambry-router') testCompile project(':ambry-cloud') testCompile project(':ambry-cloud').sourceSets.test.output @@ -195,7 +196,7 @@ project(':ambry-store') { dependencies { compile project(':ambry-api'), project(':ambry-utils') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "net.smacke:jaydio:$jaydioVersion" testCompile project(':ambry-clustermap') testCompile project(':ambry-clustermap').sourceSets.test.output @@ -208,7 +209,7 @@ project(':ambry-messageformat') { dependencies { compile project(':ambry-api'), project(':ambry-utils') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" testCompile project(':ambry-api').sourceSets.test.output testCompile project(':ambry-utils').sourceSets.test.output } @@ -222,7 +223,7 @@ project(':ambry-replication') { project(':ambry-commons'), project(':ambry-protocol'), project(':ambry-network') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" testCompile project(':ambry-clustermap').sourceSets.test.output testCompile project(':ambry-api').sourceSets.test.output testCompile project(':ambry-utils').sourceSets.test.output @@ -239,6 +240,7 @@ project(':ambry-tools') { project(':ambry-server'), project(':ambry-rest'), project(':ambry-frontend') + compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" testCompile project(':ambry-clustermap').sourceSets.test.output testCompile project(':ambry-account').sourceSets.test.output @@ -269,7 +271,8 @@ project(':ambry-rest') { compile project(':ambry-api'), project(':ambry-utils'), project(':ambry-commons') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "io.netty:netty-all:$nettyVersion" compile "io.netty:netty-tcnative-boringssl-static:$nettyTcnativeVersion" @@ -295,7 +298,7 @@ project(':ambry-router') { project(':ambry-messageformat'), project(':ambry-protocol'), project(':ambry-network') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "org.bouncycastle:bcpkix-jdk15on:$bouncycastleVersion" testCompile project(':ambry-api').sourceSets.test.output testCompile project(':ambry-clustermap').sourceSets.test.output @@ -315,7 +318,7 @@ project(':ambry-frontend') { project(':ambry-commons'), project(':ambry-rest'), project(':ambry-router') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" testCompile project(':ambry-api').sourceSets.test.output testCompile project(':ambry-account').sourceSets.test.output testCompile project(':ambry-clustermap').sourceSets.test.output @@ -335,7 +338,8 @@ project(':ambry-cloud') { project(':ambry-commons'), project(':ambry-replication'), project(':ambry-router') - compile "com.codahale.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-core:$metricsVersion" + compile "io.dropwizard.metrics:metrics-jmx:$metricsVersion" compile "com.microsoft.azure:azure-storage:$azureStorageVersion" compile "com.microsoft.azure:azure-documentdb:$azureDocumentDbVersion" testCompile project(':ambry-api').sourceSets.test.output diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index a283bbc9d5..59d9357f39 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -13,7 +13,7 @@ ext { joptSimpleVersion = "4.9" log4jVersion = "1.2.17" jsonVersion = "20170516" - metricsVersion = "3.0.1" + metricsVersion = "4.0.5" commonsVersion = "1.9" bouncycastleVersion = "1.52" javaxVersion = "3.0.1"