-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] [Operator Fusion - E2E Mono-PR] [DO-NOT-MERGE] Add zero-copy operator fusion. #32178
[Datasets] [Operator Fusion - E2E Mono-PR] [DO-NOT-MERGE] Add zero-copy operator fusion. #32178
Conversation
91d0b62
to
1babbf6
Compare
logical_plan = LogicalPlan(map_op2) | ||
physical_plan = planner.plan(logical_plan) | ||
physical_plan = PhysicalOptimizer().optimize(physical_plan) | ||
op = physical_plan.dag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we improve the testing / observability by emitting metrics on batch conversion operations and overheads? Then we can just assert the expected conversion in the metrics, e.g. {"numpy_to_pandas_conversions": 5}
etc.
This could go with the other extra metrics emitted by operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's a great idea! 🙌 I'll look into adding that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any microbenchmark results?
@clarkzinzow - yeah we can start with map_batches benchmark in nightly test. |
@c21 Just kicked off that nightly test, but I just realized that it won't use the new optimizer since it isn't enabled by default in master. I'll try running it locally with and without the change. |
@clarkzinzow - can you add |
49c2941
to
05a4d4d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meta-comment: is it possible to extract a interface PR to review the high level strategy first, followed by others?
@@ -224,7 +228,7 @@ def schema(self) -> "pyarrow.lib.Schema": | |||
def to_pandas(self) -> "pandas.DataFrame": | |||
from ray.air.util.data_batch_conversion import _cast_tensor_columns_to_ndarrays | |||
|
|||
df = self._table.to_pandas() | |||
df = self._table.to_pandas(use_threads=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for this / document the reason for this? Naively it seems like we'd want to use threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't notice having any positive or negative effect on my benchmarking after disabling multithreaded Pandas conversion, so I thought I'd set this to False
similar to how we disable multi-threaded Arrow I/O, and only enable it if it showed speedups in the benchmarking.
I think I'll probably set this back to the default for now when I split out the PRs, though.
@ericl As mentioned in standup and in the PR description, the plan is to decompose this PR into a stack, this PR is just the end-to-end mono-PR used for making sure that everything works and for benchmarking the end-state. |
077ffb2
to
a6d862a
Compare
a6d862a
to
83fe52a
Compare
83fe52a
to
b51ac54
Compare
…nd tweaks. (ray-project#32744) This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…nd tweaks. (ray-project#32744) This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
…nd tweaks. (ray-project#32744) This PR contains some miscellaneous performance/bug fixes discovered while benchmarking the zero-copy adapters in ray-project#32178, along with some minor changes. Signed-off-by: elliottower <elliot@elliottower.com>
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message. Please feel free to reopen or open a new issue if you'd still like it to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for opening the issue! |
This PR adds support for zero-copy operator fusion, where we no longer materialize large blocks in between fused operator transformations. This is done by:
Iterator[Block] -> Iterator[Block]
toIterator[DataT] -> Iterator[DataT]
, whereDataT = Union[Block DataBatch, Row]
.Most of this PR diff is adding full adapter coverage (both in functionality and in tests) for block-, batch-, and row-based transforms, where each of the transform data types need to be converted into the others without unnecessary copies/materializations. In addition, we have to handle the adapter between transform outputs and the eventual block outputs of the physical operator, which need to be buffered and split according to our dynamic block splitting policy.
TODOs
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.