Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Cosmos requests after delay when they return 429 #1205

Merged
merged 3 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void persist(String mountPath, List<ReplicaTokenInfo> tokenInfoList)

InputStream inputStream = new ByteArrayInputStream(tokenOutputStream.toByteArray());
cloudDestination.persistTokens(mountPath, replicaTokenFileName, inputStream);
logger.debug("Completed writing replica tokens to cloud destination.");
logger.debug("Persisted replica tokens for {} to cloud destination.", mountPath);
} catch (CloudStorageException e) {
throw new ReplicationException("IO error persisting replica tokens at mount path " + mountPath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.cloud.azure;

import com.github.ambry.config.Config;
import com.github.ambry.config.Default;
import com.github.ambry.config.VerifiableProperties;


Expand All @@ -26,6 +27,8 @@ public class AzureCloudConfig {
public static final String COSMOS_ENDPOINT = "cosmos.endpoint";
public static final String COSMOS_COLLECTION_LINK = "cosmos.collection.link";
public static final String COSMOS_KEY = "cosmos.key";
public static final String COSMOS_MAX_RETRIES = "cosmos.max.retries";
public static final int DEFAULT_COSMOS_MAX_RETRIES = 5;

/**
* The Azure Blob Storage connection string.
Expand All @@ -51,10 +54,18 @@ public class AzureCloudConfig {
@Config(COSMOS_KEY)
public final String cosmosKey;

/**
* The maximum number of retries for Cosmos DB requests.
*/
@Config(COSMOS_MAX_RETRIES)
@Default("5")
public final int cosmosMaxRetries;

public AzureCloudConfig(VerifiableProperties verifiableProperties) {
azureStorageConnectionString = verifiableProperties.getString(AZURE_STORAGE_CONNECTION_STRING);
cosmosEndpoint = verifiableProperties.getString(COSMOS_ENDPOINT);
cosmosCollectionLink = verifiableProperties.getString(COSMOS_COLLECTION_LINK);
cosmosKey = verifiableProperties.getString(COSMOS_KEY);
cosmosMaxRetries = verifiableProperties.getInt(COSMOS_MAX_RETRIES, DEFAULT_COSMOS_MAX_RETRIES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.FeedOptions;
import com.microsoft.azure.documentdb.FeedResponse;
import com.microsoft.azure.documentdb.PartitionKey;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.ResourceResponse;
import com.microsoft.azure.documentdb.SqlParameter;
import com.microsoft.azure.documentdb.SqlParameterCollection;
Expand All @@ -50,7 +45,6 @@
import java.net.Proxy;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -81,8 +75,7 @@ class AzureCloudDestination implements CloudDestination {
private final CloudStorageAccount azureAccount;
private final CloudBlobClient azureBlobClient;
private final DocumentClient documentClient;
private final String cosmosCollectionLink;
private final RequestOptions defaultRequestOptions = new RequestOptions();
private final CosmosDataAccessor cosmosDataAccessor;
private final OperationContext blobOpContext = new OperationContext();
private final AzureMetrics azureMetrics;
private final String clusterName;
Expand Down Expand Up @@ -112,14 +105,14 @@ class AzureCloudDestination implements CloudDestination {
new Proxy(Proxy.Type.HTTP, new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort)));
}
// Set up CosmosDB connection, including any proxy setting
cosmosCollectionLink = azureCloudConfig.cosmosCollectionLink;
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
if (cloudConfig.vcrProxyHost != null) {
connectionPolicy.setProxy(new HttpHost(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort));
connectionPolicy.setHandleServiceUnavailableFromProxy(true);
}
documentClient = new DocumentClient(azureCloudConfig.cosmosEndpoint, azureCloudConfig.cosmosKey, connectionPolicy,
ConsistencyLevel.Session);
cosmosDataAccessor = new CosmosDataAccessor(documentClient, azureCloudConfig, azureMetrics);
this.retentionPeriodMs = TimeUnit.DAYS.toMillis(cloudConfig.cloudDeletedBlobRetentionDays);
logger.info("Created Azure destination");
}
Expand All @@ -137,21 +130,23 @@ class AzureCloudDestination implements CloudDestination {
String clusterName, AzureMetrics azureMetrics) {
this.azureAccount = azureAccount;
this.documentClient = documentClient;
this.cosmosCollectionLink = cosmosCollectionLink;
this.azureMetrics = azureMetrics;
this.clusterName = clusterName;
this.retentionPeriodMs = TimeUnit.DAYS.toMillis(CloudConfig.DEFAULT_RETENTION_DAYS);

// Create a blob client to interact with Blob storage
azureBlobClient = azureAccount.createCloudBlobClient();
cosmosDataAccessor =
new CosmosDataAccessor(documentClient, cosmosCollectionLink, AzureCloudConfig.DEFAULT_COSMOS_MAX_RETRIES,
azureMetrics);
}

/**
* Test connectivity to Azure endpoints
*/
void testAzureConnectivity() {
testStorageConnectivity();
testCosmosConnectivity();
cosmosDataAccessor.testConnectivity();
}

/**
Expand All @@ -174,27 +169,19 @@ void testStorageConnectivity() {
}

/**
* Test connectivity to Azure CosmosDB
* Visible for test.
* @return the CosmosDB DocumentClient
*/
void testCosmosConnectivity() {
try {
ResourceResponse<DocumentCollection> response =
documentClient.readCollection(cosmosCollectionLink, defaultRequestOptions);
if (response.getResource() == null) {
throw new IllegalStateException("CosmosDB collection not found: " + cosmosCollectionLink);
}
logger.info("CosmosDB connection test succeeded.");
} catch (DocumentClientException ex) {
throw new IllegalStateException("CosmosDB connection test failed", ex);
}
DocumentClient getDocumentClient() {
return documentClient;
}

/**
* Visible for test.
* @return the CosmosDB DocumentClient.
* @return the {@link CosmosDataAccessor}
*/
DocumentClient getDocumentClient() {
return documentClient;
CosmosDataAccessor getCosmosDataAccessor() {
return cosmosDataAccessor;
}

/**
Expand All @@ -217,14 +204,7 @@ public boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata clo
// Note: if uploaded is false, still attempt to insert the metadata document
// since it is possible that a previous attempt failed.

Timer.Context docTimer = azureMetrics.documentCreateTime.time();
try {
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(blobId.getPartition().toPathString()));
documentClient.upsertDocument(cosmosCollectionLink, cloudBlobMetadata, options, true);
} finally {
docTimer.stop();
}
cosmosDataAccessor.upsertMetadata(cloudBlobMetadata);
backupTimer.stop();
if (uploaded) {
azureMetrics.backupSuccessByteRate.mark(inputLength);
Expand Down Expand Up @@ -294,59 +274,29 @@ public Map<String, CloudBlobMetadata> getBlobMetadata(List<BlobId> blobIds) thro
if (blobIds.isEmpty()) {
return Collections.emptyMap();
}
Timer.Context queryTimer = azureMetrics.missingKeysQueryTime.time();
String quotedBlobIds =
String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList()));
String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds);
String partitionPath = blobIds.get(0).getPartition().toPathString();
try {
String quotedBlobIds =
String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList()));
String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds);
List<CloudBlobMetadata> metadataList =
runMetadataQuery(blobIds.get(0).getPartition().toPathString(), new SqlQuerySpec(query));
cosmosDataAccessor.queryMetadata(partitionPath, new SqlQuerySpec(query), azureMetrics.missingKeysQueryTime);
return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity()));
} finally {
queryTimer.stop();
} catch (DocumentClientException dex) {
throw new CloudStorageException("Failed to query blob metadata for partition " + partitionPath, dex);
}
}

@Override
public List<CloudBlobMetadata> getDeadBlobs(String partitionPath) throws CloudStorageException {
long now = System.currentTimeMillis();
long retentionThreshold = now - retentionPeriodMs;
Timer.Context queryTimer = azureMetrics.deadBlobsQueryTime.time();
try {
SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE,
new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold)));
return runMetadataQuery(partitionPath, deadBlobsQuery);
} finally {
queryTimer.stop();
}
}

/**
* Get the list of blobs in the specified partition matching the specified DocumentDB query.
* @param partitionPath the partition to query.
* @param querySpec the DocumentDB query to execute.
* @return a List of {@link CloudBlobMetadata} referencing the matching blobs.
* @throws CloudStorageException
*/
List<CloudBlobMetadata> runMetadataQuery(String partitionPath, SqlQuerySpec querySpec) throws CloudStorageException {
azureMetrics.documentQueryCount.inc();
FeedOptions feedOptions = new FeedOptions();
feedOptions.setPartitionKey(new PartitionKey(partitionPath));
FeedResponse<Document> response = documentClient.queryDocuments(cosmosCollectionLink, querySpec, feedOptions);
SqlQuerySpec deadBlobsQuery = new SqlQuerySpec(DEAD_BLOBS_QUERY_TEMPLATE,
new SqlParameterCollection(new SqlParameter(THRESHOLD_PARAM, retentionThreshold)));
try {
// Note: internal query iterator wraps DocumentClientException in IllegalStateException!
List<CloudBlobMetadata> metadataList = new ArrayList<>();
response.getQueryIterable()
.iterator()
.forEachRemaining(doc -> metadataList.add(doc.toObject(CloudBlobMetadata.class)));
return metadataList;
} catch (RuntimeException rex) {
if (rex.getCause() instanceof DocumentClientException) {
azureMetrics.documentErrorCount.inc();
throw new CloudStorageException("Failed to query blob metadata", rex.getCause());
} else {
throw rex;
}
return cosmosDataAccessor.queryMetadata(partitionPath, deadBlobsQuery, azureMetrics.deadBlobsQueryTime);
} catch (DocumentClientException dex) {
throw new CloudStorageException("Failed to query dead blobs for partition " + partitionPath, dex);
}
}

Expand Down Expand Up @@ -389,25 +339,17 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value
storageTimer.stop();
}

Timer.Context docTimer = azureMetrics.documentUpdateTime.time();
try {
String docLink = cosmosCollectionLink + "/docs/" + blobId.getID();
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(blobId.getPartition().toPathString()));
ResourceResponse<Document> response = documentClient.readDocument(docLink, options);
//CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class);
Document doc = response.getResource();
if (doc == null) {
logger.warn("Blob metadata record not found: " + docLink);
return false;
}
// Update only if value has changed
if (!value.equals(doc.get(fieldName))) {
doc.set(fieldName, value);
documentClient.replaceDocument(doc, options);
}
} finally {
docTimer.stop();
ResourceResponse<Document> response = cosmosDataAccessor.readMetadata(blobId);
//CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class);
Document doc = response.getResource();
if (doc == null) {
logger.warn("Blob metadata record not found: {}", blobId.getID());
return false;
}
// Update only if value has changed
if (!value.equals(doc.get(fieldName))) {
doc.set(fieldName, value);
cosmosDataAccessor.replaceMetadata(blobId, doc);
}
logger.debug("Updated blob {} metadata set {} to {}.", blobId, fieldName, value);
azureMetrics.blobUpdatedCount.inc();
Expand All @@ -425,8 +367,6 @@ public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageExce
String blobFileName = blobMetadata.getCloudBlobName();
String partitionPath = blobMetadata.getPartitionId();
String containerName = getAzureContainerName(partitionPath);
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(partitionPath));
azureMetrics.blobDeleteRequestCount.inc();
Timer.Context deleteTimer = azureMetrics.blobDeletionTime.time();
try {
Expand All @@ -436,9 +376,8 @@ public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageExce
boolean deletionDone = azureBlob.deleteIfExists();

// Delete the document too
String docLink = cosmosCollectionLink + "/docs/" + blobId;
try {
documentClient.deleteDocument(docLink, options);
cosmosDataAccessor.deleteMetadata(blobMetadata);
deletionDone = true;
logger.debug("Purged blob {} from partition {}.", blobId, partitionPath);
} catch (DocumentClientException dex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public class AzureMetrics {
public static final String BLOB_UPDATE_TIME = "BlobUpdateTime";
public static final String BLOB_UPDATED_COUNT = "BlobUpdatedCount";
public static final String DOCUMENT_CREATE_TIME = "DocumentCreateTime";
public static final String DOCUMENT_READ_TIME = "DocumentReadTime";
public static final String DOCUMENT_UPDATE_TIME = "DocumentUpdateTime";
public static final String DOCUMENT_DELETE_TIME = "DocumentDeleteTime";
public static final String DOCUMENT_QUERY_COUNT = "DocumentQueryCount";
public static final String MISSING_KEYS_QUERY_TIME = "MissingKeysQueryTime";
public static final String DEAD_BLOBS_QUERY_TIME = "DeadBlobsQueryTime";
Expand All @@ -44,6 +46,8 @@ public class AzureMetrics {
public static final String BACKUP_SUCCESS_LATENCY = "BackupSuccessLatency";
public static final String BACKUP_SUCCESS_BYTE_RATE = "BackupSuccessByteRate";
public static final String BACKUP_ERROR_COUNT = "BackupErrorCount";
public static final String RETRY_COUNT = "RetryCount";
public static final String RETRY_WAIT_TIME = "RetryWaitTime";

// Metrics
public final Counter blobUploadRequestCount;
Expand All @@ -53,7 +57,9 @@ public class AzureMetrics {
public final Timer blobUploadTime;
public final Timer blobUpdateTime;
public final Timer documentCreateTime;
public final Timer documentReadTime;
public final Timer documentUpdateTime;
public final Timer documentDeleteTime;
public final Timer missingKeysQueryTime;
public final Counter documentQueryCount;
public final Timer deadBlobsQueryTime;
Expand All @@ -68,6 +74,8 @@ public class AzureMetrics {
public final Timer backupSuccessLatency;
public final Meter backupSuccessByteRate;
public final Counter backupErrorCount;
public final Counter retryCount;
public final Timer retryWaitTime;

public AzureMetrics(MetricRegistry registry) {
blobUploadRequestCount =
Expand All @@ -80,7 +88,9 @@ public AzureMetrics(MetricRegistry registry) {
blobUploadTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_TIME));
blobUpdateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPDATE_TIME));
documentCreateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_CREATE_TIME));
documentReadTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_READ_TIME));
documentUpdateTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_UPDATE_TIME));
documentDeleteTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_DELETE_TIME));
documentQueryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, DOCUMENT_QUERY_COUNT));
missingKeysQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, MISSING_KEYS_QUERY_TIME));
deadBlobsQueryTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, DEAD_BLOBS_QUERY_TIME));
Expand All @@ -96,5 +106,7 @@ public AzureMetrics(MetricRegistry registry) {
backupSuccessByteRate = registry.meter(MetricRegistry.name(AzureCloudDestination.class, BACKUP_SUCCESS_BYTE_RATE));
backupSuccessLatency = registry.timer(MetricRegistry.name(AzureCloudDestination.class, BACKUP_SUCCESS_LATENCY));
backupErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, BACKUP_ERROR_COUNT));
retryCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, RETRY_COUNT));
retryWaitTime = registry.timer(MetricRegistry.name(AzureCloudDestination.class, RETRY_WAIT_TIME));
}
}
Loading