From b31fb03e383dffc2b21888142abdda066cda2fb6 Mon Sep 17 00:00:00 2001 From: Rob Block Date: Fri, 21 Jun 2019 17:19:56 -0700 Subject: [PATCH 1/2] Retry Cosmos requests after delay when they return 429 Refactor Cosmos logic into separate class --- .../CloudTokenPersistor.java | 3 +- .../azure/AzureCloudConfig.java | 11 + .../azure/AzureCloudDestination.java | 90 ++------ .../azure/AzureMetrics.java | 12 ++ .../azure/CosmosDataAccessor.java | 197 ++++++++++++++++++ .../azure/AzureCloudDestinationTest.java | 3 + 6 files changed, 248 insertions(+), 68 deletions(-) create mode 100644 ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java index 43641cf5d6..34cf0d94f9 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java @@ -19,6 +19,7 @@ import com.github.ambry.replication.ReplicationException; import com.github.ambry.replication.ReplicationMetrics; import com.github.ambry.store.FindTokenFactory; +import com.github.ambry.store.StoreFindToken; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -68,7 +69,7 @@ protected void persist(String mountPath, List tokenInfoList) InputStream inputStream = new ByteArrayInputStream(tokenOutputStream.toByteArray()); cloudDestination.persistTokens(mountPath, replicaTokenFileName, inputStream); - logger.debug("Completed writing replica tokens to cloud destination."); + logger.debug("Persisted replica tokens for {} to cloud destination.", mountPath); } catch (CloudStorageException e) { throw new ReplicationException("IO error persisting replica tokens at mount path " + mountPath, e); } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java index 4db0dee6ae..9c0a99b149 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java @@ -14,6 +14,7 @@ package com.github.ambry.cloud.azure; import com.github.ambry.config.Config; +import com.github.ambry.config.Default; import com.github.ambry.config.VerifiableProperties; @@ -26,6 +27,8 @@ public class AzureCloudConfig { public static final String COSMOS_ENDPOINT = "cosmos.endpoint"; public static final String COSMOS_COLLECTION_LINK = "cosmos.collection.link"; public static final String COSMOS_KEY = "cosmos.key"; + public static final String COSMOS_MAX_RETRIES = "cosmos.max.retries"; + public static final int DEFAULT_COSMOS_MAX_RETRIES = 5; /** * The Azure Blob Storage connection string. @@ -51,10 +54,18 @@ public class AzureCloudConfig { @Config(COSMOS_KEY) public final String cosmosKey; + /** + * The maximum number of retries for Cosmos DB requests. + */ + @Config(COSMOS_MAX_RETRIES) + @Default("5") + public final int cosmosMaxRetries; + public AzureCloudConfig(VerifiableProperties verifiableProperties) { azureStorageConnectionString = verifiableProperties.getString(AZURE_STORAGE_CONNECTION_STRING); cosmosEndpoint = verifiableProperties.getString(COSMOS_ENDPOINT); cosmosCollectionLink = verifiableProperties.getString(COSMOS_COLLECTION_LINK); cosmosKey = verifiableProperties.getString(COSMOS_KEY); + cosmosMaxRetries = verifiableProperties.getInt(COSMOS_MAX_RETRIES, DEFAULT_COSMOS_MAX_RETRIES); } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java index e38e81ea95..67354e952b 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java @@ -24,11 +24,6 @@ import com.microsoft.azure.documentdb.Document; import com.microsoft.azure.documentdb.DocumentClient; import com.microsoft.azure.documentdb.DocumentClientException; -import com.microsoft.azure.documentdb.DocumentCollection; -import com.microsoft.azure.documentdb.FeedOptions; -import com.microsoft.azure.documentdb.FeedResponse; -import com.microsoft.azure.documentdb.PartitionKey; -import com.microsoft.azure.documentdb.RequestOptions; import com.microsoft.azure.documentdb.ResourceResponse; import com.microsoft.azure.documentdb.SqlParameter; import com.microsoft.azure.documentdb.SqlParameterCollection; @@ -50,7 +45,6 @@ import java.net.Proxy; import java.net.URISyntaxException; import java.security.InvalidKeyException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -81,8 +75,7 @@ class AzureCloudDestination implements CloudDestination { private final CloudStorageAccount azureAccount; private final CloudBlobClient azureBlobClient; private final DocumentClient documentClient; - private final String cosmosCollectionLink; - private final RequestOptions defaultRequestOptions = new RequestOptions(); + private final CosmosDataAccessor cosmosDataAccessor; private final OperationContext blobOpContext = new OperationContext(); private final AzureMetrics azureMetrics; private final String clusterName; @@ -112,7 +105,6 @@ class AzureCloudDestination implements CloudDestination { new Proxy(Proxy.Type.HTTP, new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort))); } // Set up CosmosDB connection, including any proxy setting - cosmosCollectionLink = azureCloudConfig.cosmosCollectionLink; ConnectionPolicy connectionPolicy = new ConnectionPolicy(); if (cloudConfig.vcrProxyHost != null) { connectionPolicy.setProxy(new HttpHost(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort)); @@ -120,6 +112,7 @@ class AzureCloudDestination implements CloudDestination { } documentClient = new DocumentClient(azureCloudConfig.cosmosEndpoint, azureCloudConfig.cosmosKey, connectionPolicy, ConsistencyLevel.Session); + cosmosDataAccessor = new CosmosDataAccessor(documentClient, azureCloudConfig, azureMetrics); this.retentionPeriodMs = TimeUnit.DAYS.toMillis(cloudConfig.cloudDeletedBlobRetentionDays); logger.info("Created Azure destination"); } @@ -137,13 +130,15 @@ class AzureCloudDestination implements CloudDestination { String clusterName, AzureMetrics azureMetrics) { this.azureAccount = azureAccount; this.documentClient = documentClient; - this.cosmosCollectionLink = cosmosCollectionLink; this.azureMetrics = azureMetrics; this.clusterName = clusterName; this.retentionPeriodMs = TimeUnit.DAYS.toMillis(CloudConfig.DEFAULT_RETENTION_DAYS); // Create a blob client to interact with Blob storage azureBlobClient = azureAccount.createCloudBlobClient(); + cosmosDataAccessor = + new CosmosDataAccessor(documentClient, cosmosCollectionLink, AzureCloudConfig.DEFAULT_COSMOS_MAX_RETRIES, + azureMetrics); } /** @@ -177,21 +172,12 @@ void testStorageConnectivity() { * Test connectivity to Azure CosmosDB */ void testCosmosConnectivity() { - try { - ResourceResponse response = - documentClient.readCollection(cosmosCollectionLink, defaultRequestOptions); - if (response.getResource() == null) { - throw new IllegalStateException("CosmosDB collection not found: " + cosmosCollectionLink); - } - logger.info("CosmosDB connection test succeeded."); - } catch (DocumentClientException ex) { - throw new IllegalStateException("CosmosDB connection test failed", ex); - } + cosmosDataAccessor.testConnectivity(); } /** * Visible for test. - * @return the CosmosDB DocumentClient. + * @return the CosmosDB DocumentClient */ DocumentClient getDocumentClient() { return documentClient; @@ -218,12 +204,7 @@ public boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata clo return false; } - Timer.Context docTimer = azureMetrics.documentCreateTime.time(); - try { - documentClient.upsertDocument(cosmosCollectionLink, cloudBlobMetadata, defaultRequestOptions, true); - } finally { - docTimer.stop(); - } + cosmosDataAccessor.upsertMetadata(cloudBlobMetadata); backupTimer.stop(); azureMetrics.backupSuccessByteRate.mark(inputLength); return true; @@ -326,24 +307,10 @@ public List getDeadBlobs(String partitionPath) throws CloudSt * @throws CloudStorageException */ List runMetadataQuery(String partitionPath, SqlQuerySpec querySpec) throws CloudStorageException { - azureMetrics.documentQueryCount.inc(); - FeedOptions feedOptions = new FeedOptions(); - feedOptions.setPartitionKey(new PartitionKey(partitionPath)); - FeedResponse response = documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions); try { - // Note: internal query iterator wraps DocumentClientException in IllegalStateException! - List metadataList = new ArrayList<>(); - response.getQueryIterable() - .iterator() - .forEachRemaining(doc -> metadataList.add(doc.toObject(CloudBlobMetadata.class))); - return metadataList; - } catch (RuntimeException rex) { - if (rex.getCause() instanceof DocumentClientException) { - azureMetrics.documentErrorCount.inc(); - throw new CloudStorageException("Failed to query blob metadata", rex.getCause()); - } else { - throw rex; - } + return cosmosDataAccessor.queryMetadata(partitionPath, querySpec); + } catch (DocumentClientException dex) { + throw new CloudStorageException("Failed to query blob metadata", dex); } } @@ -386,25 +353,17 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value storageTimer.stop(); } - Timer.Context docTimer = azureMetrics.documentUpdateTime.time(); - try { - String docLink = cosmosCollectionLink + "/docs/" + blobId.getID(); - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(blobId.getPartition().toPathString())); - ResourceResponse response = documentClient.readDocument(docLink, options); - //CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class); - Document doc = response.getResource(); - if (doc == null) { - logger.warn("Blob metadata record not found: " + docLink); - return false; - } - // Update only if value has changed - if (!value.equals(doc.get(fieldName))) { - doc.set(fieldName, value); - documentClient.replaceDocument(doc, options); - } - } finally { - docTimer.stop(); + ResourceResponse response = cosmosDataAccessor.readMetadata(blobId); + //CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class); + Document doc = response.getResource(); + if (doc == null) { + logger.warn("Blob metadata record not found: " + blobId.getID()); + return false; + } + // Update only if value has changed + if (!value.equals(doc.get(fieldName))) { + doc.set(fieldName, value); + cosmosDataAccessor.replaceMetadata(blobId, doc); } logger.debug("Updated blob {} metadata set {} to {}.", blobId, fieldName, value); azureMetrics.blobUpdatedCount.inc(); @@ -422,8 +381,6 @@ public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageExce String blobFileName = blobMetadata.getCloudBlobName(); String partitionPath = blobMetadata.getPartitionId(); String containerName = getAzureContainerName(partitionPath); - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(partitionPath)); azureMetrics.blobDeleteRequestCount.inc(); Timer.Context deleteTimer = azureMetrics.blobDeletionTime.time(); try { @@ -433,9 +390,8 @@ public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageExce boolean deletionDone = azureBlob.deleteIfExists(); // Delete the document too - String docLink = cosmosCollectionLink + "/docs/" + blobId; try { - documentClient.deleteDocument(docLink, options); + cosmosDataAccessor.deleteMetadata(blobMetadata); deletionDone = true; logger.debug("Purged blob {} from partition {}.", blobId, partitionPath); } catch (DocumentClientException dex) { diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java index 69a3dddf2a..29cf0f7596 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java @@ -29,7 +29,9 @@ public class AzureMetrics { public static final String BLOB_UPDATE_TIME = "BlobUpdateTime"; public static final String BLOB_UPDATED_COUNT = "BlobUpdatedCount"; public static final String DOCUMENT_CREATE_TIME = "DocumentCreateTime"; + public static final String DOCUMENT_READ_TIME = "DocumentReadTime"; public static final String DOCUMENT_UPDATE_TIME = "DocumentUpdateTime"; + public static final String DOCUMENT_DELETE_TIME = "DocumentDeleteTime"; public static final String DOCUMENT_QUERY_COUNT = "DocumentQueryCount"; public static final String MISSING_KEYS_QUERY_TIME = "MissingKeysQueryTime"; public static final String DEAD_BLOBS_QUERY_TIME = "DeadBlobsQueryTime"; @@ -44,6 +46,8 @@ public class AzureMetrics { public static final String BACKUP_SUCCESS_LATENCY = "BackupSuccessLatency"; public static final String BACKUP_SUCCESS_BYTE_RATE = "BackupSuccessByteRate"; public static final String BACKUP_ERROR_COUNT = "BackupErrorCount"; + public static final String RETRY_COUNT = "RetryCount"; + public static final String RETRY_WAIT_TIME = "RetryWaitTime"; // Metrics public final Counter blobUploadRequestCount; @@ -53,7 +57,9 @@ public class AzureMetrics { public final Timer blobUploadTime; public final Timer blobUpdateTime; public final Timer documentCreateTime; + public final Timer documentReadTime; public final Timer documentUpdateTime; + public final Timer documentDeleteTime; public final Timer missingKeysQueryTime; public final Counter documentQueryCount; public final Timer deadBlobsQueryTime; @@ -68,6 +74,8 @@ public class AzureMetrics { public final Timer backupSuccessLatency; public final Meter backupSuccessByteRate; public final Counter backupErrorCount; + public final Counter retryCount; + public final Timer retryWaitTime; public AzureMetrics(MetricRegistry registry) { blobUploadRequestCount = @@ -80,7 +88,9 @@ public AzureMetrics(MetricRegistry registry) { blobUploadTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_TIME)); blobUpdateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPDATE_TIME)); documentCreateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_CREATE_TIME)); + documentReadTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_READ_TIME)); documentUpdateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_UPDATE_TIME)); + documentDeleteTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_DELETE_TIME)); documentQueryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_QUERY_COUNT)); missingKeysQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, MISSING_KEYS_QUERY_TIME)); deadBlobsQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DEAD_BLOBS_QUERY_TIME)); @@ -96,5 +106,7 @@ public AzureMetrics(MetricRegistry registry) { backupSuccessByteRate = registry.meter(MetricRegistry.name(AzureCloudDestination.class, BACKUP_SUCCESS_BYTE_RATE)); backupSuccessLatency = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BACKUP_SUCCESS_LATENCY)); backupErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, BACKUP_ERROR_COUNT)); + retryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, RETRY_COUNT)); + retryWaitTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, RETRY_WAIT_TIME)); } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java new file mode 100644 index 0000000000..f86a0a14a8 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java @@ -0,0 +1,197 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud.azure; + +import com.codahale.metrics.Timer; +import com.github.ambry.cloud.CloudBlobMetadata; +import com.github.ambry.commons.BlobId; +import com.microsoft.azure.documentdb.Document; +import com.microsoft.azure.documentdb.DocumentClient; +import com.microsoft.azure.documentdb.DocumentClientException; +import com.microsoft.azure.documentdb.DocumentCollection; +import com.microsoft.azure.documentdb.FeedOptions; +import com.microsoft.azure.documentdb.FeedResponse; +import com.microsoft.azure.documentdb.PartitionKey; +import com.microsoft.azure.documentdb.RequestOptions; +import com.microsoft.azure.documentdb.ResourceResponse; +import com.microsoft.azure.documentdb.SqlQuerySpec; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class CosmosDataAccessor { + private static final Logger logger = LoggerFactory.getLogger(CosmosDataAccessor.class); + private static final int HTTP_TOO_MANY_REQUESTS = 429; + private static final String DOCS = "/docs/"; + private final DocumentClient documentClient; + private final String cosmosCollectionLink; + private final AzureMetrics azureMetrics; + private final int maxRetries; + + /** Production constructor */ + public CosmosDataAccessor(DocumentClient documentClient, AzureCloudConfig azureCloudConfig, + AzureMetrics azureMetrics) { + this.documentClient = documentClient; + this.cosmosCollectionLink = azureCloudConfig.cosmosCollectionLink; + this.azureMetrics = azureMetrics; + this.maxRetries = azureCloudConfig.cosmosMaxRetries; + } + + /** Test constructor */ + public CosmosDataAccessor(DocumentClient documentClient, String cosmosCollectionLink, int maxRetries, + AzureMetrics azureMetrics) { + this.documentClient = documentClient; + this.cosmosCollectionLink = cosmosCollectionLink; + this.azureMetrics = azureMetrics; + this.maxRetries = maxRetries; + } + + /** + * Test connectivity to Azure CosmosDB + */ + void testConnectivity() { + try { + ResourceResponse response = + documentClient.readCollection(cosmosCollectionLink, new RequestOptions()); + if (response.getResource() == null) { + throw new IllegalStateException("CosmosDB collection not found: " + cosmosCollectionLink); + } + logger.info("CosmosDB connection test succeeded."); + } catch (DocumentClientException ex) { + throw new IllegalStateException("CosmosDB connection test failed", ex); + } + } + + public ResourceResponse upsertMetadata(CloudBlobMetadata blobMetadata) throws DocumentClientException { + RequestOptions options = getRequestOptions(blobMetadata.getPartitionId()); + return retryWithThrottling(() -> documentClient.upsertDocument(cosmosCollectionLink, blobMetadata, options, true), + azureMetrics.documentCreateTime); + } + + public ResourceResponse deleteMetadata(CloudBlobMetadata blobMetadata) throws DocumentClientException { + String docLink = getDocumentLink(blobMetadata.getId()); + RequestOptions options = getRequestOptions(blobMetadata.getPartitionId()); + options.setPartitionKey(new PartitionKey(blobMetadata.getPartitionId())); + return retryWithThrottling(() -> documentClient.deleteDocument(docLink, options), azureMetrics.documentDeleteTime); + } + + public ResourceResponse readMetadata(BlobId blobId) throws DocumentClientException { + String docLink = getDocumentLink(blobId.getID()); + RequestOptions options = getRequestOptions(blobId.getPartition().toPathString()); + return retryWithThrottling(() -> documentClient.readDocument(docLink, options), azureMetrics.documentReadTime); + } + + public ResourceResponse replaceMetadata(BlobId blobId, Document doc) throws DocumentClientException { + RequestOptions options = getRequestOptions(blobId.getPartition().toPathString()); + return retryWithThrottling(() -> documentClient.replaceDocument(doc, options), azureMetrics.documentUpdateTime); + } + + /** + * Get the list of blobs in the specified partition matching the specified DocumentDB query. + * @param partitionPath the partition to query. + * @param querySpec the DocumentDB query to execute. + * @return a List of {@link CloudBlobMetadata} referencing the matching blobs. + * @throws DocumentClientException + */ + List queryMetadata(String partitionPath, SqlQuerySpec querySpec) throws DocumentClientException { + azureMetrics.documentQueryCount.inc(); + FeedOptions feedOptions = new FeedOptions(); + feedOptions.setPartitionKey(new PartitionKey(partitionPath)); + FeedResponse response = documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions); + try { + // Note: internal query iterator wraps DocumentClientException in IllegalStateException! + List metadataList = new ArrayList<>(); + response.getQueryIterable() + .iterator() + .forEachRemaining(doc -> metadataList.add(doc.toObject(CloudBlobMetadata.class))); + return metadataList; + } catch (RuntimeException rex) { + if (rex.getCause() instanceof DocumentClientException) { + azureMetrics.documentErrorCount.inc(); + throw (DocumentClientException) rex.getCause(); + } else { + throw rex; + } + } + } + + private String getDocumentLink(String documentId) { + return cosmosCollectionLink + DOCS + documentId; + } + + private RequestOptions getRequestOptions(String partitionPath) { + RequestOptions options = new RequestOptions(); + options.setPartitionKey(new PartitionKey(partitionPath)); + return options; + } + + /** + * Run the supplied DocumentClient action. If CosmosDB returns status 429 (TOO_MANY_REQUESTS), + * retry after the requested wait period, up to the configured retry limit. + * @param action the DocumentClient action to execute, wrapped in a {@link Callable}. + * @param timer the {@link Timer} to use to record execution time (excluding waiting). + * @return the {@link ResourceResponse} returned by the action, if successful. + * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. + */ + private ResourceResponse retryWithThrottling(Callable> action, Timer timer) + throws DocumentClientException { + int count = 0; + long waitTime = 0; + do { + try { + Timer.Context waitTimer = azureMetrics.retryWaitTime.time(); + waitForMs(waitTime); + waitTimer.stop(); + Timer.Context docTimer = timer.time(); + ResourceResponse response = action.call(); + docTimer.stop(); + return response; + } catch (DocumentClientException dex) { + azureMetrics.retryCount.inc(); + // Azure tells us how long to wait before retrying. + if (dex.getStatusCode() == HTTP_TOO_MANY_REQUESTS) { + waitTime = dex.getRetryAfterInMilliseconds(); + logger.debug("Got {} from Cosmos, will wait {} ms before retrying.", HTTP_TOO_MANY_REQUESTS, waitTime); + } else { + // Something else, not retryable. + throw dex; + } + } catch (Exception e) { + azureMetrics.documentErrorCount.inc(); + throw new RuntimeException("Hit an Exception while retrying with action " + action, e); + } + count++; + } while (count < maxRetries); + + azureMetrics.documentErrorCount.inc(); + String message = "Max number of retries reached while retrying with action " + action; + throw new RuntimeException(message); + } + + /** + * Wait for the specified time, or until interrupted. + * @param waitTimeInMillis the time to wait. + */ + void waitForMs(long waitTimeInMillis) { + try { + TimeUnit.MILLISECONDS.sleep(waitTimeInMillis); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for retry"); + } + } +} diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java index 4a658e8243..0f4893492e 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java @@ -146,6 +146,7 @@ public void testDelete() throws Exception { assertEquals(1, azureMetrics.blobUpdatedCount.getCount()); assertEquals(0, azureMetrics.blobUpdateErrorCount.getCount()); assertEquals(1, azureMetrics.blobUpdateTime.getCount()); + assertEquals(1, azureMetrics.documentReadTime.getCount()); assertEquals(1, azureMetrics.documentUpdateTime.getCount()); } @@ -157,6 +158,7 @@ public void testExpire() throws Exception { assertEquals(1, azureMetrics.blobUpdatedCount.getCount()); assertEquals(0, azureMetrics.blobUpdateErrorCount.getCount()); assertEquals(1, azureMetrics.blobUpdateTime.getCount()); + assertEquals(1, azureMetrics.documentReadTime.getCount()); assertEquals(1, azureMetrics.documentUpdateTime.getCount()); } @@ -170,6 +172,7 @@ public void testPurge() throws Exception { assertTrue("Expected success", azureDest.purgeBlob(cloudBlobMetadata)); assertEquals(1, azureMetrics.blobDeletedCount.getCount()); assertEquals(0, azureMetrics.blobDeleteErrorCount.getCount()); + assertEquals(1, azureMetrics.documentDeleteTime.getCount()); } /** Test purge not found. */ From 5c0ed6224df89cc1b44ab5a9ceaf64fa5b8080a7 Mon Sep 17 00:00:00 2001 From: Rob Block Date: Tue, 25 Jun 2019 17:21:44 -0700 Subject: [PATCH 2/2] Address review comments --- .../CloudTokenPersistor.java | 1 - .../azure/AzureCloudDestination.java | 55 ++--- .../azure/CosmosDataAccessor.java | 137 ++++++++--- .../azure/AzureCloudDestinationTest.java | 2 +- .../azure/AzureIntegrationTest.java | 6 +- .../azure/CosmosDataAccessorTest.java | 223 ++++++++++++++++++ 6 files changed, 354 insertions(+), 70 deletions(-) create mode 100644 ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java index 34cf0d94f9..4aeeaefa12 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java @@ -19,7 +19,6 @@ import com.github.ambry.replication.ReplicationException; import com.github.ambry.replication.ReplicationMetrics; import com.github.ambry.store.FindTokenFactory; -import com.github.ambry.store.StoreFindToken; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java index 46455cd891..2f52e2670a 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java @@ -146,7 +146,7 @@ class AzureCloudDestination implements CloudDestination { */ void testAzureConnectivity() { testStorageConnectivity(); - testCosmosConnectivity(); + cosmosDataAccessor.testConnectivity(); } /** @@ -169,18 +169,19 @@ void testStorageConnectivity() { } /** - * Test connectivity to Azure CosmosDB + * Visible for test. + * @return the CosmosDB DocumentClient */ - void testCosmosConnectivity() { - cosmosDataAccessor.testConnectivity(); + DocumentClient getDocumentClient() { + return documentClient; } /** * Visible for test. - * @return the CosmosDB DocumentClient + * @return the {@link CosmosDataAccessor} */ - DocumentClient getDocumentClient() { - return documentClient; + CosmosDataAccessor getCosmosDataAccessor() { + return cosmosDataAccessor; } /** @@ -273,16 +274,16 @@ public Map getBlobMetadata(List blobIds) thro if (blobIds.isEmpty()) { return Collections.emptyMap(); } - Timer.Context queryTimer = azureMetrics.missingKeysQueryTime.time(); + String quotedBlobIds = + String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList())); + String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds); + String partitionPath = blobIds.get(0).getPartition().toPathString(); try { - String quotedBlobIds = - String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList())); - String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds); List metadataList = - runMetadataQuery(blobIds.get(0).getPartition().toPathString(), new SqlQuerySpec(query)); + cosmosDataAccessor.queryMetadata(partitionPath, new SqlQuerySpec(query), azureMetrics.missingKeysQueryTime); return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity())); - } finally { - queryTimer.stop(); + } catch (DocumentClientException dex) { + throw new CloudStorageException("Failed to query blob metadata for partition " + partitionPath, dex); } } @@ -290,28 +291,12 @@ public Map getBlobMetadata(List blobIds) thro public List getDeadBlobs(String partitionPath) throws CloudStorageException { long now = System.currentTimeMillis(); long retentionThreshold = now - retentionPeriodMs; - Timer.Context queryTimer = azureMetrics.deadBlobsQueryTime.time(); - try { - SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE, - new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold))); - return runMetadataQuery(partitionPath, deadBlobsQuery); - } finally { - queryTimer.stop(); - } - } - - /** - * Get the list of blobs in the specified partition matching the specified DocumentDB query. - * @param partitionPath the partition to query. - * @param querySpec the DocumentDB query to execute. - * @return a List of {@link CloudBlobMetadata} referencing the matching blobs. - * @throws CloudStorageException - */ - List runMetadataQuery(String partitionPath, SqlQuerySpec querySpec) throws CloudStorageException { + SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE, + new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold))); try { - return cosmosDataAccessor.queryMetadata(partitionPath, querySpec); + return cosmosDataAccessor.queryMetadata(partitionPath, deadBlobsQuery, azureMetrics.deadBlobsQueryTime); } catch (DocumentClientException dex) { - throw new CloudStorageException("Failed to query blob metadata", dex); + throw new CloudStorageException("Failed to query dead blobs for partition " + partitionPath, dex); } } @@ -358,7 +343,7 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value //CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class); Document doc = response.getResource(); if (doc == null) { - logger.warn("Blob metadata record not found: " + blobId.getID()); + logger.warn("Blob metadata record not found: {}", blobId.getID()); return false; } // Update only if value has changed diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java index f86a0a14a8..99f52815d2 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java @@ -46,10 +46,7 @@ public class CosmosDataAccessor { /** Production constructor */ public CosmosDataAccessor(DocumentClient documentClient, AzureCloudConfig azureCloudConfig, AzureMetrics azureMetrics) { - this.documentClient = documentClient; - this.cosmosCollectionLink = azureCloudConfig.cosmosCollectionLink; - this.azureMetrics = azureMetrics; - this.maxRetries = azureCloudConfig.cosmosMaxRetries; + this(documentClient, azureCloudConfig.cosmosCollectionLink, azureCloudConfig.cosmosMaxRetries, azureMetrics); } /** Test constructor */ @@ -77,42 +74,75 @@ void testConnectivity() { } } + /** + * Upsert the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobMetadata the blob metadata document. + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ public ResourceResponse upsertMetadata(CloudBlobMetadata blobMetadata) throws DocumentClientException { RequestOptions options = getRequestOptions(blobMetadata.getPartitionId()); - return retryWithThrottling(() -> documentClient.upsertDocument(cosmosCollectionLink, blobMetadata, options, true), + return retryOperationWithThrottling( + () -> documentClient.upsertDocument(cosmosCollectionLink, blobMetadata, options, true), azureMetrics.documentCreateTime); } + /** + * Delete the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobMetadata the blob metadata document. + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ public ResourceResponse deleteMetadata(CloudBlobMetadata blobMetadata) throws DocumentClientException { String docLink = getDocumentLink(blobMetadata.getId()); RequestOptions options = getRequestOptions(blobMetadata.getPartitionId()); options.setPartitionKey(new PartitionKey(blobMetadata.getPartitionId())); - return retryWithThrottling(() -> documentClient.deleteDocument(docLink, options), azureMetrics.documentDeleteTime); + return retryOperationWithThrottling(() -> documentClient.deleteDocument(docLink, options), + azureMetrics.documentDeleteTime); } + /** + * Read the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobId the {@link BlobId} for which metadata is requested. + * @return the {@link ResourceResponse} containing the metadata document. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ public ResourceResponse readMetadata(BlobId blobId) throws DocumentClientException { String docLink = getDocumentLink(blobId.getID()); RequestOptions options = getRequestOptions(blobId.getPartition().toPathString()); - return retryWithThrottling(() -> documentClient.readDocument(docLink, options), azureMetrics.documentReadTime); + return retryOperationWithThrottling(() -> documentClient.readDocument(docLink, options), + azureMetrics.documentReadTime); } + /** + * Replace the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobId the {@link BlobId} for which metadata is replaced. + * @param doc the blob metadata document. + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ public ResourceResponse replaceMetadata(BlobId blobId, Document doc) throws DocumentClientException { RequestOptions options = getRequestOptions(blobId.getPartition().toPathString()); - return retryWithThrottling(() -> documentClient.replaceDocument(doc, options), azureMetrics.documentUpdateTime); + return retryOperationWithThrottling(() -> documentClient.replaceDocument(doc, options), + azureMetrics.documentUpdateTime); } /** * Get the list of blobs in the specified partition matching the specified DocumentDB query. * @param partitionPath the partition to query. * @param querySpec the DocumentDB query to execute. + * @param timer the {@link Timer} to use to record query time (excluding waiting). * @return a List of {@link CloudBlobMetadata} referencing the matching blobs. * @throws DocumentClientException */ - List queryMetadata(String partitionPath, SqlQuerySpec querySpec) throws DocumentClientException { + List queryMetadata(String partitionPath, SqlQuerySpec querySpec, Timer timer) + throws DocumentClientException { azureMetrics.documentQueryCount.inc(); FeedOptions feedOptions = new FeedOptions(); feedOptions.setPartitionKey(new PartitionKey(partitionPath)); - FeedResponse response = documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions); + FeedResponse response = + retryQueryWithThrottling(() -> documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions), + timer); try { // Note: internal query iterator wraps DocumentClientException in IllegalStateException! List metadataList = new ArrayList<>(); @@ -130,14 +160,30 @@ List queryMetadata(String partitionPath, SqlQuerySpec querySp } } - private String getDocumentLink(String documentId) { - return cosmosCollectionLink + DOCS + documentId; + /** + * Run the supplied DocumentClient action. If CosmosDB returns status 429 (TOO_MANY_REQUESTS), + * retry after the requested wait period, up to the configured retry limit. + * @param operation the DocumentClient resource operation to execute, wrapped in a {@link Callable}. + * @param timer the {@link Timer} to use to record execution time (excluding waiting). + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. + */ + private ResourceResponse retryOperationWithThrottling(Callable> operation, + Timer timer) throws DocumentClientException { + return (ResourceResponse) retryWithThrottling(operation, timer); } - private RequestOptions getRequestOptions(String partitionPath) { - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(partitionPath)); - return options; + /** + * Run the supplied DocumentClient query. If CosmosDB returns status 429 (TOO_MANY_REQUESTS), + * retry after the requested wait period, up to the configured retry limit. + * @param query the DocumentClient query to execute, wrapped in a {@link Callable}. + * @param timer the {@link Timer} to use to record execution time (excluding waiting). + * @return the {@link FeedResponse} returned by the query, if successful. + * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. + */ + private FeedResponse retryQueryWithThrottling(Callable> query, Timer timer) + throws DocumentClientException { + return (FeedResponse) retryWithThrottling(query, timer); } /** @@ -145,27 +191,24 @@ private RequestOptions getRequestOptions(String partitionPath) { * retry after the requested wait period, up to the configured retry limit. * @param action the DocumentClient action to execute, wrapped in a {@link Callable}. * @param timer the {@link Timer} to use to record execution time (excluding waiting). - * @return the {@link ResourceResponse} returned by the action, if successful. + * @return the {@link Object} returned by the action, if successful. * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. */ - private ResourceResponse retryWithThrottling(Callable> action, Timer timer) - throws DocumentClientException { + private Object retryWithThrottling(Callable action, Timer timer) throws DocumentClientException { int count = 0; long waitTime = 0; do { try { - Timer.Context waitTimer = azureMetrics.retryWaitTime.time(); waitForMs(waitTime); - waitTimer.stop(); Timer.Context docTimer = timer.time(); - ResourceResponse response = action.call(); + Object response = executeCosmosAction(action); docTimer.stop(); return response; } catch (DocumentClientException dex) { - azureMetrics.retryCount.inc(); // Azure tells us how long to wait before retrying. if (dex.getStatusCode() == HTTP_TOO_MANY_REQUESTS) { waitTime = dex.getRetryAfterInMilliseconds(); + azureMetrics.retryCount.inc(); logger.debug("Got {} from Cosmos, will wait {} ms before retrying.", HTTP_TOO_MANY_REQUESTS, waitTime); } else { // Something else, not retryable. @@ -173,25 +216,59 @@ private ResourceResponse retryWithThrottling(Callable action) throws Exception { + try { + return action.call(); + } catch (DocumentClientException dex) { + throw dex; + } catch (IllegalStateException ex) { + if (ex.getCause() instanceof DocumentClientException) { + throw (DocumentClientException) ex.getCause(); + } else { + throw ex; + } + } + } + + private String getDocumentLink(String documentId) { + return cosmosCollectionLink + DOCS + documentId; + } + + private RequestOptions getRequestOptions(String partitionPath) { + RequestOptions options = new RequestOptions(); + options.setPartitionKey(new PartitionKey(partitionPath)); + return options; + } + /** * Wait for the specified time, or until interrupted. * @param waitTimeInMillis the time to wait. */ void waitForMs(long waitTimeInMillis) { - try { - TimeUnit.MILLISECONDS.sleep(waitTimeInMillis); - } catch (InterruptedException e) { - logger.warn("Interrupted while waiting for retry"); + if (waitTimeInMillis > 0) { + Timer.Context waitTimer = azureMetrics.retryWaitTime.time(); + try { + TimeUnit.MILLISECONDS.sleep(waitTimeInMillis); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for retry"); + } + waitTimer.stop(); } } } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java index 31c765887c..b43ff77431 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java @@ -286,7 +286,7 @@ public void testAzureConnection() throws Exception { } catch (IllegalStateException expected) { } try { - dest.testCosmosConnectivity(); + dest.getCosmosDataAccessor().testConnectivity(); fail("Expected exception"); } catch (IllegalStateException expected) { } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java index cb13b49fff..8efda35699 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java @@ -14,6 +14,7 @@ package com.github.ambry.cloud.azure; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.github.ambry.cloud.CloudBlobMetadata; import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockPartitionId; @@ -66,8 +67,6 @@ public class AzureIntegrationTest { private long testPartition = 666; // one day retention private int retentionPeriodDays = 1; - - private String cosmosCollectionLink; private String propFileName = "azure-test.properties"; private String tokenFileName = "replicaTokens"; @@ -252,8 +251,9 @@ public void testPurgeDeadBlobs() throws Exception { private void cleanup() throws Exception { String partitionPath = String.valueOf(testPartition); + Timer dummyTimer = new Timer(); List allBlobsInPartition = - azureDest.runMetadataQuery(partitionPath, new SqlQuerySpec("SELECT * FROM c")); + azureDest.getCosmosDataAccessor().queryMetadata(partitionPath, new SqlQuerySpec("SELECT * FROM c"), dummyTimer); azureDest.purgeBlobs(allBlobsInPartition); } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java new file mode 100644 index 0000000000..8154151565 --- /dev/null +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java @@ -0,0 +1,223 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud.azure; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.ambry.cloud.CloudBlobMetadata; +import com.github.ambry.clustermap.MockPartitionId; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.commons.BlobId; +import com.github.ambry.utils.Utils; +import com.microsoft.azure.documentdb.Document; +import com.microsoft.azure.documentdb.DocumentClient; +import com.microsoft.azure.documentdb.DocumentClientException; +import com.microsoft.azure.documentdb.Error; +import com.microsoft.azure.documentdb.FeedOptions; +import com.microsoft.azure.documentdb.FeedResponse; +import com.microsoft.azure.documentdb.QueryIterable; +import com.microsoft.azure.documentdb.RequestOptions; +import com.microsoft.azure.documentdb.ResourceResponse; +import com.microsoft.azure.documentdb.SqlQuerySpec; +import com.microsoft.azure.documentdb.internal.HttpConstants; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static com.github.ambry.commons.BlobId.*; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + + +/** Test cases for {@link CosmosDataAccessor} */ +@RunWith(MockitoJUnitRunner.class) +public class CosmosDataAccessorTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private CosmosDataAccessor cosmosAccessor; + private DocumentClient mockumentClient; + private ResourceResponse mockResponse; + private DocumentClientException retryException; + private AzureMetrics azureMetrics; + private BlobId blobId; + private int blobSize = 1024; + private CloudBlobMetadata blobMetadata; + int maxRetries = 3; + + @Before + public void setup() throws Exception { + mockumentClient = mock(DocumentClient.class); + mockResponse = mock(ResourceResponse.class); + retryException = new DocumentClientException(HttpConstants.StatusCodes.TOO_MANY_REQUESTS, new Error(), + Collections.singletonMap(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS, "1")); + + byte dataCenterId = 66; + short accountId = 101; + short containerId = 5; + PartitionId partitionId = new MockPartitionId(); + blobId = new BlobId(BLOB_ID_V6, BlobIdType.NATIVE, dataCenterId, accountId, containerId, partitionId, false, + BlobDataType.DATACHUNK); + blobMetadata = new CloudBlobMetadata(blobId, System.currentTimeMillis(), Utils.Infinite_Time, blobSize, + CloudBlobMetadata.EncryptionOrigin.NONE, null, null); + azureMetrics = new AzureMetrics(new MetricRegistry()); + cosmosAccessor = new CosmosDataAccessor(mockumentClient, "ambry/metadata", maxRetries, azureMetrics); + } + + /** + * Test normal upsert. + * @throws Exception + */ + @Test + public void testUpsertNormal() throws Exception { + // Request succeeds first time + when(mockumentClient.upsertDocument(anyString(), any(), any(RequestOptions.class), anyBoolean())).thenReturn( + mockResponse); + cosmosAccessor.upsertMetadata(blobMetadata); + assertEquals(1, azureMetrics.documentCreateTime.getCount()); + assertEquals(0, azureMetrics.retryCount.getCount()); + assertEquals(0, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test upsert with one retry. + * @throws Exception + */ + @Test + public void testUpsertRetry() throws Exception { + // Request gets 420 on first try, succeeds on retry + when(mockumentClient.upsertDocument(anyString(), any(), any(RequestOptions.class), anyBoolean())).thenThrow( + retryException).thenReturn(mockResponse); + cosmosAccessor.upsertMetadata(blobMetadata); + assertEquals(1, azureMetrics.documentCreateTime.getCount()); + assertEquals(1, azureMetrics.retryCount.getCount()); + assertEquals(1, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test upsert exhausts retries. + * @throws Exception + */ + @Test + public void testUpsertExhaustRetries() throws Exception { + // Request keeps getting 420 until we give up + when(mockumentClient.upsertDocument(anyString(), any(), any(RequestOptions.class), anyBoolean())).thenThrow( + retryException); + try { + cosmosAccessor.upsertMetadata(blobMetadata); + fail("Expected operation to fail after too many retries"); + } catch (RuntimeException expected) { + } + assertEquals(0, azureMetrics.documentCreateTime.getCount()); + assertEquals(maxRetries, azureMetrics.retryCount.getCount()); + assertEquals(maxRetries, azureMetrics.retryWaitTime.getCount()); + } + + /** Test read. */ + @Test + public void testReadNormal() throws Exception { + // Request succeeds first time + when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenReturn(mockResponse); + cosmosAccessor.readMetadata(blobId); + assertEquals(1, azureMetrics.documentReadTime.getCount()); + assertEquals(0, azureMetrics.retryCount.getCount()); + assertEquals(0, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test read with one retry. + * @throws Exception + */ + @Test + public void testReadRetry() throws Exception { + // Request gets 420 on first try, succeeds on retry + when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenThrow(retryException) + .thenReturn(mockResponse); + cosmosAccessor.readMetadata(blobId); + assertEquals(1, azureMetrics.documentReadTime.getCount()); + assertEquals(1, azureMetrics.retryCount.getCount()); + assertEquals(1, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test read exhausts retries. + * @throws Exception + */ + @Test + public void testReadExhaustRetries() throws Exception { + // Request keeps getting 420 until we give up + when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenThrow(retryException); + try { + cosmosAccessor.readMetadata(blobId); + fail("Expected operation to fail after too many retries"); + } catch (RuntimeException expected) { + } + assertEquals(0, azureMetrics.documentReadTime.getCount()); + assertEquals(maxRetries, azureMetrics.retryCount.getCount()); + assertEquals(maxRetries, azureMetrics.retryWaitTime.getCount()); + } + + /** Test query metadata. */ + @Test + public void testQueryNormal() throws Exception { + FeedResponse feedResponse = getFeedResponse(); + when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenReturn( + feedResponse); + List metadataList = doQueryMetadata(); + assertEquals("Expected single entry", 1, metadataList.size()); + CloudBlobMetadata outputMetadata = metadataList.get(0); + assertEquals("Returned metadata does not match original", blobMetadata, outputMetadata); + assertEquals(1, azureMetrics.documentQueryCount.getCount()); + assertEquals(1, azureMetrics.missingKeysQueryTime.getCount()); + assertEquals(0, azureMetrics.retryCount.getCount()); + assertEquals(0, azureMetrics.retryWaitTime.getCount()); + } + + /** Test query one retry. */ + @Test + public void testQueryRetry() throws Exception { + FeedResponse feedResponse = getFeedResponse(); + when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenThrow( + new IllegalStateException(retryException)).thenReturn(feedResponse); + List metadataList = doQueryMetadata(); + assertEquals("Expected single entry", 1, metadataList.size()); + CloudBlobMetadata outputMetadata = metadataList.get(0); + assertEquals("Returned metadata does not match original", blobMetadata, outputMetadata); + assertEquals(1, azureMetrics.documentQueryCount.getCount()); + assertEquals(1, azureMetrics.missingKeysQueryTime.getCount()); + assertEquals(1, azureMetrics.retryCount.getCount()); + assertEquals(1, azureMetrics.retryWaitTime.getCount()); + } + + /** + * @return a FeedResponse with a single document. + */ + private FeedResponse getFeedResponse() throws Exception { + QueryIterable mockIterable = mock(QueryIterable.class); + List docList = Collections.singletonList(new Document(objectMapper.writeValueAsString(blobMetadata))); + when(mockIterable.iterator()).thenReturn(docList.iterator()); + FeedResponse feedResponse = mock(FeedResponse.class); + when(feedResponse.getQueryIterable()).thenReturn(mockIterable); + return feedResponse; + } + + /** Utility method to run metadata query with default parameters. */ + private List doQueryMetadata() throws Exception { + return cosmosAccessor.queryMetadata(blobId.getPartition().toPathString(), new SqlQuerySpec("select * from c"), + azureMetrics.missingKeysQueryTime); + } +}