Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing partition-level histogram into adaptive tracker #1174

Merged
merged 5 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.config;

import com.github.ambry.router.OperationTrackerScope;
import com.github.ambry.utils.Utils;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -247,6 +248,39 @@ public class RouterConfig {
@Default("")
public final List<Double> routerOperationTrackerCustomPercentiles;

/**
* The metric scope that is applied to operation tracker. This config specifies at which granularity router should
* track the latency distribution. For example, Datacenter or Partition. The valid scope is defined in
* {@link OperationTrackerScope}
*/
@Config("router.operation.tracker.metric.scope")
@Default("Datacenter")
public final OperationTrackerScope routerOperationTrackerMetricScope;

/**
* The maximum size of histogram reservoir in operation tracker. This configs specifies the max number of data points
* that can be kept by histogram reservoir.
*/
@Config("router.operation.tracker.reservoir.size")
@Default("1028")
public final int routerOperationTrackerReservoirSize;

/**
* The decay factor of histogram reservoir in operation tracker. This config specifies how biased histogram should be
* on new data.
*/
@Config("router.operation.tracker.reservoir.decay.factor")
@Default("0.015")
public final double routerOperationTrackerReservoirDecayFactor;

/**
* The minimum required data points to populate histogram in operation tracker. If number of data points is less than
* this threshold, the tracker ignores statistics from histogram.
*/
@Config("router.operation.tracker.min.data.points.required")
@Default("1000")
public final long routerOperationTrackerMinDataPointsRequired;

/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
Expand Down Expand Up @@ -308,5 +342,13 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
Utils.splitString(verifiableProperties.getString("router.operation.tracker.custom.percentiles", ""), ",");
routerOperationTrackerCustomPercentiles =
Collections.unmodifiableList(customPercentiles.stream().map(Double::valueOf).collect(Collectors.toList()));
String scopeStr = verifiableProperties.getString("router.operation.tracker.metric.scope", "Datacenter");
routerOperationTrackerMetricScope = OperationTrackerScope.valueOf(scopeStr);
routerOperationTrackerReservoirSize =
verifiableProperties.getIntInRange("router.operation.tracker.reservoir.size", 1028, 0, Integer.MAX_VALUE);
routerOperationTrackerReservoirDecayFactor =
verifiableProperties.getDouble("router.operation.tracker.reservoir.decay.factor", 0.015);
routerOperationTrackerMinDataPointsRequired =
verifiableProperties.getLong("router.operation.tracker.min.data.points.required", 1000L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.router;

/**
* The metric scope adopted by operation tracker. For now, the scope defines at which granularity the latency distribution
* should be tracked by operation tracker (i.e. Datacenter means latencies of all requests in local datacenter would be
* kept in a single Histogram)
*/
public enum OperationTrackerScope {
Datacenter, Partition
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -153,6 +155,29 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe
partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName);
}

/**
* Creates a mock cluster map with given list of data nodes and partitions.
* @param enableSSLPorts whether to enable SSL port.
* @param datanodes the list of data nodes created in this mock cluster map.
* @param numMountPointsPerNode number of mount points (mocking disks) that will be created in each data node
* @param partitionIdList the list of partitions created in this cluster map.
* @param localDatacenterName the name of local datacenter.
*/
public MockClusterMap(boolean enableSSLPorts, List<MockDataNodeId> datanodes, int numMountPointsPerNode,
List<PartitionId> partitionIdList, String localDatacenterName) {
this.enableSSLPorts = enableSSLPorts;
this.dataNodes = datanodes;
this.numMountPointsPerNode = numMountPointsPerNode;
partitions = new HashMap<>();
partitionIdList.forEach(p -> partitions.put(Long.valueOf(p.toPathString()), p));
this.localDatacenterName = localDatacenterName;
partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName);
Set<String> dcNames = new HashSet<>();
datanodes.forEach(node -> dcNames.add(node.getDatacenterName()));
dataCentersInClusterMap.addAll(dcNames);
specialPartition = null;
}

