-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support exchange spooling on Azure Blob Storage #12211
Conversation
...-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java
Outdated
Show resolved
Hide resolved
.../src/main/java/io/trino/plugin/exchange/filesystem/local/LocalFileSystemExchangeStorage.java
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Show resolved
Hide resolved
...-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java
Outdated
Show resolved
Hide resolved
.../main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public ListenableFuture<Void> finish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would that make sense to mark Writer as finishing
so we throw an error if finish
is called twice (or we can return previous future for second call).
(can be a followup - if to be done, same applies for s3 implementation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually in ExchangeSink
's protocol, for finish() This method is guaranteed not be called more than once.
.
@Override | ||
public long getRetainedSize() | ||
{ | ||
return INSTANCE_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there are two lists also - but I guess it is fine to skip those as size should be negligible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Added accounting for blockIds
, but not sure how to get estimated size of a ListenableFuture<Void>
.../main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
if (secretKey.isPresent()) { | ||
blockBlobAsyncClient = blockBlobAsyncClient.getCustomerProvidedKeyAsyncClient(new CustomerProvidedKey(secretKey.get().getEncoded())); | ||
} | ||
for (int i = 0; i < readableBlocks && fileOffset < fileSize; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need && fileOffset < fileSize
here? Isn't how readableBlocks
is computed enough to not go over file size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need. readableBlocks is calculated based on the remaining space in the buffer, but it's possible that the remaining file content is even shorter
if (sliceSize < 0) { | ||
sliceSize = sliceInput.readInt(); | ||
} | ||
Slice data = sliceInput.readSlice(sliceSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not obvious that sliceInput
will have at least sliceSize
bytes at this point (after single call to fillBuffer
) worth a comment.
Edit: well maybe it is ok - as fillBuffer
literally ensures buffer is full AND we know a single row will always fit in the buffer. Still, a sentence which states just that would be helpful IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I find it a little awkward to add comment here as the code seems to explain itself better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - some nitpicks.
public boolean exists(URI file) | ||
throws IOException | ||
{ | ||
return blobServiceClient.getBlobContainerClient(getContainerName(file)).getBlobClient(getPath(file)).exists(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is blobServiceClient.getBlobContainerClient
cheap? Do we need to cache it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's purely a class wrapper, not sure it's worth it (because the cache will need to be concurrently accessed). Please advise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's just a matter of creating a wrapper object than it's fine
.../main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
.../main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java
Show resolved
Hide resolved
.transformAsync(response -> toListenableFuture(response.getValue().collectList().toFuture()), directExecutor()) | ||
.transform(byteBuffers -> { | ||
int offset = finalBufferFill; | ||
for (ByteBuffer byteBuffer : byteBuffers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have the client directly write bytes into a destination buffer (to avoid double copying?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see an easy way, as Azure SDK doesn't provide any kind of callback/transformer interface to do so.
6beb139
to
7e82ec4
Compare
CI: #12298 |
CI: #12300 |
Description
New feature
trino-exchange-plugin
This enables users to set up exchange spooling on Azure Blob Storage.
Related issues, pull requests, and links
Documentation
#12467
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
(x) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
() No release notes entries required.
(x) Release notes entries required with the following suggested text: