Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged groupby_agg and groupby_dict_agg to implement dictionary functions aggregations #2317

Merged
merged 11 commits into from
Nov 13, 2020
Merged
10 changes: 0 additions & 10 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1429,16 +1429,6 @@ def groupby_agg(
drop=drop,
)

def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
self,
by=by,
func_dict=func_dict,
groupby_args=groupby_args,
agg_args=agg_args,
drop=drop,
)

# END Manual Partitioning methods

def unstack(self, level, fill_value):
Expand Down
27 changes: 23 additions & 4 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,14 +2573,27 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
agg_func = wrap_udf_function(agg_func)
if callable(agg_func):
agg_func = wrap_udf_function(agg_func)

if is_multi_by:
# If function was kept as a dictionary until now, it is now necessary to repeat all steps
# that were skipped previously, that is, make it a lambda. This is necessary
# because default to pandas is unable to operate with dictionary aggregation function argument,
# it accepts only callable functions.
if isinstance(agg_func, dict):
callable_func = wrap_udf_function(
lambda df, *args, **kwargs: df.aggregate(
agg_func, *agg_args, **agg_kwargs
)
)
else:
callable_func = agg_func
return super().groupby_agg(
by=by,
is_multi_by=is_multi_by,
axis=axis,
agg_func=agg_func,
agg_func=callable_func,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
groupby_kwargs=groupby_kwargs,
Expand All @@ -2605,7 +2618,11 @@ def groupby_agg_builder(df):
def compute_groupby(df):
grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs)
try:
result = agg_func(grouped_df, **agg_kwargs)
result = (
grouped_df.agg(agg_func)
if isinstance(agg_func, dict)
else agg_func(grouped_df, **agg_kwargs)
)
# This happens when the partition is filled with non-numeric data and a
# numeric operation is done. We need to build the index here to avoid
# issues with extracting the index.
Expand All @@ -2631,7 +2648,9 @@ def compute_groupby(df):
# determening type of raised exception by applying `aggfunc`
# to empty DataFrame
try:
agg_func(
pandas.DataFrame(index=[1], columns=[1]).agg(agg_func) if isinstance(
agg_func, dict
) else agg_func(
pandas.DataFrame(index=[1], columns=[1]).groupby(level=0),
**agg_kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ def fn(

grp = df.groupby(by, axis=axis, **groupby_args)
agg_func = cls.get_func(grp, key, **kwargs)
result = agg_func(grp, **agg_args)
result = (
grp.agg(agg_func, **agg_args)
if isinstance(agg_func, dict)
else agg_func(grp, **agg_args)
)

if not is_multi_by:
if as_index:
Expand Down
27 changes: 0 additions & 27 deletions modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,33 +279,6 @@ def groupby_agg(
)
return self.__constructor__(new_frame)

def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
"""Apply aggregation functions to a grouped dataframe per-column.

Parameters
----------
by : DFAlgQueryCompiler
The column to group by
func_dict : dict of str, callable/string
The dictionary mapping of column to function
groupby_args : dict
The dictionary of keyword arguments for the group by.
agg_args : dict
The dictionary of keyword arguments for the aggregation functions
drop : bool
Whether or not to drop the column from the data.

Returns
-------
DFAlgQueryCompiler
The result of the per-column aggregations on the grouped dataframe.
"""
# TODO: handle `drop` arg
new_frame = self._modin_frame.groupby_agg(
by, 0, func_dict, groupby_args, **agg_args
)
return self.__constructor__(new_frame)

def count(self, **kwargs):
return self._agg("count", **kwargs)

Expand Down
6 changes: 3 additions & 3 deletions modin/experimental/engines/omnisci_on_ray/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1251,11 +1251,11 @@ def set_index_name(self, name):
return self

names = self._mangle_index_names([name])
exprs = OrderedDict()
if self._index_cols is None:
exprs = OrderedDict()
exprs[name] = self.ref("__rowid__")
exprs[names[0]] = self.ref("__rowid__")
else:
exprs = self._index_exprs()
exprs[names[0]] = self.ref(self._index_cols[0])

for col in self.columns:
exprs[col] = self.ref(col)
Expand Down
52 changes: 19 additions & 33 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ def aggregate(self, func=None, *args, **kwargs):
# This is not implemented in pandas,
# so we throw a different message
raise NotImplementedError("axis other than 0 is not supported")

relabeling_required = False
if isinstance(func, dict) or func is None:

def _reconstruct_func(func, **kwargs):
Expand All @@ -380,50 +382,32 @@ def _reconstruct_func(func, **kwargs):
from pandas.core.base import SpecificationError

raise SpecificationError("nested renamer is not supported")
if isinstance(self._by, type(self._query_compiler)):
by = list(self._by.columns)
else:
by = self._by

subset_cols = list(func_dict.keys()) + (
list(self._by.columns)
if isinstance(self._by, type(self._query_compiler))
and all(c in self._df.columns for c in self._by.columns)
else []
)
result = type(self._df)(
query_compiler=self._df[subset_cols]._query_compiler.groupby_dict_agg(
by=by,
func_dict=func_dict,
groupby_args=self._kwargs,
agg_args=kwargs,
drop=self._drop,
)
)

if relabeling_required:
result = result.iloc[:, order]
result.columns = new_columns

return result

if is_list_like(func):
func = func_dict
elif is_list_like(func):
return self._default_to_pandas(
lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs),
*args,
**kwargs,
)
if isinstance(func, str):
agg_func = getattr(self, func, None)
elif isinstance(func, str):
# Using "getattr" here masks possible AttributeError which we throw
# in __getattr__, so we should call __getattr__ directly instead.
agg_func = self.__getattr__(func)
if callable(agg_func):
return agg_func(*args, **kwargs)
return self._apply_agg_function(
lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs),

result = self._apply_agg_function(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible case when _apply_agg_function returns Series and relabeling_required is True? If yes, lines are below result = result.iloc[:, order] and result.columns = new_columns will throw an exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an example when DataFrameGroupBy.agg returns a Series? We could add this case to tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of an example right now. However, return value depends on type of member DataFrameGroupBy._df, which can be Series in some case, but I am not sure regarding this operation. It is not critical for me to merge the PR. We can wait till anyone (users) will be able to face with that case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going by documentation pandas groupby aggregation always returns DataFrame, so there is two options:

  1. We can believe that every backend returns us DataFrame-like qc (it should do, but that backend responsibility is described nowhere), and then we're okay with the current code
  2. If we consider that it's the API layer responsibility to handle Series/DataFrame cases then we can add explicit to frame conversion of the aggregation result

Copy link
Collaborator

@dchigarev dchigarev Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found out that we're already converting aggregation result to the DataFrame explicitly, so our current code about reordering will work correct

result = type(self._df)(query_compiler=new_manager)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._df can be either DataFrame or Series and in depend of that different objects can be created after type(self._df)(query_compiler=new_manager). This is my concern.

Copy link
Collaborator

@YarShev YarShev Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, currently I don't see a case when self._df is DataFrame, but it was some time ago. At the same time we can get Series from _apply_agg_function via this line. Since pandas is going to remove squeeze parameter in future then we can keep the code without converting to DataFrame.

Copy link
Collaborator

@dchigarev dchigarev Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right, I just forgot about that aggregation for Series can also obtain dictionary function...

Then it seems that Modin has a lack of testing Series dictionary aggregation, because yes, it can happen that Series aggregation will return DataFrame:

>>> pandas.Series([1, 2, 3]).groupby(level=0).agg(max=("max"))
   max
0    1
1    2
2    3

Then I believe, that because of the line above we will get incorrect result of aggregation (Series instead of DataFrame), however it should be tested.

There is also another problem that we can face, pandas processes dictionary functions differently for
Series and DataFrame groupby aggregation, since we use general implementation for DataFrame, this also can cause some problems.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we definitely should add tests for Series groupby dict aggregation, and if them fails and it will be difficult to fix, create an issue and fix that in another PR, since this PR is just about merging dict aggregation with regular one

func,
drop=self._as_index,
*args,
**kwargs,
)

if relabeling_required:
result = result.iloc[:, order]
result.columns = new_columns
return result

agg = aggregate

def last(self, **kwargs):
Expand Down Expand Up @@ -888,7 +872,9 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
-------
A new combined DataFrame with the result of all groups.
"""
assert callable(f), "'{0}' object is not callable".format(type(f))
assert callable(f) or isinstance(
f, dict
), "'{0}' object is not callable and not a dict".format(type(f))

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
Expand Down
16 changes: 13 additions & 3 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ def test_mixed_dtypes_groupby(as_index):
eval_var(modin_groupby, pandas_groupby)
eval_skew(modin_groupby, pandas_groupby)

agg_functions = ["min", "max"]
agg_functions = [
"min",
"max",
{"col2": "sum"},
{"col2": "max", "col4": "sum", "col5": "min"},
]
for func in agg_functions:
eval_agg(modin_groupby, pandas_groupby, func)
eval_aggregate(modin_groupby, pandas_groupby, func)
Expand Down Expand Up @@ -479,7 +484,12 @@ def test_single_group_row_groupby():
eval_prod(modin_groupby, pandas_groupby)
eval_std(modin_groupby, pandas_groupby)

agg_functions = ["min", "max"]
agg_functions = [
"min",
"max",
{"col2": "sum"},
{"col2": "max", "col4": "sum", "col5": "min"},
]
for func in agg_functions:
eval_agg(modin_groupby, pandas_groupby, func)
eval_aggregate(modin_groupby, pandas_groupby, func)
Expand Down Expand Up @@ -595,7 +605,7 @@ def test_large_row_groupby(is_by_category):
# eval_prod(modin_groupby, pandas_groupby) causes overflows
eval_std(modin_groupby, pandas_groupby)

agg_functions = ["min", "max"]
agg_functions = ["min", "max", {"A": "sum"}, {"A": "max", "B": "sum", "C": "min"}]
for func in agg_functions:
eval_agg(modin_groupby, pandas_groupby, func)
eval_aggregate(modin_groupby, pandas_groupby, func)
Expand Down