Skip to content

Commit

Permalink
Azure batch deletes (#1365)
Browse files Browse the repository at this point in the history
Use the batch deletion API for deleting multiple blobs from azure blob storage.
  • Loading branch information
lightningrob authored Feb 4, 2020
1 parent c40eaf2 commit db13bc3
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata cloudBlobM
List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindToken findToken, long maxTotalSizeOfEntries)
throws CloudStorageException;

/**
* Permanently delete the specified blob in the cloud destination.
* @param blobMetadata the {@link CloudBlobMetadata} referencing the blob to purge.
* @return flag indicating whether the blob was successfully purged.
* @throws CloudStorageException if the purge operation fails.
*/
boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageException;

/**
* Permanently delete the specified blobs in the cloud destination.
* @param blobMetadataList the list of {@link CloudBlobMetadata} referencing the blobs to purge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Configuration;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobRequestConditions;
Expand All @@ -36,8 +40,10 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -54,13 +60,16 @@ public class AzureBlobDataAccessor {

private static final Logger logger = LoggerFactory.getLogger(AzureBlobDataAccessor.class);
private static final String SEPARATOR = "-";
private static final int batchPurgeTimeoutSec = 60;
private final BlobServiceClient storageClient;
private final BlobBatchClient blobBatchClient;
private final Configuration storageConfiguration;
private final AzureMetrics azureMetrics;
private final String clusterName;
// Containers known to exist in the storage account
private final Set<String> knownContainers = ConcurrentHashMap.newKeySet();
private ProxyOptions proxyOptions;
private final int purgeBatchSize;

/**
* Production constructor
Expand All @@ -73,6 +82,7 @@ public class AzureBlobDataAccessor {
AzureMetrics azureMetrics) {
this.clusterName = clusterName;
this.azureMetrics = azureMetrics;
this.purgeBatchSize = azureCloudConfig.azurePurgeBatchSize;
this.storageConfiguration = new Configuration();
// Check for network proxy
proxyOptions = (cloudConfig.vcrProxyHost == null) ? null : new ProxyOptions(ProxyOptions.Type.HTTP,
Expand All @@ -89,19 +99,24 @@ public class AzureBlobDataAccessor {
.retryOptions(requestRetryOptions)
.configuration(storageConfiguration)
.buildClient();
blobBatchClient = new BlobBatchClientBuilder(storageClient).buildClient();
}

/**
* Test constructor
* @param storageClient the {@link BlobServiceClient} to use.
* @param blobBatchClient the {@link BlobBatchClient} to use.
* @param clusterName the cluster name to use.
* @param azureMetrics the {@link AzureMetrics} to use.
*/
AzureBlobDataAccessor(BlobServiceClient storageClient, String clusterName, AzureMetrics azureMetrics) {
AzureBlobDataAccessor(BlobServiceClient storageClient, BlobBatchClient blobBatchClient, String clusterName,
AzureMetrics azureMetrics) {
this.storageClient = storageClient;
this.storageConfiguration = new Configuration();
this.clusterName = clusterName;
this.azureMetrics = azureMetrics;
this.blobBatchClient = blobBatchClient;
this.purgeBatchSize = AzureCloudConfig.DEFAULT_PURGE_BATCH_SIZE;
}

/**
Expand Down Expand Up @@ -302,13 +317,47 @@ public boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value)
/**
* Permanently delete the specified blobs in Azure storage.
* @param blobMetadataList the list of {@link CloudBlobMetadata} referencing the blobs to purge.
* @return the number of blobs successfully purged.
* @return list of {@link CloudBlobMetadata} referencing the blobs successfully purged.
* @throws BlobStorageException if the purge operation fails.
* @throws RuntimeException if the request times out before a response is received.
*/
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws BlobStorageException {
// TODO: use batch api to delete all
// https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/storage/azure-storage-blob-batch
throw new UnsupportedOperationException("Not yet implemented");
public List<CloudBlobMetadata> purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws BlobStorageException {

List<CloudBlobMetadata> deletedBlobs = new ArrayList<>();
List<List<CloudBlobMetadata>> partitionedLists = Utils.partitionList(blobMetadataList, purgeBatchSize);
for (List<CloudBlobMetadata> batchOfBlobs : partitionedLists) {
BlobBatch blobBatch = blobBatchClient.getBlobBatch();
List<Response<Void>> responseList = new ArrayList<>();
for (CloudBlobMetadata blobMetadata : batchOfBlobs) {
String containerName = getAzureContainerName(blobMetadata);
String blobName = getAzureBlobName(blobMetadata);
responseList.add(blobBatch.deleteBlob(containerName, blobName));
}
blobBatchClient.submitBatchWithResponse(blobBatch, false, Duration.ofSeconds(batchPurgeTimeoutSec), Context.NONE);
for (int j = 0; j < responseList.size(); j++) {
Response<Void> response = responseList.get(j);
CloudBlobMetadata blobMetadata = batchOfBlobs.get(j);
// Note: Response.getStatusCode() throws exception on any error.
int statusCode;
try {
statusCode = response.getStatusCode();
} catch (BlobStorageException bex) {
statusCode = bex.getStatusCode();
}
switch (statusCode) {
case HttpURLConnection.HTTP_OK:
case HttpURLConnection.HTTP_ACCEPTED:
case HttpURLConnection.HTTP_NOT_FOUND:
case HttpURLConnection.HTTP_GONE:
// blob was deleted or already gone
deletedBlobs.add(blobMetadata);
break;
default:
logger.error("Deleting blob {} got status {}", blobMetadata.getId(), statusCode);
}
}
}
return deletedBlobs;
}

/**
Expand Down Expand Up @@ -406,12 +455,13 @@ String getAzureBlobName(CloudBlobMetadata blobMetadata) {
}

// TODO: add AzureBlobNameConverter with versioning, this scheme being version 0

/**
* Get the blob name to use in Azure Blob Storage
* @param blobIdStr The blobId string.
* @return An Azure-friendly blob name.
*/
private String getAzureBlobName(String blobIdStr) {
String getAzureBlobName(String blobIdStr) {
// Use the last four chars as prefix to assist in Azure sharding, since beginning of blobId has little variation.
return blobIdStr.substring(blobIdStr.length() - 4) + SEPARATOR + blobIdStr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ public class AzureCloudConfig {
public static final String COSMOS_KEY = "cosmos.key";
public static final String COSMOS_MAX_RETRIES = "cosmos.max.retries";
public static final String COSMOS_DIRECT_HTTPS = "cosmos.direct.https";
public static final String AZURE_PURGE_BATCH_SIZE = "azure.purge.batch.size";
public static final int DEFAULT_COSMOS_MAX_RETRIES = 5;
// Per docs.microsoft.com/en-us/rest/api/storageservices/blob-batch
public static final int MAX_PURGE_BATCH_SIZE = 256;
public static final int DEFAULT_PURGE_BATCH_SIZE = 100;

/**
* The Azure Blob Storage connection string.
Expand Down Expand Up @@ -62,6 +66,9 @@ public class AzureCloudConfig {
@Default("5")
public final int cosmosMaxRetries;

@Config(AZURE_PURGE_BATCH_SIZE)
@Default("100")
public final int azurePurgeBatchSize;
// TODO: Add blobNamingSchemeVersion, containerNamingScheme

/**
Expand All @@ -78,6 +85,8 @@ public AzureCloudConfig(VerifiableProperties verifiableProperties) {
cosmosCollectionLink = verifiableProperties.getString(COSMOS_COLLECTION_LINK);
cosmosKey = verifiableProperties.getString(COSMOS_KEY);
cosmosMaxRetries = verifiableProperties.getInt(COSMOS_MAX_RETRIES, DEFAULT_COSMOS_MAX_RETRIES);
azurePurgeBatchSize =
verifiableProperties.getIntInRange(AZURE_PURGE_BATCH_SIZE, DEFAULT_PURGE_BATCH_SIZE, 1, MAX_PURGE_BATCH_SIZE);
cosmosDirectHttps = verifiableProperties.getBoolean(COSMOS_DIRECT_HTTPS, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.cloud.azure;

import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.models.BlobStorageException;
import com.codahale.metrics.Timer;
import com.github.ambry.cloud.CloudBlobMetadata;
Expand All @@ -22,6 +23,7 @@
import com.github.ambry.cloud.CloudStorageException;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.utils.Utils;
import com.microsoft.azure.cosmosdb.ConnectionMode;
import com.microsoft.azure.cosmosdb.ConnectionPolicy;
import com.microsoft.azure.cosmosdb.ConsistencyLevel;
Expand Down Expand Up @@ -132,9 +134,10 @@ class AzureCloudDestination implements CloudDestination {
* @param clusterName the name of the Ambry cluster.
* @param azureMetrics the {@link AzureMetrics} to use.
*/
AzureCloudDestination(BlobServiceClient storageClient, AsyncDocumentClient asyncDocumentClient,
String cosmosCollectionLink, String clusterName, AzureMetrics azureMetrics) {
this.azureBlobDataAccessor = new AzureBlobDataAccessor(storageClient, clusterName, azureMetrics);
AzureCloudDestination(BlobServiceClient storageClient, BlobBatchClient blobBatchClient,
AsyncDocumentClient asyncDocumentClient, String cosmosCollectionLink, String clusterName,
AzureMetrics azureMetrics) {
this.azureBlobDataAccessor = new AzureBlobDataAccessor(storageClient, blobBatchClient, clusterName, azureMetrics);
this.asyncDocumentClient = asyncDocumentClient;
this.azureMetrics = azureMetrics;
this.clusterName = clusterName;
Expand Down Expand Up @@ -204,24 +207,14 @@ public Map<String, CloudBlobMetadata> getBlobMetadata(List<BlobId> blobIds) thro
return Collections.emptyMap();
}

// TODO: For single blob GET request, get metadata from ABS
// CosmosDB has query size limit of 256k chars.
// Break list into chunks if necessary to avoid overflow.
List<CloudBlobMetadata> metadataList;
if (blobIds.size() > ID_QUERY_BATCH_SIZE) {
metadataList = new ArrayList<>();
for (int j = 0; j < blobIds.size() / ID_QUERY_BATCH_SIZE + 1; j++) {
int start = j * ID_QUERY_BATCH_SIZE;
if (start >= blobIds.size()) {
break;
}
int end = Math.min((j + 1) * ID_QUERY_BATCH_SIZE, blobIds.size());
List<BlobId> someBlobIds = blobIds.subList(start, end);
metadataList.addAll(getBlobMetadataChunked(someBlobIds));
}
} else {
metadataList = getBlobMetadataChunked(blobIds);
List<CloudBlobMetadata> metadataList = new ArrayList<>();
List<List<BlobId>> chunkedBlobIdList = Utils.partitionList(blobIds, ID_QUERY_BATCH_SIZE);
for (List<BlobId> batchOfBlobs : chunkedBlobIdList) {
metadataList.addAll(getBlobMetadataChunked(batchOfBlobs));
}

return metadataList.stream().collect(Collectors.toMap(CloudBlobMetadata::getId, Function.identity()));
}

Expand Down Expand Up @@ -349,51 +342,48 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value
}

@Override
public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageException {
String blobId = blobMetadata.getId();
String blobFileName = azureBlobDataAccessor.getAzureBlobName(blobMetadata);
String containerName = azureBlobDataAccessor.getAzureContainerName(blobMetadata);
String partitionPath = blobMetadata.getPartitionId();
azureMetrics.blobDeleteRequestCount.inc();
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws CloudStorageException {
if (blobMetadataList.isEmpty()) {
return 0;
}
azureMetrics.blobDeleteRequestCount.inc(blobMetadataList.size());
Timer.Context deleteTimer = azureMetrics.blobDeletionTime.time();
try {
// delete blob from storage
boolean deletionDone = azureBlobDataAccessor.deleteFile(containerName, blobFileName);

// Delete the document too
try {
cosmosDataAccessor.deleteMetadata(blobMetadata);
deletionDone = true;
logger.debug("Purged blob {} from partition {}.", blobId, partitionPath);
} catch (DocumentClientException dex) {
if (dex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
logger.warn("Could not find metadata for blob {} to delete", blobId);
} else {
throw dex;
}
List<CloudBlobMetadata> deletedBlobs = azureBlobDataAccessor.purgeBlobs(blobMetadataList);
azureMetrics.blobDeletedCount.inc(deletedBlobs.size());
azureMetrics.blobDeleteErrorCount.inc(blobMetadataList.size() - deletedBlobs.size());

// Remove them from Cosmos too
for (CloudBlobMetadata blobMetadata : deletedBlobs) {
deleteFromCosmos(blobMetadata);
}
azureMetrics.blobDeletedCount.inc(deletionDone ? 1 : 0);
return deletionDone;
} catch (Exception e) {
azureMetrics.blobDeleteErrorCount.inc();
String error = (e instanceof DocumentClientException) ? "Failed to delete metadata document for blob " + blobId
: "Failed to delete blob " + blobId + ", storage path: " + containerName + "/" + blobFileName;
throw toCloudStorageException(error, e);
return deletedBlobs.size();
} catch (Exception ex) {
azureMetrics.blobDeleteErrorCount.inc(blobMetadataList.size());
throw toCloudStorageException("Failed to purge all blobs", ex);
} finally {
deleteTimer.stop();
}
}

@Override
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws CloudStorageException {
int numPurged = 0;
for (CloudBlobMetadata blobMetadata : blobMetadataList) {
if (purgeBlob(blobMetadata)) {
numPurged++;
/**
* Delete a blob metadata record from Cosmos.
* @param blobMetadata the record to delete.
* @return {@code true} if the record was deleted, {@code false} if it was not found.
* @throws DocumentClientException
*/
private boolean deleteFromCosmos(CloudBlobMetadata blobMetadata) throws DocumentClientException {
try {
cosmosDataAccessor.deleteMetadata(blobMetadata);
return true;
} catch (DocumentClientException dex) {
if (dex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
logger.warn("Could not find metadata for blob {} to delete", blobMetadata.getId());
return false;
} else {
throw dex;
}
}
logger.info("Purged {} blobs", numPurged);
return numPurged;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,6 @@ public List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindT
return CloudBlobMetadata.capMetadataListBySize(entries, maxTotalSizeOfEntries);
}

@Override
public boolean purgeBlob(CloudBlobMetadata blobMetadata) {
return true;
}

@Override
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) {
return 0;
Expand Down
Loading

0 comments on commit db13bc3

Please sign in to comment.