Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure batch deletes #1365

Merged
merged 10 commits into from
Feb 4, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata cloudBlobM
List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindToken findToken, long maxTotalSizeOfEntries)
throws CloudStorageException;

/**
* Permanently delete the specified blob in the cloud destination.
* @param blobMetadata the {@link CloudBlobMetadata} referencing the blob to purge.
* @return flag indicating whether the blob was successfully purged.
* @throws CloudStorageException if the purge operation fails.
*/
boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageException;

/**
* Permanently delete the specified blobs in the cloud destination.
* @param blobMetadataList the list of {@link CloudBlobMetadata} referencing the blobs to purge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Configuration;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobRequestConditions;
Expand All @@ -31,13 +35,17 @@
import com.github.ambry.cloud.CloudBlobMetadata;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.CloudConfig;
import com.google.common.collect.Lists;
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -55,12 +63,15 @@ public class AzureBlobDataAccessor {
private static final Logger logger = LoggerFactory.getLogger(AzureBlobDataAccessor.class);
private static final String SEPARATOR = "-";
private final BlobServiceClient storageClient;
private final BlobBatchClient blobBatchClient;
private final Configuration storageConfiguration;
private final AzureMetrics azureMetrics;
private final String clusterName;
// Containers known to exist in the storage account
private final Set<String> knownContainers = ConcurrentHashMap.newKeySet();
private ProxyOptions proxyOptions;
// TODO: add to AzureCloudConfig
private int purgeBatchSize = 100;
lightningrob marked this conversation as resolved.
Show resolved Hide resolved

/**
* Production constructor
Expand Down Expand Up @@ -89,19 +100,23 @@ public class AzureBlobDataAccessor {
.retryOptions(requestRetryOptions)
.configuration(storageConfiguration)
.buildClient();
blobBatchClient = new BlobBatchClientBuilder(storageClient).buildClient();
}

/**
* Test constructor
* @param storageClient the {@link BlobServiceClient} to use.
* @param blobBatchClient the {@link BlobBatchClient} to use.
* @param clusterName the cluster name to use.
* @param azureMetrics the {@link AzureMetrics} to use.
*/
AzureBlobDataAccessor(BlobServiceClient storageClient, String clusterName, AzureMetrics azureMetrics) {
AzureBlobDataAccessor(BlobServiceClient storageClient, BlobBatchClient blobBatchClient, String clusterName,
AzureMetrics azureMetrics) {
this.storageClient = storageClient;
this.storageConfiguration = new Configuration();
this.clusterName = clusterName;
this.azureMetrics = azureMetrics;
this.blobBatchClient = blobBatchClient;
}

/**
Expand Down Expand Up @@ -302,13 +317,47 @@ public boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value)
/**
* Permanently delete the specified blobs in Azure storage.
* @param blobMetadataList the list of {@link CloudBlobMetadata} referencing the blobs to purge.
* @return the number of blobs successfully purged.
* @return list of {@link CloudBlobMetadata} referencing the blobs successfully purged.
* @throws BlobStorageException if the purge operation fails.
*/
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws BlobStorageException {
// TODO: use batch api to delete all
// https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/storage/azure-storage-blob-batch
throw new UnsupportedOperationException("Not yet implemented");
public List<CloudBlobMetadata> purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws BlobStorageException {

// Per docs.microsoft.com/en-us/rest/api/storageservices/blob-batch, must use batch size <= 256
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
List<CloudBlobMetadata> deletedBlobs = new ArrayList<>();
List<List<CloudBlobMetadata>> partitionedLists = Lists.partition(blobMetadataList, purgeBatchSize);
for (List<CloudBlobMetadata> someBlobs : partitionedLists) {
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
BlobBatch blobBatch = blobBatchClient.getBlobBatch();
List<Response<Void>> responseList = new ArrayList<>();
for (CloudBlobMetadata blobMetadata : someBlobs) {
String containerName = getAzureContainerName(blobMetadata);
String blobName = getAzureBlobName(blobMetadata);
responseList.add(blobBatch.deleteBlob(containerName, blobName));
}
blobBatchClient.submitBatchWithResponse(blobBatch, false, Duration.ofHours(1), Context.NONE);
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
for (int j = 0; j < responseList.size(); j++) {
Response<Void> response = responseList.get(j);
CloudBlobMetadata blobMetadata = someBlobs.get(j);
// Note: Response.getStatusCode() throws exception on any error.
int statusCode;
try {
statusCode = response.getStatusCode();
} catch (BlobStorageException bex) {
statusCode = bex.getStatusCode();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the statuscode returned by exception be one of OK, ACCEPTED, NOT_FOUND or GONE? If not, then maybe we can move the try catch block to include switch as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They throw exception if the http status is not the "expected" value, which in this case is Accepted.

}
switch (statusCode) {
case HttpURLConnection.HTTP_OK:
case HttpURLConnection.HTTP_ACCEPTED:
case HttpURLConnection.HTTP_NOT_FOUND:
case HttpURLConnection.HTTP_GONE:
// blob was deleted or already gone
deletedBlobs.add(blobMetadata);
break;
default:
logger.error("Deleting blob {} got status {}", blobMetadata.getId(), statusCode);
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return deletedBlobs;
}

/**
Expand Down Expand Up @@ -411,7 +460,7 @@ String getAzureBlobName(CloudBlobMetadata blobMetadata) {
* @param blobIdStr The blobId string.
* @return An Azure-friendly blob name.
*/
private String getAzureBlobName(String blobIdStr) {
String getAzureBlobName(String blobIdStr) {
// Use the last four chars as prefix to assist in Azure sharding, since beginning of blobId has little variation.
return blobIdStr.substring(blobIdStr.length() - 4) + SEPARATOR + blobIdStr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.cloud.azure;

import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.models.BlobStorageException;
import com.codahale.metrics.Timer;
import com.github.ambry.cloud.CloudBlobMetadata;
Expand Down Expand Up @@ -132,9 +133,10 @@ class AzureCloudDestination implements CloudDestination {
* @param clusterName the name of the Ambry cluster.
* @param azureMetrics the {@link AzureMetrics} to use.
*/
AzureCloudDestination(BlobServiceClient storageClient, AsyncDocumentClient asyncDocumentClient,
String cosmosCollectionLink, String clusterName, AzureMetrics azureMetrics) {
this.azureBlobDataAccessor = new AzureBlobDataAccessor(storageClient, clusterName, azureMetrics);
AzureCloudDestination(BlobServiceClient storageClient, BlobBatchClient blobBatchClient,
AsyncDocumentClient asyncDocumentClient, String cosmosCollectionLink, String clusterName,
AzureMetrics azureMetrics) {
this.azureBlobDataAccessor = new AzureBlobDataAccessor(storageClient, blobBatchClient, clusterName, azureMetrics);
this.asyncDocumentClient = asyncDocumentClient;
this.azureMetrics = azureMetrics;
this.clusterName = clusterName;
Expand Down Expand Up @@ -204,6 +206,7 @@ public Map<String, CloudBlobMetadata> getBlobMetadata(List<BlobId> blobIds) thro
return Collections.emptyMap();
}

// TODO: For single blob GET request, get metadata from ABS
// CosmosDB has query size limit of 256k chars.
// Break list into chunks if necessary to avoid overflow.
List<CloudBlobMetadata> metadataList;
Expand Down Expand Up @@ -349,51 +352,48 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value
}

@Override
public boolean purgeBlob(CloudBlobMetadata blobMetadata) throws CloudStorageException {
String blobId = blobMetadata.getId();
String blobFileName = azureBlobDataAccessor.getAzureBlobName(blobMetadata);
String containerName = azureBlobDataAccessor.getAzureContainerName(blobMetadata);
String partitionPath = blobMetadata.getPartitionId();
azureMetrics.blobDeleteRequestCount.inc();
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws CloudStorageException {
if (blobMetadataList.isEmpty()) {
return 0;
}
azureMetrics.blobDeleteRequestCount.inc(blobMetadataList.size());
Timer.Context deleteTimer = azureMetrics.blobDeletionTime.time();
try {
// delete blob from storage
boolean deletionDone = azureBlobDataAccessor.deleteFile(containerName, blobFileName);

// Delete the document too
try {
cosmosDataAccessor.deleteMetadata(blobMetadata);
deletionDone = true;
logger.debug("Purged blob {} from partition {}.", blobId, partitionPath);
} catch (DocumentClientException dex) {
if (dex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
logger.warn("Could not find metadata for blob {} to delete", blobId);
} else {
throw dex;
}
List<CloudBlobMetadata> deletedBlobs = azureBlobDataAccessor.purgeBlobs(blobMetadataList);
azureMetrics.blobDeletedCount.inc(deletedBlobs.size());
azureMetrics.blobDeleteErrorCount.inc(blobMetadataList.size() - deletedBlobs.size());

// Remove them from Cosmos too
for (CloudBlobMetadata blobMetadata : deletedBlobs) {
deleteFromCosmos(blobMetadata);
}
azureMetrics.blobDeletedCount.inc(deletionDone ? 1 : 0);
return deletionDone;
} catch (Exception e) {
azureMetrics.blobDeleteErrorCount.inc();
String error = (e instanceof DocumentClientException) ? "Failed to delete metadata document for blob " + blobId
: "Failed to delete blob " + blobId + ", storage path: " + containerName + "/" + blobFileName;
throw toCloudStorageException(error, e);
return deletedBlobs.size();
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception ex) {
azureMetrics.blobDeleteErrorCount.inc(blobMetadataList.size());
throw toCloudStorageException("Failed to purge all blobs", ex);
} finally {
deleteTimer.stop();
}
}

@Override
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws CloudStorageException {
int numPurged = 0;
for (CloudBlobMetadata blobMetadata : blobMetadataList) {
if (purgeBlob(blobMetadata)) {
numPurged++;
/**
* Delete a blob metadata record from Cosmos.
* @param blobMetadata the record to delete.
* @return {@code true} if the record was deleted, {@code false} if it was not found.
* @throws DocumentClientException
*/
private boolean deleteFromCosmos(CloudBlobMetadata blobMetadata) throws DocumentClientException {
try {
cosmosDataAccessor.deleteMetadata(blobMetadata);
return true;
} catch (DocumentClientException dex) {
if (dex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
logger.warn("Could not find metadata for blob {} to delete", blobMetadata.getId());
return false;
} else {
throw dex;
}
}
logger.info("Purged {} blobs", numPurged);
return numPurged;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,6 @@ public List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindT
return CloudBlobMetadata.capMetadataListBySize(entries, maxTotalSizeOfEntries);
}

@Override
public boolean purgeBlob(CloudBlobMetadata blobMetadata) {
return true;
}

@Override
public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package com.github.ambry.cloud.azure;

import com.azure.core.http.rest.Response;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobStorageException;
Expand All @@ -34,7 +37,9 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.codec.binary.Base64;
Expand All @@ -60,6 +65,7 @@ public class AzureBlobDataAccessorTest {
private Properties configProps = new Properties();
private AzureBlobDataAccessor dataAccessor;
private BlockBlobClient mockBlockBlobClient;
private BlobBatchClient mockBatchClient;
private AzureMetrics azureMetrics;
private int blobSize = 1024;
byte dataCenterId = 66;
Expand All @@ -76,6 +82,7 @@ public void setup() throws Exception {

BlobServiceClient mockServiceClient = mock(BlobServiceClient.class);
mockBlockBlobClient = setupMockBlobClient(mockServiceClient);
mockBatchClient = mock(BlobBatchClient.class);

mockBlobExistence(false);

Expand All @@ -92,7 +99,7 @@ public void setup() throws Exception {
configProps.setProperty("clustermap.datacenter.name", "uswest");
configProps.setProperty("clustermap.host.name", "localhost");
azureMetrics = new AzureMetrics(new MetricRegistry());
dataAccessor = new AzureBlobDataAccessor(mockServiceClient, clusterName, azureMetrics);
dataAccessor = new AzureBlobDataAccessor(mockServiceClient, mockBatchClient, clusterName, azureMetrics);
}

static BlockBlobClient setupMockBlobClient(BlobServiceClient mockServiceClient) {
Expand Down Expand Up @@ -151,6 +158,40 @@ public void testExpire() throws Exception {
assertTrue("Expected success", dataAccessor.updateBlobMetadata(blobId, "expirationTime", expirationTime));
}

/** Test purge */
@Test
public void testPurge() throws Exception {
// purge 3 blobs, response status (202, 404, 503)
String blobNameOkStatus = "andromeda";
String blobNameNotFoundStatus = "sirius";
String blobNameErrorStatus = "mutant";
BlobBatch mockBatch = mock(BlobBatch.class);
when(mockBatchClient.getBlobBatch()).thenReturn(mockBatch);
Response<Void> okResponse = mock(Response.class);
when(okResponse.getStatusCode()).thenReturn(202);
when(mockBatch.deleteBlob(anyString(), eq(dataAccessor.getAzureBlobName(blobNameOkStatus)))).thenReturn(okResponse);
BlobStorageException notFoundException = mock(BlobStorageException.class);
when(notFoundException.getStatusCode()).thenReturn(404);
Response<Void> notFoundResponse = mock(Response.class);
when(notFoundResponse.getStatusCode()).thenThrow(notFoundException);
when(mockBatch.deleteBlob(anyString(), eq(dataAccessor.getAzureBlobName(blobNameNotFoundStatus)))).thenReturn(
notFoundResponse);
BlobStorageException badException = mock(BlobStorageException.class);
when(badException.getStatusCode()).thenReturn(503);
Response<Void> badResponse = mock(Response.class);
when(badResponse.getStatusCode()).thenThrow(badException);
when(mockBatch.deleteBlob(anyString(), eq(dataAccessor.getAzureBlobName(blobNameErrorStatus)))).thenReturn(
badResponse);
List<CloudBlobMetadata> purgeList = new ArrayList<>();
purgeList.add(new CloudBlobMetadata().setId(blobNameOkStatus));
purgeList.add(new CloudBlobMetadata().setId(blobNameNotFoundStatus));
purgeList.add(new CloudBlobMetadata().setId(blobNameErrorStatus));
List<CloudBlobMetadata> purgeResponseList = dataAccessor.purgeBlobs(purgeList);
assertEquals("Wrong response size", 2, purgeResponseList.size());
assertEquals("Wrong blob name", blobNameOkStatus, purgeResponseList.get(0).getId());
assertEquals("Wrong blob name", blobNameNotFoundStatus, purgeResponseList.get(1).getId());
}

/** Test initializing with a proxy */
@Test
public void testProxy() throws Exception {
Expand Down
Loading