Skip to content

Commit

Permalink
FEAT-#2375: implementation of multi-column groupby aggregation (#2461)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev authored Dec 8, 2020
1 parent d710a16 commit 7c46bdd
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 127 deletions.
12 changes: 5 additions & 7 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,7 @@ def groupby_size(
reduce_args=reduce_args,
numeric_only=numeric_only,
drop=drop,
method="size",
)

def groupby_agg(
Expand All @@ -1407,13 +1408,10 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
if is_multi_by:
if isinstance(by, type(self)) and len(by.columns) == 1:
by = by.columns[0] if drop else by.to_pandas().squeeze()
elif isinstance(by, type(self)):
by = list(by.columns)
else:
by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by
if isinstance(by, type(self)) and len(by.columns) == 1:
by = by.columns[0] if drop else by.to_pandas().squeeze()
elif isinstance(by, type(self)):
by = list(by.columns)

return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
self,
Expand Down
163 changes: 135 additions & 28 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2525,34 +2525,74 @@ def groupby_agg(
if callable(agg_func):
agg_func = wrap_udf_function(agg_func)

if is_multi_by:
return super().groupby_agg(
by=by,
is_multi_by=is_multi_by,
axis=axis,
agg_func=agg_func,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
groupby_kwargs=groupby_kwargs,
drop=drop,
)

by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by

# since we're going to modify `groupby_kwargs` dict in a `groupby_agg_builder`,
# we want to copy it to not propagate these changes into source dict, in case
# of unsuccessful end of function
groupby_kwargs = groupby_kwargs.copy()

as_index = groupby_kwargs.get("as_index", True)
if isinstance(by, type(self)):
# `drop` parameter indicates whether or not 'by' data came
# from the `self` frame:
# True: 'by' data came from the `self`
# False: external 'by' data
if drop:
internal_by = by.columns
by = [by]
else:
internal_by = []
by = [by]
else:
if not isinstance(by, list):
by = [by]
internal_by = [o for o in by if isinstance(o, str) and o in self.columns]
internal_qc = (
[self.getitem_column_array(internal_by)] if len(internal_by) else []
)

by = internal_qc + by[len(internal_by) :]

broadcastable_by = [o._modin_frame for o in by if isinstance(o, type(self))]
not_broadcastable_by = [o for o in by if not isinstance(o, type(self))]

def groupby_agg_builder(df):
def groupby_agg_builder(df, by=None, drop=False, partition_idx=None):
# Set `as_index` to True to track the metadata of the grouping object
# It is used to make sure that between phases we are constructing the
# right index and placing columns in the correct order.
groupby_kwargs["as_index"] = True

def compute_groupby(df):
internal_by_cols = pandas.Index([])
missmatched_cols = pandas.Index([])
if by is not None:
internal_by_df = by[internal_by]

if isinstance(internal_by_df, pandas.Series):
internal_by_df = internal_by_df.to_frame()

missmatched_cols = internal_by_df.columns.difference(df.columns)
df = pandas.concat(
[df, internal_by_df[missmatched_cols]],
axis=1,
copy=False,
)
internal_by_cols = internal_by_df.columns

external_by = by.columns.difference(internal_by)
external_by_df = by[external_by].squeeze(axis=1)

if isinstance(external_by_df, pandas.DataFrame):
external_by_cols = [o for _, o in external_by_df.iteritems()]
else:
external_by_cols = [external_by_df]

by = internal_by_cols.tolist() + external_by_cols

else:
by = []

by += not_broadcastable_by

def compute_groupby(df, drop=False, partition_idx=0):
grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs)
try:
if isinstance(agg_func, dict):
Expand All @@ -2569,17 +2609,91 @@ def compute_groupby(df):
# issues with extracting the index.
except (DataError, TypeError):
result = pandas.DataFrame(index=grouped_df.size().index)
if isinstance(result, pandas.Series):
result = result.to_frame(
result.name if result.name is not None else "__reduced__"
)

result_cols = result.columns
result.drop(columns=missmatched_cols, inplace=True, errors="ignore")

if not as_index:
keep_index_levels = len(by) > 1 and any(
isinstance(x, pandas.CategoricalDtype)
for x in df[internal_by_cols].dtypes
)

cols_to_insert = (
internal_by_cols.intersection(result_cols)
if keep_index_levels
else internal_by_cols.difference(result_cols)
)

if keep_index_levels:
result.drop(
columns=cols_to_insert, inplace=True, errors="ignore"
)

drop = True
if partition_idx == 0:
drop = False
if not keep_index_levels:
lvls_to_drop = [
i
for i, name in enumerate(result.index.names)
if name not in cols_to_insert
]
if len(lvls_to_drop) == result.index.nlevels:
drop = True
else:
result.index = result.index.droplevel(lvls_to_drop)

if (
not isinstance(result.index, pandas.MultiIndex)
and result.index.name is None
):
drop = True

result.reset_index(drop=drop, inplace=True)

new_index_names = [
None
if isinstance(name, str) and name.startswith("__reduced__")
else name
for name in result.index.names
]

cols_to_drop = (
result.columns[result.columns.str.match(r"__reduced__.*", na=False)]
if hasattr(result.columns, "str")
else []
)

result.index.names = new_index_names

# Not dropping columns if result is Series
if len(result.columns) > 1:
result.drop(columns=cols_to_drop, inplace=True)

return result

try:
return compute_groupby(df)
return compute_groupby(df, drop, partition_idx)
# This will happen with Arrow buffer read-only errors. We don't want to copy
# all the time, so this will try to fast-path the code first.
except (ValueError, KeyError):
return compute_groupby(df.copy())
return compute_groupby(df.copy(), drop, partition_idx)

new_modin_frame = self._modin_frame._apply_full_axis(
axis, lambda df: groupby_agg_builder(df)
apply_indices = list(agg_func.keys()) if isinstance(agg_func, dict) else None

new_modin_frame = self._modin_frame.broadcast_apply_full_axis(
axis=axis,
func=lambda df, by=None, partition_idx=None: groupby_agg_builder(
df, by, drop, partition_idx
),
other=broadcastable_by,
apply_indices=apply_indices,
enumerate_partitions=True,
)
result = self.__constructor__(new_modin_frame)

Expand All @@ -2598,14 +2712,7 @@ def compute_groupby(df):
except Exception as e:
raise type(e)("No numeric types to aggregate.")

# Reset `as_index` because it was edited inplace.
groupby_kwargs["as_index"] = as_index
if as_index:
return result
else:
if result.index.name is None or result.index.name in result.columns:
drop = False
return result.reset_index(drop=not drop)
return result

# END Manual Partitioning methods

Expand Down
23 changes: 9 additions & 14 deletions modin/data_management/functions/default_methods/groupby_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class GroupBy:
@classmethod
def validate_by(cls, by):
def try_cast_series(df):
if isinstance(df, pandas.DataFrame):
df = df.squeeze(axis=1)
if not isinstance(df, pandas.Series):
return df
if df.name == "__reduced__":
Expand Down Expand Up @@ -73,11 +75,6 @@ def fn(
):
by = cls.validate_by(by)

if not is_multi_by:
groupby_args = groupby_args.copy()
as_index = groupby_args.pop("as_index", True)
groupby_args["as_index"] = True

grp = df.groupby(by, axis=axis, **groupby_args)
agg_func = cls.get_func(grp, key, **kwargs)
result = (
Expand All @@ -86,15 +83,7 @@ def fn(
else agg_func(grp, **agg_args)
)

if not is_multi_by:
if as_index:
return result
else:
if result.index.name is None or result.index.name in result.columns:
drop = False
return result.reset_index(drop=not drop)
else:
return result
return result

return fn

Expand All @@ -111,6 +100,7 @@ def fn(
**kwargs
):
if not isinstance(by, (pandas.Series, pandas.DataFrame)):
by = cls.validate_by(by)
return agg_func(
df.groupby(by=by, axis=axis, **groupby_args), **map_args
)
Expand All @@ -137,11 +127,16 @@ def fn(
grp = df.groupby(by, axis=axis, **groupby_args)
result = agg_func(grp, **map_args)

if isinstance(result, pandas.Series):
result = result.to_frame()

if not as_index:
if (
len(result.index.names) == 1 and result.index.names[0] is None
) or all([name in result.columns for name in result.index.names]):
drop = False
elif kwargs.get("method") == "size":
drop = True
result = result.reset_index(drop=not drop)

if result.index.name == "__reduced__":
Expand Down
2 changes: 1 addition & 1 deletion modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def caller(
drop=False,
):
if not isinstance(by, (type(query_compiler), str)):
by = try_cast_to_pandas(by)
by = try_cast_to_pandas(by, squeeze=True)
return query_compiler.default_to_pandas(
lambda df: map_func(
df.groupby(by=by, axis=axis, **groupby_args), **map_args
Expand Down
18 changes: 10 additions & 8 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from abc import ABC
import pandas
import numpy as np
from modin.data_management.utils import split_result_of_axis_func_pandas


Expand Down Expand Up @@ -146,12 +147,13 @@ def apply(
if other_axis_partition is not None:
if not isinstance(other_axis_partition, list):
other_axis_partition = [other_axis_partition]
other_shape = (
len(other_axis_partition),
len(other_axis_partition[0].list_of_blocks),

# (other_shape[i-1], other_shape[i]) will indicate slice
# to restore i-1 axis partition
other_shape = np.cumsum(
[0] + [len(o.list_of_blocks) for o in other_axis_partition]
)
if not self.axis:
other_shape = tuple(reversed(other_shape))

return self._wrap_partitions(
self.deploy_func_between_two_axis_partitions(
self.axis,
Expand Down Expand Up @@ -268,14 +270,14 @@ def deploy_func_between_two_axis_partitions(

rt_parts = partitions[len_of_left:]

# reshaping flattened `rt_parts` array into with shape `other_shape`
# reshaping flattened `rt_parts` array into a frame with shape `other_shape`
combined_axis = [
pandas.concat(
[rt_parts[other_shape[axis] * i + j] for j in range(other_shape[axis])],
rt_parts[other_shape[i - 1] : other_shape[i]],
axis=axis,
copy=False,
)
for i in range(other_shape[axis ^ 1])
for i in range(1, len(other_shape))
]
rt_frame = pandas.concat(combined_axis, axis=axis ^ 1, copy=False)

Expand Down
Loading

0 comments on commit 7c46bdd

Please sign in to comment.