Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor operation tracker ctor and introduce custom percentiles #1159

Merged
merged 3 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
*/
package com.github.ambry.config;

import com.github.ambry.utils.Utils;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;


/**
* Configuration parameters required by a {@link com.github.ambry.router.Router}.
* <p/>
Expand Down Expand Up @@ -232,6 +238,15 @@ public class RouterConfig {
@Default("false")
public final boolean routerUseGetBlobOperationForBlobInfo;

/**
* The custom percentiles of Histogram in operation tracker to be reported. This allows router to emit metrics of
* arbitrary percentiles (i.e. 97th, 93th etc). An example of this config is "0.91,0.93,0.97"(comma separated), each
* value should fall in {@code [0..1]}.
*/
@Config("router.operation.tracker.custom.percentiles")
@Default("")
public final List<Double> routerOperationTrackerCustomPercentiles;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will these custom percentiles be used by operation tracker in the future? Right now, they are only registered in the MetricRegistry for viewing.


/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
Expand Down Expand Up @@ -289,5 +304,9 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getIntInRange("router.ttl.update.success.target", 2, 1, Integer.MAX_VALUE);
routerUseGetBlobOperationForBlobInfo =
verifiableProperties.getBoolean("router.use.get.blob.operation.for.blob.info", false);
List<String> customPercentiles =
Utils.splitString(verifiableProperties.getString("router.operation.tracker.custom.percentiles", ""), ",");
routerOperationTrackerCustomPercentiles =
Collections.unmodifiableList(customPercentiles.stream().map(Double::valueOf).collect(Collectors.toList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we convert string to list when it's needed? Previous example: storeCompactionTriggers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got you. It's OK.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.codahale.metrics.Histogram;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Time;
import java.util.HashMap;
Expand All @@ -39,7 +40,6 @@ class AdaptiveOperationTracker extends SimpleOperationTracker {
static final long MIN_DATA_POINTS_REQUIRED = 1000;

private final Time time;
private final String datacenterName;
private final double quantile;
private final Histogram localColoTracker;
private final Histogram crossColoTracker;
Expand All @@ -55,34 +55,30 @@ class AdaptiveOperationTracker extends SimpleOperationTracker {

/**
* Constructs an {@link AdaptiveOperationTracker}
* @param datacenterName The datacenter where the router is located.
* @param operationClass The operation class in which operation tracker is used.
* @param partitionId The partition on which the operation is performed.
* @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false}
* otherwise.
* @param originatingDcName name of the cross colo DC whose replicas should be tried first.
* @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas.
* @param replicasRequired The number of replicas required for the operation.
* @param successTarget The number of successful responses required to succeed the operation.
* @param parallelism The maximum number of inflight requests at any point of time.
* @param time the {@link Time} instance to use.
* @param localColoTracker the {@link Histogram} that tracks intra datacenter latencies for this class of requests.
* @param crossColoTracker the {@link Histogram} that tracks inter datacenter latencies for this class of requests.
* @param pastDueCounter the {@link Counter} that tracks the number of times a request is past due.
* @param quantile the quantile cutoff to use for when evaluating requests against the trackers.
* @param time the {@link Time} instance to use.
*/
AdaptiveOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled,
String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget,
int parallelism, Time time, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter,
double quantile) {
super(datacenterName, partitionId, crossColoEnabled, originatingDcName, includeNonOriginatingDcReplicas,
replicasRequired, successTarget, parallelism, true);
this.datacenterName = datacenterName;
AdaptiveOperationTracker(RouterConfig routerConfig, Class operationClass, PartitionId partitionId,
String originatingDcName, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter,
NonBlockingRouterMetrics routerMetrics, Time time) {
super(routerConfig, operationClass, partitionId, originatingDcName, true);
this.time = time;
this.localColoTracker = localColoTracker;
this.crossColoTracker = crossColoTracker;
this.pastDueCounter = pastDueCounter;
this.quantile = quantile;
this.quantile = routerConfig.routerLatencyToleranceQuantile;
this.otIterator = new OpTrackerIterator();
routerMetrics.registerCustomPercentiles(operationClass, "LocalColoLatencyMs", localColoTracker,
routerConfig.routerOperationTrackerCustomPercentiles);
if (crossColoTracker != null) {
routerMetrics.registerCustomPercentiles(operationClass, "CrossColoLatencyMs", crossColoTracker,
routerConfig.routerOperationTrackerCustomPercentiles);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ class DeleteOperation {
byte blobDcId = blobId.getDatacenterId();
String originatingDcName = clusterMap.getDatacenterName(blobDcId);
this.operationTracker =
new SimpleOperationTracker(routerConfig.routerDatacenterName, blobId.getPartition(), true, originatingDcName,
true, Integer.MAX_VALUE, routerConfig.routerDeleteSuccessTarget,
routerConfig.routerDeleteRequestParallelism, false);
new SimpleOperationTracker(routerConfig, DeleteOperation.class, blobId.getPartition(), originatingDcName,
false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class GetBlobInfoOperation extends GetOperation {
routerMetrics.getBlobInfoLocalColoLatencyMs, routerMetrics.getBlobInfoCrossColoLatencyMs,
routerMetrics.getBlobInfoPastDueCount, kms, cryptoService, cryptoJobHandler, time, isEncrypted);
this.routerCallback = routerCallback;
operationTracker = getOperationTracker(blobId.getPartition(), blobId.getDatacenterId());
operationTracker = getOperationTracker(blobId.getPartition(), blobId.getDatacenterId(), GetBlobInfoOperation.class);
progressTracker = new ProgressTracker(operationTracker);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ void reset() {
void initialize(int index, BlobId id) {
chunkIndex = index;
chunkBlobId = id;
chunkOperationTracker = getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId());
chunkOperationTracker =
getOperationTracker(chunkBlobId.getPartition(), chunkBlobId.getDatacenterId(), GetBlobOperation.class);
progressTracker = new ProgressTracker(chunkOperationTracker);
state = ChunkState.Ready;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class GetOperation {

/**
* Construct a GetOperation
* @param routerConfig the {@link RouterConfig} containing the configs for put operations.
* @param routerConfig the {@link RouterConfig} containing the configs for get operations.
* @param routerMetrics The {@link NonBlockingRouterMetrics} to be used for reporting metrics.
* @param clusterMap the {@link ClusterMap} of the cluster
* @param responseHandler the {@link ResponseHandler} responsible for failure detection.
Expand Down Expand Up @@ -237,23 +237,19 @@ protected GetRequest createGetRequest(BlobId blobId, MessageFormatFlags flag, Ge
/**
* Gets an {@link OperationTracker} based on the config and {@code partitionId}.
* @param partitionId the {@link PartitionId} for which a tracker is required.
* @param operationClass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

* @return an {@link OperationTracker} based on the config and {@code partitionId}.
*/
protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId) {
protected OperationTracker getOperationTracker(PartitionId partitionId, byte datacenterId, Class operationClass) {
OperationTracker operationTracker;
String trackerType = routerConfig.routerGetOperationTrackerType;
String originatingDcName = clusterMap.getDatacenterName(datacenterId);
if (trackerType.equals(SimpleOperationTracker.class.getSimpleName())) {
operationTracker = new SimpleOperationTracker(routerConfig.routerDatacenterName, partitionId,
routerConfig.routerGetCrossDcEnabled, originatingDcName,
routerConfig.routerGetIncludeNonOriginatingDcReplicas, routerConfig.routerGetReplicasRequired,
routerConfig.routerGetSuccessTarget, routerConfig.routerGetRequestParallelism);
operationTracker = new SimpleOperationTracker(routerConfig, operationClass, partitionId, originatingDcName, true);
} else if (trackerType.equals(AdaptiveOperationTracker.class.getSimpleName())) {
operationTracker = new AdaptiveOperationTracker(routerConfig.routerDatacenterName, partitionId,
routerConfig.routerGetCrossDcEnabled, originatingDcName,
routerConfig.routerGetIncludeNonOriginatingDcReplicas, routerConfig.routerGetReplicasRequired,
routerConfig.routerGetSuccessTarget, routerConfig.routerGetRequestParallelism, time, localColoTracker,
crossColoTracker, pastDueCounter, routerConfig.routerLatencyToleranceQuantile);
operationTracker =
new AdaptiveOperationTracker(routerConfig, operationClass, partitionId, originatingDcName, localColoTracker,
crossColoTracker, pastDueCounter, routerMetrics, time);
} else {
throw new IllegalArgumentException("Unrecognized tracker type: " + trackerType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.utils.SystemTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -407,6 +408,21 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) {
decryptJobMetrics = new CryptoJobMetrics(GetOperation.class, "Decrypt", metricRegistry);
}

/**
* Register {@link Gauge} metric for each custom percentile of given {@link Histogram} with given class and name.
* @param ownerClass the {@link Class} that is supposed to own the metrics.
* @param name the name of metric that all percentiles belong to.
* @param histogram the {@link Histogram} to use.
* @param percentiles a list of interested percentiles (double value).
*/
public void registerCustomPercentiles(Class ownerClass, String name, Histogram histogram, List<Double> percentiles) {
percentiles.forEach(p -> {
Gauge<Double> customPercentile = () -> histogram.getSnapshot().getValue(p);
metricRegistry.register(MetricRegistry.name(ownerClass, name, String.valueOf(p * 100), "thPercentile"),
customPercentile);
});
}

/**
* Initializes a {@link Gauge} metric for the status of {@code RequestResponseHandlerThread} of an
* {@link com.github.ambry.router.NonBlockingRouter.OperationController}, to indicate if it is running
Expand All @@ -415,12 +431,7 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) {
* to be monitored.
*/
public void initializeOperationControllerMetrics(final Thread requestResponseHandlerThread) {
requestResponseHandlerThreadRunning = new Gauge<Long>() {
@Override
public Long getValue() {
return requestResponseHandlerThread.isAlive() ? 1L : 0L;
}
};
requestResponseHandlerThreadRunning = () -> requestResponseHandlerThread.isAlive() ? 1L : 0L;
metricRegistry.register(
MetricRegistry.name(NonBlockingRouter.class, requestResponseHandlerThread.getName() + "Running"),
requestResponseHandlerThreadRunning);
Expand All @@ -432,12 +443,7 @@ public Long getValue() {
* @param chunkFillerThread The {@code ChunkFillerThread} of which the status is to be monitored.
*/
public void initializePutManagerMetrics(final Thread chunkFillerThread) {
chunkFillerThreadRunning = new Gauge<Long>() {
@Override
public Long getValue() {
return chunkFillerThread.isAlive() ? 1L : 0L;
}
};
chunkFillerThreadRunning = () -> chunkFillerThread.isAlive() ? 1L : 0L;
metricRegistry.register(MetricRegistry.name(PutManager.class, chunkFillerThread.getName() + "Running"),
chunkFillerThreadRunning);
}
Expand All @@ -464,6 +470,13 @@ public Integer getValue() {
});
}

/**
* @return the MetricRegistry being used in {@link NonBlockingRouterMetrics}
*/
MetricRegistry getMetricRegistry() {
return metricRegistry;
}

/**
* Increment error metrics based on error type.
* @param exception The exception associated with this error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,7 @@ private void prepareForSending() {
passedInBlobProperties.getCreationTimeInMs(), passedInBlobProperties.getAccountId(),
passedInBlobProperties.getContainerId(), passedInBlobProperties.isEncrypted(),
passedInBlobProperties.getExternalAssetTag());
operationTracker = new SimpleOperationTracker(routerConfig.routerDatacenterName, partitionId, false, null, true,
Integer.MAX_VALUE, routerConfig.routerPutSuccessTarget, routerConfig.routerPutRequestParallelism);
operationTracker = new SimpleOperationTracker(routerConfig, PutOperation.class, partitionId, null, true);
correlationIdToChunkPutRequestInfo.clear();
state = ChunkState.Ready;
} catch (RouterException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.config.RouterConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -57,9 +58,10 @@
*
*/
class SimpleOperationTracker implements OperationTracker {
protected final String datacenterName;
protected final int successTarget;
protected final int parallelism;
protected final LinkedList<ReplicaId> replicaPool = new LinkedList<ReplicaId>();
protected final LinkedList<ReplicaId> replicaPool = new LinkedList<>();

protected int totalReplicaCount = 0;
protected int inflightCount = 0;
Expand All @@ -72,25 +74,43 @@ class SimpleOperationTracker implements OperationTracker {
/**
* Constructor for an {@code SimpleOperationTracker}.
*
* @param datacenterName The datacenter where the router is located.
* @param routerConfig The {@link RouterConfig} containing the configs for operation tracker.
* @param operationClass The operation class in which operation tracker is used.
* @param partitionId The partition on which the operation is performed.
* @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false}
* otherwise.
* @param originatingDcName The original DC where blob was put.
* @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas.
* @param replicasRequired The number of replicas required for the operation.
* @param successTarget The number of successful responses required to succeed the operation.
* @param parallelism The maximum number of inflight requests at any point of time.
* @param shuffleReplicas Indicates if the replicas need to be shuffled.
*/
SimpleOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled,
String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget,
int parallelism, boolean shuffleReplicas) {
SimpleOperationTracker(RouterConfig routerConfig, Class operationClass, PartitionId partitionId,
String originatingDcName, boolean shuffleReplicas) {
// populate tracker parameters based on operation type
boolean crossColoEnabled = false;
boolean includeNonOriginatingDcReplicas = true;
int replicasRequired = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this local variable used?

if (GetOperation.class.isAssignableFrom(operationClass)) {
successTarget = routerConfig.routerGetSuccessTarget;
parallelism = routerConfig.routerGetRequestParallelism;
crossColoEnabled = routerConfig.routerGetCrossDcEnabled;
includeNonOriginatingDcReplicas = routerConfig.routerGetIncludeNonOriginatingDcReplicas;
replicasRequired = routerConfig.routerGetReplicasRequired;
} else if (operationClass == PutOperation.class) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other option is to make an enum to represent the operation tracker types (i.e. GET, PUT, DELETE, TTL_UPDATE). This is a little more explicit and resilient to new classes/name changes.

successTarget = routerConfig.routerPutSuccessTarget;
parallelism = routerConfig.routerPutRequestParallelism;
} else if (operationClass == DeleteOperation.class) {
successTarget = routerConfig.routerDeleteSuccessTarget;
parallelism = routerConfig.routerDeleteRequestParallelism;
crossColoEnabled = true;
} else if (operationClass == TtlUpdateOperation.class) {
successTarget = routerConfig.routerTtlUpdateSuccessTarget;
parallelism = routerConfig.routerTtlUpdateRequestParallelism;
crossColoEnabled = true;
} else {
throw new IllegalArgumentException("Unrecognized operation: " + operationClass.getSimpleName());
}
if (parallelism < 1) {
throw new IllegalArgumentException("Parallelism has to be > 0. Configured to be " + parallelism);
}
this.successTarget = successTarget;
this.parallelism = parallelism;
datacenterName = routerConfig.routerDatacenterName;

// Order the replicas so that local healthy replicas are ordered and returned first,
// then the remote healthy ones, and finally the possibly down ones.
List<? extends ReplicaId> replicas = partitionId.getReplicaIds();
Expand Down Expand Up @@ -149,25 +169,6 @@ class SimpleOperationTracker implements OperationTracker {
this.otIterator = new OpTrackerIterator();
}

/**
* Constructor for an {@code SimpleOperationTracker}, which shuffles replicas.
*
* @param datacenterName The datacenter where the router is located.
* @param partitionId The partition on which the operation is performed.
* @param crossColoEnabled {@code true} if requests can be sent to remote replicas, {@code false} otherwise.
* @param originatingDcName The original DC where blob was put. null if DC unknown.
* @param includeNonOriginatingDcReplicas if take the option to include remote non originating DC replicas.
* @param replicasRequired The number of replicas required for the operation.
* @param successTarget The number of successful responses required to succeed the operation.
* @param parallelism The maximum number of inflight requests at any point of time.
*/
SimpleOperationTracker(String datacenterName, PartitionId partitionId, boolean crossColoEnabled,
String originatingDcName, boolean includeNonOriginatingDcReplicas, int replicasRequired, int successTarget,
int parallelism) {
this(datacenterName, partitionId, crossColoEnabled, originatingDcName, includeNonOriginatingDcReplicas,
replicasRequired, successTarget, parallelism, true);
}

@Override
public boolean hasSucceeded() {
return succeededCount >= successTarget;
Expand Down Expand Up @@ -220,7 +221,14 @@ private boolean hasFailed() {
}

/**
* Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(String, PartitionId, boolean, String, boolean, int, int, int, boolean)}.
* @return the success target number of this operation tracker.
*/
public int getSuccessTarget() {
return successTarget;
}

/**
* Helper function to catch a potential race condition in {@link SimpleOperationTracker#SimpleOperationTracker(RouterConfig, Class, PartitionId, String, boolean)}.
*
* @param partitionId The partition on which the operation is performed.
* @param examinedReplicas All replicas examined.
Expand Down
Loading