Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Azure storage v12 into AzureCloudDestination #1324

Merged
merged 7 commits into from
Dec 7, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,6 @@ List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindToken fi
*/
int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws CloudStorageException;

/**
* Checks whether the blob exists in the cloud destination.
* @param blobId id of the Ambry blob to check.
* @return {@code true} if the blob exists, otherwise {@code false}.
* @throws CloudStorageException if the existence check encounters an error.
*/
boolean doesBlobExist(BlobId blobId) throws CloudStorageException;

/**
* Upload and persist the replica tokens for the specified Ambry partition in cloud storage.
* @param partitionPath the string form of the partitionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.github.ambry.cloud.azure;

import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.util.Configuration;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobContainerClient;
Expand All @@ -32,8 +35,8 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -52,10 +55,12 @@ public class AzureBlobDataAccessor {
private static final Logger logger = LoggerFactory.getLogger(AzureBlobDataAccessor.class);
private static final String SEPARATOR = "-";
private final BlobServiceClient storageClient;
private final Configuration storageConfiguration;
private final AzureMetrics azureMetrics;
private final String clusterName;
// Containers known to exist in the storage account
private final Set<String> knownContainers = ConcurrentHashMap.newKeySet();
private ProxyOptions proxyOptions;

/**
* Production constructor
Expand All @@ -68,23 +73,19 @@ public class AzureBlobDataAccessor {
AzureMetrics azureMetrics) {
this.clusterName = clusterName;
this.azureMetrics = azureMetrics;
Configuration storageConfiguration = new Configuration();
// Check for proxy
if (cloudConfig.vcrProxyHost != null) {
this.storageConfiguration = new Configuration();
// Check for network proxy
proxyOptions = (cloudConfig.vcrProxyHost == null) ? null : new ProxyOptions(ProxyOptions.Type.HTTP,
new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort));
if (proxyOptions != null) {
logger.info("Using proxy: {}:{}", cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort);
try {
// TODO: could add vcrProxyScheme to CloudConfig
URL proxyUrl = new URL("http", cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort, "/");
storageConfiguration.put(Configuration.PROPERTY_HTTPS_PROXY, proxyUrl.toString());
storageConfiguration.put(Configuration.PROPERTY_HTTP_PROXY, proxyUrl.toString());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Bad proxy info");
}
}
HttpClient client = new NettyAsyncHttpClientBuilder().proxy(proxyOptions).build();

// TODO: set retry options depending on whether we are serving live data or replicating
// TODO: may want to set different retry options depending on live serving or replication mode
RequestRetryOptions requestRetryOptions = new RequestRetryOptions();
storageClient = new BlobServiceClientBuilder().connectionString(azureCloudConfig.azureStorageConnectionString)
.httpClient(client)
.retryOptions(requestRetryOptions)
.configuration(storageConfiguration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storageConfiguration appears to now be empty. does it still have to be passed into the builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally was setting something in it and removed it. Figured I'd keep it as a placeholder in case we need it later.

.buildClient();
Expand All @@ -98,10 +99,27 @@ public class AzureBlobDataAccessor {
*/
AzureBlobDataAccessor(BlobServiceClient storageClient, String clusterName, AzureMetrics azureMetrics) {
this.storageClient = storageClient;
this.storageConfiguration = new Configuration();
this.clusterName = clusterName;
this.azureMetrics = azureMetrics;
}

/**
* Test utility.
* @return the underlying {@link BlobServiceClient}.
*/
BlobServiceClient getStorageClient() {
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
return storageClient;
}

/**
* Test utility.
* @return the network {@link ProxyOptions} used to connect to ABS.
*/
ProxyOptions getProxyOptions() {
return proxyOptions;
}

