From f14cea7422b9ae8d130bade7422ec7cb21f16821 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Thu, 25 Apr 2019 13:00:43 -0700 Subject: [PATCH 1/3] Refactor operation tracker ctor and introduce custom percentiles 1. refactor ctor of both simple and adaptive operation tracker by passing the router config and operation class. This allows operation tracker itself to populate its parameters based on type of operation class. 2. introduce custom percentiles of latency Histogram in adaptive operation tracker. The percentiles are configurable and on-demand. User can define arbitrary quantile like 91th percentile to better eveluate the latency distribution. --- .../com.github.ambry/config/RouterConfig.java | 19 ++++ .../AdaptiveOperationTracker.java | 32 +++---- .../DeleteOperation.java | 5 +- .../GetBlobInfoOperation.java | 2 +- .../GetBlobOperation.java | 3 +- .../com.github.ambry.router/GetOperation.java | 18 ++-- .../NonBlockingRouterMetrics.java | 37 +++++--- .../com.github.ambry.router/PutOperation.java | 3 +- .../SimpleOperationTracker.java | 74 ++++++++------- .../TtlUpdateOperation.java | 5 +- .../AdaptiveOperationTrackerTest.java | 94 ++++++++++++++++--- .../GetBlobInfoOperationTest.java | 5 +- .../OperationTrackerTest.java | 93 +++++++++++++++--- 13 files changed, 277 insertions(+), 113 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java index 7f32402b72..7d1fd01571 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java @@ -13,6 +13,12 @@ */ package com.github.ambry.config; +import com.github.ambry.utils.Utils; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + + /** * Configuration parameters required by a {@link com.github.ambry.router.Router}. *

@@ -232,6 +238,15 @@ public class RouterConfig { @Default("false") public final boolean routerUseGetBlobOperationForBlobInfo; + /** + * The custom percentiles of Histogram in operation tracker to be reported. This allows router to emit metrics of + * arbitrary percentiles (i.e. 97th, 93th etc). An example of this config is "0.91,0.93,0.97"(comma separated), each + * value should fall in {@code [0..1]}. + */ + @Config("router.operation.tracker.custom.percentiles") + @Default("") + public final List routerOperationTrackerCustomPercentiles; + /** * Create a RouterConfig instance. * @param verifiableProperties the properties map to refer to. @@ -289,5 +304,9 @@ public RouterConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getIntInRange("router.ttl.update.success.target", 2, 1, Integer.MAX_VALUE); routerUseGetBlobOperationForBlobInfo = verifiableProperties.getBoolean("router.use.get.blob.operation.for.blob.info", false); + List customPercentiles = + Utils.splitString(verifiableProperties.getString("router.operation.tracker.custom.percentiles", ""), ","); + routerOperationTrackerCustomPercentiles = + Collections.unmodifiableList(customPercentiles.stream().map(Double::valueOf).collect(Collectors.toList())); } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java index 5830e7ae56..6ca6312dc7 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java @@ -17,6 +17,7 @@ import com.codahale.metrics.Histogram; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Time; import java.util.HashMap; @@ -39,7 +40,6 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { static final long MIN_DATA_POINTS_REQUIRED = 1000; private final Time time; - private final String datacenterName; private final double quantile; private final Histogram localColoTracker; private final Histogram crossColoTracker; @@ -55,34 +55,30 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { /** * Constructs an {@link AdaptiveOperationTracker} - * @param datacenterName The datacenter where the router is located. + * @param operationClass The operation class in which operation tracker is used. * @param partitionId The partition on which the operation is performed. - * @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} - * otherwise. * @param originatingDcName name of the cross colo DC whose replicas should be tried first. - * @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas. - * @param replicasRequired The number of replicas required for the operation. - * @param successTarget The number of successful responses required to succeed the operation. - * @param parallelism The maximum number of inflight requests at any point of time. - * @param time the {@link Time} instance to use. * @param localColoTracker the {@link Histogram} that tracks intra datacenter latencies for this class of requests. * @param crossColoTracker the {@link Histogram} that tracks inter datacenter latencies for this class of requests. * @param pastDueCounter the {@link Counter} that tracks the number of times a request is past due. - * @param quantile the quantile cutoff to use for when evaluating requests against the trackers. + * @param time the {@link Time} instance to use. */ - AdaptiveOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled, - String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget, - int parallelism, Time time, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter, - double quantile) { - super(datacenterName, partitionId, crossColoEnabled, originatingDcName, includeNonOriginatingDcReplicas, - replicasRequired, successTarget, parallelism, true); - this.datacenterName = datacenterName; + AdaptiveOperationTracker(RouterConfig routerConfig, Class operationClass, PartitionId partitionId, + String originatingDcName, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter, + NonBlockingRouterMetrics routerMetrics, Time time) { + super(routerConfig, operationClass, partitionId, originatingDcName, true); this.time = time; this.localColoTracker = localColoTracker; this.crossColoTracker = crossColoTracker; this.pastDueCounter = pastDueCounter; - this.quantile = quantile; + this.quantile = routerConfig.routerLatencyToleranceQuantile; this.otIterator = new OpTrackerIterator(); + routerMetrics.registerCustomPercentiles(operationClass, "LocalColoLatencyMs", localColoTracker, + routerConfig.routerOperationTrackerCustomPercentiles); + if (crossColoTracker != null) { + routerMetrics.registerCustomPercentiles(operationClass, "CrossColoLatencyMs", crossColoTracker, + routerConfig.routerOperationTrackerCustomPercentiles); + } } @Override diff --git a/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java b/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java index 762859133e..2a76439478 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java @@ -99,9 +99,8 @@ class DeleteOperation { byte blobDcId = blobId.getDatacenterId(); String originatingDcName = clusterMap.getDatacenterName(blobDcId); this.operationTracker = - new SimpleOperationTracker(routerConfig.routerDatacenterName, blobId.getPartition(), true, originatingDcName, - true, Integer.MAX_VALUE, routerConfig.routerDeleteSuccessTarget, - routerConfig.routerDeleteRequestParallelism, false); + new SimpleOperationTracker(routerConfig, DeleteOperation.class, blobId.getPartition(), originatingDcName, + false); } /** diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java index 49c0bc96e5..e7661a6c44 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java @@ -91,7 +91,7 @@ class GetBlobInfoOperation extends GetOperation { routerMetrics.getBlobInfoLocalColoLatencyMs, routerMetrics.getBlobInfoCrossColoLatencyMs, routerMetrics.getBlobInfoPastDueCount, kms, cryptoService, cryptoJobHandler, time, isEncrypted); this.routerCallback = routerCallback; - operationTracker = getOperationTracker(blobId.getPartition(), blobId.getDatacenterId()); + operationTracker = getOperationTracker(blobId.getPartition(), blobId.getDatacenterId(), GetBlobInfoOperation.class); progressTracker = new ProgressTracker(operationTracker); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 4296258bd4..01bcf8514a 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -557,7 +557,8 @@ void reset() { void initialize(int index, BlobId id) { chunkIndex = index; chunkBlobId = id; - chunkOperationTracker = getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId()); + chunkOperationTracker = + getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId(), GetBlobOperation.class); progressTracker = new ProgressTracker(chunkOperationTracker); state = ChunkState.Ready; } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java index 040598d1b5..e8333e6afd 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java @@ -66,7 +66,7 @@ abstract class GetOperation { /** * Construct a GetOperation - * @param routerConfig the {@link RouterConfig} containing the configs for put operations. + * @param routerConfig the {@link RouterConfig} containing the configs for get operations. * @param routerMetrics The {@link NonBlockingRouterMetrics} to be used for reporting metrics. * @param clusterMap the {@link ClusterMap} of the cluster * @param responseHandler the {@link ResponseHandler} responsible for failure detection. @@ -237,23 +237,19 @@ protected GetRequest createGetRequest(BlobId blobId, MessageFormatFlags flag, Ge /** * Gets an {@link OperationTracker} based on the config and {@code partitionId}. * @param partitionId the {@link PartitionId} for which a tracker is required. + * @param operationClass * @return an {@link OperationTracker} based on the config and {@code partitionId}. */ - protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId) { + protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId, Class operationClass) { OperationTracker operationTracker; String trackerType = routerConfig.routerGetOperationTrackerType; String originatingDcName = clusterMap.getDatacenterName(datacenterId); if (trackerType.equals(SimpleOperationTracker.class.getSimpleName())) { - operationTracker = new SimpleOperationTracker(routerConfig.routerDatacenterName, partitionId, - routerConfig.routerGetCrossDcEnabled, originatingDcName, - routerConfig.routerGetIncludeNonOriginatingDcReplicas, routerConfig.routerGetReplicasRequired, - routerConfig.routerGetSuccessTarget, routerConfig.routerGetRequestParallelism); + operationTracker = new SimpleOperationTracker(routerConfig, operationClass, partitionId, originatingDcName, true); } else if (trackerType.equals(AdaptiveOperationTracker.class.getSimpleName())) { - operationTracker = new AdaptiveOperationTracker(routerConfig.routerDatacenterName, partitionId, - routerConfig.routerGetCrossDcEnabled, originatingDcName, - routerConfig.routerGetIncludeNonOriginatingDcReplicas, routerConfig.routerGetReplicasRequired, - routerConfig.routerGetSuccessTarget, routerConfig.routerGetRequestParallelism, time, localColoTracker, - crossColoTracker, pastDueCounter, routerConfig.routerLatencyToleranceQuantile); + operationTracker = + new AdaptiveOperationTracker(routerConfig, operationClass, partitionId, originatingDcName, localColoTracker, + crossColoTracker, pastDueCounter, routerMetrics, time); } else { throw new IllegalArgumentException("Unrecognized tracker type: " + trackerType); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java index 7406256444..3a1c705954 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java @@ -22,6 +22,7 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.utils.SystemTime; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -407,6 +408,21 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) { decryptJobMetrics = new CryptoJobMetrics(GetOperation.class, "Decrypt", metricRegistry); } + /** + * Register {@link Gauge} metric for each custom percentile of given {@link Histogram} with given class and name. + * @param ownerClass the {@link Class} that is supposed to own the metrics. + * @param name the name of metric that all percentiles belong to. + * @param histogram the {@link Histogram} to use. + * @param percentiles a list of interested percentiles (double value). + */ + public void registerCustomPercentiles(Class ownerClass, String name, Histogram histogram, List percentiles) { + percentiles.forEach(p -> { + Gauge customPercentile = () -> histogram.getSnapshot().getValue(p); + metricRegistry.register(MetricRegistry.name(ownerClass, name, String.valueOf(p * 100), "thPercentile"), + customPercentile); + }); + } + /** * Initializes a {@link Gauge} metric for the status of {@code RequestResponseHandlerThread} of an * {@link com.github.ambry.router.NonBlockingRouter.OperationController}, to indicate if it is running @@ -415,12 +431,7 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) { * to be monitored. */ public void initializeOperationControllerMetrics(final Thread requestResponseHandlerThread) { - requestResponseHandlerThreadRunning = new Gauge() { - @Override - public Long getValue() { - return requestResponseHandlerThread.isAlive() ? 1L : 0L; - } - }; + requestResponseHandlerThreadRunning = () -> requestResponseHandlerThread.isAlive() ? 1L : 0L; metricRegistry.register( MetricRegistry.name(NonBlockingRouter.class, requestResponseHandlerThread.getName() + "Running"), requestResponseHandlerThreadRunning); @@ -432,12 +443,7 @@ public Long getValue() { * @param chunkFillerThread The {@code ChunkFillerThread} of which the status is to be monitored. */ public void initializePutManagerMetrics(final Thread chunkFillerThread) { - chunkFillerThreadRunning = new Gauge() { - @Override - public Long getValue() { - return chunkFillerThread.isAlive() ? 1L : 0L; - } - }; + chunkFillerThreadRunning = () -> chunkFillerThread.isAlive() ? 1L : 0L; metricRegistry.register(MetricRegistry.name(PutManager.class, chunkFillerThread.getName() + "Running"), chunkFillerThreadRunning); } @@ -464,6 +470,13 @@ public Integer getValue() { }); } + /** + * @return the MetricRegistry being used in {@link NonBlockingRouterMetrics} + */ + MetricRegistry getMetricRegistry() { + return metricRegistry; + } + /** * Increment error metrics based on error type. * @param exception The exception associated with this error. diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java b/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java index 1bb2c46919..8dc1fad9f9 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java @@ -1097,8 +1097,7 @@ private void prepareForSending() { passedInBlobProperties.getCreationTimeInMs(), passedInBlobProperties.getAccountId(), passedInBlobProperties.getContainerId(), passedInBlobProperties.isEncrypted(), passedInBlobProperties.getExternalAssetTag()); - operationTracker = new SimpleOperationTracker(routerConfig.routerDatacenterName, partitionId, false, null, true, - Integer.MAX_VALUE, routerConfig.routerPutSuccessTarget, routerConfig.routerPutRequestParallelism); + operationTracker = new SimpleOperationTracker(routerConfig, PutOperation.class, partitionId, null, true); correlationIdToChunkPutRequestInfo.clear(); state = ChunkState.Ready; } catch (RouterException e) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java index a5f92e18d4..56d9afbbca 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java @@ -15,6 +15,7 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -57,9 +58,10 @@ * */ class SimpleOperationTracker implements OperationTracker { + protected final String datacenterName; protected final int successTarget; protected final int parallelism; - protected final LinkedList replicaPool = new LinkedList(); + protected final LinkedList replicaPool = new LinkedList<>(); protected int totalReplicaCount = 0; protected int inflightCount = 0; @@ -72,25 +74,43 @@ class SimpleOperationTracker implements OperationTracker { /** * Constructor for an {@code SimpleOperationTracker}. * - * @param datacenterName The datacenter where the router is located. + * @param routerConfig The {@link RouterConfig} containing the configs for operation tracker. + * @param operationClass The operation class in which operation tracker is used. * @param partitionId The partition on which the operation is performed. - * @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} - * otherwise. * @param originatingDcName The original DC where blob was put. - * @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas. - * @param replicasRequired The number of replicas required for the operation. - * @param successTarget The number of successful responses required to succeed the operation. - * @param parallelism The maximum number of inflight requests at any point of time. * @param shuffleReplicas Indicates if the replicas need to be shuffled. */ - SimpleOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled, - String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget, - int parallelism, boolean shuffleReplicas) { + SimpleOperationTracker(RouterConfig routerConfig, Class operationClass, PartitionId partitionId, + String originatingDcName, boolean shuffleReplicas) { + // populate tracker parameters based on operation type + boolean crossColoEnabled = false; + boolean includeNonOriginatingDcReplicas = true; + int replicasRequired = Integer.MAX_VALUE; + if (GetOperation.class.isAssignableFrom(operationClass)) { + successTarget = routerConfig.routerGetSuccessTarget; + parallelism = routerConfig.routerGetRequestParallelism; + crossColoEnabled = routerConfig.routerGetCrossDcEnabled; + includeNonOriginatingDcReplicas = routerConfig.routerGetIncludeNonOriginatingDcReplicas; + replicasRequired = routerConfig.routerGetReplicasRequired; + } else if (operationClass == PutOperation.class) { + successTarget = routerConfig.routerPutSuccessTarget; + parallelism = routerConfig.routerPutRequestParallelism; + } else if (operationClass == DeleteOperation.class) { + successTarget = routerConfig.routerDeleteSuccessTarget; + parallelism = routerConfig.routerDeleteRequestParallelism; + crossColoEnabled = true; + } else if (operationClass == TtlUpdateOperation.class) { + successTarget = routerConfig.routerTtlUpdateSuccessTarget; + parallelism = routerConfig.routerTtlUpdateRequestParallelism; + crossColoEnabled = true; + } else { + throw new IllegalArgumentException("Unrecognized operation: " + operationClass.getSimpleName()); + } if (parallelism < 1) { throw new IllegalArgumentException("Parallelism has to be > 0. Configured to be " + parallelism); } - this.successTarget = successTarget; - this.parallelism = parallelism; + datacenterName = routerConfig.routerDatacenterName; + // Order the replicas so that local healthy replicas are ordered and returned first, // then the remote healthy ones, and finally the possibly down ones. List replicas = partitionId.getReplicaIds(); @@ -149,25 +169,6 @@ class SimpleOperationTracker implements OperationTracker { this.otIterator = new OpTrackerIterator(); } - /** - * Constructor for an {@code SimpleOperationTracker}, which shuffles replicas. - * - * @param datacenterName The datacenter where the router is located. - * @param partitionId The partition on which the operation is performed. - * @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} otherwise. - * @param originatingDcName The original DC where blob was put. null if DC unknown. - * @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas. - * @param replicasRequired The number of replicas required for the operation. - * @param successTarget The number of successful responses required to succeed the operation. - * @param parallelism The maximum number of inflight requests at any point of time. - */ - SimpleOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled, - String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget, - int parallelism) { - this(datacenterName, partitionId, crossColoEnabled, originatingDcName, includeNonOriginatingDcReplicas, - replicasRequired, successTarget, parallelism, true); - } - @Override public boolean hasSucceeded() { return succeededCount >= successTarget; @@ -220,7 +221,14 @@ private boolean hasFailed() { } /** - * Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(String, PartitionId, boolean, String, boolean, int, int, int, boolean)}. + * @return the success target number of this operation tracker. + */ + public int getSuccessTarget() { + return successTarget; + } + + /** + * Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(RouterConfig, Class, PartitionId, String, boolean)}. * * @param partitionId The partition on which the operation is performed. * @param examinedReplicas All replicas examined. diff --git a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java index 7c8e3fcc35..f6a29ac33e 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java @@ -90,9 +90,8 @@ class TtlUpdateOperation { byte blobDcId = blobId.getDatacenterId(); String originatingDcName = clusterMap.getDatacenterName(blobDcId); this.operationTracker = - new SimpleOperationTracker(routerConfig.routerDatacenterName, blobId.getPartition(), true, originatingDcName, - true, Integer.MAX_VALUE, routerConfig.routerTtlUpdateSuccessTarget, - routerConfig.routerTtlUpdateRequestParallelism, false); + new SimpleOperationTracker(routerConfig, TtlUpdateOperation.class, blobId.getPartition(), originatingDcName, + false); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java index ea74775b40..1a73f273be 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java @@ -14,12 +14,18 @@ package com.github.ambry.router; import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.MockReplicaId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; +import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.utils.MockTime; @@ -34,7 +40,10 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import java.util.Set; +import java.util.SortedMap; +import java.util.stream.Collectors; import org.junit.Test; import static org.junit.Assert.*; @@ -68,21 +77,22 @@ public class AdaptiveOperationTrackerTest { private final Histogram localColoTracker = registry.histogram("LocalColoTracker"); private final Histogram crossColoTracker = registry.histogram("CrossColoTracker"); private final Counter pastDueCounter = registry.counter("PastDueCounter"); + private NonBlockingRouterMetrics routerMetrics; /** * Constructor that sets up state. */ - public AdaptiveOperationTrackerTest() { + public AdaptiveOperationTrackerTest() throws Exception { List portList = Collections.singletonList(new Port(PORT, PortType.PLAINTEXT)); List mountPaths = Collections.singletonList("mockMountPath"); - datanodes = new ArrayList<>(Arrays.asList( - new MockDataNodeId[]{new MockDataNodeId(portList, mountPaths, "dc-0"), new MockDataNodeId(portList, mountPaths, - "dc-1")})); + datanodes = new ArrayList<>(Arrays.asList(new MockDataNodeId(portList, mountPaths, "dc-0"), + new MockDataNodeId(portList, mountPaths, "dc-1"))); localDcName = datanodes.get(0).getDatacenterName(); mockPartition = new MockPartitionId(); for (int i = 0; i < REPLICA_COUNT; i++) { mockPartition.replicaIds.add(new MockReplicaId(PORT, mockPartition, datanodes.get(i % datanodes.size()), 0)); } + routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap()); } /** @@ -96,7 +106,7 @@ public void adaptationTest() throws InterruptedException { double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); double crossColoCutoff = crossColoTracker.getSnapshot().getValue(QUANTILE); - OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, 2); + OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, 2, null); // 3-0-0-0; 3-0-0-0 sendRequests(ot, 2); // 1-2-0-0; 3-0-0-0 @@ -167,7 +177,7 @@ public void noUnexpiredRequestsTest() throws InterruptedException { primeTracker(crossColoTracker, AdaptiveOperationTracker.MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); - OperationTracker ot = getOperationTracker(false, 1, 1); + OperationTracker ot = getOperationTracker(false, 1, 1, null); // 3-0-0-0 sendRequests(ot, 1); // 2-1-0-0 @@ -199,9 +209,7 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException primeTracker(crossColoTracker, AdaptiveOperationTracker.MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); double localColoCutoff = localColoTracker.getSnapshot().getValue(1); - OperationTracker ot = - new AdaptiveOperationTracker(localDcName, mockPartition, false, null, true, Integer.MAX_VALUE, 1, 1, time, - localColoTracker, null, pastDueCounter, 1); + OperationTracker ot = getOperationTracker(false, 1, 1, null); // 3-0-0-0 sendRequests(ot, 1); // 2-1-0-0 @@ -229,6 +237,49 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException assertEquals("Past due counter is inconsistent", 1, pastDueCounter.getCount()); } + /** + * Test that adaptive operation track can correctly register custom percentiles. An example of metric name is: + * "com.github.ambry.router.GetOperation.LocalColoLatencyMs.91.0.thPercentile" + * @throws Exception + */ + @Test + public void customPercentilesMetricsRegistryTest() throws Exception { + // test that if custom percentile is not set, no corresponding metrics would be generated. + getOperationTracker(true, 1, 1, null); + MetricRegistry metricRegistry = routerMetrics.getMetricRegistry(); + MetricFilter filter = new MetricFilter() { + @Override + public boolean matches(String name, Metric metric) { + return name.endsWith("thPercentile"); + } + }; + SortedMap gauges = metricRegistry.getGauges(filter); + assertTrue("No gauges should be created because custom percentile is not set", gauges.isEmpty()); + // test that dedicated gauges are correctly created for custom percentiles. + String customPercentiles = "0.91,0.97"; + String[] percentileArray = customPercentiles.split(","); + Arrays.sort(percentileArray); + List sortedPercentiles = + Arrays.stream(percentileArray).map(p -> String.valueOf(Double.valueOf(p) * 100)).collect(Collectors.toList()); + getOperationTracker(false, 1, 1, customPercentiles); + gauges = metricRegistry.getGauges(filter); + // Note that crossColoEnabled is false, the number of gauges should equal to number of given percentiles + assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size(), gauges.size()); + Iterator mapItor = gauges.keySet().iterator(); + Iterator listItor = sortedPercentiles.iterator(); + while (listItor.hasNext()) { + String gaugeName = (String) mapItor.next(); + String percentileStr = listItor.next(); + assertTrue("The gauge name doesn't match", gaugeName.endsWith(percentileStr + ".thPercentile")); + } + // reset router metrics to clean up registered metrics + routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap()); + metricRegistry = routerMetrics.getMetricRegistry(); + getOperationTracker(true, 1, 1, customPercentiles); + gauges = metricRegistry.getGauges(filter); + assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size() * 2, gauges.size()); + } + // helpers // general @@ -238,12 +289,27 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException * @param crossColoEnabled {@code true} if cross colo needs to be enabled. {@code false} otherwise. * @param successTarget the number of successful responses required for the operation to succeed. * @param parallelism the number of parallel requests that can be in flight. + * @param customPercentiles the custom percentiles to be reported. Percentiles are specified in a comma-separated + * string, i.e "0.94,0.96,0.97". * @return an instance of {@link AdaptiveOperationTracker} with the given parameters. */ - private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism) { - return new AdaptiveOperationTracker(localDcName, mockPartition, crossColoEnabled, null, true, Integer.MAX_VALUE, - successTarget, parallelism, time, localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, - QUANTILE); + private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism, + String customPercentiles) { + Properties props = new Properties(); + props.setProperty("router.hostname", "localhost"); + props.setProperty("router.datacenter.name", localDcName); + props.setProperty("router.get.cross.dc.enabled", Boolean.toString(crossColoEnabled)); + props.setProperty("router.get.success.target", Integer.toString(successTarget)); + props.setProperty("router.get.request.parallelism", Integer.toString(parallelism)); + props.setProperty("router.get.include.non.originating.dc.replicas", "true"); + props.setProperty("router.get.replicas.required", Integer.toString(Integer.MAX_VALUE)); + props.setProperty("router.latency.tolerance.quantile", Double.toString(QUANTILE)); + if (customPercentiles != null) { + props.setProperty("router.operation.tracker.custom.percentiles", customPercentiles); + } + RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); + return new AdaptiveOperationTracker(routerConfig, GetOperation.class, mockPartition, null, localColoTracker, + crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time); } /** @@ -289,7 +355,7 @@ private void sendRequests(OperationTracker operationTracker, int numRequestsExpe */ private void doTrackerUpdateTest(boolean succeedRequests) throws InterruptedException { long timeIncrement = 10; - OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, REPLICA_COUNT); + OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, REPLICA_COUNT, null); // 3-0-0-0; 3-0-0-0 sendRequests(ot, REPLICA_COUNT); // 0-3-0-0; 0-3-0-0 diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java index ed366ead64..6a79c63126 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java @@ -120,8 +120,8 @@ public void registerRequestToSend(GetOperation getOperation, RequestInfo request */ @Parameterized.Parameters public static List data() { - return Arrays.asList( - new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, {SimpleOperationTracker.class.getSimpleName(), true}, {AdaptiveOperationTracker.class.getSimpleName(), false}}); + return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, + {SimpleOperationTracker.class.getSimpleName(), true}, {AdaptiveOperationTracker.class.getSimpleName(), false}}); } /** @@ -212,7 +212,6 @@ public void testInstantiation() { GetBlobInfoOperation op = new GetBlobInfoOperation(routerConfig, routerMetrics, mockClusterMap, responseHandler, blobId, options, getOperationCallback, routerCallback, kms, cryptoService, cryptoJobHandler, time, false); - Assert.assertEquals("Callback must match", getOperationCallback, op.getCallback()); Assert.assertEquals("Blob ids must match", blobId.getID(), op.getBlobIdStr()); diff --git a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java index 88ba95c34d..533620f1af 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java @@ -16,10 +16,13 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.MockReplicaId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.config.RouterConfig; +import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.utils.MockTime; @@ -27,10 +30,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,6 +76,7 @@ public class OperationTrackerTest { private final Set repetitionTracker = new HashSet<>(); // for AdaptiveOperationTracker + private final NonBlockingRouterMetrics routerMetrics; private final Time time = new MockTime(); private final MetricRegistry registry = new MetricRegistry(); private final Histogram localColoTracker = registry.histogram("LocalColoTracker"); @@ -88,8 +95,13 @@ public static List data() { /** * @param operationTrackerType the type of {@link OperationTracker} that needs to be used in tests */ - public OperationTrackerTest(String operationTrackerType) { + public OperationTrackerTest(String operationTrackerType) throws Exception { this.operationTrackerType = operationTrackerType; + if (operationTrackerType.equals(ADAPTIVE_OP_TRACKER)) { + routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap()); + } else { + routerMetrics = null; + } } /** @@ -154,7 +166,6 @@ public void localFailTest() { /** * crossColoEnabled = true, successTarget = 1, parallelism = 2. - *

* 1. Get 2 local replicas to send request (and send requests); * 2. 1 fails, 1 pending. @@ -185,7 +196,6 @@ public void localSucceedWithDifferentParameterTest() { /** * crossColoEnabled = true, successTarget = 1, parallelism = 2. - *

* 1. Get 2 local replicas to send request (and send requests); * 2. 1 local replica fails, 1 pending. @@ -458,6 +468,57 @@ public void incorrectParallelismTest() { } } + /** + * Test that operation tracker correctly populate parameters(i.e. successTarget) according to passed in operation class. + */ + @Test + public void operationClassTest() { + Properties props = new Properties(); + props.setProperty("router.hostname", "localhost"); + props.setProperty("router.datacenter.name", "dc-0"); + props.setProperty("router.get.success.target", "1"); + props.setProperty("router.put.success.target", "2"); + props.setProperty("router.delete.success.target", "3"); + props.setProperty("router.ttl.update.success.target", "4"); + RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); + initialize(); + try { + switch (operationTrackerType) { + case SIMPLE_OP_TRACKER: + new SimpleOperationTracker(routerConfig, getClass(), mockPartition, originatingDcName, true); + break; + case ADAPTIVE_OP_TRACKER: + new AdaptiveOperationTracker(routerConfig, getClass(), mockPartition, originatingDcName, localColoTracker, + crossColoTracker, pastDueCounter, routerMetrics, time); + break; + } + fail("Instantiation should fail because of unrecognized operation class"); + } catch (Exception e) { + //expected + } + Map operationAndSuccessTarget = new HashMap<>(); + operationAndSuccessTarget.put(GetOperation.class, 1); + operationAndSuccessTarget.put(PutOperation.class, 2); + operationAndSuccessTarget.put(DeleteOperation.class, 3); + operationAndSuccessTarget.put(TtlUpdateOperation.class, 4); + for (Map.Entry entry : operationAndSuccessTarget.entrySet()) { + SimpleOperationTracker operationTracker = null; + switch (operationTrackerType) { + case SIMPLE_OP_TRACKER: + operationTracker = + new SimpleOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, true); + break; + case ADAPTIVE_OP_TRACKER: + operationTracker = + new AdaptiveOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, + localColoTracker, crossColoTracker, pastDueCounter, routerMetrics, time); + break; + } + // ensure the success target matches the number specified for each type of operaiton + assertEquals("The suggest target doesn't match", (long) entry.getValue(), (operationTracker).getSuccessTarget()); + } + } + /** * Initialize 4 DCs, each DC has 1 data node, which has 3 replicas. */ @@ -465,10 +526,9 @@ private void initialize() { int replicaCount = 12; List portList = Collections.singletonList(new Port(PORT, PortType.PLAINTEXT)); List mountPaths = Collections.singletonList("mockMountPath"); - datanodes = new ArrayList<>(Arrays.asList( - new MockDataNodeId[]{new MockDataNodeId(portList, mountPaths, "dc-0"), new MockDataNodeId(portList, mountPaths, - "dc-1"), new MockDataNodeId(portList, mountPaths, "dc-2"), new MockDataNodeId(portList, mountPaths, - "dc-3")})); + datanodes = new ArrayList<>(Arrays.asList(new MockDataNodeId(portList, mountPaths, "dc-0"), + new MockDataNodeId(portList, mountPaths, "dc-1"), new MockDataNodeId(portList, mountPaths, "dc-2"), + new MockDataNodeId(portList, mountPaths, "dc-3"))); mockPartition = new MockPartitionId(); populateReplicaList(replicaCount); localDcName = datanodes.get(0).getDatacenterName(); @@ -495,16 +555,25 @@ private void populateReplicaList(int replicaCount) { */ private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism, boolean includeNonOriginatingDcReplicas, int replicasRequired) { + Properties props = new Properties(); + props.setProperty("router.hostname", "localhost"); + props.setProperty("router.datacenter.name", localDcName); + props.setProperty("router.get.cross.dc.enabled", Boolean.toString(crossColoEnabled)); + props.setProperty("router.get.success.target", Integer.toString(successTarget)); + props.setProperty("router.get.request.parallelism", Integer.toString(parallelism)); + props.setProperty("router.get.include.non.originating.dc.replicas", + Boolean.toString(includeNonOriginatingDcReplicas)); + props.setProperty("router.get.replicas.required", Integer.toString(replicasRequired)); + props.setProperty("router.latency.tolerance.quantile", Double.toString(QUANTILE)); + RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); OperationTracker tracker; switch (operationTrackerType) { case SIMPLE_OP_TRACKER: - tracker = new SimpleOperationTracker(localDcName, mockPartition, crossColoEnabled, originatingDcName, - includeNonOriginatingDcReplicas, replicasRequired, successTarget, parallelism); + tracker = new SimpleOperationTracker(routerConfig, GetOperation.class, mockPartition, originatingDcName, true); break; case ADAPTIVE_OP_TRACKER: - tracker = new AdaptiveOperationTracker(localDcName, mockPartition, crossColoEnabled, originatingDcName, - includeNonOriginatingDcReplicas, replicasRequired, successTarget, parallelism, time, localColoTracker, - crossColoEnabled ? crossColoTracker : null, pastDueCounter, QUANTILE); + tracker = new AdaptiveOperationTracker(routerConfig, GetOperation.class, mockPartition, originatingDcName, + localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time); break; default: throw new IllegalArgumentException("Unrecognized operation tracker type - " + operationTrackerType); From 2dc44b9d03b0caa2c4e4a76fb8aca1a8ad745909 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Fri, 26 Apr 2019 14:21:21 -0700 Subject: [PATCH 2/3] address Ze's comment and clean the code --- .../ClusterMapMetrics.java | 140 +++--------------- .../BlockingChannelConnectionPool.java | 59 ++------ .../NetworkMetrics.java | 60 +++----- .../ReplicationMetrics.java | 14 +- .../AsyncRequestResponseHandlerFactory.java | 28 +--- .../com.github.ambry.rest/RestServer.java | 7 +- .../com.github.ambry.router/GetOperation.java | 2 +- .../NonBlockingRouterMetrics.java | 17 +-- 8 files changed, 67 insertions(+), 260 deletions(-) diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java index 177ece80bd..9f2a9c4237 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapMetrics.java @@ -67,79 +67,24 @@ public ClusterMapMetrics(HardwareLayout hardwareLayout, PartitionLayout partitio // Metrics based on HardwareLayout - this.hardwareLayoutVersion = new Gauge() { - @Override - public Long getValue() { - return getHardwareLayoutVersion(); - } - }; - this.partitionLayoutVersion = new Gauge() { - @Override - public Long getValue() { - return getPartitionLayoutVersion(); - } - }; + this.hardwareLayoutVersion = this::getHardwareLayoutVersion; + this.partitionLayoutVersion = this::getPartitionLayoutVersion; registry.register(MetricRegistry.name(ClusterMap.class, "hardwareLayoutVersion"), hardwareLayoutVersion); registry.register(MetricRegistry.name(ClusterMap.class, "partitionLayoutVersion"), partitionLayoutVersion); - this.datacenterCount = new Gauge() { - @Override - public Long getValue() { - return countDatacenters(); - } - }; - this.dataNodeCount = new Gauge() { - @Override - public Long getValue() { - return countDataNodes(); - } - }; - this.diskCount = new Gauge() { - @Override - public Long getValue() { - return countDisks(); - } - }; + this.datacenterCount = this::countDatacenters; + this.dataNodeCount = this::countDataNodes; + this.diskCount = this::countDisks; registry.register(MetricRegistry.name(ClusterMap.class, "datacenterCount"), datacenterCount); registry.register(MetricRegistry.name(ClusterMap.class, "dataNodeCount"), dataNodeCount); registry.register(MetricRegistry.name(ClusterMap.class, "diskCount"), diskCount); - this.dataNodesHardUpCount = new Gauge() { - @Override - public Long getValue() { - return countDataNodesInHardState(HardwareState.AVAILABLE); - } - }; - this.dataNodesHardDownCount = new Gauge() { - @Override - public Long getValue() { - return countDataNodesInHardState(HardwareState.UNAVAILABLE); - } - }; - this.dataNodesUnavailableCount = new Gauge() { - @Override - public Long getValue() { - return countUnavailableDataNodes(); - } - }; - this.disksHardUpCount = new Gauge() { - @Override - public Long getValue() { - return countDisksInHardState(HardwareState.AVAILABLE); - } - }; - this.disksHardDownCount = new Gauge() { - @Override - public Long getValue() { - return countDisksInHardState(HardwareState.UNAVAILABLE); - } - }; - this.disksUnavailableCount = new Gauge() { - @Override - public Long getValue() { - return countUnavailableDisks(); - } - }; + this.dataNodesHardUpCount = () -> countDataNodesInHardState(HardwareState.AVAILABLE); + this.dataNodesHardDownCount = () -> countDataNodesInHardState(HardwareState.UNAVAILABLE); + this.dataNodesUnavailableCount = this::countUnavailableDataNodes; + this.disksHardUpCount = () -> countDisksInHardState(HardwareState.AVAILABLE); + this.disksHardDownCount = () -> countDisksInHardState(HardwareState.UNAVAILABLE); + this.disksUnavailableCount = this::countUnavailableDisks; registry.register(MetricRegistry.name(ClusterMap.class, "dataNodesHardUpCount"), dataNodesHardUpCount); registry.register(MetricRegistry.name(ClusterMap.class, "dataNodesHardDownCount"), dataNodesHardDownCount); registry.register(MetricRegistry.name(ClusterMap.class, "dataNodesUnavailableCount"), dataNodesUnavailableCount); @@ -149,54 +94,19 @@ public Long getValue() { // Metrics based on PartitionLayout - this.partitionCount = new Gauge() { - @Override - public Long getValue() { - return countPartitions(); - } - }; - this.partitionsReadWrite = new Gauge() { - @Override - public Long getValue() { - return countPartitionsInState(PartitionState.READ_WRITE); - } - }; - this.partitionsReadOnly = new Gauge() { - @Override - public Long getValue() { - return countPartitionsInState(PartitionState.READ_ONLY); - } - }; + this.partitionCount = this::countPartitions; + this.partitionsReadWrite = () -> countPartitionsInState(PartitionState.READ_WRITE); + this.partitionsReadOnly = () -> countPartitionsInState(PartitionState.READ_ONLY); registry.register(MetricRegistry.name(ClusterMap.class, "numberOfPartitions"), partitionCount); registry.register(MetricRegistry.name(ClusterMap.class, "numberOfReadWritePartitions"), partitionsReadWrite); registry.register(MetricRegistry.name(ClusterMap.class, "numberOfReadOnlyPartitions"), partitionsReadOnly); - this.isMajorityReplicasDown = new Gauge() { - @Override - public Boolean getValue() { - return isMajorityOfReplicasDown(); - } - }; + this.isMajorityReplicasDown = this::isMajorityOfReplicasDown; registry.register(MetricRegistry.name(ClusterMap.class, "isMajorityReplicasDown"), isMajorityReplicasDown); - this.rawCapacityInBytes = new Gauge() { - @Override - public Long getValue() { - return getRawCapacity(); - } - }; - this.allocatedRawCapacityInBytes = new Gauge() { - @Override - public Long getValue() { - return getAllocatedRawCapacity(); - } - }; - this.allocatedUsableCapacityInBytes = new Gauge() { - @Override - public Long getValue() { - return getAllocatedUsableCapacity(); - } - }; + this.rawCapacityInBytes = this::getRawCapacity; + this.allocatedRawCapacityInBytes = this::getAllocatedRawCapacity; + this.allocatedUsableCapacityInBytes = this::getAllocatedUsableCapacity; registry.register(MetricRegistry.name(ClusterMap.class, "rawCapacityInBytes"), rawCapacityInBytes); registry.register(MetricRegistry.name(ClusterMap.class, "allocatedRawCapacityInBytes"), allocatedRawCapacityInBytes); @@ -218,12 +128,7 @@ public Long getValue() { private void addDataNodeToStateMetrics(final DataNode dataNode) { final String metricName = dataNode.getHostname() + "-" + dataNode.getPort() + "-ResourceState"; - Gauge dataNodeState = new Gauge() { - @Override - public Long getValue() { - return dataNode.getState() == HardwareState.AVAILABLE ? 1L : 0L; - } - }; + Gauge dataNodeState = () -> dataNode.getState() == HardwareState.AVAILABLE ? 1L : 0L; registry.register(MetricRegistry.name(ClusterMap.class, metricName), dataNodeState); dataNodeStateList.add(dataNodeState); } @@ -232,12 +137,7 @@ private void addDiskToStateMetrics(final Disk disk) { final String metricName = disk.getDataNode().getHostname() + "-" + disk.getDataNode().getPort() + "-" + disk.getMountPath() + "-ResourceState"; - Gauge diskState = new Gauge() { - @Override - public Long getValue() { - return disk.getState() == HardwareState.AVAILABLE ? 1L : 0L; - } - }; + Gauge diskState = () -> disk.getState() == HardwareState.AVAILABLE ? 1L : 0L; registry.register(MetricRegistry.name(ClusterMap.class, metricName), diskState); dataNodeStateList.add(diskState); } diff --git a/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java b/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java index 11065d7a9a..a3eeb24d85 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java +++ b/ambry-network/src/main/java/com.github.ambry.network/BlockingChannelConnectionPool.java @@ -73,32 +73,17 @@ public BlockingChannelInfo(ConnectionPoolConfig config, String host, Port port, this.sslSocketFactory = sslSocketFactory; this.sslConfig = sslConfig; - availableConnections = new Gauge() { - @Override - public Integer getValue() { - return blockingChannelAvailableConnections.size(); - } - }; + availableConnections = blockingChannelAvailableConnections::size; registry.register( MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-availableConnections"), availableConnections); - activeConnections = new Gauge() { - @Override - public Integer getValue() { - return blockingChannelActiveConnections.size(); - } - }; + activeConnections = blockingChannelActiveConnections::size; registry.register( MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-activeConnections"), activeConnections); - totalNumberOfConnections = new Gauge() { - @Override - public Integer getValue() { - return numberOfConnections.intValue(); - } - }; + totalNumberOfConnections = numberOfConnections::intValue; registry.register( MetricRegistry.name(BlockingChannelInfo.class, host + "-" + port.getPort() + "-totalNumberOfConnections"), totalNumberOfConnections); @@ -240,8 +225,7 @@ public void destroyBlockingChannel(BlockingChannel blockingChannel) { } /** - * Returns the number of connections with this BlockingChannelInfo - * @return + * @return the number of connections with this BlockingChannelInfo */ public int getNumberOfConnections() { return this.numberOfConnections.intValue(); @@ -310,40 +294,29 @@ public BlockingChannelConnectionPool(ConnectionPoolConfig config, SSLConfig sslC connectionDestroyTime = registry.timer(MetricRegistry.name(BlockingChannelConnectionPool.class, "connectionDestroyTime")); - totalNumberOfNodesConnectedTo = new Gauge() { - @Override - public Integer getValue() { - int noOfNodesConnectedTo = 0; - for (BlockingChannelInfo blockingChannelInfo : connections.values()) { - if (blockingChannelInfo.getNumberOfConnections() > 0) { - noOfNodesConnectedTo++; - } + totalNumberOfNodesConnectedTo = () -> { + int noOfNodesConnectedTo = 0; + for (BlockingChannelInfo blockingChannelInfo : connections.values()) { + if (blockingChannelInfo.getNumberOfConnections() > 0) { + noOfNodesConnectedTo++; } - return noOfNodesConnectedTo; } + return noOfNodesConnectedTo; }; registry.register(MetricRegistry.name(BlockingChannelConnectionPool.class, "totalNumberOfNodesConnectedTo"), totalNumberOfNodesConnectedTo); - totalNumberOfConnections = new Gauge() { - @Override - public Integer getValue() { - int noOfConnections = 0; - for (BlockingChannelInfo blockingChannelInfo : connections.values()) { - noOfConnections += blockingChannelInfo.getNumberOfConnections(); - } - return noOfConnections; + totalNumberOfConnections = () -> { + int noOfConnections = 0; + for (BlockingChannelInfo blockingChannelInfo : connections.values()) { + noOfConnections += blockingChannelInfo.getNumberOfConnections(); } + return noOfConnections; }; registry.register(MetricRegistry.name(BlockingChannelConnectionPool.class, "totalNumberOfConnections"), totalNumberOfConnections); requestsWaitingToCheckoutConnectionCount = new AtomicInteger(0); - requestsWaitingToCheckoutConnection = new Gauge() { - @Override - public Integer getValue() { - return requestsWaitingToCheckoutConnectionCount.get(); - } - }; + requestsWaitingToCheckoutConnection = requestsWaitingToCheckoutConnectionCount::get; registry.register(MetricRegistry.name(BlockingChannelConnectionPool.class, "requestsWaitingToCheckoutConnection"), requestsWaitingToCheckoutConnection); sslSocketFactoryClientInitializationCount = registry.counter( diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 639cf627a4..ce24127211 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -156,41 +156,32 @@ public NetworkMetrics(MetricRegistry registry) { selectorUnreadyConnectionsList = new ArrayList<>(); networkClientPendingRequestList = new ArrayList<>(); - final Gauge selectorActiveConnectionsCount = new Gauge() { - @Override - public Long getValue() { - long activeConnectionsCount = 0; - for (AtomicLong activeConnection : selectorActiveConnectionsList) { - activeConnectionsCount += activeConnection.get(); - } - return activeConnectionsCount; + final Gauge selectorActiveConnectionsCount = () -> { + long activeConnectionsCount = 0; + for (AtomicLong activeConnection : selectorActiveConnectionsList) { + activeConnectionsCount += activeConnection.get(); } + return activeConnectionsCount; }; registry.register(MetricRegistry.name(Selector.class, "SelectorActiveConnectionsCount"), selectorActiveConnectionsCount); - final Gauge selectorUnreadyConnectionsCount = new Gauge() { - @Override - public Long getValue() { - long unreadyConnectionCount = 0; - for (Set unreadyConnection : selectorUnreadyConnectionsList) { - unreadyConnectionCount += unreadyConnection.size(); - } - return unreadyConnectionCount; + final Gauge selectorUnreadyConnectionsCount = () -> { + long unreadyConnectionCount = 0; + for (Set unreadyConnection : selectorUnreadyConnectionsList) { + unreadyConnectionCount += unreadyConnection.size(); } + return unreadyConnectionCount; }; registry.register(MetricRegistry.name(Selector.class, "SelectorUnreadyConnectionsCount"), selectorUnreadyConnectionsCount); - final Gauge networkClientPendingRequestsCount = new Gauge() { - @Override - public Long getValue() { - long pendingRequestsCount = 0; - for (AtomicLong pendingRequest : networkClientPendingRequestList) { - pendingRequestsCount += pendingRequest.get(); - } - return pendingRequestsCount; + final Gauge networkClientPendingRequestsCount = () -> { + long pendingRequestsCount = 0; + for (AtomicLong pendingRequest : networkClientPendingRequestList) { + pendingRequestsCount += pendingRequest.get(); } + return pendingRequestsCount; }; registry.register(MetricRegistry.name(NetworkClient.class, "NetworkClientPendingConnectionsCount"), networkClientPendingRequestsCount); @@ -236,32 +227,17 @@ class ServerNetworkMetrics extends NetworkMetrics { public ServerNetworkMetrics(final SocketRequestResponseChannel channel, MetricRegistry registry, final List processorThreads) { super(registry); - requestQueueSize = new Gauge() { - @Override - public Integer getValue() { - return channel.getRequestQueueSize(); - } - }; + requestQueueSize = channel::getRequestQueueSize; registry.register(MetricRegistry.name(SocketRequestResponseChannel.class, "RequestQueueSize"), requestQueueSize); responseQueueSize = new ArrayList>(channel.getNumberOfProcessors()); for (int i = 0; i < channel.getNumberOfProcessors(); i++) { final int index = i; - responseQueueSize.add(i, new Gauge() { - @Override - public Integer getValue() { - return channel.getResponseQueueSize(index); - } - }); + responseQueueSize.add(i, () -> channel.getResponseQueueSize(index)); registry.register(MetricRegistry.name(SocketRequestResponseChannel.class, i + "-ResponseQueueSize"), responseQueueSize.get(i)); } - numberOfProcessorThreads = new Gauge() { - @Override - public Integer getValue() { - return getLiveThreads(processorThreads); - } - }; + numberOfProcessorThreads = () -> getLiveThreads(processorThreads); registry.register(MetricRegistry.name(SocketServer.class, "NumberOfProcessorThreads"), numberOfProcessorThreads); acceptConnectionErrorCount = diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java index bf6856e9c2..1e6a898058 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationMetrics.java @@ -351,12 +351,7 @@ public void populatePerColoMetrics(Set datacenters) { */ void trackLiveThreadsCount(final Map> replicaThreadPools, String localDatacenter) { for (final String datacenter : replicaThreadPools.keySet()) { - Gauge liveThreadsPerDatacenter = new Gauge() { - @Override - public Integer getValue() { - return getLiveThreads(replicaThreadPools.get(datacenter)); - } - }; + Gauge liveThreadsPerDatacenter = () -> getLiveThreads(replicaThreadPools.get(datacenter)); if (localDatacenter.equals(datacenter)) { registry.register(MetricRegistry.name(ReplicaThread.class, "NumberOfIntra-Colo-ReplicaThreads"), liveThreadsPerDatacenter); @@ -382,12 +377,7 @@ public void addRemoteReplicaToLagMetrics(final RemoteReplicaInfo remoteReplicaIn DataNodeId dataNodeId = replicaId.getDataNodeId(); final String metricName = dataNodeId.getHostname() + "-" + dataNodeId.getPort() + "-" + replicaId.getPartitionId() + "-replicaLagInBytes"; - Gauge replicaLag = new Gauge() { - @Override - public Long getValue() { - return remoteReplicaInfo.getRemoteLagFromLocalInBytes(); - } - }; + Gauge replicaLag = remoteReplicaInfo::getRemoteLagFromLocalInBytes; registry.register(MetricRegistry.name(ReplicationMetrics.class, metricName), replicaLag); replicaLagInBytes.add(replicaLag); } diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java b/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java index 3764013a9e..fd1493c34e 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/AsyncRequestResponseHandlerFactory.java @@ -251,12 +251,7 @@ public RequestResponseHandlerMetrics(MetricRegistry metricRegistry) { */ public void registerRequestWorker(final AsyncRequestWorker asyncRequestWorker) { int pos = asyncRequestWorkerIndex.getAndIncrement(); - Gauge gauge = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestWorker.getRequestQueueSize(); - } - }; + Gauge gauge = asyncRequestWorker::getRequestQueueSize; metricRegistry.register(MetricRegistry.name(AsyncRequestWorker.class, pos + "-RequestQueueSize"), gauge); } @@ -265,30 +260,15 @@ public Integer getValue() { * @param asyncRequestResponseHandler the {@link AsyncRequestResponseHandler} whose key metrics have to be tracked. */ public void trackAsyncRequestResponseHandler(final AsyncRequestResponseHandler asyncRequestResponseHandler) { - Gauge totalRequestQueueSize = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestResponseHandler.getRequestQueueSize(); - } - }; + Gauge totalRequestQueueSize = asyncRequestResponseHandler::getRequestQueueSize; metricRegistry.register(MetricRegistry.name(AsyncRequestResponseHandler.class, "TotalRequestQueueSize"), totalRequestQueueSize); - Gauge totalResponseSetSize = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestResponseHandler.getResponseSetSize(); - } - }; + Gauge totalResponseSetSize = asyncRequestResponseHandler::getResponseSetSize; metricRegistry.register(MetricRegistry.name(AsyncRequestResponseHandler.class, "TotalResponseSetSize"), totalResponseSetSize); - Gauge asyncHandlerWorkersAlive = new Gauge() { - @Override - public Integer getValue() { - return asyncRequestResponseHandler.getWorkersAlive(); - } - }; + Gauge asyncHandlerWorkersAlive = asyncRequestResponseHandler::getWorkersAlive; metricRegistry.register(MetricRegistry.name(AsyncRequestResponseHandler.class, "AsyncHandlerWorkersAlive"), asyncHandlerWorkersAlive); } diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java index 6f880b768c..c2382e71d0 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java @@ -136,12 +136,7 @@ public RestServerMetrics(MetricRegistry metricRegistry, final RestServerState re accountServiceCloseTimeInMs = metricRegistry.histogram(MetricRegistry.name(RestServer.class, "AccountServiceCloseTimeInMs")); - Gauge restServerStatus = new Gauge() { - @Override - public Integer getValue() { - return restServerState.isServiceUp() ? 1 : 0; - } - }; + Gauge restServerStatus = () -> restServerState.isServiceUp() ? 1 : 0; metricRegistry.register(MetricRegistry.name(RestServer.class, "RestServerState"), restServerStatus); } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java index e8333e6afd..974cca619e 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java @@ -237,7 +237,7 @@ protected GetRequest createGetRequest(BlobId blobId, MessageFormatFlags flag, Ge /** * Gets an {@link OperationTracker} based on the config and {@code partitionId}. * @param partitionId the {@link PartitionId} for which a tracker is required. - * @param operationClass + * @param operationClass The operation class in which operation tracker is used. * @return an {@link OperationTracker} based on the config and {@code partitionId}. */ protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId, Class operationClass) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java index 3a1c705954..64438f79c2 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java @@ -452,22 +452,15 @@ public void initializePutManagerMetrics(final Thread chunkFillerThread) { * Initializes a {@link Gauge} metric to monitor the number of running * {@link com.github.ambry.router.NonBlockingRouter.OperationController} of a {@link NonBlockingRouter}. * @param currentOperationsCount The counter of {@link com.github.ambry.router.NonBlockingRouter.OperationController}. + * @param currentBackgroundOperationsCount The counter of background operations submitted to the router that are not + * yet completed. */ public void initializeNumActiveOperationsMetrics(final AtomicInteger currentOperationsCount, final AtomicInteger currentBackgroundOperationsCount) { - metricRegistry.register(MetricRegistry.name(NonBlockingRouter.class, "NumActiveOperations"), new Gauge() { - @Override - public Integer getValue() { - return currentOperationsCount.get(); - } - }); + metricRegistry.register(MetricRegistry.name(NonBlockingRouter.class, "NumActiveOperations"), + (Gauge) currentOperationsCount::get); metricRegistry.register(MetricRegistry.name(NonBlockingRouter.class, "NumActiveBackgroundOperations"), - new Gauge() { - @Override - public Integer getValue() { - return currentBackgroundOperationsCount.get(); - } - }); + (Gauge) currentBackgroundOperationsCount::get); } /** From 6347c9dd5bf8dff713b64c5140110aa151937048 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Wed, 1 May 2019 22:34:40 -0700 Subject: [PATCH 3/3] introduce RouterOperation enum --- .../AdaptiveOperationTracker.java | 18 +++++- .../DeleteOperation.java | 4 +- .../GetBlobInfoOperation.java | 3 +- .../GetBlobOperation.java | 4 +- .../com.github.ambry.router/GetOperation.java | 10 ++-- .../com.github.ambry.router/PutOperation.java | 3 +- .../RouterOperation.java | 18 ++++++ .../SimpleOperationTracker.java | 57 ++++++++++--------- .../TtlUpdateOperation.java | 4 +- .../AdaptiveOperationTrackerTest.java | 4 +- .../OperationTrackerTest.java | 55 +++++++++--------- 11 files changed, 109 insertions(+), 71 deletions(-) create mode 100644 ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java diff --git a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java index 6ca6312dc7..39d6b58bf4 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java @@ -55,7 +55,8 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { /** * Constructs an {@link AdaptiveOperationTracker} - * @param operationClass The operation class in which operation tracker is used. + * @param routerConfig The {@link RouterConfig} containing the configs for operation tracker. + * @param routerOperation The {@link RouterOperation} which {@link AdaptiveOperationTracker} is associated with. * @param partitionId The partition on which the operation is performed. * @param originatingDcName name of the cross colo DC whose replicas should be tried first. * @param localColoTracker the {@link Histogram} that tracks intra datacenter latencies for this class of requests. @@ -63,16 +64,27 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { * @param pastDueCounter the {@link Counter} that tracks the number of times a request is past due. * @param time the {@link Time} instance to use. */ - AdaptiveOperationTracker(RouterConfig routerConfig, Class operationClass, PartitionId partitionId, + AdaptiveOperationTracker(RouterConfig routerConfig, RouterOperation routerOperation, PartitionId partitionId, String originatingDcName, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter, NonBlockingRouterMetrics routerMetrics, Time time) { - super(routerConfig, operationClass, partitionId, originatingDcName, true); + super(routerConfig, routerOperation, partitionId, originatingDcName, true); this.time = time; this.localColoTracker = localColoTracker; this.crossColoTracker = crossColoTracker; this.pastDueCounter = pastDueCounter; this.quantile = routerConfig.routerLatencyToleranceQuantile; this.otIterator = new OpTrackerIterator(); + Class operationClass = null; + switch (routerOperation) { + case GetBlobOperation: + operationClass = GetBlobOperation.class; + break; + case GetBlobInfoOperation: + operationClass = GetBlobInfoOperation.class; + break; + default: + throw new IllegalArgumentException(routerOperation + " is not supported in AdaptiveOperationTracker"); + } routerMetrics.registerCustomPercentiles(operationClass, "LocalColoLatencyMs", localColoTracker, routerConfig.routerOperationTrackerCustomPercentiles); if (crossColoTracker != null) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java b/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java index 2a76439478..c7eb17fc58 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DeleteOperation.java @@ -99,8 +99,8 @@ class DeleteOperation { byte blobDcId = blobId.getDatacenterId(); String originatingDcName = clusterMap.getDatacenterName(blobDcId); this.operationTracker = - new SimpleOperationTracker(routerConfig, DeleteOperation.class, blobId.getPartition(), originatingDcName, - false); + new SimpleOperationTracker(routerConfig, RouterOperation.DeleteOperation, blobId.getPartition(), + originatingDcName, false); } /** diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java index e7661a6c44..9856f66c2b 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobInfoOperation.java @@ -91,7 +91,8 @@ class GetBlobInfoOperation extends GetOperation { routerMetrics.getBlobInfoLocalColoLatencyMs, routerMetrics.getBlobInfoCrossColoLatencyMs, routerMetrics.getBlobInfoPastDueCount, kms, cryptoService, cryptoJobHandler, time, isEncrypted); this.routerCallback = routerCallback; - operationTracker = getOperationTracker(blobId.getPartition(), blobId.getDatacenterId(), GetBlobInfoOperation.class); + operationTracker = + getOperationTracker(blobId.getPartition(), blobId.getDatacenterId(), RouterOperation.GetBlobInfoOperation); progressTracker = new ProgressTracker(operationTracker); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 01bcf8514a..8932c7d907 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -557,8 +557,8 @@ void reset() { void initialize(int index, BlobId id) { chunkIndex = index; chunkBlobId = id; - chunkOperationTracker = - getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId(), GetBlobOperation.class); + chunkOperationTracker = getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId(), + RouterOperation.GetBlobOperation); progressTracker = new ProgressTracker(chunkOperationTracker); state = ChunkState.Ready; } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java index 974cca619e..7ad71163ec 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetOperation.java @@ -237,18 +237,20 @@ protected GetRequest createGetRequest(BlobId blobId, MessageFormatFlags flag, Ge /** * Gets an {@link OperationTracker} based on the config and {@code partitionId}. * @param partitionId the {@link PartitionId} for which a tracker is required. - * @param operationClass The operation class in which operation tracker is used. + * @param routerOperation The type of router operation used by tracker. * @return an {@link OperationTracker} based on the config and {@code partitionId}. */ - protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId, Class operationClass) { + protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId, + RouterOperation routerOperation) { OperationTracker operationTracker; String trackerType = routerConfig.routerGetOperationTrackerType; String originatingDcName = clusterMap.getDatacenterName(datacenterId); if (trackerType.equals(SimpleOperationTracker.class.getSimpleName())) { - operationTracker = new SimpleOperationTracker(routerConfig, operationClass, partitionId, originatingDcName, true); + operationTracker = + new SimpleOperationTracker(routerConfig, routerOperation, partitionId, originatingDcName, true); } else if (trackerType.equals(AdaptiveOperationTracker.class.getSimpleName())) { operationTracker = - new AdaptiveOperationTracker(routerConfig, operationClass, partitionId, originatingDcName, localColoTracker, + new AdaptiveOperationTracker(routerConfig, routerOperation, partitionId, originatingDcName, localColoTracker, crossColoTracker, pastDueCounter, routerMetrics, time); } else { throw new IllegalArgumentException("Unrecognized tracker type: " + trackerType); diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java b/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java index 8dc1fad9f9..e613379e48 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutOperation.java @@ -1097,7 +1097,8 @@ private void prepareForSending() { passedInBlobProperties.getCreationTimeInMs(), passedInBlobProperties.getAccountId(), passedInBlobProperties.getContainerId(), passedInBlobProperties.isEncrypted(), passedInBlobProperties.getExternalAssetTag()); - operationTracker = new SimpleOperationTracker(routerConfig, PutOperation.class, partitionId, null, true); + operationTracker = + new SimpleOperationTracker(routerConfig, RouterOperation.PutOperation, partitionId, null, true); correlationIdToChunkPutRequestInfo.clear(); state = ChunkState.Ready; } catch (RouterException e) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java b/ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java new file mode 100644 index 0000000000..10f49a3f1d --- /dev/null +++ b/ambry-router/src/main/java/com.github.ambry.router/RouterOperation.java @@ -0,0 +1,18 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.router; + +public enum RouterOperation { + GetBlobOperation, GetBlobInfoOperation, PutOperation, DeleteOperation, TtlUpdateOperation +} diff --git a/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java index 56d9afbbca..9bbc94a5b9 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/SimpleOperationTracker.java @@ -73,38 +73,43 @@ class SimpleOperationTracker implements OperationTracker { /** * Constructor for an {@code SimpleOperationTracker}. - * * @param routerConfig The {@link RouterConfig} containing the configs for operation tracker. - * @param operationClass The operation class in which operation tracker is used. + * @param routerOperation The {@link RouterOperation} which {@link SimpleOperationTracker} is associated with. * @param partitionId The partition on which the operation is performed. * @param originatingDcName The original DC where blob was put. * @param shuffleReplicas Indicates if the replicas need to be shuffled. */ - SimpleOperationTracker(RouterConfig routerConfig, Class operationClass, PartitionId partitionId, + SimpleOperationTracker(RouterConfig routerConfig, RouterOperation routerOperation, PartitionId partitionId, String originatingDcName, boolean shuffleReplicas) { // populate tracker parameters based on operation type boolean crossColoEnabled = false; boolean includeNonOriginatingDcReplicas = true; - int replicasRequired = Integer.MAX_VALUE; - if (GetOperation.class.isAssignableFrom(operationClass)) { - successTarget = routerConfig.routerGetSuccessTarget; - parallelism = routerConfig.routerGetRequestParallelism; - crossColoEnabled = routerConfig.routerGetCrossDcEnabled; - includeNonOriginatingDcReplicas = routerConfig.routerGetIncludeNonOriginatingDcReplicas; - replicasRequired = routerConfig.routerGetReplicasRequired; - } else if (operationClass == PutOperation.class) { - successTarget = routerConfig.routerPutSuccessTarget; - parallelism = routerConfig.routerPutRequestParallelism; - } else if (operationClass == DeleteOperation.class) { - successTarget = routerConfig.routerDeleteSuccessTarget; - parallelism = routerConfig.routerDeleteRequestParallelism; - crossColoEnabled = true; - } else if (operationClass == TtlUpdateOperation.class) { - successTarget = routerConfig.routerTtlUpdateSuccessTarget; - parallelism = routerConfig.routerTtlUpdateRequestParallelism; - crossColoEnabled = true; - } else { - throw new IllegalArgumentException("Unrecognized operation: " + operationClass.getSimpleName()); + int numOfReplicasRequired = Integer.MAX_VALUE; + switch (routerOperation) { + case GetBlobOperation: + case GetBlobInfoOperation: + successTarget = routerConfig.routerGetSuccessTarget; + parallelism = routerConfig.routerGetRequestParallelism; + crossColoEnabled = routerConfig.routerGetCrossDcEnabled; + includeNonOriginatingDcReplicas = routerConfig.routerGetIncludeNonOriginatingDcReplicas; + numOfReplicasRequired = routerConfig.routerGetReplicasRequired; + break; + case PutOperation: + successTarget = routerConfig.routerPutSuccessTarget; + parallelism = routerConfig.routerPutRequestParallelism; + break; + case DeleteOperation: + successTarget = routerConfig.routerDeleteSuccessTarget; + parallelism = routerConfig.routerDeleteRequestParallelism; + crossColoEnabled = true; + break; + case TtlUpdateOperation: + successTarget = routerConfig.routerTtlUpdateSuccessTarget; + parallelism = routerConfig.routerTtlUpdateRequestParallelism; + crossColoEnabled = true; + break; + default: + throw new IllegalArgumentException("Unsupported operation: " + routerOperation); } if (parallelism < 1) { throw new IllegalArgumentException("Parallelism has to be > 0. Configured to be " + parallelism); @@ -152,10 +157,10 @@ class SimpleOperationTracker implements OperationTracker { // Please note replicasRequired is 6 because total number of local and originating replicas is always <= 6. // This may no longer be true with partition classes and flexible replication. // Don't do this if originatingDcName is unknown. - while (replicaPool.size() < replicasRequired && backupReplicas.size() > 0) { + while (replicaPool.size() < numOfReplicasRequired && backupReplicas.size() > 0) { replicaPool.add(backupReplicas.pollFirst()); } - while (replicaPool.size() < replicasRequired && downReplicas.size() > 0) { + while (replicaPool.size() < numOfReplicasRequired && downReplicas.size() > 0) { replicaPool.add(downReplicas.pollFirst()); } } @@ -228,7 +233,7 @@ public int getSuccessTarget() { } /** - * Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(RouterConfig, Class, PartitionId, String, boolean)}. + * Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(RouterConfig, RouterOperation, PartitionId, String, boolean)}. * * @param partitionId The partition on which the operation is performed. * @param examinedReplicas All replicas examined. diff --git a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java index f6a29ac33e..1b3fa1144b 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateOperation.java @@ -90,8 +90,8 @@ class TtlUpdateOperation { byte blobDcId = blobId.getDatacenterId(); String originatingDcName = clusterMap.getDatacenterName(blobDcId); this.operationTracker = - new SimpleOperationTracker(routerConfig, TtlUpdateOperation.class, blobId.getPartition(), originatingDcName, - false); + new SimpleOperationTracker(routerConfig, RouterOperation.TtlUpdateOperation, blobId.getPartition(), + originatingDcName, false); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java index 1a73f273be..fea0c27598 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java @@ -308,8 +308,8 @@ private OperationTracker getOperationTracker(boolean crossColoEnabled, int succe props.setProperty("router.operation.tracker.custom.percentiles", customPercentiles); } RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); - return new AdaptiveOperationTracker(routerConfig, GetOperation.class, mockPartition, null, localColoTracker, - crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time); + return new AdaptiveOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, null, + localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java index 533620f1af..c7ea1d0f85 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java @@ -469,7 +469,7 @@ public void incorrectParallelismTest() { } /** - * Test that operation tracker correctly populate parameters(i.e. successTarget) according to passed in operation class. + * Test that operation tracker can correctly populate parameters(i.e. successTarget) based on input {@link RouterOperation}. */ @Test public void operationClassTest() { @@ -482,26 +482,13 @@ public void operationClassTest() { props.setProperty("router.ttl.update.success.target", "4"); RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); initialize(); - try { - switch (operationTrackerType) { - case SIMPLE_OP_TRACKER: - new SimpleOperationTracker(routerConfig, getClass(), mockPartition, originatingDcName, true); - break; - case ADAPTIVE_OP_TRACKER: - new AdaptiveOperationTracker(routerConfig, getClass(), mockPartition, originatingDcName, localColoTracker, - crossColoTracker, pastDueCounter, routerMetrics, time); - break; - } - fail("Instantiation should fail because of unrecognized operation class"); - } catch (Exception e) { - //expected - } - Map operationAndSuccessTarget = new HashMap<>(); - operationAndSuccessTarget.put(GetOperation.class, 1); - operationAndSuccessTarget.put(PutOperation.class, 2); - operationAndSuccessTarget.put(DeleteOperation.class, 3); - operationAndSuccessTarget.put(TtlUpdateOperation.class, 4); - for (Map.Entry entry : operationAndSuccessTarget.entrySet()) { + Map operationAndSuccessTarget = new HashMap<>(); + operationAndSuccessTarget.put(RouterOperation.GetBlobOperation, 1); + operationAndSuccessTarget.put(RouterOperation.GetBlobInfoOperation, 1); + operationAndSuccessTarget.put(RouterOperation.PutOperation, 2); + operationAndSuccessTarget.put(RouterOperation.DeleteOperation, 3); + operationAndSuccessTarget.put(RouterOperation.TtlUpdateOperation, 4); + for (Map.Entry entry : operationAndSuccessTarget.entrySet()) { SimpleOperationTracker operationTracker = null; switch (operationTrackerType) { case SIMPLE_OP_TRACKER: @@ -509,13 +496,22 @@ public void operationClassTest() { new SimpleOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, true); break; case ADAPTIVE_OP_TRACKER: - operationTracker = - new AdaptiveOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, - localColoTracker, crossColoTracker, pastDueCounter, routerMetrics, time); + try { + operationTracker = + new AdaptiveOperationTracker(routerConfig, entry.getKey(), mockPartition, originatingDcName, + localColoTracker, crossColoTracker, pastDueCounter, routerMetrics, time); + } catch (IllegalArgumentException e) { + assertTrue("Get operation shouldn't throw any exception in adaptive tracker", + entry.getKey() != RouterOperation.GetBlobOperation + && entry.getKey() != RouterOperation.GetBlobInfoOperation); + } break; } // ensure the success target matches the number specified for each type of operaiton - assertEquals("The suggest target doesn't match", (long) entry.getValue(), (operationTracker).getSuccessTarget()); + if (operationTracker != null) { + assertEquals("The suggest target doesn't match", (long) entry.getValue(), + (operationTracker).getSuccessTarget()); + } } } @@ -569,11 +565,14 @@ private OperationTracker getOperationTracker(boolean crossColoEnabled, int succe OperationTracker tracker; switch (operationTrackerType) { case SIMPLE_OP_TRACKER: - tracker = new SimpleOperationTracker(routerConfig, GetOperation.class, mockPartition, originatingDcName, true); + tracker = + new SimpleOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, originatingDcName, + true); break; case ADAPTIVE_OP_TRACKER: - tracker = new AdaptiveOperationTracker(routerConfig, GetOperation.class, mockPartition, originatingDcName, - localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time); + tracker = new AdaptiveOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, + originatingDcName, localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, + routerMetrics, time); break; default: throw new IllegalArgumentException("Unrecognized operation tracker type - " + operationTrackerType);