Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes in IOException catch and metrics registry #1170

Merged
merged 3 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions ambry-api/src/main/java/com.github.ambry/store/StoreException.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
*/
package com.github.ambry.store;

import java.util.Objects;


public class StoreException extends Exception {
public static final String INTERNAL_ERROR_STR =
static final String INTERNAL_ERROR_STR =
"a fault occurred in a recent unsafe memory access operation in compiled Java code";
public static final String IO_ERROR_STR = "Input/output error";
static final String IO_ERROR_STR = "Input/output error";
private static final long serialVersionUID = 1;
private final StoreErrorCodes error;

Expand All @@ -38,4 +41,14 @@ public StoreException(Throwable e, StoreErrorCodes error) {
public StoreErrorCodes getErrorCode() {
return this.error;
}

/**
* Resolve the error code from {@link Throwable}. This is to determine if exception is related to real disk I/O issue.
* @param t the {@link Throwable} to check
* @return the {@link StoreErrorCodes} based on the error message in exception/error.
*/
public static StoreErrorCodes resolveErrorCode(Throwable t) {
return Objects.equals(t.getMessage(), StoreException.IO_ERROR_STR) || Objects.equals(t.getMessage(),
StoreException.INTERNAL_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;


/**
Expand Down Expand Up @@ -54,8 +53,7 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
} catch (IOException e) {
// The IOException message may vary in different java versions. As code evolves, we may need to update IO_ERROR_STR
// in StoreException (based on java version that is being employed) to correctly capture disk I/O related errors.
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = StoreException.resolveErrorCode(e);
throw new StoreException(errorCode.toString() + " while writing into store", e, errorCode);
}
buf.limit(savedLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static com.github.ambry.replication.ReplicationTest.*;
Expand Down Expand Up @@ -124,9 +123,7 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
sizeRead += channel.read(buf);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = StoreException.resolveErrorCode(e);
throw new StoreException(errorCode.toString() + " while writing into dummy log", e, errorCode);
}
buf.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,14 @@ class AdaptiveOperationTracker extends SimpleOperationTracker {
*/
AdaptiveOperationTracker(RouterConfig routerConfig, RouterOperation routerOperation, PartitionId partitionId,
String originatingDcName, Histogram localColoTracker, Histogram crossColoTracker, Counter pastDueCounter,
NonBlockingRouterMetrics routerMetrics, Time time) {
Time time) {
super(routerConfig, routerOperation, partitionId, originatingDcName, true);
this.time = time;
this.localColoTracker = localColoTracker;
this.crossColoTracker = crossColoTracker;
this.pastDueCounter = pastDueCounter;
this.quantile = routerConfig.routerLatencyToleranceQuantile;
this.otIterator = new OpTrackerIterator();
Class operationClass = null;
switch (routerOperation) {
case GetBlobOperation:
operationClass = GetBlobOperation.class;
break;
case GetBlobInfoOperation:
operationClass = GetBlobInfoOperation.class;
break;
default:
throw new IllegalArgumentException(routerOperation + " is not supported in AdaptiveOperationTracker");
}
routerMetrics.registerCustomPercentiles(operationClass, "LocalColoLatencyMs", localColoTracker,
routerConfig.routerOperationTrackerCustomPercentiles);
if (crossColoTracker != null) {
routerMetrics.registerCustomPercentiles(operationClass, "CrossColoLatencyMs", crossColoTracker,
routerConfig.routerOperationTrackerCustomPercentiles);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ protected OperationTracker getOperationTracker(PartitionId partitionId, byte dat
} else if (trackerType.equals(AdaptiveOperationTracker.class.getSimpleName())) {
operationTracker =
new AdaptiveOperationTracker(routerConfig, routerOperation, partitionId, originatingDcName, localColoTracker,
crossColoTracker, pastDueCounter, routerMetrics, time);
crossColoTracker, pastDueCounter, time);
} else {
throw new IllegalArgumentException("Unrecognized tracker type: " + trackerType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public NonBlockingRouterFactory(VerifiableProperties verifiableProperties, Clust
this.notificationSystem = notificationSystem;
this.accountService = accountService;
MetricRegistry registry = clusterMap.getMetricRegistry();
routerMetrics = new NonBlockingRouterMetrics(clusterMap);
routerMetrics = new NonBlockingRouterMetrics(clusterMap, routerConfig);
networkConfig = new NetworkConfig(verifiableProperties);
networkMetrics = new NetworkMetrics(registry);
time = SystemTime.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.utils.SystemTime;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -182,7 +183,7 @@ public class NonBlockingRouterMetrics {
// Map that stores dataNode-level metrics.
private final Map<DataNodeId, NodeLevelMetrics> dataNodeToMetrics;

public NonBlockingRouterMetrics(ClusterMap clusterMap) {
public NonBlockingRouterMetrics(ClusterMap clusterMap, RouterConfig routerConfig) {
metricRegistry = clusterMap.getMetricRegistry();

// Operation Rate.
Expand Down Expand Up @@ -406,6 +407,18 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) {
// Encrypt/Decrypt job metrics
encryptJobMetrics = new CryptoJobMetrics(PutOperation.class, "Encrypt", metricRegistry);
decryptJobMetrics = new CryptoJobMetrics(GetOperation.class, "Decrypt", metricRegistry);

// Custom percentiles
if (routerConfig != null) {
registerCustomPercentiles(GetBlobOperation.class, "LocalColoLatencyMs", getBlobLocalColoLatencyMs,
routerConfig.routerOperationTrackerCustomPercentiles);
registerCustomPercentiles(GetBlobOperation.class, "CrossColoLatencyMs", getBlobCrossColoLatencyMs,
routerConfig.routerOperationTrackerCustomPercentiles);
registerCustomPercentiles(GetBlobInfoOperation.class, "LocalColoLatencyMs", getBlobInfoLocalColoLatencyMs,
routerConfig.routerOperationTrackerCustomPercentiles);
registerCustomPercentiles(GetBlobInfoOperation.class, "CrossColoLatencyMs", getBlobInfoCrossColoLatencyMs,
routerConfig.routerOperationTrackerCustomPercentiles);
}
}

/**
Expand All @@ -415,7 +428,7 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap) {
* @param histogram the {@link Histogram} to use.
* @param percentiles a list of interested percentiles (double value).
*/
public void registerCustomPercentiles(Class ownerClass, String name, Histogram histogram, List<Double> percentiles) {
private void registerCustomPercentiles(Class ownerClass, String name, Histogram histogram, List<Double> percentiles) {
percentiles.forEach(p -> {
Gauge<Double> customPercentile = () -> histogram.getSnapshot().getValue(p);
metricRegistry.register(MetricRegistry.name(ownerClass, name, String.valueOf(p * 100), "thPercentile"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class AdaptiveOperationTrackerTest {
private final Histogram crossColoTracker = registry.histogram("CrossColoTracker");
private final Counter pastDueCounter = registry.counter("PastDueCounter");
private NonBlockingRouterMetrics routerMetrics;
private RouterConfig defaultRouterConfig;

/**
* Constructor that sets up state.
Expand All @@ -92,7 +93,11 @@ public AdaptiveOperationTrackerTest() throws Exception {
for (int i = 0; i < REPLICA_COUNT; i++) {
mockPartition.replicaIds.add(new MockReplicaId(PORT, mockPartition, datanodes.get(i % datanodes.size()), 0));
}
routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap());
Properties props = new Properties();
props.setProperty("router.hostname", "localhost");
props.setProperty("router.datacenter.name", localDcName);
defaultRouterConfig = new RouterConfig(new VerifiableProperties(props));
routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap(), defaultRouterConfig);
}

/**
Expand All @@ -106,7 +111,7 @@ public void adaptationTest() throws InterruptedException {
double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE);
double crossColoCutoff = crossColoTracker.getSnapshot().getValue(QUANTILE);

OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, 2, null);
OperationTracker ot = getOperationTracker(createRouterConfig(true, REPLICA_COUNT, 2, null));
// 3-0-0-0; 3-0-0-0
sendRequests(ot, 2);
// 1-2-0-0; 3-0-0-0
Expand Down Expand Up @@ -177,7 +182,7 @@ public void noUnexpiredRequestsTest() throws InterruptedException {
primeTracker(crossColoTracker, AdaptiveOperationTracker.MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE);
double localColoCutoff = localColoTracker.getSnapshot().getValue(QUANTILE);

OperationTracker ot = getOperationTracker(false, 1, 1, null);
OperationTracker ot = getOperationTracker(createRouterConfig(false, 1, 1, null));
// 3-0-0-0
sendRequests(ot, 1);
// 2-1-0-0
Expand Down Expand Up @@ -209,7 +214,7 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException
primeTracker(crossColoTracker, AdaptiveOperationTracker.MIN_DATA_POINTS_REQUIRED, CROSS_COLO_LATENCY_RANGE);
double localColoCutoff = localColoTracker.getSnapshot().getValue(1);

OperationTracker ot = getOperationTracker(false, 1, 1, null);
OperationTracker ot = getOperationTracker(createRouterConfig(false, 1, 1, null));
// 3-0-0-0
sendRequests(ot, 1);
// 2-1-0-0
Expand Down Expand Up @@ -238,14 +243,13 @@ public void trackerUpdateBetweenHasNextAndNextTest() throws InterruptedException
}

/**
* Test that adaptive operation track can correctly register custom percentiles. An example of metric name is:
* Test that {@link NonBlockingRouterMetrics} can correctly register custom percentiles. An example of metric name is:
* "com.github.ambry.router.GetOperation.LocalColoLatencyMs.91.0.thPercentile"
* @throws Exception
*/
@Test
public void customPercentilesMetricsRegistryTest() throws Exception {
// test that if custom percentile is not set, no corresponding metrics would be generated.
getOperationTracker(true, 1, 1, null);
MetricRegistry metricRegistry = routerMetrics.getMetricRegistry();
MetricFilter filter = new MetricFilter() {
@Override
Expand All @@ -257,43 +261,51 @@ public boolean matches(String name, Metric metric) {
assertTrue("No gauges should be created because custom percentile is not set", gauges.isEmpty());
// test that dedicated gauges are correctly created for custom percentiles.
String customPercentiles = "0.91,0.97";
RouterConfig routerConfig = createRouterConfig(false, 1, 1, customPercentiles);
String[] percentileArray = customPercentiles.split(",");
Arrays.sort(percentileArray);
List<String> sortedPercentiles =
Arrays.stream(percentileArray).map(p -> String.valueOf(Double.valueOf(p) * 100)).collect(Collectors.toList());
getOperationTracker(false, 1, 1, customPercentiles);
gauges = metricRegistry.getGauges(filter);
// Note that crossColoEnabled is false, the number of gauges should equal to number of given percentiles
assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size(), gauges.size());
routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap(), routerConfig);
gauges = routerMetrics.getMetricRegistry().getGauges(filter);
// Note that each percentile creates 4 metrics (GetBlobInfo/GetBlob joins LocalColo/CrossColo). So, the total number of
// metrics should equal to 4 * (# of given custom percentiles)
assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size() * 4, gauges.size());
Iterator mapItor = gauges.keySet().iterator();
Iterator<String> listItor = sortedPercentiles.iterator();
while (listItor.hasNext()) {
String gaugeName = (String) mapItor.next();
String percentileStr = listItor.next();
assertTrue("The gauge name doesn't match", gaugeName.endsWith(percentileStr + ".thPercentile"));
}
// reset router metrics to clean up registered metrics
routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap());
metricRegistry = routerMetrics.getMetricRegistry();
getOperationTracker(true, 1, 1, customPercentiles);
gauges = metricRegistry.getGauges(filter);
assertEquals("The number of custom percentile gauge doesn't match", sortedPercentiles.size() * 2, gauges.size());
// reset router metrics to clean up registered custom percentile metrics
routerMetrics = new NonBlockingRouterMetrics(new MockClusterMap(), defaultRouterConfig);
}

// helpers

// general

/**
* Returns an instance of {@link AdaptiveOperationTracker}.
* Instantiate an adaptive operation tracker.
* @param routerConfig the {@link RouterConfig} to use in adaptive tracker.
* @return an instance of {@link AdaptiveOperationTracker} with the given parameters.
*/
private OperationTracker getOperationTracker(RouterConfig routerConfig) {
return new AdaptiveOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, null,
localColoTracker, routerConfig.routerGetCrossDcEnabled ? crossColoTracker : null, pastDueCounter, time);
}

/**
* Generate an instance of {@link RouterConfig} based on input parameters.
* @param crossColoEnabled {@code true} if cross colo needs to be enabled. {@code false} otherwise.
* @param successTarget the number of successful responses required for the operation to succeed.
* @param parallelism the number of parallel requests that can be in flight.
* @param customPercentiles the custom percentiles to be reported. Percentiles are specified in a comma-separated
* string, i.e "0.94,0.96,0.97".
* @return an instance of {@link AdaptiveOperationTracker} with the given parameters.
* @return an instance of {@link RouterConfig}
*/
private OperationTracker getOperationTracker(boolean crossColoEnabled, int successTarget, int parallelism,
private RouterConfig createRouterConfig(boolean crossColoEnabled, int successTarget, int parallelism,
String customPercentiles) {
Properties props = new Properties();
props.setProperty("router.hostname", "localhost");
Expand All @@ -307,9 +319,7 @@ private OperationTracker getOperationTracker(boolean crossColoEnabled, int succe
if (customPercentiles != null) {
props.setProperty("router.operation.tracker.custom.percentiles", customPercentiles);
}
RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(props));
return new AdaptiveOperationTracker(routerConfig, RouterOperation.GetBlobOperation, mockPartition, null,
localColoTracker, crossColoEnabled ? crossColoTracker : null, pastDueCounter, routerMetrics, time);
return new RouterConfig(new VerifiableProperties(props));
}

/**
Expand Down Expand Up @@ -355,7 +365,7 @@ private void sendRequests(OperationTracker operationTracker, int numRequestsExpe
*/
private void doTrackerUpdateTest(boolean succeedRequests) throws InterruptedException {
long timeIncrement = 10;
OperationTracker ot = getOperationTracker(true, REPLICA_COUNT, REPLICA_COUNT, null);
OperationTracker ot = getOperationTracker(createRouterConfig(true, REPLICA_COUNT, REPLICA_COUNT, null));
// 3-0-0-0; 3-0-0-0
sendRequests(ot, REPLICA_COUNT);
// 0-3-0-0; 0-3-0-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testChunkNumAndSizeCalculations() throws Exception {
VerifiableProperties vProps = getNonBlockingRouterProperties();
MockClusterMap mockClusterMap = new MockClusterMap();
RouterConfig routerConfig = new RouterConfig(vProps);
NonBlockingRouterMetrics routerMetrics = new NonBlockingRouterMetrics(mockClusterMap);
NonBlockingRouterMetrics routerMetrics = new NonBlockingRouterMetrics(mockClusterMap, routerConfig);
short accountId = Utils.getRandomShort(random);
short containerId = Utils.getRandomShort(random);
BlobProperties putBlobProperties =
Expand Down Expand Up @@ -234,7 +234,7 @@ private void fillChunksAndAssertSuccess() throws Exception {
VerifiableProperties vProps = getNonBlockingRouterProperties();
MockClusterMap mockClusterMap = new MockClusterMap();
RouterConfig routerConfig = new RouterConfig(vProps);
routerMetrics = new NonBlockingRouterMetrics(mockClusterMap);
routerMetrics = new NonBlockingRouterMetrics(mockClusterMap, routerConfig);
ResponseHandler responseHandler = new ResponseHandler(mockClusterMap);
short accountId = Utils.getRandomShort(random);
short containerId = Utils.getRandomShort(random);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public CryptoJobHandlerTest() throws IOException, GeneralSecurityException {
cryptoService = new GCMCryptoServiceFactory(verifiableProperties, REGISTRY).getCryptoService();
cryptoJobHandler = new CryptoJobHandler(DEFAULT_THREAD_COUNT);
referenceClusterMap = new MockClusterMap();
routerMetrics = new NonBlockingRouterMetrics(referenceClusterMap);
routerMetrics = new NonBlockingRouterMetrics(referenceClusterMap, null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I purposely set RouterConfig = null when creating NonBlockingRouterMetrics in several tests. This is to verify that even if router config is mistakenly set to null in the ctor of NonBlockingRouterMetrics, there would be no failure during instantiation.

}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void init() throws Exception {
clusterMap = new MockClusterMap();
serverLayout = new MockServerLayout(clusterMap);
RouterConfig routerConfig = new RouterConfig(vProps);
router = new NonBlockingRouter(routerConfig, new NonBlockingRouterMetrics(clusterMap),
router = new NonBlockingRouter(routerConfig, new NonBlockingRouterMetrics(clusterMap, routerConfig),
new MockNetworkClientFactory(vProps, mockSelectorState, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL,
CHECKOUT_TIMEOUT_MS, serverLayout, mockTime), new LoggingNotificationSystem(), clusterMap, null, null, null,
new InMemAccountService(false, true), mockTime, MockClusterMap.DEFAULT_PARTITION_CLASS);
Expand Down Expand Up @@ -360,7 +360,8 @@ public void testVariousServerErrorCodesForThreeParallelism() throws Exception {
Properties props = getNonBlockingRouterProperties();
props.setProperty("router.delete.request.parallelism", "3");
VerifiableProperties vProps = new VerifiableProperties(props);
router = new NonBlockingRouter(new RouterConfig(vProps), new NonBlockingRouterMetrics(clusterMap),
RouterConfig routerConfig = new RouterConfig(vProps);
router = new NonBlockingRouter(routerConfig, new NonBlockingRouterMetrics(clusterMap, routerConfig),
new MockNetworkClientFactory(vProps, mockSelectorState, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL,
CHECKOUT_TIMEOUT_MS, serverLayout, mockTime), new LoggingNotificationSystem(), clusterMap, null, null, null,
new InMemAccountService(false, true), mockTime, MockClusterMap.DEFAULT_PARTITION_CLASS);
Expand Down
Loading