diff --git a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java index 7f32402b72..7d1fd01571 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java @@ -13,6 +13,12 @@ */ package com.github.ambry.config; +import com.github.ambry.utils.Utils; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + + /** * Configuration parameters required by a {@link com.github.ambry.router.Router}. *

@@ -232,6 +238,15 @@ public class RouterConfig { @Default("false") public final boolean routerUseGetBlobOperationForBlobInfo; + /** + * The custom percentiles of Histogram in operation tracker to be reported. This allows router to emit metrics of + * arbitrary percentiles (i.e. 97th, 93th etc). An example of this config is "0.91,0.93,0.97"(comma separated), each + * value should fall in {@code [0..1]}. + */ + @Config("router.operation.tracker.custom.percentiles") + @Default("") + public final List routerOperationTrackerCustomPercentiles; + /** * Create a RouterConfig instance. * @param verifiableProperties the properties map to refer to. @@ -289,5 +304,9 @@ public RouterConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getIntInRange("router.ttl.update.success.target", 2, 1, Integer.MAX_VALUE); routerUseGetBlobOperationForBlobInfo = verifiableProperties.getBoolean("router.use.get.blob.operation.for.blob.info", false); + List customPercentiles = + Utils.splitString(verifiableProperties.getString("router.operation.tracker.custom.percentiles", ""), ","); + routerOperationTrackerCustomPercentiles = + Collections.unmodifiableList(customPercentiles.stream().map(Double::valueOf).collect(Collectors.toList())); } } diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java index 177ece80bd..9f2a9c4237 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java @@ -67,79 +67,24 @@ public ClusterMapMetrics(HardwareLayout hardwareLayout, PartitionLayout partitio // Metrics based on HardwareLayout - this.hardwareLayoutVersion = new Gauge() { - @Override - public Long getValue() { - return getHardwareLayoutVersion(); - } - }; - this.partitionLayoutVersion = new Gauge() { - @Override - public Long getValue() { - return getPartitionLayoutVersion(); - } - }; + this.hardwareLayoutVersion = this::getHardwareLayoutVersion; + this.partitionLayoutVersion = this::getPartitionLayoutVersion; registry.register(MetricRegistry.name(ClusterMap.class, "hardwareLayoutVersion"), hardwareLayoutVersion); registry.register(MetricRegistry.name(ClusterMap.class, "partitionLayoutVersion"), partitionLayoutVersion); - this.datacenterCount = new Gauge() { - @Override - public Long getValue() { - return countDatacenters(); - } - }; - this.dataNodeCount = new Gauge() { - @Override - public Long getValue() { - return countDataNodes(); - } - }; - this.diskCount = new Gauge() { - @Override - public Long getValue() { - return countDisks(); - } - }; + this.datacenterCount = this::countDatacenters; + this.dataNodeCount = this::countDataNodes; + this.diskCount = this::countDisks; registry.register(MetricRegistry.name(ClusterMap.class, "datacenterCount"), datacenterCount); registry.register(MetricRegistry.name(ClusterMap.class, "dataNodeCount"), dataNodeCount); registry.register(MetricRegistry.name(ClusterMap.class, "diskCount"), diskCount); - this.dataNodesHardUpCount = new Gauge() { - @Override - public Long getValue() { - return countDataNodesInHardState(HardwareState.AVAILABLE); - } - }; - this.dataNodesHardDownCount = new Gauge() { - @Override - public Long getValue() { - return countDataNodesInHardState(HardwareState.UNAVAILABLE); - } - }; - this.dataNodesUnavailableCount = new Gauge() { - @Override - public Long getValue() { - return countUnavailableDataNodes(); - } - }; - this.disksHardUpCount = new Gauge() { - @Override - public Long getValue() { - return countDisksInHardState(HardwareState.AVAILABLE); - } - }; - this.disksHardDownCount = new Gauge() { - @Override - public Long getValue() { - return countDisksInHardState(HardwareState.UNAVAILABLE); - } - }; - this.disksUnavailableCount = new Gauge() { - @Override - public Long getValue() { - return countUnavailableDisks(); - } - }; + this.dataNodesHardUpCount = () -> countDataNodesInHardState(HardwareState.AVAILABLE); + this.dataNodesHardDownCount = () -> countDataNodesInHardState(HardwareState.UNAVAILABLE); + this.dataNodesUnavailableCount = this::countUnavailableDataNodes; + this.disksHardUpCount = () -> countDisksInHardState(HardwareState.AVAILABLE); + this.disksHardDownCount = () -> countDisksInHardState(HardwareState.UNAVAILABLE); + this.disksUnavailableCount = this::countUnavailableDisks; registry.register(MetricRegistry.name(ClusterMap.class, "dataNodesHardUpCount"), dataNodesHardUpCount); registry.register(MetricRegistry.name(ClusterMap.class, "dataNodesHardDownCount"), dataNodesHardDownCount); registry.register(MetricRegistry.name(ClusterMap.class, "dataNodesUnavailableCount"), dataNodesUnavailableCount); @@ -149,54 +94,19 @@ public Long getValue() { // Metrics based on PartitionLayout - this.partitionCount = new Gauge() { - @Override - public Long getValue() { - return countPartitions(); - } - }; - this.partitionsReadWrite = new Gauge() { - @Override - public Long getValue() { - return countPartitionsInState(PartitionState.READ_WRITE); - } - }; - this.partitionsReadOnly = new Gauge() { - @Override - public Long getValue() { - return countPartitionsInState(PartitionState.READ_ONLY); - } - }; + this.partitionCount = this::countPartitions; + this.partitionsReadWrite = () -> countPartitionsInState(PartitionState.READ_WRITE); + this.partitionsReadOnly = () -> countPartitionsInState(PartitionState.READ_ONLY); registry.register(MetricRegistry.name(ClusterMap.class, "numberOfPartitions"), partitionCount); registry.register(MetricRegistry.name(ClusterMap.class, "numberOfReadWritePartitions"), partitionsReadWrite); registry.register(MetricRegistry.name(ClusterMap.class, "numberOfReadOnlyPartitions"), partitionsReadOnly); - this.isMajorityReplicasDown = new Gauge() { - @Override - public Boolean getValue() { - return isMajorityOfReplicasDown(); - } - }; + this.isMajorityReplicasDown = this::isMajorityOfReplicasDown; registry.register(MetricRegistry.name(ClusterMap.class, "isMajorityReplicasDown"), isMajorityReplicasDown); - this.rawCapacityInBytes = new Gauge() { - @Override - public Long getValue() { - return getRawCapacity(); - } - }; - this.allocatedRawCapacityInBytes = new Gauge() { - @Override - public Long getValue() { - return getAllocatedRawCapacity(); - } - }; - this.allocatedUsableCapacityInBytes = new Gauge() { - @Override - public Long getValue() { - return getAllocatedUsableCapacity(); - } - }; + this.rawCapacityInBytes = this::getRawCapacity; + this.allocatedRawCapacityInBytes = this::getAllocatedRawCapacity; + this.allocatedUsableCapacityInBytes = this::getAllocatedUsableCapacity; registry.register(MetricRegistry.name(ClusterMap.class, "rawCapacityInBytes"), rawCapacityInBytes); registry.register(MetricRegistry.name(ClusterMap.class, "allocatedRawCapacityInBytes"), allocatedRawCapacityInBytes); @@ -218,12 +128,7 @@ public Long getValue() { private void addDataNodeToStateMetrics(final DataNode dataNode) { final String metricName = dataNode.getHostname() + "-" + dataNode.getPort() + "-ResourceState"; - Gauge dataNodeState = new Gauge() { - @Override - public Long getValue() { - return dataNode.getState() == HardwareState.AVAILABLE ? 1L : 0L; - } - }; + Gauge dataNodeState = () -> dataNode.getState() == HardwareState.AVAILABLE ? 1L : 0L; registry.register(MetricRegistry.name(ClusterMap.class, metricName), dataNodeState); dataNodeStateList.add(dataNodeState); } @@ -232,12 +137,7 @@ private void addDiskToStateMetrics(final Disk disk) { final String metricName = disk.getDataNode().getHostname() + "-" + disk.getDataNode().getPort() + "-" + disk.getMountPath() + "-ResourceState"; - Gauge diskState = new Gauge() { - @Override - public Long getValue() { - return disk.getState() == HardwareState.AVAILABLE ? 1L : 0L; - } - }; + Gauge diskState = () -> disk.getState() == HardwareState.AVAILABLE ? 1L : 0L; registry.register(MetricRegistry.name(ClusterMap.class, metricName), diskState); dataNodeStateList.add(diskState); } diff --git a/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java b/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java index 11065d7a9a..a3eeb24d85 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java +++ b/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java @@ -73,32 +73,17 @@ public BlockingChannelInfo(ConnectionPoolConfig config, String host, Port port, this.sslSocketFactory = sslSocketFactory; this.sslConfig = sslConfig; - availableConnections = new Gauge() { - @Override - public Integer getValue() { - return blockingChannelAvailableConnections.size(); - } - }; + availableConnections = blockingChannelAvailableConnections::size; registry.register( MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-availableConnections"), availableConnections); - activeConnections = new Gauge() { - @Override - public Integer getValue() { - return blockingChannelActiveConnections.size(); - } - }; + activeConnections = blockingChannelActiveConnections::size; registry.register( MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-activeConnections"), activeConnections); - totalNumberOfConnections = new Gauge() { - @Override - public Integer getValue() { - return numberOfConnections.intValue(); - } - }; + totalNumberOfConnections = numberOfConnections::intValue; registry.register( MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-totalNumberOfConnections"), totalNumberOfConnections); @@ -240,8 +225,7 @@ public void destroyBlockingChannel(BlockingChannel blockingChannel) { } /** - * Returns the number of connections with this BlockingChannelInfo - * @return + * @return the number of connections with this BlockingChannelInfo */ public int getNumberOfConnections() { return this.numberOfConnections.intValue(); @@ -310,40 +294,29 @@ public BlockingChannelConnectionPool(ConnectionPoolConfig config, SSLConfig sslC connectionDestroyTime = registry.timer(MetricRegistry.name(BlockingChannelConnectionPool.class, "connectionDestroyTime")); - totalNumberOfNodesConnectedTo = new Gauge() { - @Override - public Integer getValue() { - int noOfNodesConnectedTo = 0; - for (BlockingChannelInfo blockingChannelInfo : connections.values()) { - if (blockingChannelInfo.getNumberOfConnections() > 0) { - noOfNodesConnectedTo++; - } + totalNumberOfNodesConnectedTo = () -> { + int noOfNodesConnectedTo = 0; + for (BlockingChannelInfo blockingChannelInfo : connections.values()) { + if (blockingChannelInfo.getNumberOfConnections() > 0) { + noOfNodesConnectedTo++; } - return noOfNodesConnectedTo; } + return noOfNodesConnectedTo; }; registry.register(MetricRegistry.name(BlockingChannelConnectionPool.class, "totalNumberOfNodesConnectedTo"), totalNumberOfNodesConnectedTo); - totalNumberOfConnections = new Gauge() { - @Override - public Integer getValue() { - int noOfConnections = 0; - for (BlockingChannelInfo blockingChannelInfo : connections.values()) { - noOfConnections += blockingChannelInfo.getNumberOfConnections(); - } - return noOfConnections; + totalNumberOfConnections = () -> { + int noOfConnections = 0; + for (BlockingChannelInfo blockingChannelInfo : connections.values()) { + noOfConnections += blockingChannelInfo.getNumberOfConnections(); } + return noOfConnections; }; registry.register(MetricRegistry.name(BlockingChannelConnectionPool.class, "totalNumberOfConnections"), totalNumberOfConnections); requestsWaitingToCheckoutConnectionCount = new AtomicInteger(0); - requestsWaitingToCheckoutConnection = new Gauge() { - @Override - public Integer getValue() { - return requestsWaitingToCheckoutConnectionCount.get(); - } - }; + requestsWaitingToCheckoutConnection = requestsWaitingToCheckoutConnectionCount::get; registry.register(MetricRegistry.name(BlockingChannelConnectionPool.class, "requestsWaitingToCheckoutConnection"), requestsWaitingToCheckoutConnection); sslSocketFactoryClientInitializationCount = registry.counter( diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 639cf627a4..ce24127211 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -156,41 +156,32 @@ public NetworkMetrics(MetricRegistry registry) { selectorUnreadyConnectionsList = new ArrayList<>(); networkClientPendingRequestList = new ArrayList<>(); - final Gauge selectorActiveConnectionsCount = new Gauge() { - @Override - public Long getValue() { - long activeConnectionsCount = 0; - for (AtomicLong activeConnection : selectorActiveConnectionsList) { - activeConnectionsCount += activeConnection.get(); - } - return activeConnectionsCount; + final Gauge selectorActiveConnectionsCount = () -> { + long activeConnectionsCount = 0; + for (AtomicLong activeConnection : selectorActiveConnectionsList) { + activeConnectionsCount += activeConnection.get(); } + return activeConnectionsCount; }; registry.register(MetricRegistry.name(Selector.class, "SelectorActiveConnectionsCount"), selectorActiveConnectionsCount); - final Gauge selectorUnreadyConnectionsCount = new Gauge() { - @Override - public Long getValue() { - long unreadyConnectionCount = 0; - for (Set unreadyConnection : selectorUnreadyConnectionsList) { - unreadyConnectionCount += unreadyConnection.size(); - } - return unreadyConnectionCount; + final Gauge selectorUnreadyConnectionsCount = () -> { + long unreadyConnectionCount = 0; + for (Set unreadyConnection : selectorUnreadyConnectionsList) { + unreadyConnectionCount += unreadyConnection.size(); } + return unreadyConnectionCount; }; registry.register(MetricRegistry.name(Selector.class, "SelectorUnreadyConnectionsCount"), selectorUnreadyConnectionsCount); - final Gauge networkClientPendingRequestsCount = new Gauge() { - @Override - public Long getValue() { - long pendingRequestsCount = 0; - for (AtomicLong pendingRequest : networkClientPendingRequestList) { - pendingRequestsCount += pendingRequest.get(); - } - return pendingRequestsCount; + final Gauge networkClientPendingRequestsCount = () -> { + long pendingRequestsCount = 0; + for (AtomicLong pendingRequest : networkClientPendingRequestList) { + pendingRequestsCount += pendingRequest.get(); } + return pendingRequestsCount; }; registry.register(MetricRegistry.name(NetworkClient.class, "NetworkClientPendingConnectionsCount"), networkClientPendingRequestsCount); @@ -236,32 +227,17 @@ class ServerNetworkMetrics extends NetworkMetrics { public ServerNetworkMetrics(final SocketRequestResponseChannel channel, MetricRegistry registry, final List processorThreads) { super(registry); - requestQueueSize = new Gauge() { - @Override - public Integer getValue() { - return channel.getRequestQueueSize(); - } - }; + requestQueueSize = channel::getRequestQueueSize; registry.register(MetricRegistry.name(SocketRequestResponseChannel.class, "RequestQueueSize"), requestQueueSize); responseQueueSize = new ArrayList>(channel.getNumberOfProcessors()); for (int i = 0; i < channel.getNumberOfProcessors(); i++) { final int index = i; - responseQueueSize.add(i, new Gauge() { - @Override - public Integer getValue() { - return channel.getResponseQueueSize(index); - } - }); + responseQueueSize.add(i, () -> channel.getResponseQueueSize(index)); registry.register(MetricRegistry.name(SocketRequestResponseChannel.class, i + "-ResponseQueueSize"), responseQueueSize.get(i)); } - numberOfProcessorThreads = new Gauge() { - @Override - public Integer getValue() { - return getLiveThreads(processorThreads); - } - }; + numberOfProcessorThreads = () -> getLiveThreads(processorThreads); registry.register(MetricRegistry.name(SocketServer.class, "NumberOfProcessorThreads"), numberOfProcessorThreads); acceptConnectionErrorCount = diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java index bf6856e9c2..1e6a898058 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java @@ -351,12 +351,7 @@ public void populatePerColoMetrics(Set datacenters) { */ void trackLiveThreadsCount(final Map> replicaThreadPools, String localDatacenter) { for (final String datacenter : replicaThreadPools.keySet()) { - Gauge liveThreadsPerDatacenter = new Gauge() { - @Override - public Integer getValue() { - return getLiveThreads(replicaThreadPools.get(datacenter)); - } - }; + Gauge liveThreadsPerDatacenter = () -> getLiveThreads(replicaThreadPools.get(datacenter)); if (localDatacenter.equals(datacenter)) { registry.register(MetricRegistry.name(ReplicaThread.class, "NumberOfIntra-Colo-ReplicaThreads"), liveThreadsPerDatacenter); @@ -382,12 +377,7 @@ public void addRemoteReplicaToLagMetrics(final RemoteReplicaInfo remoteReplicaIn DataNodeId dataNodeId = replicaId.getDataNodeId(); final String metricName = dataNodeId.getHostname() + "-" + dataNodeId.getPort() + "-" + replicaId.getPartitionId() + "-replicaLagInBytes"; - Gauge replicaLag = new Gauge() { - @Override - public Long getValue() { - return remoteReplicaInfo.getRemoteLagFromLocalInBytes(); - } - }; + Gauge replicaLag = remoteReplicaInfo::getRemoteLagFromLocalInBytes; registry.register(MetricRegistry.name(ReplicationMetrics.class, metricName), replicaLag); replicaLagInBytes.add(replicaLag); } diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java b/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java index 3764013a9e..fd1493c34e 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java @@ -251,12 +251,7 @@ public RequestResponseHandlerMetrics(MetricRegistry metricRegistry) { */ public void registerRequestWorker(final AsyncRequestWorker asyncRequestWorker) { int pos = asyncRequestWorkerIndex.getAndIncrement(); - Gauge gauge = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestWorker.getRequestQueueSize(); - } - }; + Gauge gauge = asyncRequestWorker::getRequestQueueSize; metricRegistry.register(MetricRegistry.name(AsyncRequestWorker.class, pos + "-RequestQueueSize"), gauge); } @@ -265,30 +260,15 @@ public Integer getValue() { * @param asyncRequestResponseHandler the {@link AsyncRequestResponseHandler} whose key metrics have to be tracked. */ public void trackAsyncRequestResponseHandler(final AsyncRequestResponseHandler asyncRequestResponseHandler) { - Gauge totalRequestQueueSize = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestResponseHandler.getRequestQueueSize(); - } - }; + Gauge totalRequestQueueSize = asyncRequestResponseHandler::getRequestQueueSize; metricRegistry.register(MetricRegistry.name(AsyncRequestResponseHandler.class, "TotalRequestQueueSize"), totalRequestQueueSize); - Gauge totalResponseSetSize = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestResponseHandler.getResponseSetSize(); - } - }; + Gauge totalResponseSetSize = asyncRequestResponseHandler::getResponseSetSize; metricRegistry.register(MetricRegistry.name(AsyncRequestResponseHandler.class, "TotalResponseSetSize"), totalResponseSetSize); - Gauge asyncHandlerWorkersAlive = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestResponseHandler.getWorkersAlive(); - } - }; + Gauge asyncHandlerWorkersAlive = asyncRequestResponseHandler::getWorkersAlive; metricRegistry.register(MetricRegistry.name(AsyncRequestResponseHandler.class, "AsyncHandlerWorkersAlive"), asyncHandlerWorkersAlive); } diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java index 6f880b768c..c2382e71d0 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java @@ -136,12 +136,7 @@ public RestServerMetrics(MetricRegistry metricRegistry, final RestServerState re accountServiceCloseTimeInMs = metricRegistry.histogram(MetricRegistry.name(RestServer.class, "AccountServiceCloseTimeInMs")); - Gauge restServerStatus = new Gauge() { - @Override - public Integer getValue() { - return restServerState.isServiceUp() ? 1 : 0; - } - }; + Gauge restServerStatus = () -> restServerState.isServiceUp() ? 1 : 0; metricRegistry.register(MetricRegistry.name(RestServer.class, "RestServerState"), restServerStatus); } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java index 5830e7ae56..39d6b58bf4 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java @@ -17,6 +17,7 @@ import com.codahale.metrics.Histogram; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Time; import java.util.HashMap; @@ -39,7 +40,6 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { static final long MIN_DATA_POINTS_REQUIRED = 1000; private final Time time; - private final String datacenterName; private final double quantile; private final Histogram localColoTracker; private final Histogram crossColoTracker; @@ -55,34 +55,42 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { /** * Constructs an {@link AdaptiveOperationTracker} - * @param datacenterName The datacenter where the router is located. + * @param routerConfig The {@link RouterConfig} containing the configs for operation tracker. + * @param routerOperation The {@link RouterOperation} which {@link AdaptiveOperationTracker} is associated with. * @param partitionId The partition on which the operation is performed. - * @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} - * otherwise. * @param originatingDcName name of the cross colo DC whose replicas should be tried first. - * @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas. - * @param replicasRequired The number of replicas required for the operation. - * @param successTarget The number of successful responses required to succeed the operation. - * @param parallelism The maximum number of inflight requests at any point of time. - * @param time the {@link Time} instance to use. * @param localColoTracker the {@link Histogram} that tracks intra datacenter latencies for this class of requests. * @param crossColoTracker the {@link Histogram} that tracks inter datacenter latencies for this class of requests. * @param pastDueCounter the {@link Counter} that tracks the number of times a request is past due. - * @param quantile the quantile cutoff to use for when evaluating requests against the trackers. + * @param time the {@link Time} instance to use. */ - AdaptiveOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled, - String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget, - int parallelism, Time time, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter, - double quantile) { - super(datacenterName, partitionId, crossColoEnabled, originatingDcName, includeNonOriginatingDcReplicas, - replicasRequired, successTarget, parallelism, true); - this.datacenterName = datacenterName; + AdaptiveOperationTracker(RouterConfig routerConfig, RouterOperation routerOperation, PartitionId partitionId, + String originatingDcName, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter, + NonBlockingRouterMetrics routerMetrics, Time time) { + super(routerConfig, routerOperation, partitionId, originatingDcName, true); this.time = time; this.localColoTracker = localColoTracker; this.crossColoTracker = crossColoTracker; this.pastDueCounter = pastDueCounter; - this.quantile = quantile; + this.quantile = routerConfig.routerLatencyToleranceQuantile; this.otIterator = new OpTrackerIterator(); + Class operationClass = null; + switch (routerOperation) { + case GetBlobOperation: + operationClass = GetBlobOperation.class; + break; + case GetBlobInfoOperation: + operationClass = GetBlobInfoOperation.class; + break; + default: + throw new IllegalArgumentException(routerOperation + " is not supported in AdaptiveOperationTracker"); + } + routerMetrics.registerCustomPercentiles(operationClass, "LocalColoLatencyMs", localColoTracker, + routerConfig.routerOperationTrackerCustomPercentiles); + if (crossColoTracker != null) { + routerMetrics.registerCustomPercentiles(operationClass, "CrossColoLatencyMs", crossColoTracker, + routerConfig.routerOperationTrackerCustomPercentiles); + } } @Override diff --git a/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java b/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java index 762859133e..c7eb17fc58 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java @@ -99,9 +99,8 @@ class DeleteOperation { byte blobDcId = blobId.getDatacenterId(); String originatingDcName = clusterMap.getDatacenterName(blobDcId); this.operationTracker = - new SimpleOperationTracker(routerConfig.routerDatacenterName, blobId.getPartition(), true, originatingDcName, - true, Integer.MAX_VALUE, routerConfig.routerDeleteSuccessTarget, - routerConfig.routerDeleteRequestParallelism, false); + new SimpleOperationTracker(routerConfig, RouterOperation.DeleteOperation, blobId.getPartition(), + originatingDcName, false); } /** diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java index 49c0bc96e5..9856f66c2b 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java @@ -91,7 +91,8 @@ class GetBlobInfoOperation extends GetOperation { routerMetrics.getBlobInfoLocalColoLatencyMs, routerMetrics.getBlobInfoCrossColoLatencyMs, routerMetrics.getBlobInfoPastDueCount, kms, cryptoService, cryptoJobHandler, time, isEncrypted); this.routerCallback = routerCallback; - operationTracker = getOperationTracker(blobId.getPartition(), blobId.getDatacenterId()); + operationTracker = + getOperationTracker(blobId.getPartition(), blobId.getDatacenterId(), RouterOperation.GetBlobInfoOperation); progressTracker = new ProgressTracker(operationTracker); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 4296258bd4..8932c7d907 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -557,7 +557,8 @@ void reset() { void initialize(int index, BlobId id) { chunkIndex = index; chunkBlobId = id; - chunkOperationTracker = getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId()); + chunkOperationTracker = getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId(), + RouterOperation.GetBlobOperation); progressTracker = new ProgressTracker(chunkOperationTracker); state = ChunkState.Ready; } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java index 040598d1b5..7ad71163ec 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java @@ -66,7 +66,7 @@ abstract class GetOperation { /** * Construct a GetOperation - * @param routerConfig the {@link RouterConfig} containing the configs for put operations. + * @param routerConfig the {@link RouterConfig} containing the configs for get operations. * @param routerMetrics The {@link NonBlockingRouterMetrics} to be used for reporting metrics. * @param clusterMap the {@link ClusterMap} of the cluster * @param responseHandler the {@link ResponseHandler} responsible for failure detection. @@ -237,23 +237,21 @@ protected GetRequest createGetRequest(BlobId blobId, MessageFormatFlags flag, Ge /** * Gets an {@link OperationTracker} based on the config and {@code partitionId}. * @param partitionId the {@link PartitionId} for which a tracker is required. + * @param routerOperation The type of router operation used by tracker. * @return an {@link OperationTracker} based on the config and {@code partitionId}. */ - protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId) { + protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId, + RouterOperation routerOperation) { OperationTracker operationTracker; String trackerType = routerConfig.routerGetOperationTrackerType; String originatingDcName = clusterMap.getDatacenterName(datacenterId); if (trackerType.equals(SimpleOperationTracker.class.getSimpleName())) { - operationTracker = new SimpleOperationTracker(routerConfig.routerDatacenterName, partitionId, - routerConfig.routerGetCrossDcEnabled, originatingDcName, - routerConfig.routerGetIncludeNonOriginatingDcReplicas, routerConfig.routerGetReplicasRequired, - routerConfig.routerGetSuccessTarget, routerConfig.routerGetRequestParallelism); + operationTracker = + new SimpleOperationTracker(routerConfig, routerOperation, partitionId, originatingDcName, true); } else if (trackerType.equals(AdaptiveOperationTracker.class.getSimpleName())) { - operationTracker = new AdaptiveOperationTracker(routerConfig.routerDatacenterName, partitionId, - routerConfig.routerGetCrossDcEnabled, originatingDcName, - routerConfig.routerGetIncludeNonOriginatingDcReplicas, routerConfig.routerGetReplicasRequired, - routerConfig.routerGetSuccessTarget, routerConfig.routerGetRequestParallelism, time, localColoTracker, - crossColoTracker, pastDueCounter, routerConfig.routerLatencyToleranceQuantile); + operationTracker = + new AdaptiveOperationTracker(routerConfig, routerOperation, partitionId, originatingDcName, localColoTracker, + crossColoTracker, pastDueCounter, routerMetrics, time); } else { throw new IllegalArgumentException("Unrecognized tracker type: " + trackerType); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java index 7406256444..64438f79c2 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java @@ -22,6 +22,7 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.utils.SystemTime; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -407,6 +408,21 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) { decryptJobMetrics = new CryptoJobMetrics(GetOperation.class, "Decrypt", metricRegistry); } + /** + * Register {@link Gauge} metric for each custom percentile of given {@link Histogram} with given class and name. + * @param ownerClass the {@link Class} that is supposed to own the metrics. + * @param name the name of metric that all percentiles belong to. + * @param histogram the {@link Histogram} to use. + * @param percentiles a list of interested percentiles (double value). + */ + public void registerCustomPercentiles(Class ownerClass, String name, Histogram histogram, List percentiles) { + percentiles.forEach(p -> { + Gauge customPercentile = () -> histogram.getSnapshot().getValue(p); + metricRegistry.register(MetricRegistry.name(ownerClass, name, String.valueOf(p * 100), "thPercentile"), + customPercentile); + }); + } + /** * Initializes a {@link Gauge} metric for the status of {@code RequestResponseHandlerThread} of an * {@link com.github.ambry.router.NonBlockingRouter.OperationController}, to indicate if it is running @@ -415,12 +431,7 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) { * to be monitored. */ public void initializeOperationControllerMetrics(final Thread requestResponseHandlerThread) { - requestResponseHandlerThreadRunning = new Gauge() { - @Override - public Long getValue() { - return requestResponseHandlerThread.isAlive() ? 1L : 0L; - } - }; + requestResponseHandlerThreadRunning = () -> requestResponseHandlerThread.isAlive() ? 1L : 0L; metricRegistry.register( MetricRegistry.name(NonBlockingRouter.class, requestResponseHandlerThread.getName() + "Running"), requestResponseHandlerThreadRunning); @@ -432,12 +443,7 @@ public Long getValue() { * @param chunkFillerThread The {@code ChunkFillerThread} of which the status is to be monitored. */ public void initializePutManagerMetrics(final Thread chunkFillerThread) { - chunkFillerThreadRunning = new Gauge() { - @Override - public Long getValue() { - return chunkFillerThread.isAlive() ? 1L : 0L; - } - }; + chunkFillerThreadRunning = () -> chunkFillerThread.isAlive() ? 1L : 0L; metricRegistry.register(MetricRegistry.name(PutManager.class, chunkFillerThread.getName() + "Running"), chunkFillerThreadRunning); } @@ -446,22 +452,22 @@ public Long getValue() { * Initializes a {@link Gauge} metric to monitor the number of running * {@link com.github.ambry.router.NonBlockingRouter.OperationController} of a {@link NonBlockingRouter}. * @param currentOperationsCount The counter of {@link com.github.ambry.router.NonBlockingRouter.OperationController}. + * @param currentBackgroundOperationsCount The counter of background operations submitted to the router that are not + * yet completed. */ public void initializeNumActiveOperationsMetrics(final AtomicInteger currentOperationsCount, final AtomicInteger currentBackgroundOperationsCount) { - metricRegistry.register(MetricRegistry.name(NonBlockingRouter.class, "NumActiveOperations"), new Gauge() { - @Override - public Integer getValue() { - return currentOperationsCount.get(); - } - }); + metricRegistry.register(MetricRegistry.name(NonBlockingRouter.class, "NumActiveOperations"), + (Gauge) currentOperationsCount::get); metricRegistry.register(MetricRegistry.name(NonBlockingRouter.class, "NumActiveBackgroundOperations"), - new Gauge() { - @Override - public Integer getValue() { - return currentBackgroundOperationsCount.get(); - } - }); + (Gauge) currentBackgroundOperationsCount::get); + } + + /** + * @return the MetricRegistry being used in {@link NonBlockingRouterMetrics} + */ + MetricRegistry getMetricRegistry() { + return metricRegistry; } /** diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java b/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java index 1bb2c46919..e613379e48 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java @@ -1097,8 +1097,8 @@ private void prepareForSending() { passedInBlobProperties.getCreationTimeInMs(), passedInBlobProperties.getAccountId(), passedInBlobProperties.getContainerId(), passedInBlobProperties.isEncrypted(), passedInBlobProperties.getExternalAssetTag()); - operationTracker = new SimpleOperationTracker(routerConfig.routerDatacenterName, partitionId, false, null, true, - Integer.MAX_VALUE, routerConfig.routerPutSuccessTarget, routerConfig.routerPutRequestParallelism); + operationTracker = + new SimpleOperationTracker(routerConfig, RouterOperation.PutOperation, partitionId, null, true); correlationIdToChunkPutRequestInfo.clear(); state = ChunkState.Ready; } catch (RouterException e) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java b/ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java new file mode 100644 index 0000000000..10f49a3f1d --- /dev/null +++ b/ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java @@ -0,0 +1,18 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.router; + +public enum RouterOperation { + GetBlobOperation, GetBlobInfoOperation, PutOperation, DeleteOperation, TtlUpdateOperation +} diff --git a/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java index a5f92e18d4..9bbc94a5b9 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java @@ -15,6 +15,7 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -57,9 +58,10 @@ * */ class SimpleOperationTracker implements OperationTracker { + protected final String datacenterName; protected final int successTarget; protected final int parallelism; - protected final LinkedList replicaPool = new LinkedList(); + protected final LinkedList replicaPool = new LinkedList<>(); protected int totalReplicaCount = 0; protected int inflightCount = 0; @@ -71,26 +73,49 @@ class SimpleOperationTracker implements OperationTracker { /** * Constructor for an {@code SimpleOperationTracker}. - * - * @param datacenterName The datacenter where the router is located. + * @param routerConfig The {@link RouterConfig} containing the configs for operation tracker. + * @param routerOperation The {@link RouterOperation} which {@link SimpleOperationTracker} is associated with. * @param partitionId The partition on which the operation is performed. - * @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} - * otherwise. * @param originatingDcName The original DC where blob was put. - * @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas. - * @param replicasRequired The number of replicas required for the operation. - * @param successTarget The number of successful responses required to succeed the operation. - * @param parallelism The maximum number of inflight requests at any point of time. * @param shuffleReplicas Indicates if the replicas need to be shuffled. */ - SimpleOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled, - String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget, - int parallelism, boolean shuffleReplicas) { + SimpleOperationTracker(RouterConfig routerConfig, RouterOperation routerOperation, PartitionId partitionId, + String originatingDcName, boolean shuffleReplicas) { + // populate tracker parameters based on operation type + boolean crossColoEnabled = false; + boolean includeNonOriginatingDcReplicas = true; + int numOfReplicasRequired = Integer.MAX_VALUE; + switch (routerOperation) { + case GetBlobOperation: + case GetBlobInfoOperation: + successTarget = routerConfig.routerGetSuccessTarget; + parallelism = routerConfig.routerGetRequestParallelism; + crossColoEnabled = routerConfig.routerGetCrossDcEnabled; + includeNonOriginatingDcReplicas = routerConfig.routerGetIncludeNonOriginatingDcReplicas; + numOfReplicasRequired = routerConfig.routerGetReplicasRequired; + break; + case PutOperation: + successTarget = routerConfig.routerPutSuccessTarget; + parallelism = routerConfig.routerPutRequestParallelism; + break; + case DeleteOperation: + successTarget = routerConfig.routerDeleteSuccessTarget; + parallelism = routerConfig.routerDeleteRequestParallelism; + crossColoEnabled = true; + break; + case TtlUpdateOperation: + successTarget = routerConfig.routerTtlUpdateSuccessTarget; + parallelism = routerConfig.routerTtlUpdateRequestParallelism; + crossColoEnabled = true; + break; + default: + throw new IllegalArgumentException("Unsupported operation: " + routerOperation); + } if (parallelism < 1) { throw new IllegalArgumentException("Parallelism has to be > 0. Configured to be " + parallelism); } - this.successTarget = successTarget; - this.parallelism = parallelism; + datacenterName = routerConfig.routerDatacenterName; + // Order the replicas so that local healthy replicas are ordered and returned first, // then the remote healthy ones, and finally the possibly down ones. List replicas = partitionId.getReplicaIds(); @@ -132,10 +157,10 @@ class SimpleOperationTracker implements OperationTracker { // Please note replicasRequired is 6 because total number of local and originating replicas is always <= 6. // This may no longer be true with partition classes and flexible replication. // Don't do this if originatingDcName is unknown. - while (replicaPool.size() < replicasRequired && backupReplicas.size() > 0) { + while (replicaPool.size() < numOfReplicasRequired && backupReplicas.size() > 0) { replicaPool.add(backupReplicas.pollFirst()); } - while (replicaPool.size() < replicasRequired && downReplicas.size() > 0) { + while (replicaPool.size() < numOfReplicasRequired && downReplicas.size() > 0) { replicaPool.add(downReplicas.pollFirst()); } } @@ -149,25 +174,6 @@ class SimpleOperationTracker implements OperationTracker { this.otIterator = new OpTrackerIterator(); } - /** - * Constructor for an {@code SimpleOperationTracker}, which shuffles replicas. - * - * @param datacenterName The datacenter where the router is located. - * @param partitionId The partition on which the operation is performed. - * @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} otherwise. - * @param originatingDcName The original DC where blob was put. null if DC unknown. - * @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas. - * @param replicasRequired The number of replicas required for the operation. - * @param successTarget The number of successful responses required to succeed the operation. - * @param parallelism The maximum number of inflight requests at any point of time. - */ - SimpleOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled, - String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget, - int parallelism) { - this(datacenterName, partitionId, crossColoEnabled, originatingDcName, includeNonOriginatingDcReplicas, - replicasRequired, successTarget, parallelism, true); - } - @Override public boolean hasSucceeded() { return succeededCount >= successTarget; @@ -220,7 +226,14 @@ private boolean hasFailed() { } /** - * Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(String, PartitionId, boolean, String, boolean, int, int, int, boolean)}. + * @return the success target number of this operation tracker. + */ + public int getSuccessTarget() { + return successTarget; + } + + /** + * Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(RouterConfig, RouterOperation, PartitionId, String, boolean)}. * * @param partitionId The partition on which the operation is performed. * @param examinedReplicas All replicas examined. diff --git a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java index 7c8e3fcc35..1b3fa1144b 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java @@ -90,9 +90,8 @@ class TtlUpdateOperation { byte blobDcId = blobId.getDatacenterId(); String originatingDcName = clusterMap.getDatacenterName(blobDcId); this.operationTracker = - new SimpleOperationTracker(routerConfig.routerDatacenterName, blobId.getPartition(), true, originatingDcName, - true, Integer.MAX_VALUE, routerConfig.routerTtlUpdateSuccessTarget, - routerConfig.routerTtlUpdateRequestParallelism, false); + new SimpleOperationTracker(routerConfig, RouterOperation.TtlUpdateOperation, blobId.getPartition(), + originatingDcName, false); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java index ea74775b40..fea0c27598 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java @@ -14,12 +14,18 @@ package com.github.ambry.router; import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.MockReplicaId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; +import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.utils.MockTime; @@ -34,7 +40,10 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import java.util.Set; +import java.util.SortedMap; +import java.util.stream.Collectors; import org.junit.Test; import static org.junit.Assert.*; @@ -68,21 +77,22 @@ public class AdaptiveOperationTrackerTest { private final Histogram localColoTracker = registry.histogram("LocalColoTracker"); private final Histogram crossColoTracker = registry.histogram("CrossColoTracker"); private final Counter pastDueCounter = registry.counter("PastDueCounter"); + private NonBlockingRouterMetrics routerMetrics; /** * Constructor that sets up state. */ - public AdaptiveOperationTrackerTest() { + public AdaptiveOperationTrackerTest() throws Exception { List portList = Collections.singletonList(new Port(PORT, PortType.PLAINTEXT)); List mountPaths = Collections.singletonList("mockMountPath"); - datanodes = new ArrayList<>(Arrays.asList( - new MockDataNodeId[]{new MockDataNodeId(portList, mountPaths, "dc-0"), new MockDataNodeId(portList, mountPaths, - "dc-1")})); + datanodes = new ArrayList<>(Arrays.asList(new MockDataNodeId(portList, mountPaths, "dc-0"), + new MockDataNodeId(portList, mountPaths, "dc-1"))); localDcName = datanodes.get(0).getDatacenterName(); mockPartition = new MockPartitionId(); for (int i = 0; i < REPLICA_COUNT; i++) { mockPartition.replicaIds.add(new MockReplicaId(PORT, mockPartition, datanodes.get(i % datanodes.size()), 0)); } + routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap()); } /** @@ -96,7 +106,7 @@ public void adaptationTest() throws InterruptedException { double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); double crossColoCutoff = crossColoTracker.getSnapshot().getValue(QUANTILE); - OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, 2); + OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, 2, null); // 3-0-0-0; 3-0-0-0 sendRequests(ot, 2); // 1-2-0-0; 3-0-0-0 @@ -167,7 +177,7 @@ public void noUnexpiredRequestsTest() throws InterruptedException { primeTracker(crossColoTracker, AdaptiveOperationTracker.MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); - OperationTracker ot = getOperationTracker(false, 1, 1); + OperationTracker ot = getOperationTracker(false, 1, 1, null); // 3-0-0-0 sendRequests(ot, 1); // 2-1-0-0 @@ -199,9 +209,7 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException primeTracker(crossColoTracker, AdaptiveOperationTracker.MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); double localColoCutoff = localColoTracker.getSnapshot().getValue(1); - OperationTracker ot = - new AdaptiveOperationTracker(localDcName, mockPartition, false, null, true, Integer.MAX_VALUE, 1, 1, time, - localColoTracker, null, pastDueCounter, 1); + OperationTracker ot = getOperationTracker(false, 1, 1, null); // 3-0-0-0 sendRequests(ot, 1); // 2-1-0-0 @@ -229,6 +237,49 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException assertEquals("Past due counter is inconsistent", 1, pastDueCounter.getCount()); } + /** + * Test that adaptive operation track can correctly register custom percentiles. An example of metric name is: + * "com.github.ambry.router.GetOperation.LocalColoLatencyMs.91.0.thPercentile" + * @throws Exception + */ + @Test + public void customPercentilesMetricsRegistryTest() throws Exception { + // test that if custom percentile is not set, no corresponding metrics would be generated. + getOperationTracker(true, 1, 1, null); + MetricRegistry metricRegistry = routerMetrics.getMetricRegistry(); + MetricFilter filter = new MetricFilter() { + @Override + public boolean matches(String name, Metric metric) { + return name.endsWith("thPercentile"); + } + }; + SortedMap gauges = metricRegistry.getGauges(filter); + assertTrue("No gauges should be created because custom percentile is not set", gauges.isEmpty()); + // test that dedicated gauges are correctly created for custom percentiles. + String customPercentiles = "0.91,0.97"; + String[] percentileArray = customPercentiles.split(","); + Arrays.sort(percentileArray); + List sortedPercentiles = + Arrays.stream(percentileArray).map(p -> String.valueOf(Double.valueOf(p) * 100)).collect(Collectors.toList()); + getOperationTracker(false, 1, 1, customPercentiles); + gauges = metricRegistry.getGauges(filter); + // Note that crossColoEnabled is false, the number of gauges should equal to number of given percentiles + assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size(), gauges.size()); + Iterator mapItor = gauges.keySet().iterator(); + Iterator listItor = sortedPercentiles.iterator(); + while (listItor.hasNext()) { + String gaugeName = (String) mapItor.next(); + String percentileStr = listItor.next(); + assertTrue("The gauge name doesn't match", gaugeName.endsWith(percentileStr + ".thPercentile")); + } + // reset router metrics to clean up registered metrics + routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap()); + metricRegistry = routerMetrics.getMetricRegistry(); + getOperationTracker(true, 1, 1, customPercentiles); + gauges = metricRegistry.getGauges(filter); + assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size() * 2, gauges.size()); + } + // helpers // general @@ -238,12 +289,27 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException * @param crossColoEnabled {@code true} if cross colo needs to be enabled. {@code false} otherwise. * @param successTarget the number of successful responses required for the operation to succeed. * @param parallelism the number of parallel requests that can be in flight. + * @param customPercentiles the custom percentiles to be reported. Percentiles are specified in a comma-separated + * string, i.e "0.94,0.96,0.97". * @return an instance of {@link AdaptiveOperationTracker} with the given parameters. */ - private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism) { - return new AdaptiveOperationTracker(localDcName, mockPartition, crossColoEnabled, null, true, Integer.MAX_VALUE, - successTarget, parallelism, time, localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, - QUANTILE); + private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism, + String customPercentiles) { + Properties props = new Properties(); + props.setProperty("router.hostname", "localhost"); + props.setProperty("router.datacenter.name", localDcName); + props.setProperty("router.get.cross.dc.enabled", Boolean.toString(crossColoEnabled)); + props.setProperty("router.get.success.target", Integer.toString(successTarget)); + props.setProperty("router.get.request.parallelism", Integer.toString(parallelism)); + props.setProperty("router.get.include.non.originating.dc.replicas", "true"); + props.setProperty("router.get.replicas.required", Integer.toString(Integer.MAX_VALUE)); + props.setProperty("router.latency.tolerance.quantile", Double.toString(QUANTILE)); + if (customPercentiles != null) { + props.setProperty("router.operation.tracker.custom.percentiles", customPercentiles); + } + RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); + return new AdaptiveOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, null, + localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time); } /** @@ -289,7 +355,7 @@ private void sendRequests(OperationTracker operationTracker, int numRequestsExpe */ private void doTrackerUpdateTest(boolean succeedRequests) throws InterruptedException { long timeIncrement = 10; - OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, REPLICA_COUNT); + OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, REPLICA_COUNT, null); // 3-0-0-0; 3-0-0-0 sendRequests(ot, REPLICA_COUNT); // 0-3-0-0; 0-3-0-0 diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java index ed366ead64..6a79c63126 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java @@ -120,8 +120,8 @@ public void registerRequestToSend(GetOperation getOperation, RequestInfo request */ @Parameterized.Parameters public static List data() { - return Arrays.asList( - new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, {SimpleOperationTracker.class.getSimpleName(), true}, {AdaptiveOperationTracker.class.getSimpleName(), false}}); + return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, + {SimpleOperationTracker.class.getSimpleName(), true}, {AdaptiveOperationTracker.class.getSimpleName(), false}}); } /** @@ -212,7 +212,6 @@ public void testInstantiation() { GetBlobInfoOperation op = new GetBlobInfoOperation(routerConfig, routerMetrics, mockClusterMap, responseHandler, blobId, options, getOperationCallback, routerCallback, kms, cryptoService, cryptoJobHandler, time, false); - Assert.assertEquals("Callback must match", getOperationCallback, op.getCallback()); Assert.assertEquals("Blob ids must match", blobId.getID(), op.getBlobIdStr()); diff --git a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java index 88ba95c34d..c7ea1d0f85 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java @@ -16,10 +16,13 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.MockReplicaId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; +import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.utils.MockTime; @@ -27,10 +30,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,6 +76,7 @@ public class OperationTrackerTest { private final Set repetitionTracker = new HashSet<>(); // for AdaptiveOperationTracker + private final NonBlockingRouterMetrics routerMetrics; private final Time time = new MockTime(); private final MetricRegistry registry = new MetricRegistry(); private final Histogram localColoTracker = registry.histogram("LocalColoTracker"); @@ -88,8 +95,13 @@ public static List data() { /** * @param operationTrackerType the type of {@link OperationTracker} that needs to be used in tests */ - public OperationTrackerTest(String operationTrackerType) { + public OperationTrackerTest(String operationTrackerType) throws Exception { this.operationTrackerType = operationTrackerType; + if (operationTrackerType.equals(ADAPTIVE_OP_TRACKER)) { + routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap()); + } else { + routerMetrics = null; + } } /** @@ -154,7 +166,6 @@ public void localFailTest() { /** * crossColoEnabled = true, successTarget = 1, parallelism = 2. - *

* 1. Get 2 local replicas to send request (and send requests); * 2. 1 fails, 1 pending. @@ -185,7 +196,6 @@ public void localSucceedWithDifferentParameterTest() { /** * crossColoEnabled = true, successTarget = 1, parallelism = 2. - *

* 1. Get 2 local replicas to send request (and send requests); * 2. 1 local replica fails, 1 pending. @@ -458,6 +468,53 @@ public void incorrectParallelismTest() { } } + /** + * Test that operation tracker can correctly populate parameters(i.e. successTarget) based on input {@link RouterOperation}. + */ + @Test + public void operationClassTest() { + Properties props = new Properties(); + props.setProperty("router.hostname", "localhost"); + props.setProperty("router.datacenter.name", "dc-0"); + props.setProperty("router.get.success.target", "1"); + props.setProperty("router.put.success.target", "2"); + props.setProperty("router.delete.success.target", "3"); + props.setProperty("router.ttl.update.success.target", "4"); + RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); + initialize(); + Map operationAndSuccessTarget = new HashMap<>(); + operationAndSuccessTarget.put(RouterOperation.GetBlobOperation, 1); + operationAndSuccessTarget.put(RouterOperation.GetBlobInfoOperation, 1); + operationAndSuccessTarget.put(RouterOperation.PutOperation, 2); + operationAndSuccessTarget.put(RouterOperation.DeleteOperation, 3); + operationAndSuccessTarget.put(RouterOperation.TtlUpdateOperation, 4); + for (Map.Entry entry : operationAndSuccessTarget.entrySet()) { + SimpleOperationTracker operationTracker = null; + switch (operationTrackerType) { + case SIMPLE_OP_TRACKER: + operationTracker = + new SimpleOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, true); + break; + case ADAPTIVE_OP_TRACKER: + try { + operationTracker = + new AdaptiveOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, + localColoTracker, crossColoTracker, pastDueCounter, routerMetrics, time); + } catch (IllegalArgumentException e) { + assertTrue("Get operation shouldn't throw any exception in adaptive tracker", + entry.getKey() != RouterOperation.GetBlobOperation + && entry.getKey() != RouterOperation.GetBlobInfoOperation); + } + break; + } + // ensure the success target matches the number specified for each type of operaiton + if (operationTracker != null) { + assertEquals("The suggest target doesn't match", (long) entry.getValue(), + (operationTracker).getSuccessTarget()); + } + } + } + /** * Initialize 4 DCs, each DC has 1 data node, which has 3 replicas. */ @@ -465,10 +522,9 @@ private void initialize() { int replicaCount = 12; List portList = Collections.singletonList(new Port(PORT, PortType.PLAINTEXT)); List mountPaths = Collections.singletonList("mockMountPath"); - datanodes = new ArrayList<>(Arrays.asList( - new MockDataNodeId[]{new MockDataNodeId(portList, mountPaths, "dc-0"), new MockDataNodeId(portList, mountPaths, - "dc-1"), new MockDataNodeId(portList, mountPaths, "dc-2"), new MockDataNodeId(portList, mountPaths, - "dc-3")})); + datanodes = new ArrayList<>(Arrays.asList(new MockDataNodeId(portList, mountPaths, "dc-0"), + new MockDataNodeId(portList, mountPaths, "dc-1"), new MockDataNodeId(portList, mountPaths, "dc-2"), + new MockDataNodeId(portList, mountPaths, "dc-3"))); mockPartition = new MockPartitionId(); populateReplicaList(replicaCount); localDcName = datanodes.get(0).getDatacenterName(); @@ -495,16 +551,28 @@ private void populateReplicaList(int replicaCount) { */ private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism, boolean includeNonOriginatingDcReplicas, int replicasRequired) { + Properties props = new Properties(); + props.setProperty("router.hostname", "localhost"); + props.setProperty("router.datacenter.name", localDcName); + props.setProperty("router.get.cross.dc.enabled", Boolean.toString(crossColoEnabled)); + props.setProperty("router.get.success.target", Integer.toString(successTarget)); + props.setProperty("router.get.request.parallelism", Integer.toString(parallelism)); + props.setProperty("router.get.include.non.originating.dc.replicas", + Boolean.toString(includeNonOriginatingDcReplicas)); + props.setProperty("router.get.replicas.required", Integer.toString(replicasRequired)); + props.setProperty("router.latency.tolerance.quantile", Double.toString(QUANTILE)); + RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); OperationTracker tracker; switch (operationTrackerType) { case SIMPLE_OP_TRACKER: - tracker = new SimpleOperationTracker(localDcName, mockPartition, crossColoEnabled, originatingDcName, - includeNonOriginatingDcReplicas, replicasRequired, successTarget, parallelism); + tracker = + new SimpleOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, originatingDcName, + true); break; case ADAPTIVE_OP_TRACKER: - tracker = new AdaptiveOperationTracker(localDcName, mockPartition, crossColoEnabled, originatingDcName, - includeNonOriginatingDcReplicas, replicasRequired, successTarget, parallelism, time, localColoTracker, - crossColoEnabled ? crossColoTracker : null, pastDueCounter, QUANTILE); + tracker = new AdaptiveOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, + originatingDcName, localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, + routerMetrics, time); break; default: throw new IllegalArgumentException("Unrecognized operation tracker type - " + operationTrackerType);