diff --git a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java index 78f194579e..fd15102893 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java @@ -298,6 +298,41 @@ public class RouterConfig { @Default("1000") public final long routerOperationTrackerMinDataPointsRequired; + /** + * The maximum number of inflight requests that allowed for adaptive tracker. If current number of inflight requests + * is larger than or equal to this threshold, tracker shouldn't send out any request even though the oldest is past due. + * {@link RouterConfig#routerGetRequestParallelism} is a suggestive number that operation tracker uses to determine how + * many requests can be outstanding in parallel (assuming request gets response in time). Adaptive tracker is allowed + * to issue more requests (total inflight requests may exceed #routerGetRequestParallelism) if old request is past due. + * {@link RouterConfig#routerOperationTrackerMaxInflightRequests} is the strict upper bound that at any point of time, + * number of inflight requests issued by adaptive tracker should not exceed this number. Hence, for adaptive tracker, + * inflight requests number should always be within [0, #routerOperationTrackerMaxInflightRequests] + */ + @Config("router.operation.tracker.max.inflight.requests") + @Default("2") + public final int routerOperationTrackerMaxInflightRequests; + + /** + * Indicates whether to enable excluding timed out requests in Histogram reservoir. + */ + @Config("router.operation.tracker.exclude.timeout.enabled") + @Default("false") + public final boolean routerOperationTrackerExcludeTimeoutEnabled; + + /** + * Indicates whether to dump resource-level histogram to log file. + */ + @Config("router.operation.tracker.histogram.dump.enabled") + @Default("false") + public final boolean routerOperationTrackerHistogramDumpEnabled; + + /** + * The period of dumping resource-level histogram (if enabled). + */ + @Config("router.operation.tracker.histogram.dump.period") + @Default("600") + public final long routerOperationTrackerHistogramDumpPeriod; + /** * Create a RouterConfig instance. * @param verifiableProperties the properties map to refer to. @@ -371,5 +406,17 @@ public RouterConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getDouble("router.operation.tracker.reservoir.decay.factor", 0.015); routerOperationTrackerMinDataPointsRequired = verifiableProperties.getLong("router.operation.tracker.min.data.points.required", 1000L); + routerOperationTrackerMaxInflightRequests = + verifiableProperties.getIntInRange("router.operation.tracker.max.inflight.requests", 2, 1, Integer.MAX_VALUE); + routerOperationTrackerExcludeTimeoutEnabled = + verifiableProperties.getBoolean("router.operation.tracker.exclude.timeout.enabled", false); + routerOperationTrackerHistogramDumpEnabled = + verifiableProperties.getBoolean("router.operation.tracker.histogram.dump.enabled", false); + routerOperationTrackerHistogramDumpPeriod = + verifiableProperties.getLongInRange("router.operation.tracker.histogram.dump.period", 600L, 1L, Long.MAX_VALUE); + if (routerGetRequestParallelism > routerOperationTrackerMaxInflightRequests) { + throw new IllegalArgumentException( + "Operation tracker parallelism is larger than operation tracker max inflight number"); + } } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java index 67001c94a2..f8b960661c 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java +++ b/ambry-router/src/main/java/com.github.ambry.router/AdaptiveOperationTracker.java @@ -84,6 +84,10 @@ class AdaptiveOperationTracker extends SimpleOperationTracker { localDcResourceToHistogram = getResourceToLatencyMap(routerOperation, true); crossDcResourceToHistogram = getResourceToLatencyMap(routerOperation, false); } + if (parallelism > routerConfig.routerOperationTrackerMaxInflightRequests) { + throw new IllegalArgumentException( + "Operation tracker parallelism is larger than adaptive tracker max inflight number"); + } } @Override @@ -95,7 +99,8 @@ public void onResponse(ReplicaId replicaId, TrackedRequestFinalState trackedRequ } else { elapsedTime = time.milliseconds() - expiredRequestSendTimes.remove(replicaId); } - if (trackedRequestFinalState != TrackedRequestFinalState.TIMED_OUT) { + if (trackedRequestFinalState != TrackedRequestFinalState.TIMED_OUT + || !routerConfig.routerOperationTrackerExcludeTimeoutEnabled) { getLatencyHistogram(replicaId).update(elapsedTime); if (routerConfig.routerOperationTrackerMetricScope != OperationTrackerScope.Datacenter) { // This is only used to report whole datacenter histogram for monitoring purpose @@ -219,7 +224,15 @@ private class OpTrackerIterator implements Iterator { @Override public boolean hasNext() { - return replicaIterator.hasNext() && (inflightCount < parallelism || isOldestRequestPastDue()); + if (replicaIterator.hasNext()) { + if (inflightCount < parallelism) { + return true; + } + if (inflightCount < routerConfig.routerOperationTrackerMaxInflightRequests && isOldestRequestPastDue()) { + return true; + } + } + return false; } @Override diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java index fed074b866..6af5645c55 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java @@ -410,6 +410,8 @@ public void close() { if (cryptoJobHandler != null) { cryptoJobHandler.close(); } + // close router metrics + routerMetrics.close(); } /** diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java index 505a54c46d..34f5711181 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouterMetrics.java @@ -27,11 +27,17 @@ import com.github.ambry.clustermap.Resource; import com.github.ambry.config.RouterConfig; import com.github.ambry.utils.SystemTime; +import com.github.ambry.utils.Utils; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.github.ambry.utils.Utils.*; /** @@ -41,6 +47,7 @@ */ public class NonBlockingRouterMetrics { private final MetricRegistry metricRegistry; + private static final Logger logger = LoggerFactory.getLogger(NonBlockingRouterMetrics.class); // Operation rate. public final Meter putBlobOperationRate; @@ -192,17 +199,21 @@ public class NonBlockingRouterMetrics { public final CryptoJobMetrics decryptJobMetrics; // Resource to latency histogram map. Here resource can be DataNode, Partition, Disk, Replica etc. - Map getBlobLocalDcResourceToLatency; - Map getBlobCrossDcResourceToLatency; + Map getBlobLocalDcResourceToLatency = new HashMap<>(); + Map getBlobCrossDcResourceToLatency = new HashMap<>(); - Map getBlobInfoLocalDcResourceToLatency; - Map getBlobInfoCrossDcResourceToLatency; + Map getBlobInfoLocalDcResourceToLatency = new HashMap<>(); + Map getBlobInfoCrossDcResourceToLatency = new HashMap<>(); // Map that stores dataNode-level metrics. private final Map dataNodeToMetrics; + private final RouterConfig routerConfig; + private final HistogramDumper histogramDumper; + private ScheduledExecutorService scheduler = null; public NonBlockingRouterMetrics(ClusterMap clusterMap, RouterConfig routerConfig) { metricRegistry = clusterMap.getMetricRegistry(); + this.routerConfig = routerConfig; // Operation Rate. putBlobOperationRate = metricRegistry.meter(MetricRegistry.name(PutOperation.class, "PutBlobOperationRate")); @@ -436,8 +447,9 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap, RouterConfig routerConfig encryptJobMetrics = new CryptoJobMetrics(PutOperation.class, "Encrypt", metricRegistry); decryptJobMetrics = new CryptoJobMetrics(GetOperation.class, "Decrypt", metricRegistry); - // Custom percentiles + // Record type of adaptive tracker and configure custom percentiles if (routerConfig != null) { + logger.info("The metric scope of adaptive tracker is {}", routerConfig.routerOperationTrackerMetricScope); registerCustomPercentiles(GetBlobOperation.class, "LocalDcLatencyMs", getBlobLocalDcLatencyMs, routerConfig.routerOperationTrackerCustomPercentiles); registerCustomPercentiles(GetBlobOperation.class, "CrossDcLatencyMs", getBlobCrossDcLatencyMs, @@ -452,6 +464,17 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap, RouterConfig routerConfig // pre-populate all resource-to-histogram maps here to allow lock-free hashmap in adaptive operation tracker initializeResourceToHistogramMap(clusterMap, routerConfig); } + + if (routerConfig != null && routerConfig.routerOperationTrackerHistogramDumpEnabled) { + histogramDumper = new HistogramDumper(); + scheduler = Utils.newScheduler(1, false); + logger.info("Scheduling histogram dumper with a period of {} secs", + routerConfig.routerOperationTrackerHistogramDumpPeriod); + scheduler.scheduleAtFixedRate(histogramDumper, 0, routerConfig.routerOperationTrackerHistogramDumpPeriod, + TimeUnit.SECONDS); + } else { + histogramDumper = null; + } } /** @@ -464,10 +487,6 @@ private void initializeResourceToHistogramMap(ClusterMap clusterMap, RouterConfi int reservoirSize = routerConfig.routerOperationTrackerReservoirSize; double decayFactor = routerConfig.routerOperationTrackerReservoirDecayFactor; String localDatacenterName = clusterMap.getDatacenterName(clusterMap.getLocalDatacenterId()); - getBlobLocalDcResourceToLatency = new HashMap<>(); - getBlobInfoLocalDcResourceToLatency = new HashMap<>(); - getBlobCrossDcResourceToLatency = new HashMap<>(); - getBlobInfoCrossDcResourceToLatency = new HashMap<>(); switch (routerConfig.routerOperationTrackerMetricScope) { case Partition: for (PartitionId partitionId : clusterMap.getAllPartitionIds(null)) { @@ -892,6 +911,50 @@ void trackAgeAtAccess(long creationTimeMs) { } } } + + /** + * Close {@link NonBlockingRouterMetrics} by shutting down scheduler (if present) in this class. + */ + public void close() { + if (scheduler != null) { + shutDownExecutorService(scheduler, 5, TimeUnit.SECONDS); + } + } + + /** + * A thread that helps periodically dump resource-level histogram (with given percentile) into log file. + */ + private class HistogramDumper implements Runnable { + + @Override + public void run() { + double quantile = routerConfig.routerLatencyToleranceQuantile; + for (Map.Entry resourceToHistogram : getBlobLocalDcResourceToLatency.entrySet()) { + Resource resource = resourceToHistogram.getKey(); + Histogram histogram = resourceToHistogram.getValue(); + logger.info("{} GetBlob local DC latency histogram {}th percentile in ms: {}", resource.toString(), + quantile * 100, histogram.getSnapshot().getValue(quantile)); + } + for (Map.Entry resourceToHistogram : getBlobCrossDcResourceToLatency.entrySet()) { + Resource resource = resourceToHistogram.getKey(); + Histogram histogram = resourceToHistogram.getValue(); + logger.info("{} GetBlob cross DC latency histogram {}th percentile in ms: {}", resource.toString(), + quantile * 100, histogram.getSnapshot().getValue(quantile)); + } + for (Map.Entry resourceToHistogram : getBlobInfoLocalDcResourceToLatency.entrySet()) { + Resource resource = resourceToHistogram.getKey(); + Histogram histogram = resourceToHistogram.getValue(); + logger.info("{} GetBlobInfo local DC latency histogram {}th percentile in ms: {}", resource.toString(), + quantile * 100, histogram.getSnapshot().getValue(quantile)); + } + for (Map.Entry resourceToHistogram : getBlobInfoCrossDcResourceToLatency.entrySet()) { + Resource resource = resourceToHistogram.getKey(); + Histogram histogram = resourceToHistogram.getValue(); + logger.info("{} GetBlobInfo cross DC latency histogram {}th percentile in ms: {}", resource.toString(), + quantile * 100, histogram.getSnapshot().getValue(quantile)); + } + } + } } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java index 3cca2cccc0..ba1d3fd10a 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/AdaptiveOperationTrackerTest.java @@ -123,7 +123,7 @@ public void adaptationTest() throws InterruptedException { double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); double crossColoCutoff = crossColoTracker.getSnapshot().getValue(QUANTILE); - OperationTracker ot = getOperationTracker(createRouterConfig(true, REPLICA_COUNT, 2, null), mockPartition); + OperationTracker ot = getOperationTracker(createRouterConfig(true, REPLICA_COUNT, 2, 6, null, true), mockPartition); // 3-0-0-0; 3-0-0-0 sendRequests(ot, 2); // 1-2-0-0; 3-0-0-0 @@ -164,6 +164,53 @@ public void adaptationTest() throws InterruptedException { assertEquals("Past due counter is inconsistent", REPLICA_COUNT - 2, pastDueCounter.getCount()); } + /** + * Test that the max number of inflight requests should not exceed configured number. + * @throws InterruptedException + */ + @Test + public void clampMaxInflightRequestsTest() throws InterruptedException { + primeTracker(localColoTracker, MIN_DATA_POINTS_REQUIRED, LOCAL_COLO_LATENCY_RANGE); + primeTracker(crossColoTracker, MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); + double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); + double crossColoCutoff = crossColoTracker.getSnapshot().getValue(QUANTILE); + // set max inflight number = 2 and excludeTimeout = false in this test + OperationTracker ot = getOperationTracker(createRouterConfig(true, 3, 2, 3, null, false), mockPartition); + // 3-0-0-0; 3-0-0-0 + sendRequests(ot, 2); + // 1-2-0-0; 3-0-0-0 + // sleep for less than the cutoff + time.sleep((long) localColoCutoff - 2); + sendRequests(ot, 0); + // push it over the edge + time.sleep(5); + // should only send one request because (inflight num + 1) == routerConfig.routerOperationTrackerMaxInflightRequests + sendRequests(ot, 1); + // 0-3-0-0; 3-0-0-0 + // mark one request TIMED_OUT + ot.onResponse(partitionAndInflightReplicas.get(mockPartition).poll(), TrackedRequestFinalState.TIMED_OUT); + // should send out 1 request + sendRequests(ot, 1); + // 0-2-0-1; 2-1-0-0 + + // mark one request FAILURE + ot.onResponse(partitionAndInflightReplicas.get(mockPartition).poll(), TrackedRequestFinalState.FAILURE); + time.sleep((long) (crossColoCutoff - localColoCutoff) + 2); + + // should send out 1 request + sendRequests(ot, 1); + // 0-1-0-2; 1-2-0-0 + + // mark 3 inflight requests SUCCESS + while (!partitionAndInflightReplicas.get(mockPartition).isEmpty()) { + assertFalse("Operation should not be done", ot.isDone()); + ot.onResponse(partitionAndInflightReplicas.get(mockPartition).poll(), TrackedRequestFinalState.SUCCESS); + } + assertTrue("Operation should have succeeded", ot.hasSucceeded()); + // past due counter should be 3 (note that pastDueCounter is updated only when Iterator.remove() is called) + assertEquals("Past due counter is not expected", 3, pastDueCounter.getCount()); + } + /** * Tests that adaptive tracker uses separate partition-level histogram to determine if inflight requests are past due. * @throws Exception @@ -179,7 +226,7 @@ public void partitionLevelAdaptiveTrackerTest() throws Exception { MockClusterMap clusterMap = new MockClusterMap(false, datanodes, 3, Arrays.asList(mockPartition1, mockPartition2), localDcName); trackerScope = OperationTrackerScope.Partition; - RouterConfig routerConfig = createRouterConfig(true, 2, 1, null); + RouterConfig routerConfig = createRouterConfig(true, 2, 1, 6, null, true); NonBlockingRouterMetrics originalMetrics = routerMetrics; routerMetrics = new NonBlockingRouterMetrics(clusterMap, routerConfig); Counter pastDueCount = routerMetrics.getBlobPastDueCount; @@ -279,7 +326,7 @@ public void nodeLevelAdaptiveTrackerTest() throws Exception { MockClusterMap clusterMap = new MockClusterMap(false, datanodes, 3, Arrays.asList(mockPartition1, mockPartition2), localDcName); trackerScope = OperationTrackerScope.DataNode; - RouterConfig routerConfig = createRouterConfig(true, 1, 1, null); + RouterConfig routerConfig = createRouterConfig(true, 1, 1, 6, null, true); NonBlockingRouterMetrics originalMetrics = routerMetrics; routerMetrics = new NonBlockingRouterMetrics(clusterMap, routerConfig); Counter pastDueCount = routerMetrics.getBlobPastDueCount; @@ -371,7 +418,7 @@ public void diskLevelAdaptiveTrackerTest() throws Exception { MockClusterMap clusterMap = new MockClusterMap(false, datanodes, 2, Arrays.asList(mockPartition1, mockPartition2), localDcName); trackerScope = OperationTrackerScope.Disk; - RouterConfig routerConfig = createRouterConfig(true, 1, 1, null); + RouterConfig routerConfig = createRouterConfig(true, 1, 1, 6, null, true); NonBlockingRouterMetrics originalMetrics = routerMetrics; routerMetrics = new NonBlockingRouterMetrics(clusterMap, routerConfig); Counter pastDueCount = routerMetrics.getBlobPastDueCount; @@ -471,7 +518,7 @@ public void noUnexpiredRequestsTest() throws InterruptedException { primeTracker(crossColoTracker, MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE); - OperationTracker ot = getOperationTracker(createRouterConfig(false, 1, 1, null), mockPartition); + OperationTracker ot = getOperationTracker(createRouterConfig(false, 1, 1, 6, null, true), mockPartition); // 3-0-0-0 sendRequests(ot, 1); // 2-1-0-0 @@ -503,7 +550,7 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException primeTracker(crossColoTracker, MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE); double localColoCutoff = localColoTracker.getSnapshot().getValue(1); - OperationTracker ot = getOperationTracker(createRouterConfig(false, 1, 1, null), mockPartition); + OperationTracker ot = getOperationTracker(createRouterConfig(false, 1, 1, 6, null, true), mockPartition); // 3-0-0-0 sendRequests(ot, 1); // 2-1-0-0 @@ -549,7 +596,7 @@ public boolean matches(String name, Metric metric) { assertTrue("No gauges should be created because custom percentile is not set", gauges.isEmpty()); // test that dedicated gauges are correctly created for custom percentiles. String customPercentiles = "0.91,0.97"; - RouterConfig routerConfig = createRouterConfig(false, 1, 1, customPercentiles); + RouterConfig routerConfig = createRouterConfig(false, 1, 1, 6, customPercentiles, true); String[] percentileArray = customPercentiles.split(","); Arrays.sort(percentileArray); List sortedPercentiles = @@ -599,14 +646,14 @@ public void invalidOperationTrackerScopeTest() { tracker.onResponse(partitionAndInflightReplicas.get(mockPartition).poll(), TrackedRequestFinalState.SUCCESS); assertTrue("Operation should have succeeded", tracker.hasSucceeded()); // test that no other resource-level metrics have been instantiated. - assertNull("Partition histogram in RouterMetrics should be null", - routerMetrics.getBlobInfoLocalDcResourceToLatency); - assertNull("Partition histogram in RouterMetrics should be null", - routerMetrics.getBlobInfoCrossDcResourceToLatency); - assertNull("Partition histogram in OperationTracker should be null", - tracker.getResourceToLatencyMap(RouterOperation.GetBlobInfoOperation, true)); - assertNull("Partition histogram in OperationTracker should be null", - tracker.getResourceToLatencyMap(RouterOperation.GetBlobInfoOperation, false)); + assertTrue("Partition histogram in RouterMetrics should be empty", + routerMetrics.getBlobInfoLocalDcResourceToLatency.isEmpty()); + assertTrue("Partition histogram in RouterMetrics should be empty", + routerMetrics.getBlobInfoCrossDcResourceToLatency.isEmpty()); + assertTrue("Partition histogram in OperationTracker should be empty", + tracker.getResourceToLatencyMap(RouterOperation.GetBlobInfoOperation, true).isEmpty()); + assertTrue("Partition histogram in OperationTracker should be empty", + tracker.getResourceToLatencyMap(RouterOperation.GetBlobInfoOperation, false).isEmpty()); // extra test: invalid router operation try { tracker.getResourceToLatencyMap(RouterOperation.PutOperation, true); @@ -636,12 +683,14 @@ private OperationTracker getOperationTracker(RouterConfig routerConfig, Partitio * @param crossColoEnabled {@code true} if cross colo needs to be enabled. {@code false} otherwise. * @param successTarget the number of successful responses required for the operation to succeed. * @param parallelism the number of parallel requests that can be in flight. + * @param maxInflightNum the maximum number of inflight requests for adaptive tracker. * @param customPercentiles the custom percentiles to be reported. Percentiles are specified in a comma-separated * string, i.e "0.94,0.96,0.97". + * @param excludeTimeout whether to exclude timed out requests in Histogram. * @return an instance of {@link RouterConfig} */ private RouterConfig createRouterConfig(boolean crossColoEnabled, int successTarget, int parallelism, - String customPercentiles) { + int maxInflightNum, String customPercentiles, boolean excludeTimeout) { Properties props = new Properties(); props.setProperty("router.hostname", "localhost"); props.setProperty("router.datacenter.name", localDcName); @@ -652,6 +701,8 @@ private RouterConfig createRouterConfig(boolean crossColoEnabled, int successTar props.setProperty("router.get.replicas.required", Integer.toString(Integer.MAX_VALUE)); props.setProperty("router.latency.tolerance.quantile", Double.toString(QUANTILE)); props.setProperty("router.operation.tracker.metric.scope", trackerScope.toString()); + props.setProperty("router.operation.tracker.max.inflight.requests", Integer.toString(maxInflightNum)); + props.setProperty("router.operation.tracker.exclude.timeout.enabled", Boolean.toString(excludeTimeout)); if (customPercentiles != null) { props.setProperty("router.operation.tracker.custom.percentiles", customPercentiles); } @@ -708,7 +759,7 @@ private void sendRequests(OperationTracker operationTracker, int numRequestsExpe private void doTrackerUpdateTest(boolean succeedRequests) throws InterruptedException { long timeIncrement = 10; OperationTracker ot = - getOperationTracker(createRouterConfig(true, REPLICA_COUNT, REPLICA_COUNT, null), mockPartition); + getOperationTracker(createRouterConfig(true, REPLICA_COUNT, REPLICA_COUNT, 6, null, true), mockPartition); // 3-0-0-0; 3-0-0-0 sendRequests(ot, REPLICA_COUNT); // 0-3-0-0; 0-3-0-0 diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java index be28cd179e..0242950cbe 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java @@ -137,7 +137,7 @@ public static List data() { public GetBlobInfoOperationTest(String operationTrackerType, boolean testEncryption) throws Exception { this.operationTrackerType = operationTrackerType; this.testEncryption = testEncryption; - VerifiableProperties vprops = new VerifiableProperties(getNonBlockingRouterProperties()); + VerifiableProperties vprops = new VerifiableProperties(getNonBlockingRouterProperties(true)); routerConfig = new RouterConfig(vprops); mockClusterMap = new MockClusterMap(); localDcName = mockClusterMap.getDatacenterName(mockClusterMap.getLocalDatacenterId()); @@ -224,7 +224,7 @@ public void testInstantiation() { Assert.assertEquals("Blob ids must match", blobId.getID(), op.getBlobIdStr()); // test the case where the tracker type is bad - Properties properties = getNonBlockingRouterProperties(); + Properties properties = getNonBlockingRouterProperties(true); properties.setProperty("router.get.operation.tracker.type", "NonExistentTracker"); RouterConfig badConfig = new RouterConfig(new VerifiableProperties(properties)); try { @@ -299,7 +299,7 @@ public void testRouterRequestTimeoutAllFailure() { Assert.assertTrue("Operation should be complete at this time", op.isOperationComplete()); RouterException routerException = (RouterException) op.getOperationException(); Assert.assertEquals(RouterErrorCode.OperationTimedOut, routerException.getErrorCode()); - // test that time out response shouldn't update the Histogram + // test that time out response shouldn't update the Histogram if exclude timeout is enabled assumeTrue(operationTrackerType.equals(AdaptiveOperationTracker.class.getSimpleName())); AdaptiveOperationTracker tracker = (AdaptiveOperationTracker) op.getOperationTrackerInUse(); Assert.assertEquals("Timed-out response shouldn't be counted into local colo latency histogram", 0, @@ -308,6 +308,35 @@ public void testRouterRequestTimeoutAllFailure() { tracker.getLatencyHistogram(remoteReplica).getCount()); } + /** + * Test that timed out requests are allowed to update Histogram by default. + */ + @Test + public void testTimeoutRequestUpdateHistogramByDefault() { + NonBlockingRouter.currentOperationsCount.incrementAndGet(); + VerifiableProperties vprops = new VerifiableProperties(getNonBlockingRouterProperties(false)); + RouterConfig routerConfig = new RouterConfig(vprops); + GetBlobInfoOperation op = + new GetBlobInfoOperation(routerConfig, routerMetrics, mockClusterMap, responseHandler, blobId, options, null, + routerCallback, kms, cryptoService, cryptoJobHandler, time, false); + requestRegistrationCallback.requestListToFill = new ArrayList<>(); + op.poll(requestRegistrationCallback); + while (!op.isOperationComplete()) { + time.sleep(routerConfig.routerRequestTimeoutMs + 1); + op.poll(requestRegistrationCallback); + } + Assert.assertEquals("Must have attempted sending requests to all replicas", replicasCount, + correlationIdToGetOperation.size()); + Assert.assertEquals(RouterErrorCode.OperationTimedOut, + ((RouterException) op.getOperationException()).getErrorCode()); + assumeTrue(operationTrackerType.equals(AdaptiveOperationTracker.class.getSimpleName())); + AdaptiveOperationTracker tracker = (AdaptiveOperationTracker) op.getOperationTrackerInUse(); + Assert.assertEquals("Number of data points in local colo latency histogram is not expected", 3, + tracker.getLatencyHistogram(localReplica).getCount()); + Assert.assertEquals("Number of data points in cross colo latency histogram is not expected", 6, + tracker.getLatencyHistogram(remoteReplica).getCount()); + } + /** * Test the case where all local replicas and one remote replica timed out. The rest remote replicas respond * with Blob_Not_Found. @@ -435,7 +464,7 @@ public void testErrorPrecedenceWithSpecialCase() throws Exception { @Test public void testBlobAuthorizationFailureOverrideAll() throws Exception { successTarget = 2; // set it to 2 for more coverage - Properties props = getNonBlockingRouterProperties(); + Properties props = getNonBlockingRouterProperties(true); routerConfig = new RouterConfig(new VerifiableProperties(props)); ServerErrorCode[] serverErrorCodes = new ServerErrorCode[9]; @@ -519,17 +548,17 @@ public void testSuccessInThePresenceOfVariousErrors() throws Exception { String dcWherePutHappened = routerConfig.routerDatacenterName; // test requests coming in from local dc as well as cross-colo. - Properties props = getNonBlockingRouterProperties(); + Properties props = getNonBlockingRouterProperties(true); props.setProperty("router.datacenter.name", "DC1"); routerConfig = new RouterConfig(new VerifiableProperties(props)); testVariousErrors(dcWherePutHappened); - props = getNonBlockingRouterProperties(); + props = getNonBlockingRouterProperties(true); props.setProperty("router.datacenter.name", "DC2"); routerConfig = new RouterConfig(new VerifiableProperties(props)); testVariousErrors(dcWherePutHappened); - props = getNonBlockingRouterProperties(); + props = getNonBlockingRouterProperties(true); props.setProperty("router.datacenter.name", "DC3"); routerConfig = new RouterConfig(new VerifiableProperties(props)); testVariousErrors(dcWherePutHappened); @@ -642,9 +671,10 @@ private void assertSuccess(GetBlobInfoOperation op) { /** * Get the properties for the {@link NonBlockingRouter}. + * @param excludeTimeout whether to exclude timed out request data point in Histogram. * @return the constructed properties. */ - private Properties getNonBlockingRouterProperties() { + private Properties getNonBlockingRouterProperties(boolean excludeTimeout) { Properties properties = new Properties(); properties.setProperty("router.hostname", "localhost"); properties.setProperty("router.datacenter.name", "DC3"); @@ -652,6 +682,7 @@ private Properties getNonBlockingRouterProperties() { properties.setProperty("router.get.success.target", Integer.toString(successTarget)); properties.setProperty("router.get.operation.tracker.type", operationTrackerType); properties.setProperty("router.request.timeout.ms", Integer.toString(20)); + properties.setProperty("router.operation.tracker.exclude.timeout.enabled", Boolean.toString(excludeTimeout)); return properties; } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java index cc7a56bef9..ce22c2fdd4 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java @@ -81,7 +81,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static com.github.ambry.router.GetBlobOptions.*; import static com.github.ambry.router.PutManagerTest.*; import static com.github.ambry.router.RouterTestHelpers.*; import static org.junit.Assume.*; @@ -201,7 +200,7 @@ public GetBlobOperationTest(String operationTrackerType, boolean testEncryption) // a blob size that is greater than the maxChunkSize and is not a multiple of it. Will result in a composite blob. blobSize = maxChunkSize * random.nextInt(10) + random.nextInt(maxChunkSize - 1) + 1; mockSelectorState.set(MockSelectorState.Good); - VerifiableProperties vprops = new VerifiableProperties(getDefaultNonBlockingRouterProperties()); + VerifiableProperties vprops = new VerifiableProperties(getDefaultNonBlockingRouterProperties(true)); routerConfig = new RouterConfig(vprops); mockClusterMap = new MockClusterMap(); localDcName = mockClusterMap.getDatacenterName(mockClusterMap.getLocalDatacenterId()); @@ -316,7 +315,7 @@ public void onCompletion(GetBlobResultInternal result, Exception exception) { Assert.assertEquals("Blob ids must match", blobIdStr, op.getBlobIdStr()); // test the case where the tracker type is bad - Properties properties = getDefaultNonBlockingRouterProperties(); + Properties properties = getDefaultNonBlockingRouterProperties(true); properties.setProperty("router.get.operation.tracker.type", "NonExistentTracker"); RouterConfig badConfig = new RouterConfig(new VerifiableProperties(properties)); try { @@ -461,7 +460,7 @@ public void testCompositeBlobNotChunkSizeMultipleGetSuccess() throws Exception { @Test public void testRouterRequestTimeoutAllFailure() throws Exception { doPut(); - GetBlobOperation op = createOperation(null); + GetBlobOperation op = createOperation(routerConfig, null); op.poll(requestRegistrationCallback); while (!op.isOperationComplete()) { time.sleep(routerConfig.routerRequestTimeoutMs + 1); @@ -473,7 +472,7 @@ public void testRouterRequestTimeoutAllFailure() throws Exception { correlationIdToGetOperation.size()); assertFailureAndCheckErrorCode(op, RouterErrorCode.OperationTimedOut); - // test that timed out response won't update latency histogram + // test that timed out response won't update latency histogram if exclude timeout is enabled. assumeTrue(operationTrackerType.equals(AdaptiveOperationTracker.class.getSimpleName())); AdaptiveOperationTracker tracker = (AdaptiveOperationTracker) op.getFirstChunkOperationTrackerInUse(); Histogram localColoTracker = @@ -486,6 +485,34 @@ public void testRouterRequestTimeoutAllFailure() throws Exception { crossColoTracker.getCount()); } + /** + * Test that timed out requests are allowed to update Histogram by default. + * @throws Exception + */ + @Test + public void testTimeoutRequestUpdateHistogramByDefault() throws Exception { + doPut(); + VerifiableProperties vprops = new VerifiableProperties(getDefaultNonBlockingRouterProperties(false)); + RouterConfig routerConfig = new RouterConfig(vprops); + GetBlobOperation op = createOperation(routerConfig, null); + op.poll(requestRegistrationCallback); + while (!op.isOperationComplete()) { + time.sleep(routerConfig.routerRequestTimeoutMs + 1); + op.poll(requestRegistrationCallback); + } + Assert.assertEquals("Must have attempted sending requests to all replicas", replicasCount, + correlationIdToGetOperation.size()); + Assert.assertEquals(RouterErrorCode.OperationTimedOut, + ((RouterException) op.getOperationException()).getErrorCode()); + assumeTrue(operationTrackerType.equals(AdaptiveOperationTracker.class.getSimpleName())); + AdaptiveOperationTracker tracker = (AdaptiveOperationTracker) op.getFirstChunkOperationTrackerInUse(); + + Assert.assertEquals("Number of data points in local colo latency histogram is not expected", 3, + tracker.getLatencyHistogram(RouterTestHelpers.getAnyReplica(blobId, true, localDcName)).getCount()); + Assert.assertEquals("Number of data points in cross colo latency histogram is not expected", 6, + tracker.getLatencyHistogram(RouterTestHelpers.getAnyReplica(blobId, false, localDcName)).getCount()); + } + /** * Test the case where 2 local replicas timed out. The remaining one local replica and rest remote replicas respond * with Blob_Not_Found. @@ -495,7 +522,7 @@ public void testRouterRequestTimeoutAllFailure() throws Exception { public void testRequestTimeoutAndBlobNotFound() throws Exception { assumeTrue(operationTrackerType.equals(AdaptiveOperationTracker.class.getSimpleName())); doPut(); - GetBlobOperation op = createOperation(null); + GetBlobOperation op = createOperation(routerConfig, null); AdaptiveOperationTracker tracker = (AdaptiveOperationTracker) op.getFirstChunkOperationTrackerInUse(); correlationIdToGetOperation.clear(); for (MockServer server : mockServerLayout.getMockServers()) { @@ -538,7 +565,7 @@ public void testRequestTimeoutAndBlobNotFound() throws Exception { @Test public void testNetworkClientTimeoutAllFailure() throws Exception { doPut(); - GetBlobOperation op = createOperation(null); + GetBlobOperation op = createOperation(routerConfig, null); while (!op.isOperationComplete()) { op.poll(requestRegistrationCallback); for (RequestInfo requestInfo : requestRegistrationCallback.requestListToFill) { @@ -639,17 +666,17 @@ public void testSuccessInThePresenceOfVariousErrors() throws Exception { String dcWherePutHappened = routerConfig.routerDatacenterName; // test requests coming in from local dc as well as cross-colo. - Properties props = getDefaultNonBlockingRouterProperties(); + Properties props = getDefaultNonBlockingRouterProperties(true); props.setProperty("router.datacenter.name", "DC1"); routerConfig = new RouterConfig(new VerifiableProperties(props)); doTestSuccessInThePresenceOfVariousErrors(dcWherePutHappened); - props = getDefaultNonBlockingRouterProperties(); + props = getDefaultNonBlockingRouterProperties(true); props.setProperty("router.datacenter.name", "DC2"); routerConfig = new RouterConfig(new VerifiableProperties(props)); doTestSuccessInThePresenceOfVariousErrors(dcWherePutHappened); - props = getDefaultNonBlockingRouterProperties(); + props = getDefaultNonBlockingRouterProperties(true); props.setProperty("router.datacenter.name", "DC3"); routerConfig = new RouterConfig(new VerifiableProperties(props)); doTestSuccessInThePresenceOfVariousErrors(dcWherePutHappened); @@ -960,7 +987,7 @@ public void testEarlyReadableStreamChannelClose() throws Exception { @Test public void testSetOperationException() throws Exception { doPut(); - GetBlobOperation op = createOperation(null); + GetBlobOperation op = createOperation(routerConfig, null); RouterErrorCode[] routerErrorCodes = new RouterErrorCode[8]; routerErrorCodes[0] = RouterErrorCode.BlobDoesNotExist; routerErrorCodes[1] = RouterErrorCode.OperationTimedOut; @@ -1297,7 +1324,7 @@ private void getAndAssertSuccess(final boolean getChunksBeforeRead, final boolea * @throws Exception */ private GetBlobOperation createOperationAndComplete(Callback callback) throws Exception { - GetBlobOperation op = createOperation(callback); + GetBlobOperation op = createOperation(routerConfig, callback); while (!op.isOperationComplete()) { op.poll(requestRegistrationCallback); List responses = sendAndWaitForResponses(requestRegistrationCallback.requestListToFill); @@ -1312,11 +1339,11 @@ private GetBlobOperation createOperationAndComplete(Callback callback) throws Exception { + private GetBlobOperation createOperation(RouterConfig routerConfig, Callback callback) { NonBlockingRouter.currentOperationsCount.incrementAndGet(); GetBlobOperation op = new GetBlobOperation(routerConfig, routerMetrics, mockClusterMap, responseHandler, blobId, options, callback, @@ -1472,9 +1499,10 @@ private List sendAndWaitForResponses(List requestList /** * Get the default {@link Properties} for the {@link NonBlockingRouter}. + * @param excludeTimeout whether to exclude timed out request in Histogram. * @return the constructed {@link Properties} */ - private Properties getDefaultNonBlockingRouterProperties() { + private Properties getDefaultNonBlockingRouterProperties(boolean excludeTimeout) { Properties properties = new Properties(); properties.setProperty("router.hostname", "localhost"); properties.setProperty("router.datacenter.name", "DC3"); @@ -1485,6 +1513,7 @@ private Properties getDefaultNonBlockingRouterProperties() { properties.setProperty("router.get.success.target", Integer.toString(1)); properties.setProperty("router.get.operation.tracker.type", operationTrackerType); properties.setProperty("router.request.timeout.ms", Integer.toString(20)); + properties.setProperty("router.operation.tracker.exclude.timeout.enabled", Boolean.toString(excludeTimeout)); return properties; } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java index 606bed25ce..836ce898d9 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/OperationTrackerTest.java @@ -555,6 +555,7 @@ private OperationTracker getOperationTracker(boolean crossColoEnabled, int succe Boolean.toString(includeNonOriginatingDcReplicas)); props.setProperty("router.get.replicas.required", Integer.toString(replicasRequired)); props.setProperty("router.latency.tolerance.quantile", Double.toString(QUANTILE)); + props.setProperty("router.operation.tracker.max.inflight.requests", Integer.toString(parallelism)); RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props)); NonBlockingRouterMetrics routerMetrics = new NonBlockingRouterMetrics(mockClusterMap, routerConfig); OperationTracker tracker;