-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] [Operator Fusion - 2/N] Data layer performance/bug fixes and tweaks. #32744
[Datasets] [Operator Fusion - 2/N] Data layer performance/bug fixes and tweaks. #32744
Conversation
@@ -55,6 +55,11 @@ def add_block(self, block: Block): | |||
self._builder = accessor.builder() | |||
self._builder.add_block(block) | |||
|
|||
def will_build_yield_copy(self) -> bool: | |||
if self._builder is None: | |||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe False by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl we'll technically create a new (empty) block in this case, which I think we should consider to be a "copy" in the sense that the returned block doesn't point to any old data buffers (this method is returning whether building will yield a new block, not whether building will copy data). The Batcher
currently uses this method to determine whether we need to copy the built block in order to ensure that no old data buffers are still being referenced, so we can respect the zero_copy_batch=False
, ensure_copy=True
case.
I'm a bit confused why this one improved perf actually, isn't the new code just refactored and does the same equivalent sampling? |
Yep, it does the equivalent sampling! IIRC most of the overhead was going through the size estimation for every row in the block, rather than a single batched operation for the block. When adding a simple block containing 10k rows, this is the difference between doing 10k func calls (with probably a lot of branch mispredictions) and a total of 10 + 10 + 10 = 30 size estimations (pickle roundtrips), vs a single func call with 10 size estimations. |
…s set for actor pool actors." This reverts commit 8ea5f30.
…nd tweaks. (ray-project#32744) This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…nd tweaks. (ray-project#32744) This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes.
…nd tweaks. (ray-project#32744) This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes. Signed-off-by: elliottower <elliot@elliottower.com>
This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in #32178, along with some minor changes. These fixes/changes include:
ds.lazy()
call (this isn't a perf fix, just a regular ol' bug fix): 8591ac6DatasetContext
is generically set via thecached_remote_fn
wrapper, reducing redundant codeDatasetContext
may be modified after the remote function has been cached (e.g. when reading a CSV dataset twice), so we still need to pass through theDatasetContext
at task submission time.OptionContext
to ignore chained assignment warnings, since this is surprisingly expensive; use normal warnings filter instead: 8bcb07eThe performance optimizations produce outsized improvements on the zero-copy adapters benchmark:
SizeEstimator.add_block()
that applies the same size estimation logic (add to the weighted running mean every N rows) to blocks instead of individual rows, I saw a 10x perf improvement for a simple blocks benchmark on 1k blocks and 10k rows per block.BlockBuilder
s until it’s actually asked for by a wrappingBlockOutputBuffer
(using a calculated size cursor and a running sum that’s updated on eachget_estimated_memory_usage()
call), I’m able to get a 2x perf improvement for a Pandas batching benchmark on 1k blocks, 10k rows per block, and 2k rows per batch (and this is a 3x improvement compared to the legacy operator fusion).OptionContext
to use a normalwarnings.simplefilter
2xed the performance of a benchmark that fuses 2 consecutive MapBatches operations.Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.