Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7337: Using dynamic partitionning in broadcast_apply #7338

Merged
merged 11 commits into from
Aug 26, 2024
8 changes: 5 additions & 3 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,11 @@ def aggregate_on_dict(grp_obj, *args, **kwargs):
)

native_res_part = [] if native_agg_res is None else [native_agg_res]
result = pandas.concat(
[*native_res_part, *custom_results], axis=1, copy=False
)
parts = [*native_res_part, *custom_results]
if parts:
result = pandas.concat(parts, axis=1, copy=False)
else:
result = pandas.DataFrame(columns=result_columns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important to keep the column names in case of empty partitions? How did you come to this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do it to get the expectable result, because result columns are known already. Would it bring any problems or slowdowns?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make sure that Modin's behavior matches that of a pandas. Do we have a test for this new code?


# The order is naturally preserved if there's no custom aggregations
if preserve_aggregation_order and len(custom_aggs):
Expand Down
59 changes: 57 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def get_partitions(index):

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(cls, axis, apply_func, left, right):
def base_broadcast_apply(cls, axis, apply_func, left, right):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function.

Expand Down Expand Up @@ -504,6 +504,7 @@ def broadcast_axis_partitions(
keep_partitioning=False,
num_splits=None,
apply_indices=None,
broadcast_all=True,
enumerate_partitions=False,
lengths=None,
apply_func_args=None,
Expand Down Expand Up @@ -532,6 +533,8 @@ def broadcast_axis_partitions(
then the number of splits is preserved.
apply_indices : list of ints, default: None
Indices of `axis ^ 1` to apply function over.
broadcast_all : bool, default: True
Whether or not to pass all right axis partitions to each of the left axis partitions.
enumerate_partitions : bool, default: False
Whether or not to pass partition index into `apply_func`.
Note that `apply_func` must be able to accept `partition_idx` kwarg.
Expand Down Expand Up @@ -578,7 +581,6 @@ def broadcast_axis_partitions(
# load-balance the data as well.
kw = {
"num_splits": num_splits,
"other_axis_partition": right_partitions,
"maintain_partitioning": keep_partitioning,
}
if lengths:
Expand All @@ -593,6 +595,9 @@ def broadcast_axis_partitions(
left_partitions[i].apply(
preprocessed_map_func,
*(apply_func_args if apply_func_args else []),
other_axis_partition=(
right_partitions if broadcast_all else right_partitions[i]
),
**kw,
**({"partition_idx": idx} if enumerate_partitions else {}),
**kwargs,
Expand Down Expand Up @@ -648,6 +653,56 @@ def base_map_partitions(
]
)

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(
cls,
axis,
apply_func,
left,
right,
):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance.

Parameters
----------
axis : {0, 1}
Axis to apply and broadcast over.
apply_func : callable
Function to apply.
left : np.ndarray
NumPy array of left partitions.
right : np.ndarray
NumPy array of right partitions.

Returns
-------
np.ndarray
NumPy array of result partition objects.
"""
if not DynamicPartitioning.get():
# block-wise broadcast
new_partitions = cls.base_broadcast_apply(
axis,
apply_func,
left,
right,
)
else:
# The dynamic partitioning behavior of `broadcast_apply` differs from that of `map_partitions`,
# since the columnar approach for `broadcast_apply` results in slowdown.
# axis-wise broadcast
new_partitions = cls.broadcast_axis_partitions(
axis=axis ^ 1,
left=left,
right=right,
apply_func=apply_func,
broadcast_all=False,
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
Expand Down
4 changes: 1 addition & 3 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3157,9 +3157,7 @@ def dropna(self, **kwargs):
lib.no_default,
None,
)
# FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394
# if there are too many partitions then all non-full-axis implementations start acting very badly.
# The here threshold is pretty random though it works fine on simple scenarios
# The map reduce approach works well for frames with few columnar partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# The map reduce approach works well for frames with few columnar partitions
# The map reduce approach works well for frames with few row or column partitions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not correct. I got the follows results:
If Dataframe has few rows (less than 10^7) the old way show a better performance regardless of the number of columns.
If Dataframe has plenty of rows (more than 10^7) and few columns the map reduce approach is better.
If Dataframe has plenty of rows (more than 10^7) and plenty of columns the old approach is better again.

But I didn't add any new conditions here, because this is beyond the scope of the current task.

processable_amount_of_partitions = (
self._modin_frame.num_parts < CpuCount.get() * 32
)
Expand Down
33 changes: 32 additions & 1 deletion modin/tests/pandas/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
import pytest

import modin.pandas as pd
from modin.config import IsRayCluster, NPartitions, RangePartitioning, StorageFormat
from modin.config import (
IsRayCluster,
NPartitions,
RangePartitioning,
StorageFormat,
context,
)
from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
Expand Down Expand Up @@ -2431,6 +2437,31 @@ def test_multi_column_groupby_different_partitions(
)


def test_empty_partitions_after_groupby():
def func_to_apply(grp):
return grp.agg(
{
list(test_data_values[0].keys())[1]: "sum",
list(test_data_values[0].keys())[-1]: "sum",
}
)

data = test_data_values[0]
md_df, pd_df = create_test_dfs(data)
by = pd_df.columns[0]

with context(DynamicPartitioning=True):
md_grp, pd_grp = (
md_df.groupby(by),
pd_df.groupby(by),
)
eval_general(
md_grp,
pd_grp,
func_to_apply,
)


@pytest.mark.parametrize(
"by",
[
Expand Down
Loading