-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement parallel read from S3 for exchange storage #11174
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job! Some comments
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSink.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
...in/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :)
I put some comments - but all of that are editorials.
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeSourceFile.java
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/ExchangeSourceFile.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
...-exchange/src/main/java/io/trino/plugin/exchange/s3/BufferWriteAsyncResponseTransformer.java
Show resolved
Hide resolved
...in/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
if (oldBuffer != null) { | ||
verify(bufferPosition <= bufferFill, | ||
"bufferPosition (%s) expected to be smaller or equal to bufferFill (%s)", bufferPosition, bufferFill); | ||
int rest = bufferFill - bufferPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remainderSize
?
} | ||
|
||
// Make sure S3 read request byte ranges align with part sizes for best performance | ||
int readableParts = toIntExact(buffer.length - bufferFill) / partSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readableParts
was a bit confusing to me as it felt it characterizes file, while it actually characterizes buffer.
Maybe rename to bufferPartsCapacity
or sth similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually characterizes file. So I feel readablePart
is quite readable here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why file? It says how many parts can we fit in a buffer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's both. Imagine a file of 12MB, with part size 5MB, then it will be stored in three parts: 5MB-5MB-2MB
...in/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java
Show resolved
Hide resolved
2741267
to
f164809
Compare
11fa33e
to
f2d6621
Compare
CI: raptor #11252 |
f2d6621
to
7844aad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % nits
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeSource.java
Show resolved
Hide resolved
plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...in/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...in/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java
Show resolved
Hide resolved
7844aad
to
ecab4d2
Compare
7dbe584
to
faf4a1c
Compare
88c99b0
to
be60aec
Compare
Test failure due to #11368 |
be60aec
to
2542c8b
Compare
2542c8b
to
7cd36fa
Compare
b4b1ed5
to
f266de4
Compare
Description
Improvement of the read path for exchange sources.
To the trino-exchange plugin.
This PR implements parallel read for s3-based exchange storage, including inter-file and intra-file level parallelism, and improves the overall throughput of fault tolerant execution.
Related issues, pull requests, and links
#11050
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
After implementation/configs stabilize and get well tested, we will add documentations related to fault tolerant execution.
Release notes
() No release notes entries required.
(x) Release notes entries required with the following suggested text: