Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#2375: implementation of multi-column groupby aggregation #2461

Merged
merged 24 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7b3ed3c
FEAT-#2375: draft implementation of multi-column groupby agg
dchigarev Nov 17, 2020
fe7cf95
FEAT-#2375: bug fixing
dchigarev Nov 19, 2020
2ec80ba
FEAT-#2375: code cleaning
dchigarev Nov 19, 2020
5caaa72
FEAT-#2375: removed unused code
dchigarev Nov 19, 2020
74644b5
FEAT-#2375: fix failing lookup for Series objects at OmniSci backend
dchigarev Nov 19, 2020
8ad50fb
FEAT-#2375: 'try_cast_to_pandas' squeeze parameter added
dchigarev Nov 19, 2020
7a6ac46
FEAT-#2375: fix BaseOnPython tests
dchigarev Nov 20, 2020
bc921c8
FEAT-#2375: code cleaning
dchigarev Nov 20, 2020
fad766d
FEAT-#2375: 'squeeze' warning fixed
dchigarev Nov 20, 2020
b030342
FEAT-#2375: revert some changes
dchigarev Nov 25, 2020
ceb61b0
FEAT-#2375: addressing comments
dchigarev Nov 26, 2020
4715782
FEAT-#2375: revert some changes
dchigarev Nov 26, 2020
62f5eb3
FEAT-#2375: fixed cases where aggregation returns Series
dchigarev Nov 26, 2020
70d1e4d
FEAT-#2375: optimized 'dict-agg' case by adding 'apply_indices' param
dchigarev Nov 27, 2020
9f2cade
FEAT-#2375: fixed cases when 'func' in 'apply' was built-in
dchigarev Nov 27, 2020
835732a
FEAT-#2375: addressing comments
dchigarev Nov 30, 2020
1681ea9
FEAT-#2375: doc fixing
dchigarev Dec 1, 2020
f233e90
FEAT-#2375: added failing tests
dchigarev Dec 2, 2020
2321186
FEAT-#2375: bug fixing
dchigarev Dec 2, 2020
39faca8
FEAT-#2375: typo fixes
dchigarev Dec 4, 2020
db5ab68
FEAT-#2375: fixed groupby.size at base backend
dchigarev Dec 7, 2020
047ec19
FEAT-#2375: removed unnecessary 'drop' in '_apply_agg_function'
dchigarev Dec 7, 2020
7474170
FEAT-#2375: removed unused code
dchigarev Dec 7, 2020
6558f86
FEAT-#2375: addressing comments
dchigarev Dec 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,7 @@ def groupby_size(
reduce_args=reduce_args,
numeric_only=numeric_only,
drop=drop,
method="size",
)