/**
* Upload the blob to Azure storage if it does not already exist in the designated container.
* @param blobId the blobId to upload
Expand All @@ -118,7 +136,7 @@ public boolean uploadIfNotExists(BlobId blobId, long inputLength, CloudBlobMetad
azureMetrics.blobUploadRequestCount.inc();
Timer.Context storageTimer = azureMetrics.blobUploadTime.time();
try {
BlockBlobClient blobClient = getAzureBlobReference(blobId, true);
BlockBlobClient blobClient = getBlockBlobClient(blobId, true);
cloudBlobMetadata.setCloudBlobName(getAzureBlobName(blobId));
Map<String, String> metadata = getMetadataMap(cloudBlobMetadata);
blobClient.uploadWithResponse(blobInputStream, inputLength, null, metadata, null, null, blobRequestConditions,
Expand All @@ -142,24 +160,105 @@ public boolean uploadIfNotExists(BlobId blobId, long inputLength, CloudBlobMetad
}
}

/**
* Upload a file to blob storage.
* @param containerName name of the container where blob is stored.
* @param fileName the blob filename.
* @param inputStream the input stream to use for upload.
* @throws BlobStorageException for any error on ABS side.
* @throws IOException for any error with supplied data stream.
*/
public void uploadFile(String containerName, String fileName, InputStream inputStream)
throws BlobStorageException, IOException {
try {
BlobContainerClient containerClient = getContainer(containerName, true);
BlockBlobClient blobClient = containerClient.getBlobClient(fileName).getBlockBlobClient();
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
blobClient.uploadWithResponse(inputStream, inputStream.available(), null, null, null, null, null, null,
Context.NONE);
} catch (UncheckedIOException e) {
// error processing input stream
throw e.getCause();
}
}

/**
* Delete a file from blob storage.
* @param containerName name of the container containing blob to delete.
* @param fileName name of the blob.
* @return {@code true} if the deletion was successful, {@code false} if the blob was not found.
* @throws BlobStorageException for any error on ABS side.
*/
public boolean deleteFile(String containerName, String fileName) throws BlobStorageException {
try {
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
BlockBlobClient blobClient = containerClient.getBlobClient(fileName).getBlockBlobClient();
blobClient.delete();
return true;
} catch (BlobStorageException e) {
if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) {
return false;
} else {
throw e;
}
}
}

/**
* Download a file from blob storage.
* @param containerName name of the container containing blob to download.
* @param fileName name of the blob.
* @param outputStream the output stream to use for download.
* @param errorOnNotFound If {@code true}, throw BlobStorageException on blob not found, otherwise return false.
* @return {@code true} if the download was successful, {@code false} if the blob was not found.
* @throws BlobStorageException for any error on ABS side.
* @throws UncheckedIOException for any error with supplied data stream.
*/
public boolean downloadFile(String containerName, String fileName, OutputStream outputStream, boolean errorOnNotFound)
throws BlobStorageException {
try {
BlockBlobClient blobClient = getBlockBlobClient(containerName, fileName, false);
blobClient.download(outputStream);
return true;
} catch (BlobStorageException e) {
if (!errorOnNotFound && e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) {
return false;
} else {
throw e;
}
}
}

/**
* Perform basic connectivity test.
*/
void testConnectivity() {
try {
// TODO: Turn on verbose logging during this call (how to do in v12?)
storageClient.getBlobContainerClient("partition-0").existsWithResponse(Duration.ofSeconds(1), Context.NONE);
logger.info("Blob storage connection test succeeded.");
} catch (BlobStorageException ex) {
throw new IllegalStateException("Blob storage connection test failed", ex);
}
}

