diff --git a/plugin/trino-exchange-filesystem/pom.xml b/plugin/trino-exchange-filesystem/pom.xml
index bfabc9dd23bf..173a41e031f8 100644
--- a/plugin/trino-exchange-filesystem/pom.xml
+++ b/plugin/trino-exchange-filesystem/pom.xml
@@ -15,10 +15,18 @@
${project.parent.basedir}
2.17.151
+ 1.1.1
+
+ com.azure
+ azure-sdk-bom
+ pom
+ ${azurejavasdk.version}
+ import
+
software.amazon.awssdk
bom
@@ -65,6 +73,44 @@
units
+
+ com.azure
+ azure-core
+
+
+
+ com.azure
+ azure-identity
+
+
+ com.github.stephenc.jcip
+ jcip-annotations
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.projectlombok
+ lombok
+
+
+ net.java.dev.jna
+ jna-platform
+
+
+
+
+
+ com.azure
+ azure-storage-blob
+
+
+
+ com.azure
+ azure-storage-blob-batch
+
+
com.google.code.findbugs
jsr305
@@ -80,6 +126,12 @@
guice
+
+ io.projectreactor
+ reactor-core
+ 3.4.13
+
+
javax.annotation
javax.annotation-api
diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java
index c4c0bcbc8939..31ff81f82a7c 100644
--- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java
+++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java
@@ -17,6 +17,8 @@
import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.trino.plugin.exchange.filesystem.azure.AzureBlobFileSystemExchangeStorage;
+import io.trino.plugin.exchange.filesystem.azure.ExchangeAzureConfig;
import io.trino.plugin.exchange.filesystem.local.LocalFileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.s3.ExchangeS3Config;
import io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage;
@@ -57,6 +59,10 @@ else if (ImmutableSet.of("s3", "s3a", "s3n").contains(scheme)) {
binder.bind(FileSystemExchangeStorage.class).to(S3FileSystemExchangeStorage.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(ExchangeS3Config.class);
}
+ else if (ImmutableSet.of("abfs", "abfss").contains(scheme)) {
+ binder.bind(FileSystemExchangeStorage.class).to(AzureBlobFileSystemExchangeStorage.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(ExchangeAzureConfig.class);
+ }
else {
throw new TrinoException(NOT_SUPPORTED, format("Scheme %s is not supported as exchange spooling storage", scheme));
}
diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java
new file mode 100644
index 000000000000..e45471cb0cef
--- /dev/null
+++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.exchange.filesystem.azure;
+
+import com.azure.core.http.rest.PagedFlux;
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.http.rest.PagedResponse;
+import com.azure.core.util.BinaryData;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceAsyncClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchAsyncClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobRange;
+import com.azure.storage.blob.models.CustomerProvidedKey;
+import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.slice.SizeOf;
+import io.airlift.slice.Slice;
+import io.airlift.slice.SliceInput;
+import io.airlift.slice.Slices;
+import io.trino.plugin.exchange.filesystem.ExchangeSourceFile;
+import io.trino.plugin.exchange.filesystem.ExchangeStorageReader;
+import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
+import io.trino.plugin.exchange.filesystem.FileStatus;
+import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
+import org.openjdk.jol.info.ClassLayout;
+import reactor.core.publisher.Flux;
+
+import javax.annotation.PreDestroy;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+import javax.crypto.SecretKey;
+import javax.inject.Inject;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.nullToEmpty;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static io.airlift.concurrent.MoreFutures.asVoid;
+import static io.airlift.concurrent.MoreFutures.getFutureValue;
+import static io.airlift.concurrent.MoreFutures.toListenableFuture;
+import static io.airlift.slice.SizeOf.estimatedSizeOf;
+import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
+import static java.lang.Math.min;
+import static java.lang.Math.toIntExact;
+import static java.lang.System.arraycopy;
+import static java.util.Objects.requireNonNull;
+import static java.util.Objects.requireNonNullElseGet;
+
+public class AzureBlobFileSystemExchangeStorage
+ implements FileSystemExchangeStorage
+{
+ private final int blockSize;
+ private final BlobServiceClient blobServiceClient;
+ private final BlobServiceAsyncClient blobServiceAsyncClient;
+
+ @Inject
+ public AzureBlobFileSystemExchangeStorage(ExchangeAzureConfig config)
+ {
+ requireNonNull(config, "config is null");
+ this.blockSize = toIntExact(config.getAzureStorageBlockSize().toBytes());
+ Optional connectionString = config.getAzureStorageConnectionString();
+
+ BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder();
+ if (connectionString.isPresent()) {
+ blobServiceClientBuilder.connectionString(connectionString.get());
+ }
+ else {
+ blobServiceClientBuilder.credential(new DefaultAzureCredentialBuilder().build());
+ }
+
+ this.blobServiceClient = blobServiceClientBuilder.buildClient();
+ this.blobServiceAsyncClient = blobServiceClientBuilder.buildAsyncClient();
+ }
+
+ @Override
+ public void createDirectories(URI dir)
+ throws IOException
+ {
+ // Nothing to do for Azure
+ }
+
+ @Override
+ public ExchangeStorageReader createExchangeStorageReader(Queue sourceFiles, int maxPageStorageSize)
+ {
+ return new AzureExchangeStorageReader(blobServiceAsyncClient, sourceFiles, blockSize, maxPageStorageSize);
+ }
+
+ @Override
+ public ExchangeStorageWriter createExchangeStorageWriter(URI file, Optional secretKey)
+ {
+ String containerName = getContainerName(file);
+ String blobName = getPath(file);
+ BlockBlobAsyncClient blockBlobAsyncClient = blobServiceAsyncClient
+ .getBlobContainerAsyncClient(containerName)
+ .getBlobAsyncClient(blobName)
+ .getBlockBlobAsyncClient();
+ if (secretKey.isPresent()) {
+ blockBlobAsyncClient = blockBlobAsyncClient.getCustomerProvidedKeyAsyncClient(new CustomerProvidedKey(secretKey.get().getEncoded()));
+ }
+ return new AzureExchangeStorageWriter(blockBlobAsyncClient, blockSize);
+ }
+
+ @Override
+ public boolean exists(URI file)
+ throws IOException
+ {
+ return blobServiceClient.getBlobContainerClient(getContainerName(file)).getBlobClient(getPath(file)).exists();
+ }
+
+ @Override
+ public ListenableFuture createEmptyFile(URI file)
+ {
+ String containerName = getContainerName(file);
+ String blobName = getPath(file);
+ return asVoid(toListenableFuture(blobServiceAsyncClient
+ .getBlobContainerAsyncClient(containerName)
+ .getBlobAsyncClient(blobName)
+ .upload(BinaryData.fromString(""))
+ .toFuture()));
+ }
+
+ @Override
+ public ListenableFuture deleteRecursively(URI dir)
+ {
+ return asVoid(Futures.transformAsync(
+ toListenableFuture(listObjectsRecursively(dir).byPage().collectList().toFuture()),
+ pagedResponseList -> deleteObjects(getContainerName(dir), pagedResponseList),
+ directExecutor()));
+ }
+
+ @Override
+ public List listFiles(URI dir)
+ throws IOException
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ try {
+ for (BlobItem blobItem : listObjects(dir)) {
+ if (blobItem.isPrefix() != Boolean.TRUE) {
+ builder.add(new FileStatus(
+ new URI(dir.getScheme(), dir.getUserInfo(), dir.getHost(), -1, PATH_SEPARATOR + blobItem.getName(), null, dir.getFragment()).toString(),
+ blobItem.getProperties().getContentLength()));
+ }
+ }
+ }
+ catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public List listDirectories(URI dir)
+ throws IOException
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ try {
+ for (BlobItem blobItem : listObjects(dir)) {
+ if (blobItem.isPrefix() == Boolean.TRUE) {
+ builder.add(new URI(dir.getScheme(), dir.getUserInfo(), dir.getHost(), -1, PATH_SEPARATOR + blobItem.getName(), null, dir.getFragment()));
+ }
+ }
+ }
+ catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public int getWriteBufferSize()
+ {
+ return blockSize;
+ }
+
+ @PreDestroy
+ @Override
+ public void close()
+ throws IOException
+ {
+ }
+
+ private PagedIterable listObjects(URI dir)
+ {
+ checkArgument(isDirectory(dir), "listObjects called on file uri %s", dir);
+
+ String containerName = getContainerName(dir);
+ String directoryPath = getPath(dir);
+ if (!directoryPath.isEmpty()) {
+ directoryPath += PATH_SEPARATOR;
+ }
+
+ return blobServiceClient.getBlobContainerClient(containerName).listBlobsByHierarchy(PATH_SEPARATOR, (new ListBlobsOptions()).setPrefix(directoryPath), null);
+ }
+
+ private PagedFlux listObjectsRecursively(URI dir)
+ {
+ checkArgument(isDirectory(dir), "listObjectsRecursively called on file uri %s", dir);
+
+ String containerName = getContainerName(dir);
+ String directoryPath = getPath(dir);
+
+ return blobServiceAsyncClient
+ .getBlobContainerAsyncClient(containerName)
+ // deleteBlobs can delete at most 256 blobs at a time
+ .listBlobsByHierarchy(null, (new ListBlobsOptions()).setPrefix(directoryPath).setMaxResultsPerPage(256));
+ }
+
+ private ListenableFuture> deleteObjects(String containerName, List> pageBlobItems)
+ {
+ BlobBatchAsyncClient blobBatchAsyncClient = new BlobBatchClientBuilder(blobServiceClient).buildAsyncClient();
+ BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(containerName);
+
+ return Futures.allAsList(pageBlobItems.stream().map(pageBlobItem -> {
+ if (pageBlobItem.getValue().isEmpty()) {
+ return immediateVoidFuture();
+ }
+ ImmutableList.Builder builder = ImmutableList.builder();
+ pageBlobItem.getValue().forEach(blobItem -> builder.add(blobContainerClient.getBlobClient(blobItem.getName()).getBlobUrl()));
+ return toListenableFuture(blobBatchAsyncClient.deleteBlobs(builder.build(), DeleteSnapshotsOptionType.INCLUDE).then().toFuture());
+ }).collect(toImmutableList()));
+ }
+
+ // URI format: abfs[s]://@.dfs.core.windows.net//
+ private static String getContainerName(URI uri)
+ {
+ return uri.getUserInfo();
+ }
+
+ private static String getPath(URI uri)
+ {
+ checkArgument(uri.isAbsolute(), "Uri is not absolute: %s", uri);
+ String blobName = nullToEmpty(uri.getPath());
+ if (blobName.startsWith(PATH_SEPARATOR)) {
+ blobName = blobName.substring(PATH_SEPARATOR.length());
+ }
+ if (blobName.endsWith(PATH_SEPARATOR)) {
+ blobName = blobName.substring(0, blobName.length() - PATH_SEPARATOR.length());
+ }
+ return blobName;
+ }
+
+ private static boolean isDirectory(URI uri)
+ {
+ return uri.toString().endsWith(PATH_SEPARATOR);
+ }
+
+ @ThreadSafe
+ private static class AzureExchangeStorageReader
+ implements ExchangeStorageReader
+ {
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(AzureExchangeStorageReader.class).instanceSize();
+
+ private final BlobServiceAsyncClient blobServiceAsyncClient;
+ private final Queue sourceFiles;
+ private final int blockSize;
+ private final int bufferSize;
+
+ @GuardedBy("this")
+ private ExchangeSourceFile currentFile;
+ @GuardedBy("this")
+ private long fileOffset;
+ @GuardedBy("this")
+ private SliceInput sliceInput;
+ @GuardedBy("this")
+ private int sliceSize = -1;
+ private volatile boolean closed;
+ private volatile long bufferRetainedSize;
+ private volatile ListenableFuture inProgressReadFuture = immediateVoidFuture();
+
+ public AzureExchangeStorageReader(
+ BlobServiceAsyncClient blobServiceAsyncClient,
+ Queue sourceFiles,
+ int blockSize,
+ int maxPageStorageSize)
+ {
+ this.blobServiceAsyncClient = requireNonNull(blobServiceAsyncClient, "blobServiceAsyncClient is null");
+ this.sourceFiles = requireNonNull(sourceFiles, "sourceFiles is null");
+ this.blockSize = blockSize;
+ // Make sure buffer can accommodate at least one complete Slice, and keep reads aligned to block boundaries
+ this.bufferSize = maxPageStorageSize + blockSize;
+
+ // Safe publication of S3ExchangeStorageReader is required as it's a mutable class
+ fillBuffer();
+ }
+
+ @Override
+ public synchronized Slice read()
+ throws IOException
+ {
+ if (closed || !inProgressReadFuture.isDone()) {
+ return null;
+ }
+
+ try {
+ getFutureValue(inProgressReadFuture);
+ }
+ catch (RuntimeException e) {
+ throw new IOException(e);
+ }
+
+ if (sliceSize < 0) {
+ sliceSize = sliceInput.readInt();
+ }
+ Slice data = sliceInput.readSlice(sliceSize);
+
+ if (sliceInput.available() > Integer.BYTES) {
+ sliceSize = sliceInput.readInt();
+ if (sliceInput.available() < sliceSize) {
+ fillBuffer();
+ }
+ }
+ else {
+ sliceSize = -1;
+ fillBuffer();
+ }
+
+ return data;
+ }
+
+ @Override
+ public ListenableFuture isBlocked()
+ {
+ // rely on FileSystemExchangeSource implementation to wrap with nonCancellationPropagating
+ return inProgressReadFuture;
+ }
+
+ @Override
+ public synchronized long getRetainedSize()
+ {
+ return INSTANCE_SIZE + bufferRetainedSize;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return closed;
+ }
+
+ @Override
+ public synchronized void close()
+ {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ currentFile = null;
+ sliceInput = null;
+ bufferRetainedSize = 0;
+ inProgressReadFuture.cancel(true);
+ inProgressReadFuture = immediateVoidFuture(); // such that we don't retain reference to the buffer
+ }
+
+ private void fillBuffer()
+ {
+ if (currentFile == null || fileOffset == currentFile.getFileSize()) {
+ currentFile = sourceFiles.poll();
+ if (currentFile == null) {
+ close();
+ return;
+ }
+ fileOffset = 0;
+ }
+
+ byte[] buffer = new byte[bufferSize];
+ int bufferFill = 0;
+ if (sliceInput != null) {
+ int length = sliceInput.available();
+ sliceInput.readBytes(buffer, 0, length);
+ bufferFill += length;
+ }
+
+ ImmutableList.Builder> downloadFutures = ImmutableList.builder();
+ while (true) {
+ long fileSize = currentFile.getFileSize();
+ // Make sure Azure Blob Storage read request byte ranges align with block sizes for best performance
+ int readableBlocks = (buffer.length - bufferFill) / blockSize;
+ if (readableBlocks == 0) {
+ if (buffer.length - bufferFill >= fileSize - fileOffset) {
+ readableBlocks = 1;
+ }
+ else {
+ break;
+ }
+ }
+
+ BlockBlobAsyncClient blockBlobAsyncClient = blobServiceAsyncClient
+ .getBlobContainerAsyncClient(getContainerName(currentFile.getFileUri()))
+ .getBlobAsyncClient(getPath(currentFile.getFileUri()))
+ .getBlockBlobAsyncClient();
+ Optional secretKey = currentFile.getSecretKey();
+ if (secretKey.isPresent()) {
+ blockBlobAsyncClient = blockBlobAsyncClient.getCustomerProvidedKeyAsyncClient(new CustomerProvidedKey(secretKey.get().getEncoded()));
+ }
+ for (int i = 0; i < readableBlocks && fileOffset < fileSize; ++i) {
+ int length = (int) min(blockSize, fileSize - fileOffset);
+
+ int finalBufferFill = bufferFill;
+ FluentFuture downloadFuture = FluentFuture.from(toListenableFuture(blockBlobAsyncClient.downloadWithResponse(new BlobRange(fileOffset, (long) length), null, null, false).toFuture()))
+ .transformAsync(response -> toListenableFuture(response.getValue().collectList().toFuture()), directExecutor())
+ .transform(byteBuffers -> {
+ int offset = finalBufferFill;
+ for (ByteBuffer byteBuffer : byteBuffers) {
+ int readableBytes = byteBuffer.remaining();
+ if (byteBuffer.hasArray()) {
+ arraycopy(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), buffer, offset, readableBytes);
+ }
+ else {
+ byteBuffer.asReadOnlyBuffer().get(buffer, offset, readableBytes);
+ }
+ offset += readableBytes;
+ }
+ return null;
+ }, directExecutor());
+ downloadFutures.add(downloadFuture);
+ bufferFill += length;
+ fileOffset += length;
+ }
+
+ if (fileOffset == fileSize) {
+ currentFile = sourceFiles.poll();
+ if (currentFile == null) {
+ break;
+ }
+ fileOffset = 0;
+ }
+ }
+
+ inProgressReadFuture = asVoid(Futures.allAsList(downloadFutures.build()));
+ sliceInput = Slices.wrappedBuffer(buffer, 0, bufferFill).getInput();
+ bufferRetainedSize = sliceInput.getRetainedSize();
+ }
+ }
+
+ @NotThreadSafe
+ private static class AzureExchangeStorageWriter
+ implements ExchangeStorageWriter
+ {
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(AzureExchangeStorageWriter.class).instanceSize();
+
+ private final BlockBlobAsyncClient blockBlobAsyncClient;
+ private final int blockSize;
+
+ private ListenableFuture directUploadFuture;
+ private final List> multiPartUploadFutures = new ArrayList<>();
+ private final List blockIds = new ArrayList<>();
+ private volatile boolean closed;
+
+ public AzureExchangeStorageWriter(
+ BlockBlobAsyncClient blockBlobAsyncClient,
+ int blockSize)
+ {
+ this.blockBlobAsyncClient = requireNonNull(blockBlobAsyncClient, "blockBlobAsyncClient is null");
+ this.blockSize = blockSize;
+ }
+
+ @Override
+ public ListenableFuture write(Slice slice)
+ {
+ checkState(directUploadFuture == null, "Direct upload already started");
+ if (closed) {
+ // Ignore writes after writer is closed
+ return immediateVoidFuture();
+ }
+
+ // Skip multipart upload if there would only be one part
+ if (slice.length() < blockSize && multiPartUploadFutures.isEmpty()) {
+ directUploadFuture = asVoid(toListenableFuture(blockBlobAsyncClient.upload(Flux.just(slice.toByteBuffer()), slice.length()).toFuture()));
+ return directUploadFuture;
+ }
+
+ String blockId = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
+ ListenableFuture uploadFuture = toListenableFuture(blockBlobAsyncClient.stageBlock(blockId, Flux.just(slice.toByteBuffer()), slice.length()).toFuture());
+ multiPartUploadFutures.add(uploadFuture);
+ blockIds.add(blockId);
+ return uploadFuture;
+ }
+
+ @Override
+ public ListenableFuture finish()
+ {
+ if (closed) {
+ return immediateVoidFuture();
+ }
+
+ if (multiPartUploadFutures.isEmpty()) {
+ return requireNonNullElseGet(directUploadFuture, Futures::immediateVoidFuture);
+ }
+
+ ListenableFuture finishFuture = Futures.transformAsync(
+ Futures.allAsList(multiPartUploadFutures),
+ ignored -> asVoid(toListenableFuture(blockBlobAsyncClient.commitBlockList(blockIds).toFuture())),
+ directExecutor());
+ Futures.addCallback(finishFuture, new FutureCallback<>() {
+ @Override
+ public void onSuccess(Void result)
+ {
+ closed = true;
+ }
+
+ @Override
+ public void onFailure(Throwable ignored)
+ {
+ // Rely on caller to abort in case of exceptions during finish
+ }
+ }, directExecutor());
+ return finishFuture;
+ }
+
+ @Override
+ public ListenableFuture abort()
+ {
+ if (closed) {
+ return immediateVoidFuture();
+ }
+ closed = true;
+
+ if (multiPartUploadFutures.isEmpty()) {
+ if (directUploadFuture != null) {
+ directUploadFuture.cancel(true);
+ }
+ return immediateVoidFuture();
+ }
+
+ verify(directUploadFuture == null);
+ multiPartUploadFutures.forEach(future -> future.cancel(true));
+
+ // No explicit way to delete staged blocks; uncommitted blocks are automatically deleted after 7 days
+ return immediateVoidFuture();
+ }
+
+ @Override
+ public long getRetainedSize()
+ {
+ return INSTANCE_SIZE + estimatedSizeOf(blockIds, SizeOf::estimatedSizeOf);
+ }
+ }
+}
diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java
new file mode 100644
index 000000000000..a51334f5cc49
--- /dev/null
+++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.exchange.filesystem.azure;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import io.airlift.units.DataSize;
+import io.airlift.units.MaxDataSize;
+import io.airlift.units.MinDataSize;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Optional;
+
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+
+public class ExchangeAzureConfig
+{
+ private Optional azureStorageConnectionString = Optional.empty();
+ private DataSize azureStorageBlockSize = DataSize.of(4, MEGABYTE);
+
+ public Optional getAzureStorageConnectionString()
+ {
+ return azureStorageConnectionString;
+ }
+
+ @Config("exchange.azure.connection-string")
+ @ConfigSecuritySensitive
+ public ExchangeAzureConfig setAzureStorageConnectionString(String azureStorageConnectionString)
+ {
+ this.azureStorageConnectionString = Optional.ofNullable(azureStorageConnectionString);
+ return this;
+ }
+
+ @NotNull
+ @MinDataSize("4MB")
+ @MaxDataSize("256MB")
+ public DataSize getAzureStorageBlockSize()
+ {
+ return azureStorageBlockSize;
+ }
+
+ @Config("exchange.azure.block-size")
+ @ConfigDescription("Block size for Azure High-Throughput Block Blob")
+ public ExchangeAzureConfig setAzureStorageBlockSize(DataSize azureStorageBlockSize)
+ {
+ this.azureStorageBlockSize = azureStorageBlockSize;
+ return this;
+ }
+}
diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java
new file mode 100644
index 000000000000..0a212002aa38
--- /dev/null
+++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.exchange.filesystem.azure;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.units.DataSize;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
+import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
+import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+
+public class TestExchangeAzureConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ assertRecordedDefaults(recordDefaults(ExchangeAzureConfig.class)
+ .setAzureStorageConnectionString(null)
+ .setAzureStorageBlockSize(DataSize.of(4, MEGABYTE)));
+ }
+
+ @Test
+ public void testExplicitPropertyMappings()
+ {
+ Map properties = ImmutableMap.builder()
+ .put("exchange.azure.connection-string", "connection")
+ .put("exchange.azure.block-size", "8MB")
+ .buildOrThrow();
+
+ ExchangeAzureConfig expected = new ExchangeAzureConfig()
+ .setAzureStorageConnectionString("connection")
+ .setAzureStorageBlockSize(DataSize.of(8, MEGABYTE));
+
+ assertFullMapping(properties, expected);
+ }
+}