def groupby_agg(
Expand All @@ -1407,13 +1408,10 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
if is_multi_by:
if isinstance(by, type(self)) and len(by.columns) == 1:
by = by.columns[0] if drop else by.to_pandas().squeeze()
elif isinstance(by, type(self)):
by = list(by.columns)
else:
by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by
if isinstance(by, type(self)) and len(by.columns) == 1:
by = by.columns[0] if drop else by.to_pandas().squeeze()
elif isinstance(by, type(self)):
by = list(by.columns)

return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
self,
Expand Down
163 changes: 135 additions & 28 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2525,34 +2525,74 @@ def groupby_agg(
if callable(agg_func):
agg_func = wrap_udf_function(agg_func)

if is_multi_by:
return super().groupby_agg(
by=by,
is_multi_by=is_multi_by,
axis=axis,
agg_func=agg_func,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
groupby_kwargs=groupby_kwargs,
drop=drop,
)

by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by

# since we're going to modify `groupby_kwargs` dict in a `groupby_agg_builder`,
# we want to copy it to not propagate these changes into source dict, in case
# of unsuccessful end of function
groupby_kwargs = groupby_kwargs.copy()

as_index = groupby_kwargs.get("as_index", True)
if isinstance(by, type(self)):
# `drop` parameter indicates whether or not 'by' data came
# from the `self` frame:
# True: 'by' data came from the `self`
# False: external 'by' data
if drop:
internal_by = by.columns
by = [by]
else:
internal_by = []
by = [by]
else:
if not isinstance(by, list):
by = [by]
internal_by = [o for o in by if isinstance(o, str) and o in self.columns]
internal_qc = (
[self.getitem_column_array(internal_by)] if len(internal_by) else []
)

by = internal_qc + by[len(internal_by) :]
YarShev marked this conversation as resolved.
Show resolved Hide resolved

broadcastable_by = [o._modin_frame for o in by if isinstance(o, type(self))]
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
YarShev marked this conversation as resolved.
Show resolved Hide resolved
not_broadcastable_by = [o for o in by if not isinstance(o, type(self))]

def groupby_agg_builder(df):
def groupby_agg_builder(df, by=None, drop=False, partition_idx=None):
# Set `as_index` to True to track the metadata of the grouping object
# It is used to make sure that between phases we are constructing the
# right index and placing columns in the correct order.
groupby_kwargs["as_index"] = True

def compute_groupby(df):
internal_by_cols = pandas.Index([])
missmatched_cols = pandas.Index([])
if by is not None:
internal_by_df = by[internal_by]

if isinstance(internal_by_df, pandas.Series):
internal_by_df = internal_by_df.to_frame()

missmatched_cols = internal_by_df.columns.difference(df.columns)
df = pandas.concat(
[df, internal_by_df[missmatched_cols]],
axis=1,
copy=False,
)
internal_by_cols = internal_by_df.columns

external_by = by.columns.difference(internal_by)
external_by_df = by[external_by].squeeze(axis=1)

if isinstance(external_by_df, pandas.DataFrame):
external_by_cols = [o for _, o in external_by_df.iteritems()]
else:
external_by_cols = [external_by_df]

by = internal_by_cols.tolist() + external_by_cols

else:
by = []

by += not_broadcastable_by

def compute_groupby(df, drop=False, partition_idx=0):
grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs)
try:
if isinstance(agg_func, dict):
Expand All @@ -2569,17 +2609,91 @@ def compute_groupby(df):
# issues with extracting the index.
except (DataError, TypeError):
result = pandas.DataFrame(index=grouped_df.size().index)
if isinstance(result, pandas.Series):
result = result.to_frame(
result.name if result.name is not None else "__reduced__"
)

result_cols = result.columns
result.drop(columns=missmatched_cols, inplace=True, errors="ignore")

if not as_index:
keep_index_levels = len(by) > 1 and any(
isinstance(x, pandas.CategoricalDtype)
for x in df[internal_by_cols].dtypes
)

cols_to_insert = (
internal_by_cols.intersection(result_cols)
if keep_index_levels
else internal_by_cols.difference(result_cols)
)

if keep_index_levels:
result.drop(
columns=cols_to_insert, inplace=True, errors="ignore"
)

drop = True
if partition_idx == 0:
drop = False
if not keep_index_levels:
lvls_to_drop = [
i
for i, name in enumerate(result.index.names)
if name not in cols_to_insert
]
if len(lvls_to_drop) == result.index.nlevels:
drop = True
else:
result.index = result.index.droplevel(lvls_to_drop)

if (
not isinstance(result.index, pandas.MultiIndex)
and result.index.name is None
):
drop = True

result.reset_index(drop=drop, inplace=True)

new_index_names = [
None
if isinstance(name, str) and name.startswith("__reduced__")
else name
for name in result.index.names
]

cols_to_drop = (
result.columns[result.columns.str.match(r"__reduced__.*", na=False)]
if hasattr(result.columns, "str")
else []
)

result.index.names = new_index_names

# Not dropping columns if result is Series
if len(result.columns) > 1:
result.drop(columns=cols_to_drop, inplace=True)

return result

try:
return compute_groupby(df)
return compute_groupby(df, drop, partition_idx)
# This will happen with Arrow buffer read-only errors. We don't want to copy
# all the time, so this will try to fast-path the code first.
except (ValueError, KeyError):
return compute_groupby(df.copy())
return compute_groupby(df.copy(), drop, partition_idx)

new_modin_frame = self._modin_frame._apply_full_axis(
axis, lambda df: groupby_agg_builder(df)
apply_indices = list(agg_func.keys()) if isinstance(agg_func, dict) else None
YarShev marked this conversation as resolved.
Show resolved Hide resolved

new_modin_frame = self._modin_frame.broadcast_apply_full_axis(
axis=axis,
func=lambda df, by=None, partition_idx=None: groupby_agg_builder(
df, by, drop, partition_idx
),
other=broadcastable_by,
apply_indices=apply_indices,
enumerate_partitions=True,
)
result = self.__constructor__(new_modin_frame)

Expand All @@ -2598,14 +2712,7 @@ def compute_groupby(df):
except Exception as e:
raise type(e)("No numeric types to aggregate.")

# Reset `as_index` because it was edited inplace.
groupby_kwargs["as_index"] = as_index
if as_index:
return result
else:
if result.index.name is None or result.index.name in result.columns:
drop = False
return result.reset_index(drop=not drop)
return result

# END Manual Partitioning methods

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class GroupBy:
@classmethod
def validate_by(cls, by):
def try_cast_series(df):
if isinstance(df, pandas.DataFrame):
df = df.squeeze(axis=1)
if not isinstance(df, pandas.Series):
return df
if df.name == "__reduced__":
Expand Down Expand Up @@ -73,11 +75,6 @@ def fn(
):
by = cls.validate_by(by)

if not is_multi_by:
groupby_args = groupby_args.copy()
as_index = groupby_args.pop("as_index", True)
groupby_args["as_index"] = True

grp = df.groupby(by, axis=axis, **groupby_args)
agg_func = cls.get_func(grp, key, **kwargs)
result = (
Expand All @@ -86,15 +83,7 @@ def fn(
else agg_func(grp, **agg_args)
)

if not is_multi_by:
if as_index:
return result
else:
if result.index.name is None or result.index.name in result.columns:
drop = False
return result.reset_index(drop=not drop)
else:
return result
return result
YarShev marked this conversation as resolved.
Show resolved Hide resolved

return fn

Expand All @@ -111,6 +100,7 @@ def fn(
**kwargs
):
if not isinstance(by, (pandas.Series, pandas.DataFrame)):
by = cls.validate_by(by)
return agg_func(
df.groupby(by=by, axis=axis, **groupby_args), **map_args
)
Expand All @@ -137,11 +127,16 @@ def fn(
grp = df.groupby(by, axis=axis, **groupby_args)
result = agg_func(grp, **map_args)

if isinstance(result, pandas.Series):
result = result.to_frame()

if not as_index:
if (
len(result.index.names) == 1 and result.index.names[0] is None
) or all([name in result.columns for name in result.index.names]):
drop = False
elif kwargs.get("method") == "size":
YarShev marked this conversation as resolved.
Show resolved Hide resolved
drop = True
result = result.reset_index(drop=not drop)

if result.index.name == "__reduced__":
Expand Down
2 changes: 1 addition & 1 deletion modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def caller(
drop=False,
):
if not isinstance(by, (type(query_compiler), str)):
by = try_cast_to_pandas(by)
by = try_cast_to_pandas(by, squeeze=True)
return query_compiler.default_to_pandas(
lambda df: map_func(
df.groupby(by=by, axis=axis, **groupby_args), **map_args
Expand Down
18 changes: 10 additions & 8 deletions modin/engines/base/frame/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from abc import ABC
import pandas
import numpy as np
from modin.data_management.utils import split_result_of_axis_func_pandas


Expand Down Expand Up @@ -146,12 +147,13 @@ def apply(
if other_axis_partition is not None:
if not isinstance(other_axis_partition, list):
other_axis_partition = [other_axis_partition]
other_shape = (
len(other_axis_partition),
len(other_axis_partition[0].list_of_blocks),

# (other_shape[i-1], other_shape[i]) will indicate slice
# to restore i-1 axis partition
other_shape = np.cumsum(
[0] + [len(o.list_of_blocks) for o in other_axis_partition]
)
if not self.axis:
other_shape = tuple(reversed(other_shape))

return self._wrap_partitions(
self.deploy_func_between_two_axis_partitions(
self.axis,
Expand Down Expand Up @@ -268,14 +270,14 @@ def deploy_func_between_two_axis_partitions(

rt_parts = partitions[len_of_left:]

# reshaping flattened `rt_parts` array into with shape `other_shape`
# reshaping flattened `rt_parts` array into a frame with shape `other_shape`
combined_axis = [
pandas.concat(
[rt_parts[other_shape[axis] * i + j] for j in range(other_shape[axis])],
rt_parts[other_shape[i - 1] : other_shape[i]],
axis=axis,
copy=False,
)
for i in range(other_shape[axis ^ 1])
for i in range(1, len(other_shape))
]
rt_frame = pandas.concat(combined_axis, axis=axis ^ 1, copy=False)

Expand Down
Loading