Skip to content

Commit

Permalink
Use async cosmos library and upgrade to 2.6.3 (#1321)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankagrawal authored and lightningrob committed Dec 6, 2019
1 parent 6e43fda commit 1411228
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 404 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
import com.github.ambry.cloud.CloudStorageException;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.CloudConfig;
import com.microsoft.azure.documentdb.ConnectionMode;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.ResourceResponse;
import com.microsoft.azure.documentdb.SqlParameter;
import com.microsoft.azure.documentdb.SqlParameterCollection;
import com.microsoft.azure.documentdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.ConnectionMode;
import com.microsoft.azure.cosmosdb.ConnectionPolicy;
import com.microsoft.azure.cosmosdb.ConsistencyLevel;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.RetryOptions;
import com.microsoft.azure.cosmosdb.SqlParameter;
import com.microsoft.azure.cosmosdb.SqlParameterCollection;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
Expand All @@ -51,7 +52,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand All @@ -61,7 +61,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -77,22 +76,22 @@ class AzureCloudDestination implements CloudDestination {
private static final String TIME_SINCE_PARAM = "@timesince";
private static final String BATCH_ID_QUERY_TEMPLATE = "SELECT * FROM c WHERE c.id IN (%s)";
static final int ID_QUERY_BATCH_SIZE = 1000;
static final String DEAD_BLOBS_QUERY_TEMPLATE =
private static final String DEAD_BLOBS_QUERY_TEMPLATE =
"SELECT TOP " + LIMIT_PARAM + " * FROM c WHERE (c." + CloudBlobMetadata.FIELD_DELETION_TIME + " BETWEEN 1 AND "
+ THRESHOLD_PARAM + ")" + " OR (c." + CloudBlobMetadata.FIELD_EXPIRATION_TIME + " BETWEEN 1 AND "
+ THRESHOLD_PARAM + ")" + " ORDER BY c." + CloudBlobMetadata.FIELD_UPLOAD_TIME + " ASC";
// Note: ideally would like to order by uploadTime and id, but Cosmos doesn't allow without composite index.
// It is unlikely (but not impossible) for two blobs in same partition to have the same uploadTime (would have to
// be multiple VCR's uploading same partition). We track the lastBlobId in the CloudFindToken and skip it if
// is returned in successive queries.
static final String ENTRIES_SINCE_QUERY_TEMPLATE =
private static final String ENTRIES_SINCE_QUERY_TEMPLATE =
"SELECT TOP " + LIMIT_PARAM + " * FROM c WHERE c." + CosmosDataAccessor.COSMOS_LAST_UPDATED_COLUMN + " >= "
+ TIME_SINCE_PARAM + " ORDER BY c." + CosmosDataAccessor.COSMOS_LAST_UPDATED_COLUMN + " ASC";
private static final String SEPARATOR = "-";
private static final int findSinceQueryLimit = 1000;
private final CloudStorageAccount azureAccount;
private final CloudBlobClient azureBlobClient;
private final DocumentClient documentClient;
private final AsyncDocumentClient asyncDocumentClient;
private final CosmosDataAccessor cosmosDataAccessor;
private final OperationContext blobOpContext = new OperationContext();
private final AzureMetrics azureMetrics;
Expand Down Expand Up @@ -123,20 +122,25 @@ class AzureCloudDestination implements CloudDestination {
OperationContext.setDefaultProxy(
new Proxy(Proxy.Type.HTTP, new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort)));
}
// Set up CosmosDB connection, including any proxy setting
// Set up CosmosDB connection, including retry options and any proxy setting
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
RetryOptions retryOptions = new RetryOptions();
retryOptions.setMaxRetryAttemptsOnThrottledRequests(azureCloudConfig.cosmosMaxRetries);
connectionPolicy.setRetryOptions(retryOptions);
if (azureCloudConfig.cosmosDirectHttps) {
logger.info("Using CosmosDB DirectHttps connection mode");
connectionPolicy.setConnectionMode(ConnectionMode.DirectHttps);
connectionPolicy.setConnectionMode(ConnectionMode.Direct);
}
if (cloudConfig.vcrProxyHost != null) {
connectionPolicy.setProxy(new HttpHost(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort));
connectionPolicy.setHandleServiceUnavailableFromProxy(true);
connectionPolicy.setProxy(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort);
}
// TODO: test option to set connectionPolicy.setEnableEndpointDiscovery(false);
documentClient = new DocumentClient(azureCloudConfig.cosmosEndpoint, azureCloudConfig.cosmosKey, connectionPolicy,
ConsistencyLevel.Session);
cosmosDataAccessor = new CosmosDataAccessor(documentClient, azureCloudConfig, azureMetrics);
asyncDocumentClient = new AsyncDocumentClient.Builder().withServiceEndpoint(azureCloudConfig.cosmosEndpoint)
.withMasterKeyOrResourceToken(azureCloudConfig.cosmosKey)
.withConnectionPolicy(connectionPolicy)
.withConsistencyLevel(ConsistencyLevel.Session)
.build();
cosmosDataAccessor = new CosmosDataAccessor(asyncDocumentClient, azureCloudConfig, azureMetrics);
this.retentionPeriodMs = TimeUnit.DAYS.toMillis(cloudConfig.cloudDeletedBlobRetentionDays);
this.deadBlobsQueryLimit = cloudConfig.cloudBlobCompactionQueryLimit;
logger.info("Created Azure destination");
Expand All @@ -145,26 +149,23 @@ class AzureCloudDestination implements CloudDestination {
/**
* Test constructor.
* @param azureAccount the {@link CloudStorageAccount} to use.
* @param documentClient the {@link DocumentClient} to use.
* @param asyncDocumentClient the {@link AsyncDocumentClient} to use.
* @param cosmosCollectionLink the CosmosDB collection link to use.
* @param clusterName the name of the Ambry cluster.
* @param azureMetrics the {@link AzureMetrics} to use.
* @throws CloudStorageException if the destination could not be created.
*/
AzureCloudDestination(CloudStorageAccount azureAccount, DocumentClient documentClient, String cosmosCollectionLink,
String clusterName, AzureMetrics azureMetrics) {
AzureCloudDestination(CloudStorageAccount azureAccount, AsyncDocumentClient asyncDocumentClient,
String cosmosCollectionLink, String clusterName, AzureMetrics azureMetrics) {
this.azureAccount = azureAccount;
this.documentClient = documentClient;
this.asyncDocumentClient = asyncDocumentClient;
this.azureMetrics = azureMetrics;
this.clusterName = clusterName;
this.retentionPeriodMs = TimeUnit.DAYS.toMillis(CloudConfig.DEFAULT_RETENTION_DAYS);
this.deadBlobsQueryLimit = CloudConfig.DEFAULT_COMPACTION_QUERY_LIMIT;

// Create a blob client to interact with Blob storage
azureBlobClient = azureAccount.createCloudBlobClient();
cosmosDataAccessor =
new CosmosDataAccessor(documentClient, cosmosCollectionLink, AzureCloudConfig.DEFAULT_COSMOS_MAX_RETRIES,
azureMetrics);
cosmosDataAccessor = new CosmosDataAccessor(asyncDocumentClient, cosmosCollectionLink, azureMetrics);
}

/**
Expand Down Expand Up @@ -309,15 +310,14 @@ public Map<String, CloudBlobMetadata> getBlobMetadata(List<BlobId> blobIds) thro
metadataList = getBlobMetadataChunked(blobIds);
}

return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity()));
return metadataList.stream().collect(Collectors.toMap(CloudBlobMetadata::getId, Function.identity()));
}

private List<CloudBlobMetadata> getBlobMetadataChunked(List<BlobId> blobIds) throws CloudStorageException {
if (blobIds.isEmpty() || blobIds.size() > ID_QUERY_BATCH_SIZE) {
throw new IllegalArgumentException("Invalid input list size: " + blobIds.size());
}
String quotedBlobIds =
String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList()));
String quotedBlobIds = blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.joining(","));
String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds);
String partitionPath = blobIds.get(0).getPartition().toPathString();
try {
Expand Down Expand Up @@ -363,6 +363,14 @@ public List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindT
}
}

/**
* Getter for {@link AsyncDocumentClient} object.
* @return {@link AsyncDocumentClient} object.
*/
AsyncDocumentClient getAsyncDocumentClient() {
return asyncDocumentClient;
}

/**
* Filter out {@link CloudBlobMetadata} objects from lastUpdateTime ordered {@code cloudBlobMetadataList} whose
* lastUpdateTime is {@code lastUpdateTime} and id is in {@code lastReadBlobIds}.
Expand All @@ -374,9 +382,9 @@ private void filterOutLastReadBlobs(List<CloudBlobMetadata> cloudBlobMetadataLis
long lastUpdateTime) {
ListIterator<CloudBlobMetadata> iterator = cloudBlobMetadataList.listIterator();
int numRemovedBlobs = 0;
while(iterator.hasNext()) {
while (iterator.hasNext()) {
CloudBlobMetadata cloudBlobMetadata = iterator.next();
if(numRemovedBlobs == lastReadBlobIds.size() || cloudBlobMetadata.getLastUpdateTime() > lastUpdateTime) {
if (numRemovedBlobs == lastReadBlobIds.size() || cloudBlobMetadata.getLastUpdateTime() > lastUpdateTime) {
break;
}
if (lastReadBlobIds.contains(cloudBlobMetadata.getId())) {
Expand All @@ -392,7 +400,7 @@ private void filterOutLastReadBlobs(List<CloudBlobMetadata> cloudBlobMetadataLis
* @param fieldName The metadata field to modify.
* @param value The new value.
* @return {@code true} if the udpate succeeded, {@code false} if the metadata record was not found.
* @throws DocumentClientException
* @throws CloudStorageException if the update fails.
*/
private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value) throws CloudStorageException {
Objects.requireNonNull(blobId, "BlobId cannot be null");
Expand Down Expand Up @@ -424,7 +432,6 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value
}

ResourceResponse<Document> response = cosmosDataAccessor.readMetadata(blobId);
//CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class);
Document doc = response.getResource();
if (doc == null) {
logger.warn("Blob metadata record not found: {}", blobId.getID());
Expand Down Expand Up @@ -510,7 +517,8 @@ public boolean doesBlobExist(BlobId blobId) throws CloudStorageException {
* @param blobId the {@link BlobId} that needs a container.
* @param autoCreate flag indicating whether to create the container if it does not exist.
* @return the created {@link CloudBlobContainer}.
* @throws Exception
* @throws URISyntaxException
* @throws StorageException
*/
private CloudBlobContainer getContainer(BlobId blobId, boolean autoCreate)
throws URISyntaxException, StorageException {
Expand Down Expand Up @@ -594,7 +602,7 @@ private CloudBlockBlob getAzureBlobReference(BlobId blobId, boolean autoCreateCo
* @return the name of the Azure storage container where blobs in the specified partition are stored.
* @param partitionPath the lexical path of the Ambry partition.
*/
String getAzureContainerName(String partitionPath) {
private String getAzureContainerName(String partitionPath) {
// Include Ambry cluster name in case the same storage account is used to backup multiple clusters.
// Azure requires container names to be all lower case
String rawContainerName = clusterName + SEPARATOR + partitionPath;
Expand All @@ -612,14 +620,6 @@ String getAzureBlobName(BlobId blobId) {
return blobIdStr.substring(blobIdStr.length() - 4) + SEPARATOR + blobIdStr;
}

/**
* Visible for test.
* @return the CosmosDB DocumentClient
*/
DocumentClient getDocumentClient() {
return documentClient;
}

/**
* Visible for test.
* @return the {@link CosmosDataAccessor}
Expand All @@ -628,14 +628,6 @@ CosmosDataAccessor getCosmosDataAccessor() {
return cosmosDataAccessor;
}

/**
* Visible for test.
* @return the blob storage operation context.
*/
OperationContext getBlobOpContext() {
return blobOpContext;
}

/**
* Update the appropriate error metrics corresponding to the thrown exception.
* @param e the exception thrown.
Expand Down
Loading

0 comments on commit 1411228

Please sign in to comment.