-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420
FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420
Conversation
… Dask Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
Codecov Report
@@ Coverage Diff @@
## master #4420 +/- ##
==========================================
+ Coverage 86.49% 89.64% +3.15%
==========================================
Files 230 231 +1
Lines 18473 18820 +347
==========================================
+ Hits 15978 16871 +893
+ Misses 2495 1949 -546
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
Signed-off-by: Rehan Durrani <rehan@ponder.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add tests for this?
Signed-off-by: Rehan Durrani <rehan@ponder.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some minor comments.
We can't test much of this code until we add something like repeated partition rebalancing, which can result in creating virtual partitions out of other virtual partitions.
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
…tioning/virtual_partition.py Co-authored-by: Mahesh Vashishtha <mvashishtha@users.noreply.github.com>
@RehanSD we need to implement modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py Line 148 in 830e8dc
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall!
if not any( | ||
isinstance(o, PandasOnDaskDataframeVirtualPartition) for o in list_of_blocks | ||
): | ||
self.list_of_partitions_to_combine = list_of_blocks | ||
return | ||
|
||
assert ( | ||
len( | ||
set( | ||
o.axis | ||
for o in list_of_blocks | ||
if isinstance(o, PandasOnDaskDataframeVirtualPartition) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Instead of having two different loops here, can we combine them into one if/else case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RehanSD please rebase this on current master
cc @RehanSD |
…titioning_dask Signed-off-by: Rehan Durrani <rehan@ponder.io>
…titioning_dask Signed-off-by: Rehan Durrani <rehan@ponder.io>
…ept num_splits argument Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
…al partitioning API Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
Engine.get() != "Dask" and Engine.get() != "Ray", | ||
reason="Rebalancing partitions is only supported for Dask and Ray engines", | ||
) | ||
def test_rebalance_partitions(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for adding this test!
) | ||
for i in range(1, 10001, 100) | ||
] | ||
large_df = pd.concat(small_dfs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add a case that forces us to split a partition? e.g. concat a DF with 10_000 rows to 5 dfs with 1 row each? e.g. this produces splitting with NPartitions
of 16:
pd.concat([pd.DataFrame([[i] for i in range(1000)])] + [pd.DataFrame([[i]]) for i in range(16)])
you need enough rows in the first dataframe for the partitioning to be imbalanced enough to trigger repartitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes a lot of sense - will go ahead and add!
isinstance(ptn, large_modin_frame._partition_mgr_cls._column_partitions_class) | ||
for ptn in large_modin_frame._partitions.flatten() | ||
) | ||
large_df = large_df.apply(lambda x: x + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a comment about why you're testing what the apply does? One reason I'm thinking of is that it requires building full-axis virtual partitions out of non-full-axis virtual partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually since this is a cell-wise apply, it requires us to convert virtual partitions back to block partitions - I wanted to ensure that that worked correctly. I think it makes sense to add a test like what you suggest as well though, so I'll go ahead and do that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a full-axis apply? I thought we would create full-axis virtual partitions out the virtual partitions, apply the UDF to the full-axis partitions, then split the virtual partitions into block partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct - my bad!
Co-authored-by: Mahesh Vashishtha <mvashishtha@users.noreply.github.com>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
…RehanSD/modin into rehan/virtual_partitioning_dask Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
…titioning_dask Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
Signed-off-by: Rehan Durrani <rehan@ponder.io>
@@ -41,22 +42,63 @@ class PandasOnDaskDataframe(PandasDataframe): | |||
|
|||
_partition_mgr_cls = PandasOnDaskDataframePartitionManager | |||
|
|||
def _get_partition_size_along_axis(self, partition, axis=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per #4494, we are currently calculating partition shapes serially in other places than the one you changed here, e.g. here for Ray, in Ray virtual partitioning length
, and in the same place length
for the new dask virtual partition class. The dask blocking means that this function actually blocks on inner partitions if the Modin frame consists of virtual partitions that are themselves made of virtual partitions.
In my opinion, we should parallelize getting all the partition shapes correctly in a separate fix for #4494.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I'm wrong about this function blocking on inner partitions, because it doesn't call length
or width
. So I think it's correct. Still, this part seems outside the scope of this PR. I'd rather worry about this fix in a separate PR for #4494.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to include here, since the original code to get length + width in parallel breaks when applied to axis partitions so I'll need to fix that code anyways, and if I'm doing that, I may as well fix the code to just do it all in parallel right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the original code was broken, I think it's okay to include the partial fix for #4494 here.
# over the orthogonal axis from non-full-axis virtual partitions. | ||
|
||
def col_apply_func(col): | ||
assert len(col) == col_length, "Partial axis partition detected." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this assert
? Why isn't it enough to check that the column-wise apply does what pandas would do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to ensure that this is a full-axis partition, and not just a partial axis partition - the function as I've written it will run correctly even if it is a partial axis partition, so I just thought this assert would be a good way to keep the function simple, but still test that it is full axis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test should check for functional correctness. We already check that we get the apply
result correct when the starting partitions are non-full-axis virtual partitions. I don't think it's right to check for internal implementation details here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - I'm not sure that I agree with that - this file is called test_internals
which would imply its testing internal details, and it makes sense to ensure that the internal partitioning code didn't incorrectly make a full axis partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's appropriate to test any internal detail here just because it's test_internals
. Why not do something like have this function return col[::-1]
? That way we can be pretty confident that modin is really applying the function across the full axis. I think testing the intermediate representations during the function call is outside the scope of this particular test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could do that, but then I would have to go into each partition to determine what the last row is and then double check the output, so this feels "less intrusive" open to other suggestions though! Perhaps the solution may be to add unit testing for partition layer stuff into the codebase like @pyrito suggested, and move this code to there, so that test_internals
just tests the overall internals as its supposed to, and implementation details can be tested at that level?
# over the same axis from non-full-axis virtual partitions. | ||
|
||
def row_apply_func(row): | ||
assert len(row) == 1000, "Partial axis partition detected." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto about the assert
"large_df,col_length", | ||
[ | ||
(pd.concat(small_dfs), 100), | ||
(pd.concat([pd.concat(small_dfs)] + small_dfs[:3]), 103), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this really trigger two rebalances?
small_dfs = [ | ||
pd.DataFrame( | ||
[[i + j for j in range(0, 1000)]], | ||
columns=[f"col{j}" for j in range(1, 1001)], | ||
index=pd.Index([i - 1]), | ||
) | ||
for i in range(1, 100001, 1000) | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small_dfs = [ | |
pd.DataFrame( | |
[[i + j for j in range(0, 1000)]], | |
columns=[f"col{j}" for j in range(1, 1001)], | |
index=pd.Index([i - 1]), | |
) | |
for i in range(1, 100001, 1000) | |
] | |
small_dfs = [ | |
pd.DataFrame([[i + j for j in range(0, 1000)]]).add_prefix('col') | |
for i in range(100 * 1000, 1000) | |
] |
I prefer to have the values start from 0. Then we can create the dataframe like this. Let me know if you prefer the current way.
large_df = pd.DataFrame( | ||
[[i + j for j in range(1, 1000)] for i in range(0, 100000, 1000)], | ||
columns=[f"col{j}" for j in range(1, 1000)], | ||
index=pd.Index(list(range(0, 100000, 1000))), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
large_df = pd.DataFrame( | |
[[i + j for j in range(1, 1000)] for i in range(0, 100000, 1000)], | |
columns=[f"col{j}" for j in range(1, 1000)], | |
index=pd.Index(list(range(0, 100000, 1000))), | |
) | |
large_df = pd.DataFrame( | |
[[i + j for j in range(1000)] for i in range(0, 100 * 1000, 1000)]] | |
).add_prefix('col') |
Unless you see a good reason for the 1-indexing and a custom index, I prefer this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good to me - mainly just style nits and questions to clarify my understanding
@property | ||
def _row_lengths(self): | ||
""" | ||
Compute the row partitions lengths if they are not cached. | ||
Compute ther row partitions lengths if they are not cached. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compute ther row partitions lengths if they are not cached. | |
Compute the row partition lengths if they are not cached. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not fixed
# threshold is a heuristic that may need to be tuned for performance. | ||
max_excess_of_num_partitions = 1.5 | ||
num_existing_partitions = partitions.shape[0] | ||
ideal_num_new_partitions = NPartitions.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: ideal_num_new_partitions
doesn't account for the total number of vCPUs in a cluster right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can override NPartitions.get()
to do so. It just asks Ray how many cores there are, and returns that, so as long as your cluster is fully initialized when modin is initialized, and you set IsRayCluster
correctly, it should.
# `stop` is the index of the last existing partition so far that we | ||
# want to put into the current new partition. | ||
stop = start | ||
partition_size = partitions[start][0].length() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is length
a blocking call? Do we have to materialize things in memory to get this value or is it stored as metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to cache it, but it will block if it is not cached. If it is blocking, it submits a remote task that gets the length of the object, and only materializes the length in memory on the main node.
@property | ||
def list_of_blocks(self): | ||
""" | ||
Get the list of physical partition objects that compose this partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, but this means that our perspective of a partition might contain N dask partitions, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup - a virtual partition is just one or more block partitions (physical partitions) and an axis that they are aligned along.
if not self.full_axis: | ||
# If this is not a full axis partition, it already contains a subset of | ||
# the full axis, so we shouldn't split the result further. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a rebalance for example. If we have 20 very small partitions, we may chunk up some of them into non-full axis virtual partitions, so maybe we have 4 virtual partitions of 5 block partitions each. After we apply a function to the virtual partitions, we want to materialize the result as 1 block partition, instead of as 5 small block partitions (as it was before) in order to complete the rebalance, which would trigger this if statement.
|
||
_length_cache = None | ||
|
||
def length(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test that covers this function and width
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really have any unit testing at the partition level. This PR is actually the first to add any testing thats supposed to cover that level. We could look into adding more testing, but I think that would belong in a separate PR.
o | ||
for o in list_of_blocks | ||
if isinstance(o, PandasOnDaskDataframeVirtualPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe a better variable name than o
?
if len(self.call_queue) > 0: | ||
self.drain_call_queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just have the if check for this encapsulated within drain_call_queue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question - tbh I don't think there's really any huge reason to do so. It does save an extra call on the stack, but thats an extremely minuscule effect (so def not why). I think it just boils down to the idea that the drain_call_queue
method (conceptually) should only be called when we have a call queue?
if self.full_axis: | ||
return result | ||
else: | ||
# If this is a full axis partition, just take out the single split in the result. | ||
return result[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.full_axis: | |
return result | |
else: | |
# If this is a full axis partition, just take out the single split in the result. | |
return result[0] | |
return result if self.full_axis else result[0] |
Maybe something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more condensed, but I think the comment is helpful from a code clarity perspective.
if start >= len(partitions): | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand when this would happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above kind of explains, but basically, if we have very, very small partitions, we may want to coalesce more than we actually have - e.g. if we have rebalanced all but the last three partitions, and those three alone are not enough to make a new partition, we would run off the end of the list before we've satisfied the min constraint for the length of the new partition, and hit this case.
Signed-off-by: Rehan Durrani <rehan@ponder.io>
...low/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/axis_partition.rst
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py
Show resolved
Hide resolved
Signed-off-by: Rehan Durrani <rehan@ponder.io>
…titioning_dask Signed-off-by: Rehan Durrani <rehan@ponder.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
partition_type = PandasOnDaskDataframePartition | ||
instance_type = Future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be located near to axis = None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - I can open a refactor PR for that if you think it warrants it, or I can just roll it in with another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way of these is fine to me
Signed-off-by: Rehan Durrani rehan@ponder.io
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date