diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java index 43641cf5d6..4aeeaefa12 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudTokenPersistor.java @@ -68,7 +68,7 @@ protected void persist(String mountPath, List tokenInfoList) InputStream inputStream = new ByteArrayInputStream(tokenOutputStream.toByteArray()); cloudDestination.persistTokens(mountPath, replicaTokenFileName, inputStream); - logger.debug("Completed writing replica tokens to cloud destination."); + logger.debug("Persisted replica tokens for {} to cloud destination.", mountPath); } catch (CloudStorageException e) { throw new ReplicationException("IO error persisting replica tokens at mount path " + mountPath, e); } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java index 4db0dee6ae..9c0a99b149 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudConfig.java @@ -14,6 +14,7 @@ package com.github.ambry.cloud.azure; import com.github.ambry.config.Config; +import com.github.ambry.config.Default; import com.github.ambry.config.VerifiableProperties; @@ -26,6 +27,8 @@ public class AzureCloudConfig { public static final String COSMOS_ENDPOINT = "cosmos.endpoint"; public static final String COSMOS_COLLECTION_LINK = "cosmos.collection.link"; public static final String COSMOS_KEY = "cosmos.key"; + public static final String COSMOS_MAX_RETRIES = "cosmos.max.retries"; + public static final int DEFAULT_COSMOS_MAX_RETRIES = 5; /** * The Azure Blob Storage connection string. @@ -51,10 +54,18 @@ public class AzureCloudConfig { @Config(COSMOS_KEY) public final String cosmosKey; + /** + * The maximum number of retries for Cosmos DB requests. + */ + @Config(COSMOS_MAX_RETRIES) + @Default("5") + public final int cosmosMaxRetries; + public AzureCloudConfig(VerifiableProperties verifiableProperties) { azureStorageConnectionString = verifiableProperties.getString(AZURE_STORAGE_CONNECTION_STRING); cosmosEndpoint = verifiableProperties.getString(COSMOS_ENDPOINT); cosmosCollectionLink = verifiableProperties.getString(COSMOS_COLLECTION_LINK); cosmosKey = verifiableProperties.getString(COSMOS_KEY); + cosmosMaxRetries = verifiableProperties.getInt(COSMOS_MAX_RETRIES, DEFAULT_COSMOS_MAX_RETRIES); } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java index 6929f0bbca..2f52e2670a 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureCloudDestination.java @@ -24,11 +24,6 @@ import com.microsoft.azure.documentdb.Document; import com.microsoft.azure.documentdb.DocumentClient; import com.microsoft.azure.documentdb.DocumentClientException; -import com.microsoft.azure.documentdb.DocumentCollection; -import com.microsoft.azure.documentdb.FeedOptions; -import com.microsoft.azure.documentdb.FeedResponse; -import com.microsoft.azure.documentdb.PartitionKey; -import com.microsoft.azure.documentdb.RequestOptions; import com.microsoft.azure.documentdb.ResourceResponse; import com.microsoft.azure.documentdb.SqlParameter; import com.microsoft.azure.documentdb.SqlParameterCollection; @@ -50,7 +45,6 @@ import java.net.Proxy; import java.net.URISyntaxException; import java.security.InvalidKeyException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -81,8 +75,7 @@ class AzureCloudDestination implements CloudDestination { private final CloudStorageAccount azureAccount; private final CloudBlobClient azureBlobClient; private final DocumentClient documentClient; - private final String cosmosCollectionLink; - private final RequestOptions defaultRequestOptions = new RequestOptions(); + private final CosmosDataAccessor cosmosDataAccessor; private final OperationContext blobOpContext = new OperationContext(); private final AzureMetrics azureMetrics; private final String clusterName; @@ -112,7 +105,6 @@ class AzureCloudDestination implements CloudDestination { new Proxy(Proxy.Type.HTTP, new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort))); } // Set up CosmosDB connection, including any proxy setting - cosmosCollectionLink = azureCloudConfig.cosmosCollectionLink; ConnectionPolicy connectionPolicy = new ConnectionPolicy(); if (cloudConfig.vcrProxyHost != null) { connectionPolicy.setProxy(new HttpHost(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort)); @@ -120,6 +112,7 @@ class AzureCloudDestination implements CloudDestination { } documentClient = new DocumentClient(azureCloudConfig.cosmosEndpoint, azureCloudConfig.cosmosKey, connectionPolicy, ConsistencyLevel.Session); + cosmosDataAccessor = new CosmosDataAccessor(documentClient, azureCloudConfig, azureMetrics); this.retentionPeriodMs = TimeUnit.DAYS.toMillis(cloudConfig.cloudDeletedBlobRetentionDays); logger.info("Created Azure destination"); } @@ -137,13 +130,15 @@ class AzureCloudDestination implements CloudDestination { String clusterName, AzureMetrics azureMetrics) { this.azureAccount = azureAccount; this.documentClient = documentClient; - this.cosmosCollectionLink = cosmosCollectionLink; this.azureMetrics = azureMetrics; this.clusterName = clusterName; this.retentionPeriodMs = TimeUnit.DAYS.toMillis(CloudConfig.DEFAULT_RETENTION_DAYS); // Create a blob client to interact with Blob storage azureBlobClient = azureAccount.createCloudBlobClient(); + cosmosDataAccessor = + new CosmosDataAccessor(documentClient, cosmosCollectionLink, AzureCloudConfig.DEFAULT_COSMOS_MAX_RETRIES, + azureMetrics); } /** @@ -151,7 +146,7 @@ class AzureCloudDestination implements CloudDestination { */ void testAzureConnectivity() { testStorageConnectivity(); - testCosmosConnectivity(); + cosmosDataAccessor.testConnectivity(); } /** @@ -174,27 +169,19 @@ void testStorageConnectivity() { } /** - * Test connectivity to Azure CosmosDB + * Visible for test. + * @return the CosmosDB DocumentClient */ - void testCosmosConnectivity() { - try { - ResourceResponse response = - documentClient.readCollection(cosmosCollectionLink, defaultRequestOptions); - if (response.getResource() == null) { - throw new IllegalStateException("CosmosDB collection not found: " + cosmosCollectionLink); - } - logger.info("CosmosDB connection test succeeded."); - } catch (DocumentClientException ex) { - throw new IllegalStateException("CosmosDB connection test failed", ex); - } + DocumentClient getDocumentClient() { + return documentClient; } /** * Visible for test. - * @return the CosmosDB DocumentClient. + * @return the {@link CosmosDataAccessor} */ - DocumentClient getDocumentClient() { - return documentClient; + CosmosDataAccessor getCosmosDataAccessor() { + return cosmosDataAccessor; } /** @@ -217,14 +204,7 @@ public boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata clo // Note: if uploaded is false, still attempt to insert the metadata document // since it is possible that a previous attempt failed. - Timer.Context docTimer = azureMetrics.documentCreateTime.time(); - try { - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(blobId.getPartition().toPathString())); - documentClient.upsertDocument(cosmosCollectionLink, cloudBlobMetadata, options, true); - } finally { - docTimer.stop(); - } + cosmosDataAccessor.upsertMetadata(cloudBlobMetadata); backupTimer.stop(); if (uploaded) { azureMetrics.backupSuccessByteRate.mark(inputLength); @@ -294,16 +274,16 @@ public Map getBlobMetadata(List blobIds) thro if (blobIds.isEmpty()) { return Collections.emptyMap(); } - Timer.Context queryTimer = azureMetrics.missingKeysQueryTime.time(); + String quotedBlobIds = + String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList())); + String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds); + String partitionPath = blobIds.get(0).getPartition().toPathString(); try { - String quotedBlobIds = - String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList())); - String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds); List metadataList = - runMetadataQuery(blobIds.get(0).getPartition().toPathString(), new SqlQuerySpec(query)); + cosmosDataAccessor.queryMetadata(partitionPath, new SqlQuerySpec(query), azureMetrics.missingKeysQueryTime); return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity())); - } finally { - queryTimer.stop(); + } catch (DocumentClientException dex) { + throw new CloudStorageException("Failed to query blob metadata for partition " + partitionPath, dex); } } @@ -311,42 +291,12 @@ public Map getBlobMetadata(List blobIds) thro public List getDeadBlobs(String partitionPath) throws CloudStorageException { long now = System.currentTimeMillis(); long retentionThreshold = now - retentionPeriodMs; - Timer.Context queryTimer = azureMetrics.deadBlobsQueryTime.time(); - try { - SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE, - new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold))); - return runMetadataQuery(partitionPath, deadBlobsQuery); - } finally { - queryTimer.stop(); - } - } - - /** - * Get the list of blobs in the specified partition matching the specified DocumentDB query. - * @param partitionPath the partition to query. - * @param querySpec the DocumentDB query to execute. - * @return a List of {@link CloudBlobMetadata} referencing the matching blobs. - * @throws CloudStorageException - */ - List runMetadataQuery(String partitionPath, SqlQuerySpec querySpec) throws CloudStorageException { - azureMetrics.documentQueryCount.inc(); - FeedOptions feedOptions = new FeedOptions(); - feedOptions.setPartitionKey(new PartitionKey(partitionPath)); - FeedResponse response = documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions); + SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE, + new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold))); try { - // Note: internal query iterator wraps DocumentClientException in IllegalStateException! - List metadataList = new ArrayList<>(); - response.getQueryIterable() - .iterator() - .forEachRemaining(doc -> metadataList.add(doc.toObject(CloudBlobMetadata.class))); - return metadataList; - } catch (RuntimeException rex) { - if (rex.getCause() instanceof DocumentClientException) { - azureMetrics.documentErrorCount.inc(); - throw new CloudStorageException("Failed to query blob metadata", rex.getCause()); - } else { - throw rex; - } + return cosmosDataAccessor.queryMetadata(partitionPath, deadBlobsQuery, azureMetrics.deadBlobsQueryTime); + } catch (DocumentClientException dex) { + throw new CloudStorageException("Failed to query dead blobs for partition " + partitionPath, dex); } } @@ -389,25 +339,17 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value storageTimer.stop(); } - Timer.Context docTimer = azureMetrics.documentUpdateTime.time(); - try { - String docLink = cosmosCollectionLink + "/docs/" + blobId.getID(); - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(blobId.getPartition().toPathString())); - ResourceResponse response = documentClient.readDocument(docLink, options); - //CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class); - Document doc = response.getResource(); - if (doc == null) { - logger.warn("Blob metadata record not found: " + docLink); - return false; - } - // Update only if value has changed - if (!value.equals(doc.get(fieldName))) { - doc.set(fieldName, value); - documentClient.replaceDocument(doc, options); - } - } finally { - docTimer.stop(); + ResourceResponse response = cosmosDataAccessor.readMetadata(blobId); + //CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class); + Document doc = response.getResource(); + if (doc == null) { + logger.warn("Blob metadata record not found: {}", blobId.getID()); + return false; + } + // Update only if value has changed + if (!value.equals(doc.get(fieldName))) { + doc.set(fieldName, value); + cosmosDataAccessor.replaceMetadata(blobId, doc); } logger.debug("Updated blob {} metadata set {} to {}.", blobId, fieldName, value); azureMetrics.blobUpdatedCount.inc(); @@ -425,8 +367,6 @@ public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageExce String blobFileName = blobMetadata.getCloudBlobName(); String partitionPath = blobMetadata.getPartitionId(); String containerName = getAzureContainerName(partitionPath); - RequestOptions options = new RequestOptions(); - options.setPartitionKey(new PartitionKey(partitionPath)); azureMetrics.blobDeleteRequestCount.inc(); Timer.Context deleteTimer = azureMetrics.blobDeletionTime.time(); try { @@ -436,9 +376,8 @@ public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageExce boolean deletionDone = azureBlob.deleteIfExists(); // Delete the document too - String docLink = cosmosCollectionLink + "/docs/" + blobId; try { - documentClient.deleteDocument(docLink, options); + cosmosDataAccessor.deleteMetadata(blobMetadata); deletionDone = true; logger.debug("Purged blob {} from partition {}.", blobId, partitionPath); } catch (DocumentClientException dex) { diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java index 69a3dddf2a..29cf0f7596 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/AzureMetrics.java @@ -29,7 +29,9 @@ public class AzureMetrics { public static final String BLOB_UPDATE_TIME = "BlobUpdateTime"; public static final String BLOB_UPDATED_COUNT = "BlobUpdatedCount"; public static final String DOCUMENT_CREATE_TIME = "DocumentCreateTime"; + public static final String DOCUMENT_READ_TIME = "DocumentReadTime"; public static final String DOCUMENT_UPDATE_TIME = "DocumentUpdateTime"; + public static final String DOCUMENT_DELETE_TIME = "DocumentDeleteTime"; public static final String DOCUMENT_QUERY_COUNT = "DocumentQueryCount"; public static final String MISSING_KEYS_QUERY_TIME = "MissingKeysQueryTime"; public static final String DEAD_BLOBS_QUERY_TIME = "DeadBlobsQueryTime"; @@ -44,6 +46,8 @@ public class AzureMetrics { public static final String BACKUP_SUCCESS_LATENCY = "BackupSuccessLatency"; public static final String BACKUP_SUCCESS_BYTE_RATE = "BackupSuccessByteRate"; public static final String BACKUP_ERROR_COUNT = "BackupErrorCount"; + public static final String RETRY_COUNT = "RetryCount"; + public static final String RETRY_WAIT_TIME = "RetryWaitTime"; // Metrics public final Counter blobUploadRequestCount; @@ -53,7 +57,9 @@ public class AzureMetrics { public final Timer blobUploadTime; public final Timer blobUpdateTime; public final Timer documentCreateTime; + public final Timer documentReadTime; public final Timer documentUpdateTime; + public final Timer documentDeleteTime; public final Timer missingKeysQueryTime; public final Counter documentQueryCount; public final Timer deadBlobsQueryTime; @@ -68,6 +74,8 @@ public class AzureMetrics { public final Timer backupSuccessLatency; public final Meter backupSuccessByteRate; public final Counter backupErrorCount; + public final Counter retryCount; + public final Timer retryWaitTime; public AzureMetrics(MetricRegistry registry) { blobUploadRequestCount = @@ -80,7 +88,9 @@ public AzureMetrics(MetricRegistry registry) { blobUploadTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_TIME)); blobUpdateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPDATE_TIME)); documentCreateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_CREATE_TIME)); + documentReadTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_READ_TIME)); documentUpdateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_UPDATE_TIME)); + documentDeleteTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_DELETE_TIME)); documentQueryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_QUERY_COUNT)); missingKeysQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, MISSING_KEYS_QUERY_TIME)); deadBlobsQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DEAD_BLOBS_QUERY_TIME)); @@ -96,5 +106,7 @@ public AzureMetrics(MetricRegistry registry) { backupSuccessByteRate = registry.meter(MetricRegistry.name(AzureCloudDestination.class, BACKUP_SUCCESS_BYTE_RATE)); backupSuccessLatency = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BACKUP_SUCCESS_LATENCY)); backupErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, BACKUP_ERROR_COUNT)); + retryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, RETRY_COUNT)); + retryWaitTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, RETRY_WAIT_TIME)); } } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java new file mode 100644 index 0000000000..99f52815d2 --- /dev/null +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/azure/CosmosDataAccessor.java @@ -0,0 +1,274 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud.azure; + +import com.codahale.metrics.Timer; +import com.github.ambry.cloud.CloudBlobMetadata; +import com.github.ambry.commons.BlobId; +import com.microsoft.azure.documentdb.Document; +import com.microsoft.azure.documentdb.DocumentClient; +import com.microsoft.azure.documentdb.DocumentClientException; +import com.microsoft.azure.documentdb.DocumentCollection; +import com.microsoft.azure.documentdb.FeedOptions; +import com.microsoft.azure.documentdb.FeedResponse; +import com.microsoft.azure.documentdb.PartitionKey; +import com.microsoft.azure.documentdb.RequestOptions; +import com.microsoft.azure.documentdb.ResourceResponse; +import com.microsoft.azure.documentdb.SqlQuerySpec; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class CosmosDataAccessor { + private static final Logger logger = LoggerFactory.getLogger(CosmosDataAccessor.class); + private static final int HTTP_TOO_MANY_REQUESTS = 429; + private static final String DOCS = "/docs/"; + private final DocumentClient documentClient; + private final String cosmosCollectionLink; + private final AzureMetrics azureMetrics; + private final int maxRetries; + + /** Production constructor */ + public CosmosDataAccessor(DocumentClient documentClient, AzureCloudConfig azureCloudConfig, + AzureMetrics azureMetrics) { + this(documentClient, azureCloudConfig.cosmosCollectionLink, azureCloudConfig.cosmosMaxRetries, azureMetrics); + } + + /** Test constructor */ + public CosmosDataAccessor(DocumentClient documentClient, String cosmosCollectionLink, int maxRetries, + AzureMetrics azureMetrics) { + this.documentClient = documentClient; + this.cosmosCollectionLink = cosmosCollectionLink; + this.azureMetrics = azureMetrics; + this.maxRetries = maxRetries; + } + + /** + * Test connectivity to Azure CosmosDB + */ + void testConnectivity() { + try { + ResourceResponse response = + documentClient.readCollection(cosmosCollectionLink, new RequestOptions()); + if (response.getResource() == null) { + throw new IllegalStateException("CosmosDB collection not found: " + cosmosCollectionLink); + } + logger.info("CosmosDB connection test succeeded."); + } catch (DocumentClientException ex) { + throw new IllegalStateException("CosmosDB connection test failed", ex); + } + } + + /** + * Upsert the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobMetadata the blob metadata document. + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ + public ResourceResponse upsertMetadata(CloudBlobMetadata blobMetadata) throws DocumentClientException { + RequestOptions options = getRequestOptions(blobMetadata.getPartitionId()); + return retryOperationWithThrottling( + () -> documentClient.upsertDocument(cosmosCollectionLink, blobMetadata, options, true), + azureMetrics.documentCreateTime); + } + + /** + * Delete the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobMetadata the blob metadata document. + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ + public ResourceResponse deleteMetadata(CloudBlobMetadata blobMetadata) throws DocumentClientException { + String docLink = getDocumentLink(blobMetadata.getId()); + RequestOptions options = getRequestOptions(blobMetadata.getPartitionId()); + options.setPartitionKey(new PartitionKey(blobMetadata.getPartitionId())); + return retryOperationWithThrottling(() -> documentClient.deleteDocument(docLink, options), + azureMetrics.documentDeleteTime); + } + + /** + * Read the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobId the {@link BlobId} for which metadata is requested. + * @return the {@link ResourceResponse} containing the metadata document. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ + public ResourceResponse readMetadata(BlobId blobId) throws DocumentClientException { + String docLink = getDocumentLink(blobId.getID()); + RequestOptions options = getRequestOptions(blobId.getPartition().toPathString()); + return retryOperationWithThrottling(() -> documentClient.readDocument(docLink, options), + azureMetrics.documentReadTime); + } + + /** + * Replace the blob metadata document in the CosmosDB collection, retrying as necessary. + * @param blobId the {@link BlobId} for which metadata is replaced. + * @param doc the blob metadata document. + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if the operation failed, or retry limit is exhausted. + */ + public ResourceResponse replaceMetadata(BlobId blobId, Document doc) throws DocumentClientException { + RequestOptions options = getRequestOptions(blobId.getPartition().toPathString()); + return retryOperationWithThrottling(() -> documentClient.replaceDocument(doc, options), + azureMetrics.documentUpdateTime); + } + + /** + * Get the list of blobs in the specified partition matching the specified DocumentDB query. + * @param partitionPath the partition to query. + * @param querySpec the DocumentDB query to execute. + * @param timer the {@link Timer} to use to record query time (excluding waiting). + * @return a List of {@link CloudBlobMetadata} referencing the matching blobs. + * @throws DocumentClientException + */ + List queryMetadata(String partitionPath, SqlQuerySpec querySpec, Timer timer) + throws DocumentClientException { + azureMetrics.documentQueryCount.inc(); + FeedOptions feedOptions = new FeedOptions(); + feedOptions.setPartitionKey(new PartitionKey(partitionPath)); + FeedResponse response = + retryQueryWithThrottling(() -> documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions), + timer); + try { + // Note: internal query iterator wraps DocumentClientException in IllegalStateException! + List metadataList = new ArrayList<>(); + response.getQueryIterable() + .iterator() + .forEachRemaining(doc -> metadataList.add(doc.toObject(CloudBlobMetadata.class))); + return metadataList; + } catch (RuntimeException rex) { + if (rex.getCause() instanceof DocumentClientException) { + azureMetrics.documentErrorCount.inc(); + throw (DocumentClientException) rex.getCause(); + } else { + throw rex; + } + } + } + + /** + * Run the supplied DocumentClient action. If CosmosDB returns status 429 (TOO_MANY_REQUESTS), + * retry after the requested wait period, up to the configured retry limit. + * @param operation the DocumentClient resource operation to execute, wrapped in a {@link Callable}. + * @param timer the {@link Timer} to use to record execution time (excluding waiting). + * @return the {@link ResourceResponse} returned by the operation, if successful. + * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. + */ + private ResourceResponse retryOperationWithThrottling(Callable> operation, + Timer timer) throws DocumentClientException { + return (ResourceResponse) retryWithThrottling(operation, timer); + } + + /** + * Run the supplied DocumentClient query. If CosmosDB returns status 429 (TOO_MANY_REQUESTS), + * retry after the requested wait period, up to the configured retry limit. + * @param query the DocumentClient query to execute, wrapped in a {@link Callable}. + * @param timer the {@link Timer} to use to record execution time (excluding waiting). + * @return the {@link FeedResponse} returned by the query, if successful. + * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. + */ + private FeedResponse retryQueryWithThrottling(Callable> query, Timer timer) + throws DocumentClientException { + return (FeedResponse) retryWithThrottling(query, timer); + } + + /** + * Run the supplied DocumentClient action. If CosmosDB returns status 429 (TOO_MANY_REQUESTS), + * retry after the requested wait period, up to the configured retry limit. + * @param action the DocumentClient action to execute, wrapped in a {@link Callable}. + * @param timer the {@link Timer} to use to record execution time (excluding waiting). + * @return the {@link Object} returned by the action, if successful. + * @throws DocumentClientException if Cosmos returns a different error status, or if the retry limit is reached. + */ + private Object retryWithThrottling(Callable action, Timer timer) throws DocumentClientException { + int count = 0; + long waitTime = 0; + do { + try { + waitForMs(waitTime); + Timer.Context docTimer = timer.time(); + Object response = executeCosmosAction(action); + docTimer.stop(); + return response; + } catch (DocumentClientException dex) { + // Azure tells us how long to wait before retrying. + if (dex.getStatusCode() == HTTP_TOO_MANY_REQUESTS) { + waitTime = dex.getRetryAfterInMilliseconds(); + azureMetrics.retryCount.inc(); + logger.debug("Got {} from Cosmos, will wait {} ms before retrying.", HTTP_TOO_MANY_REQUESTS, waitTime); + } else { + // Something else, not retryable. + throw dex; + } + } catch (Exception e) { + azureMetrics.documentErrorCount.inc(); + throw new RuntimeException("Exception calling action " + action, e); + } + count++; + } while (count <= maxRetries); + azureMetrics.retryCount.dec(); // number of retries, not total tries + azureMetrics.documentErrorCount.inc(); + String message = "Max number of retries reached while retrying with action " + action; + throw new RuntimeException(message); + } + + /** + * Utility method to call a Cosmos method and extract any nested DocumentClientException. + * @param action the action to call. + * @return the result of the action. + * @throws Exception + */ + private Object executeCosmosAction(Callable action) throws Exception { + try { + return action.call(); + } catch (DocumentClientException dex) { + throw dex; + } catch (IllegalStateException ex) { + if (ex.getCause() instanceof DocumentClientException) { + throw (DocumentClientException) ex.getCause(); + } else { + throw ex; + } + } + } + + private String getDocumentLink(String documentId) { + return cosmosCollectionLink + DOCS + documentId; + } + + private RequestOptions getRequestOptions(String partitionPath) { + RequestOptions options = new RequestOptions(); + options.setPartitionKey(new PartitionKey(partitionPath)); + return options; + } + + /** + * Wait for the specified time, or until interrupted. + * @param waitTimeInMillis the time to wait. + */ + void waitForMs(long waitTimeInMillis) { + if (waitTimeInMillis > 0) { + Timer.Context waitTimer = azureMetrics.retryWaitTime.time(); + try { + TimeUnit.MILLISECONDS.sleep(waitTimeInMillis); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for retry"); + } + waitTimer.stop(); + } + } +} diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java index 26a88ea585..b43ff77431 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureCloudDestinationTest.java @@ -147,6 +147,7 @@ public void testDelete() throws Exception { assertEquals(1, azureMetrics.blobUpdatedCount.getCount()); assertEquals(0, azureMetrics.blobUpdateErrorCount.getCount()); assertEquals(1, azureMetrics.blobUpdateTime.getCount()); + assertEquals(1, azureMetrics.documentReadTime.getCount()); assertEquals(1, azureMetrics.documentUpdateTime.getCount()); } @@ -158,6 +159,7 @@ public void testExpire() throws Exception { assertEquals(1, azureMetrics.blobUpdatedCount.getCount()); assertEquals(0, azureMetrics.blobUpdateErrorCount.getCount()); assertEquals(1, azureMetrics.blobUpdateTime.getCount()); + assertEquals(1, azureMetrics.documentReadTime.getCount()); assertEquals(1, azureMetrics.documentUpdateTime.getCount()); } @@ -171,6 +173,7 @@ public void testPurge() throws Exception { assertTrue("Expected success", azureDest.purgeBlob(cloudBlobMetadata)); assertEquals(1, azureMetrics.blobDeletedCount.getCount()); assertEquals(0, azureMetrics.blobDeleteErrorCount.getCount()); + assertEquals(1, azureMetrics.documentDeleteTime.getCount()); } /** Test purge not found. */ @@ -283,7 +286,7 @@ public void testAzureConnection() throws Exception { } catch (IllegalStateException expected) { } try { - dest.testCosmosConnectivity(); + dest.getCosmosDataAccessor().testConnectivity(); fail("Expected exception"); } catch (IllegalStateException expected) { } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java index cb13b49fff..8efda35699 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/AzureIntegrationTest.java @@ -14,6 +14,7 @@ package com.github.ambry.cloud.azure; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.github.ambry.cloud.CloudBlobMetadata; import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockPartitionId; @@ -66,8 +67,6 @@ public class AzureIntegrationTest { private long testPartition = 666; // one day retention private int retentionPeriodDays = 1; - - private String cosmosCollectionLink; private String propFileName = "azure-test.properties"; private String tokenFileName = "replicaTokens"; @@ -252,8 +251,9 @@ public void testPurgeDeadBlobs() throws Exception { private void cleanup() throws Exception { String partitionPath = String.valueOf(testPartition); + Timer dummyTimer = new Timer(); List allBlobsInPartition = - azureDest.runMetadataQuery(partitionPath, new SqlQuerySpec("SELECT * FROM c")); + azureDest.getCosmosDataAccessor().queryMetadata(partitionPath, new SqlQuerySpec("SELECT * FROM c"), dummyTimer); azureDest.purgeBlobs(allBlobsInPartition); } diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java new file mode 100644 index 0000000000..8154151565 --- /dev/null +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/azure/CosmosDataAccessorTest.java @@ -0,0 +1,223 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud.azure; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.ambry.cloud.CloudBlobMetadata; +import com.github.ambry.clustermap.MockPartitionId; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.commons.BlobId; +import com.github.ambry.utils.Utils; +import com.microsoft.azure.documentdb.Document; +import com.microsoft.azure.documentdb.DocumentClient; +import com.microsoft.azure.documentdb.DocumentClientException; +import com.microsoft.azure.documentdb.Error; +import com.microsoft.azure.documentdb.FeedOptions; +import com.microsoft.azure.documentdb.FeedResponse; +import com.microsoft.azure.documentdb.QueryIterable; +import com.microsoft.azure.documentdb.RequestOptions; +import com.microsoft.azure.documentdb.ResourceResponse; +import com.microsoft.azure.documentdb.SqlQuerySpec; +import com.microsoft.azure.documentdb.internal.HttpConstants; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static com.github.ambry.commons.BlobId.*; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + + +/** Test cases for {@link CosmosDataAccessor} */ +@RunWith(MockitoJUnitRunner.class) +public class CosmosDataAccessorTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private CosmosDataAccessor cosmosAccessor; + private DocumentClient mockumentClient; + private ResourceResponse mockResponse; + private DocumentClientException retryException; + private AzureMetrics azureMetrics; + private BlobId blobId; + private int blobSize = 1024; + private CloudBlobMetadata blobMetadata; + int maxRetries = 3; + + @Before + public void setup() throws Exception { + mockumentClient = mock(DocumentClient.class); + mockResponse = mock(ResourceResponse.class); + retryException = new DocumentClientException(HttpConstants.StatusCodes.TOO_MANY_REQUESTS, new Error(), + Collections.singletonMap(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS, "1")); + + byte dataCenterId = 66; + short accountId = 101; + short containerId = 5; + PartitionId partitionId = new MockPartitionId(); + blobId = new BlobId(BLOB_ID_V6, BlobIdType.NATIVE, dataCenterId, accountId, containerId, partitionId, false, + BlobDataType.DATACHUNK); + blobMetadata = new CloudBlobMetadata(blobId, System.currentTimeMillis(), Utils.Infinite_Time, blobSize, + CloudBlobMetadata.EncryptionOrigin.NONE, null, null); + azureMetrics = new AzureMetrics(new MetricRegistry()); + cosmosAccessor = new CosmosDataAccessor(mockumentClient, "ambry/metadata", maxRetries, azureMetrics); + } + + /** + * Test normal upsert. + * @throws Exception + */ + @Test + public void testUpsertNormal() throws Exception { + // Request succeeds first time + when(mockumentClient.upsertDocument(anyString(), any(), any(RequestOptions.class), anyBoolean())).thenReturn( + mockResponse); + cosmosAccessor.upsertMetadata(blobMetadata); + assertEquals(1, azureMetrics.documentCreateTime.getCount()); + assertEquals(0, azureMetrics.retryCount.getCount()); + assertEquals(0, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test upsert with one retry. + * @throws Exception + */ + @Test + public void testUpsertRetry() throws Exception { + // Request gets 420 on first try, succeeds on retry + when(mockumentClient.upsertDocument(anyString(), any(), any(RequestOptions.class), anyBoolean())).thenThrow( + retryException).thenReturn(mockResponse); + cosmosAccessor.upsertMetadata(blobMetadata); + assertEquals(1, azureMetrics.documentCreateTime.getCount()); + assertEquals(1, azureMetrics.retryCount.getCount()); + assertEquals(1, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test upsert exhausts retries. + * @throws Exception + */ + @Test + public void testUpsertExhaustRetries() throws Exception { + // Request keeps getting 420 until we give up + when(mockumentClient.upsertDocument(anyString(), any(), any(RequestOptions.class), anyBoolean())).thenThrow( + retryException); + try { + cosmosAccessor.upsertMetadata(blobMetadata); + fail("Expected operation to fail after too many retries"); + } catch (RuntimeException expected) { + } + assertEquals(0, azureMetrics.documentCreateTime.getCount()); + assertEquals(maxRetries, azureMetrics.retryCount.getCount()); + assertEquals(maxRetries, azureMetrics.retryWaitTime.getCount()); + } + + /** Test read. */ + @Test + public void testReadNormal() throws Exception { + // Request succeeds first time + when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenReturn(mockResponse); + cosmosAccessor.readMetadata(blobId); + assertEquals(1, azureMetrics.documentReadTime.getCount()); + assertEquals(0, azureMetrics.retryCount.getCount()); + assertEquals(0, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test read with one retry. + * @throws Exception + */ + @Test + public void testReadRetry() throws Exception { + // Request gets 420 on first try, succeeds on retry + when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenThrow(retryException) + .thenReturn(mockResponse); + cosmosAccessor.readMetadata(blobId); + assertEquals(1, azureMetrics.documentReadTime.getCount()); + assertEquals(1, azureMetrics.retryCount.getCount()); + assertEquals(1, azureMetrics.retryWaitTime.getCount()); + } + + /** + * Test read exhausts retries. + * @throws Exception + */ + @Test + public void testReadExhaustRetries() throws Exception { + // Request keeps getting 420 until we give up + when(mockumentClient.readDocument(anyString(), any(RequestOptions.class))).thenThrow(retryException); + try { + cosmosAccessor.readMetadata(blobId); + fail("Expected operation to fail after too many retries"); + } catch (RuntimeException expected) { + } + assertEquals(0, azureMetrics.documentReadTime.getCount()); + assertEquals(maxRetries, azureMetrics.retryCount.getCount()); + assertEquals(maxRetries, azureMetrics.retryWaitTime.getCount()); + } + + /** Test query metadata. */ + @Test + public void testQueryNormal() throws Exception { + FeedResponse feedResponse = getFeedResponse(); + when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenReturn( + feedResponse); + List metadataList = doQueryMetadata(); + assertEquals("Expected single entry", 1, metadataList.size()); + CloudBlobMetadata outputMetadata = metadataList.get(0); + assertEquals("Returned metadata does not match original", blobMetadata, outputMetadata); + assertEquals(1, azureMetrics.documentQueryCount.getCount()); + assertEquals(1, azureMetrics.missingKeysQueryTime.getCount()); + assertEquals(0, azureMetrics.retryCount.getCount()); + assertEquals(0, azureMetrics.retryWaitTime.getCount()); + } + + /** Test query one retry. */ + @Test + public void testQueryRetry() throws Exception { + FeedResponse feedResponse = getFeedResponse(); + when(mockumentClient.queryDocuments(anyString(), any(SqlQuerySpec.class), any(FeedOptions.class))).thenThrow( + new IllegalStateException(retryException)).thenReturn(feedResponse); + List metadataList = doQueryMetadata(); + assertEquals("Expected single entry", 1, metadataList.size()); + CloudBlobMetadata outputMetadata = metadataList.get(0); + assertEquals("Returned metadata does not match original", blobMetadata, outputMetadata); + assertEquals(1, azureMetrics.documentQueryCount.getCount()); + assertEquals(1, azureMetrics.missingKeysQueryTime.getCount()); + assertEquals(1, azureMetrics.retryCount.getCount()); + assertEquals(1, azureMetrics.retryWaitTime.getCount()); + } + + /** + * @return a FeedResponse with a single document. + */ + private FeedResponse getFeedResponse() throws Exception { + QueryIterable mockIterable = mock(QueryIterable.class); + List docList = Collections.singletonList(new Document(objectMapper.writeValueAsString(blobMetadata))); + when(mockIterable.iterator()).thenReturn(docList.iterator()); + FeedResponse feedResponse = mock(FeedResponse.class); + when(feedResponse.getQueryIterable()).thenReturn(mockIterable); + return feedResponse; + } + + /** Utility method to run metadata query with default parameters. */ + private List doQueryMetadata() throws Exception { + return cosmosAccessor.queryMetadata(blobId.getPartition().toPathString(), new SqlQuerySpec("select * from c"), + azureMetrics.missingKeysQueryTime); + } +}