-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Improve performance of DefaultFileMetaProvider. #33117
[Datasets] Improve performance of DefaultFileMetaProvider. #33117
Conversation
4a196be
to
03e5ed1
Compare
@ericl Feedback implemented, PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments on testing.
# Always launch at least 2 parallel fetch tasks. | ||
max(len(uris) // desired_uris_per_task, 2), | ||
# Oversubscribe cluster CPU by 2x since these tasks are I/O-bound. | ||
round(available_cpus / num_cpus), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove line 801, since as long as the tasks are long enough it doesn't matter to have more tasks after that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, this ended up leading to pretty big slowdowns of the 10k files and 40k files cases, ~2x slower in both cases, so the task scheduling/dispatching overhead is still non-negligible upon queueing. Would you still vote for removing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I'd still vote to remove it, since we don't understand why it should be faster (40k/16 = 2500 tasks, which should be less than a second of overhead).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it be using 0.5 CPUs is actually slower than 1 CPU due to worker startup time? I don't think the worker pool caches more than 1 worker per CPU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually ended up being the 0.5 CPUs vs 0.25 CPUs difference, with the latter being 2x faster. Keeping num_cpus
fixed and removing this CPU oversubscription bound resulted in ~ the same performance.
I wouldn't expect worker startup time to be the issue here, since we're talking about queued, short tasks or large (e.g. 40 second) tasks, with CPU oversubscription in both cases, so we're comparing remaining stalled in the queue or starting up more workers to help drain the queue. In either the short or large task case, more workers would allow us to drain the queue faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @clarkzinzow!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
def _fetch_metadata_parallel( | ||
uris: List[Uri], | ||
fetch_func: Callable[[List[Uri]], List[Meta]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _fetch_metadata_serialization_wrapper
look taking _SerializedPiece typed arg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, and the provided uris
arg is a List[_SerializedPiece]
, so the Uri
type is consistent across these two args, which is all that we're trying to express here (fetch_func
should take the same type as uris
as an argument). I couldn't think of a better type name than Uri
, since both file paths and Parquet fragments can be thought of as an identifier for data to be read, so Uri
seemed like an ok choice.
Failures appear to be unrelated, merging! |
…ct#33117) This PR improves the performance of the DefaultFileMetaProvider. Previously, DefaultFileMetaProvider would serially expand and fetch the file size for a large list of directories and files, respectively. This PR optimizes this by parallelizing directory expansion and file size fetching over Ray tasks. Also, in the common case that all file paths share the same parent directory (or base directory, if using partitioning), we do a single ListObjectsV2 call on the directory followed by a client-side filter, which reduces a 90 second parallel file size fetch to a 0.8 second request + client-side filter. Signed-off-by: Jack He <jackhe2345@gmail.com>
…ct#33117) This PR improves the performance of the DefaultFileMetaProvider. Previously, DefaultFileMetaProvider would serially expand and fetch the file size for a large list of directories and files, respectively. This PR optimizes this by parallelizing directory expansion and file size fetching over Ray tasks. Also, in the common case that all file paths share the same parent directory (or base directory, if using partitioning), we do a single ListObjectsV2 call on the directory followed by a client-side filter, which reduces a 90 second parallel file size fetch to a 0.8 second request + client-side filter. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…ct#33117) This PR improves the performance of the DefaultFileMetaProvider. Previously, DefaultFileMetaProvider would serially expand and fetch the file size for a large list of directories and files, respectively. This PR optimizes this by parallelizing directory expansion and file size fetching over Ray tasks. Also, in the common case that all file paths share the same parent directory (or base directory, if using partitioning), we do a single ListObjectsV2 call on the directory followed by a client-side filter, which reduces a 90 second parallel file size fetch to a 0.8 second request + client-side filter.
…ct#33117) This PR improves the performance of the DefaultFileMetaProvider. Previously, DefaultFileMetaProvider would serially expand and fetch the file size for a large list of directories and files, respectively. This PR optimizes this by parallelizing directory expansion and file size fetching over Ray tasks. Also, in the common case that all file paths share the same parent directory (or base directory, if using partitioning), we do a single ListObjectsV2 call on the directory followed by a client-side filter, which reduces a 90 second parallel file size fetch to a 0.8 second request + client-side filter. Signed-off-by: elliottower <elliot@elliottower.com>
…ct#33117) This PR improves the performance of the DefaultFileMetaProvider. Previously, DefaultFileMetaProvider would serially expand and fetch the file size for a large list of directories and files, respectively. This PR optimizes this by parallelizing directory expansion and file size fetching over Ray tasks. Also, in the common case that all file paths share the same parent directory (or base directory, if using partitioning), we do a single ListObjectsV2 call on the directory followed by a client-side filter, which reduces a 90 second parallel file size fetch to a 0.8 second request + client-side filter. Signed-off-by: Jack He <jackhe2345@gmail.com>
This PR improves the performance of the
DefaultFileMetaProvider
. Previously,DefaultFileMetaProvider
would serially expand and fetch the file size for a large list of directories and files, respectively. This PR optimizes this by parallelizing directory expansion and file size fetching over Ray tasks. Also, in the common case that all file paths share the same parent directory (or base directory, if using partitioning), we do a singleListObjectsV2
call on the directory followed by a client-side filter, which reduces a 90 second parallel file size fetch to a 0.8 second request + client-side filter.Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.