/**
* Download the blob from Azure storage.
* @param blobId id of the Ambry blob to be downloaded
* @param outputStream outputstream to populate the downloaded data with
* @throws BlobStorageException on Azure side error.
* @throws IOException on error writing to the output stream.
* @throws UncheckedIOException on error writing to the output stream.
*/
public void downloadBlob(BlobId blobId, OutputStream outputStream) throws BlobStorageException, IOException {
public void downloadBlob(BlobId blobId, OutputStream outputStream) throws BlobStorageException {
azureMetrics.blobDownloadRequestCount.inc();
Timer.Context storageTimer = azureMetrics.blobDownloadTime.time();
try {
BlockBlobClient blobClient = getAzureBlobReference(blobId, false);
blobClient.download(outputStream);
String containerName = getAzureContainerName(blobId.getPartition().toPathString());
String blobName = getAzureBlobName(blobId);
downloadFile(containerName, blobName, outputStream, true);
azureMetrics.blobDownloadSuccessCount.inc();
} catch (UncheckedIOException e) {
// error processing input stream
} catch (Exception e) {
azureMetrics.blobDownloadErrorCount.inc();
throw e.getCause();
throw e;
} finally {
storageTimer.stop();
}
Expand All @@ -178,11 +277,7 @@ public boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value)
Objects.requireNonNull(fieldName, "Field name cannot be null");

try {
BlockBlobClient blobClient = getAzureBlobReference(blobId, false);
if (!blobClient.exists()) {
logger.debug("Blob {} not found.", blobId);
return false;
}
BlockBlobClient blobClient = getBlockBlobClient(blobId, false);
Timer.Context storageTimer = azureMetrics.blobUpdateTime.time();
try {
BlobProperties blobProperties = blobClient.getProperties();
Expand All @@ -205,6 +300,9 @@ public boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value)
storageTimer.stop();
}
} catch (BlobStorageException e) {
if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) {
return false;
}
if (e.getErrorCode() == BlobErrorCode.CONDITION_NOT_MET) {
// TODO: blob was updated (race condition), retry the update
}
Expand All @@ -225,25 +323,36 @@ public int purgeBlobs(List<CloudBlobMetadata> blobMetadataList) throws BlobStora
}

/**
* Get the azure blob reference for blobid.
* @param blobId id of the blob for which {@code CloudBlockBlob} reference is asked for.
* Get the block blob client for the supplied blobid.
* @param blobId id of the blob for which {@code BlockBlobClient} is needed.
* @param autoCreateContainer flag indicating whether to create the container if it does not exist.
* @return {@code BlockBlobClient} reference.
*/
private BlockBlobClient getAzureBlobReference(BlobId blobId, boolean autoCreateContainer) {
BlobContainerClient containerClient = getContainer(blobId, autoCreateContainer);
String azureBlobName = getAzureBlobName(blobId);
return containerClient.getBlobClient(azureBlobName).getBlockBlobClient();
private BlockBlobClient getBlockBlobClient(BlobId blobId, boolean autoCreateContainer) {
String containerName = getAzureContainerName(blobId.getPartition().toPathString());
String blobName = getAzureBlobName(blobId);
return getBlockBlobClient(containerName, blobName, autoCreateContainer);
}

/**
* Get the block blob client for the supplied Azure container and blob name.
* @param containerName name of the Azure container where the blob lives.
* @param blobName name of the blob.
* @param autoCreateContainer flag indicating whether to create the container if it does not exist.
* @return {@code BlockBlobClient} reference.
*/
private BlockBlobClient getBlockBlobClient(String containerName, String blobName, boolean autoCreateContainer) {
BlobContainerClient containerClient = getContainer(containerName, autoCreateContainer);
return containerClient.getBlobClient(blobName).getBlockBlobClient();
}

/**
* Get an Azure container to place the specified {@link BlobId}.
* @param blobId the {@link BlobId} that needs a container.
* Get a reference to an Azure container, creating it if necessary.
* @param containerName the container name.
* @param autoCreate flag indicating whether to create the container if it does not exist.
* @return the created {@link BlobContainerClient}.
*/
private BlobContainerClient getContainer(BlobId blobId, boolean autoCreate) {
String containerName = getAzureContainerName(blobId.getPartition().toPathString());
private BlobContainerClient getContainer(String containerName, boolean autoCreate) {
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
if (autoCreate) {
if (!knownContainers.contains(containerName)) {
Expand Down
Loading