protected ArrayList<Port> getListOfPorts(int port) {
ArrayList<Port> ports = new ArrayList<Port>();
ports.add(new Port(port, PortType.PLAINTEXT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@
* perceived latencies.
*/
class AdaptiveOperationTracker extends SimpleOperationTracker {
static final long MIN_DATA_POINTS_REQUIRED = 1000;

private final RouterConfig routerConfig;
private final NonBlockingRouterMetrics routerMetrics;
private final Time time;
private final double quantile;
private final Histogram localColoTracker;
private final Histogram crossColoTracker;
private final Histogram localDcHistogram;
private final Histogram crossDcHistogram;
private final Counter pastDueCounter;
private final OpTrackerIterator otIterator;
private Iterator<ReplicaId> replicaIterator;
private Map<PartitionId, Histogram> localDcPartitionToHistogram;
private Map<PartitionId, Histogram> crossDcPartitionToHistogram;

// The value contains a pair - the boolean indicates whether the request to the corresponding replicaId has been
// determined as expired (but not yet removed). The long is the time at which the request was sent.
private final LinkedHashMap<ReplicaId, Pair<Boolean, Long>> unexpiredRequestSendTimes = new LinkedHashMap<>();
Expand All @@ -56,24 +59,27 @@ class AdaptiveOperationTracker extends SimpleOperationTracker {
/**
* Constructs an {@link AdaptiveOperationTracker}
* @param routerConfig The {@link RouterConfig} containing the configs for operation tracker.
* @param routerMetrics The {@link NonBlockingRouterMetrics} that contains histograms used by this operation tracker.
* @param routerOperation The {@link RouterOperation} which {@link AdaptiveOperationTracker} is associated with.
* @param partitionId The partition on which the operation is performed.
* @param originatingDcName name of the cross colo DC whose replicas should be tried first.
* @param localColoTracker the {@link Histogram} that tracks intra datacenter latencies for this class of requests.
* @param crossColoTracker the {@link Histogram} that tracks inter datacenter latencies for this class of requests.
* @param pastDueCounter the {@link Counter} that tracks the number of times a request is past due.
* @param originatingDcName name of originating DC whose replicas should be tried first.
* @param time the {@link Time} instance to use.
*/
AdaptiveOperationTracker(RouterConfig routerConfig, RouterOperation routerOperation, PartitionId partitionId,
String originatingDcName, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter,
Time time) {
AdaptiveOperationTracker(RouterConfig routerConfig, NonBlockingRouterMetrics routerMetrics,
RouterOperation routerOperation, PartitionId partitionId, String originatingDcName, Time time) {
super(routerConfig, routerOperation, partitionId, originatingDcName, true);
this.routerConfig = routerConfig;
this.routerMetrics = routerMetrics;
this.time = time;
this.localColoTracker = localColoTracker;
this.crossColoTracker = crossColoTracker;
this.pastDueCounter = pastDueCounter;
this.localDcHistogram = getWholeDcTracker(routerOperation, true);
this.crossDcHistogram = getWholeDcTracker(routerOperation, false);
this.pastDueCounter = getWholeDcPastDueCounter(routerOperation);
this.quantile = routerConfig.routerLatencyToleranceQuantile;
this.otIterator = new OpTrackerIterator();
if (routerConfig.routerOperationTrackerMetricScope == OperationTrackerScope.Partition) {
localDcPartitionToHistogram = getPartitionToLatencyMap(routerOperation, true);
crossDcPartitionToHistogram = getPartitionToLatencyMap(routerOperation, false);
}
}

@Override
Expand All @@ -87,6 +93,12 @@ public void onResponse(ReplicaId replicaId, TrackedRequestFinalState trackedRequ
}
if (trackedRequestFinalState != TrackedRequestFinalState.TIMED_OUT) {
getLatencyHistogram(replicaId).update(elapsedTime);
if (routerConfig.routerOperationTrackerMetricScope != OperationTrackerScope.Datacenter) {
// This is only used to report whole datacenter histogram for monitoring purpose
Histogram histogram =
replicaId.getDataNodeId().getDatacenterName().equals(datacenterName) ? localDcHistogram : crossDcHistogram;
histogram.update(elapsedTime);
}
}
}

Expand All @@ -100,14 +112,90 @@ public Iterator<ReplicaId> getReplicaIterator() {
* Gets the {@link Histogram} that tracks request latencies to the class of replicas (intra or inter DC) that
* {@code replicaId} belongs to.
* @param replicaId the {@link ReplicaId} whose request latency is going to be tracked.
* @return the {@link Histogram} that tracks requests to the class of replicas (intra or inter DC) that
* {@code replicaId} belongs to.
* @return the {@link Histogram} associated with this replica.
*/
Histogram getLatencyHistogram(ReplicaId replicaId) {
if (replicaId.getDataNodeId().getDatacenterName().equals(datacenterName)) {
return localColoTracker;
boolean isLocalReplica = replicaId.getDataNodeId().getDatacenterName().equals(datacenterName);
Histogram histogramToReturn;
// TODO add support for replica-level/disk-level/node-level histogram based on the config
switch (routerConfig.routerOperationTrackerMetricScope) {
case Datacenter:
histogramToReturn = isLocalReplica ? localDcHistogram : crossDcHistogram;
break;
case Partition:
PartitionId partitionId = replicaId.getPartitionId();
histogramToReturn = isLocalReplica ? localDcPartitionToHistogram.get(partitionId)
: crossDcPartitionToHistogram.get(partitionId);
break;
default:
throw new IllegalArgumentException("Unsupported operation tracker metric scope.");
}
return histogramToReturn;
}

/**
* Get certain whole DC latency histogram based on given arguments.
* @param routerOperation the {@link RouterOperation} that uses this tracker.
* @param isLocal {@code true} if local DC latency histogram should be returned. {@code false} otherwise.
* @return whole DC latency histogram.
*/
private Histogram getWholeDcTracker(RouterOperation routerOperation, boolean isLocal) {
Histogram trackerToReturn;
switch (routerOperation) {
case GetBlobInfoOperation:
trackerToReturn =
isLocal ? routerMetrics.getBlobInfoLocalDcLatencyMs : routerMetrics.getBlobInfoCrossDcLatencyMs;
break;
case GetBlobOperation:
trackerToReturn = isLocal ? routerMetrics.getBlobLocalDcLatencyMs : routerMetrics.getBlobCrossDcLatencyMs;
break;
default:
throw new IllegalArgumentException("Unsupported router operation when getting whole DC latency tracker.");
}
return trackerToReturn;
}

/**
* Get certain whole DC past due counter based on given arguments.
* @param routerOperation the {@link RouterOperation} that uses this tracker.
* @return whole DC past due counter.
*/
private Counter getWholeDcPastDueCounter(RouterOperation routerOperation) {
Counter pastDueCounter;
switch (routerOperation) {
case GetBlobInfoOperation:
pastDueCounter = routerMetrics.getBlobInfoPastDueCount;
break;
case GetBlobOperation:
pastDueCounter = routerMetrics.getBlobPastDueCount;
break;
default:
throw new IllegalArgumentException("Unsupported router operation when getting whole DC past due counter.");
}
return pastDueCounter;
}

/**
* Get certain partition-level histograms based on given arguments.
* @param routerOperation the {@link RouterOperation} that uses this tracker.
* @param isLocal {@code true} if local partition-level histograms should be returned. {@code false} otherwise.
* @return partition-to-histogram map.
*/
Map<PartitionId, Histogram> getPartitionToLatencyMap(RouterOperation routerOperation, boolean isLocal) {
Map<PartitionId, Histogram> partitionToHistogramMap;
switch (routerOperation) {
case GetBlobInfoOperation:
partitionToHistogramMap = isLocal ? routerMetrics.getBlobInfoLocalDcPartitionToLatency
: routerMetrics.getBlobInfoCrossDcPartitionToLatency;
break;
case GetBlobOperation:
partitionToHistogramMap =
isLocal ? routerMetrics.getBlobLocalDcPartitionToLatency : routerMetrics.getBlobCrossDcPartitionToLatency;
break;
default:
throw new IllegalArgumentException("Unsupported router operation when getting partition-to-latency map");
}
return crossColoTracker;
return partitionToHistogramMap;
}

/**
Expand Down Expand Up @@ -153,7 +241,7 @@ private boolean isOldestRequestPastDue() {
Map.Entry<ReplicaId, Pair<Boolean, Long>> oldestEntry = unexpiredRequestSendTimes.entrySet().iterator().next();
if (!oldestEntry.getValue().getFirst()) {
Histogram latencyTracker = getLatencyHistogram(oldestEntry.getKey());
isPastDue = (latencyTracker.getCount() >= MIN_DATA_POINTS_REQUIRED) && (
isPastDue = (latencyTracker.getCount() >= routerConfig.routerOperationTrackerMinDataPointsRequired) && (
time.milliseconds() - oldestEntry.getValue().getSecond() >= latencyTracker.getSnapshot()
.getValue(quantile));
if (isPastDue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ class GetBlobInfoOperation extends GetOperation {
ResponseHandler responseHandler, BlobId blobId, GetBlobOptionsInternal options,
Callback<GetBlobResultInternal> callback, RouterCallback routerCallback, KeyManagementService kms,
CryptoService cryptoService, CryptoJobHandler cryptoJobHandler, Time time, boolean isEncrypted) {
super(routerConfig, routerMetrics, clusterMap, responseHandler, blobId, options, callback,
routerMetrics.getBlobInfoLocalColoLatencyMs, routerMetrics.getBlobInfoCrossColoLatencyMs,
routerMetrics.getBlobInfoPastDueCount, kms, cryptoService, cryptoJobHandler, time, isEncrypted);
super(routerConfig, routerMetrics, clusterMap, responseHandler, blobId, options, callback, kms, cryptoService,
cryptoJobHandler, time, isEncrypted);
this.routerCallback = routerCallback;
operationTracker =
getOperationTracker(blobId.getPartition(), blobId.getDatacenterId(), RouterOperation.GetBlobInfoOperation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ class GetBlobOperation extends GetOperation {
Callback<GetBlobResultInternal> callback, RouterCallback routerCallback, BlobIdFactory blobIdFactory,
KeyManagementService kms, CryptoService cryptoService, CryptoJobHandler cryptoJobHandler, Time time,
boolean isEncrypted) {
super(routerConfig, routerMetrics, clusterMap, responseHandler, blobId, options, callback,
routerMetrics.getBlobLocalColoLatencyMs, routerMetrics.getBlobCrossColoLatencyMs,
routerMetrics.getBlobPastDueCount, kms, cryptoService, cryptoJobHandler, time, isEncrypted);
super(routerConfig, routerMetrics, clusterMap, responseHandler, blobId, options, callback, kms, cryptoService,
cryptoJobHandler, time, isEncrypted);
this.routerCallback = routerCallback;
this.blobIdFactory = blobIdFactory;
firstChunk = new FirstGetChunk();
Expand Down
Loading