-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#4419: Extend virtual partitioning API to pandas on Dask #4420
Changes from 10 commits
28cdc19
15b0072
ae4eb6c
026bd14
fa377a8
b9d68f7
17a9c65
348272d
6784b4b
4efd83b
af4edbc
0ac6de3
8c44eb1
a5af1ff
971e00a
87cad7c
0de19cd
4f5dd50
f5d3eb1
5e748cd
0cc25c4
43296b2
66c4360
9f48f0d
8dab0d1
2d9f150
9a945e8
5b322ee
a761182
6a9855c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,4 +39,5 @@ Contributors | |
@mvashishtha | ||
@NickCrews | ||
@prutskov | ||
@vnlitvinov | ||
@RehanSD |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,6 +15,7 @@ | |||||
|
||||||
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe | ||||||
from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager | ||||||
from modin.core.execution.dask.common.engine_wrapper import DaskWrapper | ||||||
|
||||||
|
||||||
class PandasOnDaskDataframe(PandasDataframe): | ||||||
|
@@ -41,22 +42,63 @@ class PandasOnDaskDataframe(PandasDataframe): | |||||
|
||||||
_partition_mgr_cls = PandasOnDaskDataframePartitionManager | ||||||
|
||||||
def _get_partition_size_along_axis(self, partition, axis=0): | ||||||
""" | ||||||
Compute the length along the specified axis of the specified partition. | ||||||
|
||||||
Parameters | ||||||
---------- | ||||||
partition : ``PandasOnDaskDataframeVirtualPartition`` or ``PandasOnDaskDataframePartition`` | ||||||
The partition whose size to compute. | ||||||
axis : int, default: 0 | ||||||
The axis along which to compute size. | ||||||
|
||||||
Returns | ||||||
------- | ||||||
list | ||||||
A list of lengths along the specified axis that sum to the overall length of the partition | ||||||
along the specified axis. | ||||||
|
||||||
Notes | ||||||
----- | ||||||
This utility function is used to ensure that computation occurs asynchronously across all partitions | ||||||
whether the partitions are virtual or physical partitions. | ||||||
""" | ||||||
if isinstance(partition, self._partition_mgr_cls._partition_class): | ||||||
return [ | ||||||
partition.apply( | ||||||
lambda df: len(df) if not axis else len(df.columns) | ||||||
)._data | ||||||
] | ||||||
elif partition.axis == axis: | ||||||
return [ | ||||||
ptn.apply(lambda df: len(df) if not axis else len(df.columns))._data | ||||||
for ptn in partition.list_of_partitions_to_combine | ||||||
] | ||||||
return [ | ||||||
partition.list_of_partitions_to_combine[0] | ||||||
.apply(lambda df: len(df) if not axis else (len(df.columns))) | ||||||
._data | ||||||
] | ||||||
|
||||||
@property | ||||||
def _row_lengths(self): | ||||||
""" | ||||||
Compute the row partitions lengths if they are not cached. | ||||||
Compute ther row partitions lengths if they are not cached. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not fixed |
||||||
|
||||||
Returns | ||||||
------- | ||||||
list | ||||||
A list of row partitions lengths. | ||||||
""" | ||||||
if self._row_lengths_cache is None: | ||||||
self._row_lengths_cache = ( | ||||||
self._partition_mgr_cls.get_objects_from_partitions( | ||||||
[obj.apply(lambda df: len(df)) for obj in self._partitions.T[0]] | ||||||
) | ||||||
row_lengths_list = DaskWrapper.materialize( | ||||||
[ | ||||||
YarShev marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self._get_partition_size_along_axis(obj, axis=0) | ||||||
for obj in self._partitions.T[0] | ||||||
] | ||||||
) | ||||||
self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list] | ||||||
return self._row_lengths_cache | ||||||
|
||||||
@property | ||||||
|
@@ -70,12 +112,13 @@ def _column_widths(self): | |||||
A list of column partitions widths. | ||||||
""" | ||||||
if self._column_widths_cache is None: | ||||||
self._column_widths_cache = ( | ||||||
self._partition_mgr_cls.get_objects_from_partitions( | ||||||
[ | ||||||
obj.apply(lambda df: len(df.columns)) | ||||||
for obj in self._partitions[0] | ||||||
] | ||||||
) | ||||||
col_widths_list = DaskWrapper.materialize( | ||||||
YarShev marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
[ | ||||||
self._get_partition_size_along_axis(obj, axis=1) | ||||||
for obj in self._partitions[0] | ||||||
] | ||||||
) | ||||||
self._column_widths_cache = [ | ||||||
sum(width_list) for width_list in col_widths_list | ||||||
] | ||||||
return self._column_widths_cache |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -112,36 +112,77 @@ def func_to_apply(partition, row_internal_indices, col_internal_indices, item): | |||||||||||||||||||||||||
df_equals(md_df, pd_df) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
small_dfs = [ | ||||||||||||||||||||||||||
pd.DataFrame( | ||||||||||||||||||||||||||
[[i + j for j in range(0, 1000)]], | ||||||||||||||||||||||||||
columns=[f"col{j}" for j in range(1, 1001)], | ||||||||||||||||||||||||||
index=pd.Index([i - 1]), | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
for i in range(1, 100001, 1000) | ||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I prefer to have the values start from 0. Then we can create the dataframe like this. Let me know if you prefer the current way. |
||||||||||||||||||||||||||
large_df = pd.DataFrame( | ||||||||||||||||||||||||||
RehanSD marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
[[i + j for j in range(1, 1000)] for i in range(0, 100000, 1000)], | ||||||||||||||||||||||||||
columns=[f"col{j}" for j in range(1, 1000)], | ||||||||||||||||||||||||||
index=pd.Index(list(range(0, 100000, 1000))), | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Unless you see a good reason for the 1-indexing and a custom index, I prefer this. |
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@pytest.mark.skipif( | ||||||||||||||||||||||||||
Engine.get() != "Dask" and Engine.get() != "Ray", | ||||||||||||||||||||||||||
Engine.get() not in ("Dask", "Ray"), | ||||||||||||||||||||||||||
reason="Rebalancing partitions is only supported for Dask and Ray engines", | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
def test_rebalance_partitions(): | ||||||||||||||||||||||||||
small_dfs = [ | ||||||||||||||||||||||||||
pd.DataFrame( | ||||||||||||||||||||||||||
[[i + j for j in range(0, 100)]], | ||||||||||||||||||||||||||
columns=[f"col{j}" for j in range(1, 101)], | ||||||||||||||||||||||||||
index=pd.Index([i - 1]), | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
for i in range(1, 10001, 100) | ||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||
large_df = pd.concat(small_dfs) | ||||||||||||||||||||||||||
@pytest.mark.parametrize( | ||||||||||||||||||||||||||
"large_df,col_length", | ||||||||||||||||||||||||||
[ | ||||||||||||||||||||||||||
(pd.concat(small_dfs), 100), | ||||||||||||||||||||||||||
(pd.concat([pd.concat(small_dfs)] + small_dfs[:3]), 103), | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this really trigger two rebalances? |
||||||||||||||||||||||||||
(pd.concat([large_df] + small_dfs[:3]), 103), | ||||||||||||||||||||||||||
RehanSD marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
], | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
def test_rebalance_partitions(large_df, col_length): | ||||||||||||||||||||||||||
large_modin_frame = large_df._query_compiler._modin_frame | ||||||||||||||||||||||||||
assert large_modin_frame._partitions.shape == ( | ||||||||||||||||||||||||||
4, | ||||||||||||||||||||||||||
4, | ||||||||||||||||||||||||||
NPartitions.get(), | ||||||||||||||||||||||||||
NPartitions.get(), | ||||||||||||||||||||||||||
), "Partitions were not rebalanced after concat." | ||||||||||||||||||||||||||
assert all( | ||||||||||||||||||||||||||
isinstance(ptn, large_modin_frame._partition_mgr_cls._column_partitions_class) | ||||||||||||||||||||||||||
for ptn in large_modin_frame._partitions.flatten() | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
large_df = large_df.apply(lambda x: x + 1) | ||||||||||||||||||||||||||
large_modin_frame = large_df._query_compiler._modin_frame | ||||||||||||||||||||||||||
assert large_modin_frame._partitions.shape == ( | ||||||||||||||||||||||||||
# The following check tests that we can correctly form full-axis virtual partitions | ||||||||||||||||||||||||||
# over the orthogonal axis from non-full-axis virtual partitions. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def col_apply_func(col): | ||||||||||||||||||||||||||
assert len(col) == col_length, "Partial axis partition detected." | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to ensure that this is a full-axis partition, and not just a partial axis partition - the function as I've written it will run correctly even if it is a partial axis partition, so I just thought this assert would be a good way to keep the function simple, but still test that it is full axis. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this test should check for functional correctness. We already check that we get the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm - I'm not sure that I agree with that - this file is called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's appropriate to test any internal detail here just because it's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could do that, but then I would have to go into each partition to determine what the last row is and then double check the output, so this feels "less intrusive" open to other suggestions though! Perhaps the solution may be to add unit testing for partition layer stuff into the codebase like @pyrito suggested, and move this code to there, so that |
||||||||||||||||||||||||||
return col + 1 | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
large_df = large_df.apply(col_apply_func) | ||||||||||||||||||||||||||
new_large_modin_frame = large_df._query_compiler._modin_frame | ||||||||||||||||||||||||||
assert new_large_modin_frame._partitions.shape == ( | ||||||||||||||||||||||||||
NPartitions.get(), | ||||||||||||||||||||||||||
NPartitions.get(), | ||||||||||||||||||||||||||
), "Partitions list shape is incorrect." | ||||||||||||||||||||||||||
assert all( | ||||||||||||||||||||||||||
isinstance(ptn, new_large_modin_frame._partition_mgr_cls._partition_class) | ||||||||||||||||||||||||||
for ptn in new_large_modin_frame._partitions.flatten() | ||||||||||||||||||||||||||
), "Partitions are not block partitioned after apply." | ||||||||||||||||||||||||||
large_df = pd.DataFrame( | ||||||||||||||||||||||||||
query_compiler=large_df._query_compiler.__constructor__(large_modin_frame) | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
# The following check tests that we can correctly form full-axis virtual partitions | ||||||||||||||||||||||||||
# over the same axis from non-full-axis virtual partitions. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def row_apply_func(row): | ||||||||||||||||||||||||||
assert len(row) == 1000, "Partial axis partition detected." | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto about the assert |
||||||||||||||||||||||||||
return row + 1 | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
large_df = large_df.apply(row_apply_func, axis=1) | ||||||||||||||||||||||||||
new_large_modin_frame = large_df._query_compiler._modin_frame | ||||||||||||||||||||||||||
assert new_large_modin_frame._partitions.shape == ( | ||||||||||||||||||||||||||
4, | ||||||||||||||||||||||||||
4, | ||||||||||||||||||||||||||
), "Partitions are not block partitioned after apply." | ||||||||||||||||||||||||||
), "Partitions list shape is incorrect." | ||||||||||||||||||||||||||
assert all( | ||||||||||||||||||||||||||
isinstance(ptn, large_modin_frame._partition_mgr_cls._partition_class) | ||||||||||||||||||||||||||
for ptn in large_modin_frame._partitions.flatten() | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
isinstance(ptn, new_large_modin_frame._partition_mgr_cls._partition_class) | ||||||||||||||||||||||||||
for ptn in new_large_modin_frame._partitions.flatten() | ||||||||||||||||||||||||||
), "Partitions are not block partitioned after apply." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per #4494, we are currently calculating partition shapes serially in other places than the one you changed here, e.g. here for Ray, in Ray virtual partitioning
length
, and in the same placelength
for the new dask virtual partition class. The dask blocking means that this function actually blocks on inner partitions if the Modin frame consists of virtual partitions that are themselves made of virtual partitions.In my opinion, we should parallelize getting all the partition shapes correctly in a separate fix for #4494.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I'm wrong about this function blocking on inner partitions, because it doesn't call
length
orwidth
. So I think it's correct. Still, this part seems outside the scope of this PR. I'd rather worry about this fix in a separate PR for #4494.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to include here, since the original code to get length + width in parallel breaks when applied to axis partitions so I'll need to fix that code anyways, and if I'm doing that, I may as well fix the code to just do it all in parallel right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the original code was broken, I think it's okay to include the partial fix for #4494 here.