Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#2269: Move default_to_pandas logic from API layer to backend #2332

Merged
merged 9 commits into from
Oct 30, 2020
43 changes: 38 additions & 5 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class BaseQueryCompiler(abc.ABC):

@abc.abstractmethod
def default_to_pandas(self, pandas_op, *args, **kwargs):
"""Default to pandas behavior.
"""
Default to pandas behavior.

Parameters
----------
Expand Down Expand Up @@ -1396,14 +1397,46 @@ def groupby_size(
drop=drop,
)

def groupby_agg(self, by, axis, agg_func, groupby_args, agg_args, drop=False):
def groupby_agg(
self,
by,
is_multi_by,
idx_name,
axis,
agg_func,
agg_args,
agg_kwargs,
groupby_kwargs,
drop_,
drop=False,
):
if is_multi_by:
if isinstance(by, type(self)) and len(by.columns) == 1:
by = by.columns[0] if drop else by.to_pandas().squeeze()
elif isinstance(by, type(self)):
by = list(by.columns)
else:
by = by
else:
by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
new_self = (
self.drop(columns=[idx_name])
if idx_name is not None and drop_ and drop
else self
)

return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
self,
self if is_multi_by else new_self,
by=by,
is_multi_by=is_multi_by,
axis=axis,
agg_func=agg_func,
groupby_args=groupby_args,
agg_args=agg_args,
groupby_args=groupby_kwargs,
agg_args=agg_kwargs,
drop=drop,
)

Expand Down
107 changes: 92 additions & 15 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ def __init__(self, modin_frame):
self._modin_frame = modin_frame

def default_to_pandas(self, pandas_op, *args, **kwargs):
"""Default to pandas behavior.
"""
Default to pandas behavior.

Parameters
----------
Expand All @@ -206,8 +207,8 @@ def default_to_pandas(self, pandas_op, *args, **kwargs):
PandasQueryCompiler
The result of the `pandas_op`, converted back to PandasQueryCompiler

Note
----
Notes
-----
This operation takes a distributed object and converts it directly to pandas.
"""
op_name = getattr(pandas_op, "__name__", str(pandas_op))
Expand All @@ -228,6 +229,52 @@ def default_to_pandas(self, pandas_op, *args, **kwargs):
else:
return result

def _default_to_pandas_groupby(
self, f, by, axis, drop, groupby_kwargs, *args, **kwargs
):
"""
Default to pandas behavior.

Parameters
----------
f : callable
The function to apply to each group.
by : PandasQueryCompiler, mapping, function, label, or list of labels
Used to determine the groups for the groupby.
axis : 0 or 1
Split along rows (0) or columns (1).
drop : boolean
The flag in charge of the logic of inserting index into DataFrame columns.
groupby_kwargs
The keyword arguments for the groupby `pandas_op`
args
The arguments for the `pandas_op`
kwargs
The keyword arguments for the `pandas_op`

Returns
-------
PandasQueryCompiler
The result of the `pandas_op`, converted back to PandasQueryCompiler

Notes
-----
This operation takes a distributed object and converts it directly to pandas.
"""
if isinstance(by, type(self)) and len(by.columns) == 1:
by = by.columns[0] if drop else by.to_pandas().squeeze()
elif isinstance(by, type(self)):
by = list(by.columns)
else:
by = by

by = try_cast_to_pandas(by)

def groupby_on_multiple_columns(df, *args, **kwargs):
return f(df.groupby(by=by, axis=axis, **groupby_kwargs), *args, **kwargs)

return self.default_to_pandas(groupby_on_multiple_columns, *args, **kwargs)

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down Expand Up @@ -2583,24 +2630,54 @@ def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
lambda df: df.groupby(by=by, **groupby_args).agg(func_dict, **agg_args)
)

def groupby_agg(self, by, axis, agg_func, groupby_args, agg_args, drop=False):
# since we're going to modify `groupby_args` dict in a `groupby_agg_builder`,
def groupby_agg(
self,
by,
is_multi_by,
idx_name,
axis,
agg_func,
agg_args,
agg_kwargs,
groupby_kwargs,
drop_,
drop=False,
):
agg_func = wrap_udf_function(agg_func)

if is_multi_by:
return self._default_to_pandas_groupby(
agg_func, by, axis, drop, groupby_kwargs, *agg_args, **agg_kwargs
)
dchigarev marked this conversation as resolved.
Show resolved Hide resolved

by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
new_self = (
self.drop(columns=[idx_name])
if idx_name is not None and drop_ and drop
else self
)

# since we're going to modify `groupby_kwargs` dict in a `groupby_agg_builder`,
# we want to copy it to not propagate these changes into source dict, in case
# of unsuccessful end of function
groupby_args = groupby_args.copy()
groupby_kwargs = groupby_kwargs.copy()

as_index = groupby_args.get("as_index", True)
as_index = groupby_kwargs.get("as_index", True)

def groupby_agg_builder(df):
# Set `as_index` to True to track the metadata of the grouping object
# It is used to make sure that between phases we are constructing the
# right index and placing columns in the correct order.
groupby_args["as_index"] = True
groupby_kwargs["as_index"] = True

def compute_groupby(df):
grouped_df = df.groupby(by=by, axis=axis, **groupby_args)
grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs)
try:
result = agg_func(grouped_df, **agg_args)
result = agg_func(grouped_df, **agg_kwargs)
# This happens when the partition is filled with non-numeric data and a
# numeric operation is done. We need to build the index here to avoid
# issues with extracting the index.
Expand All @@ -2615,26 +2692,26 @@ def compute_groupby(df):
except (ValueError, KeyError):
return compute_groupby(df.copy())

new_modin_frame = self._modin_frame._apply_full_axis(
new_modin_frame = new_self._modin_frame._apply_full_axis(
axis, lambda df: groupby_agg_builder(df)
)
result = self.__constructor__(new_modin_frame)
result = new_self.__constructor__(new_modin_frame)

# that means that exception in `compute_groupby` was raised
# in every partition, so we also should raise it
if len(result.columns) == 0 and len(self.columns) != 0:
if len(result.columns) == 0 and len(new_self.columns) != 0:
# determening type of raised exception by applying `aggfunc`
# to empty DataFrame
try:
agg_func(
pandas.DataFrame(index=[1], columns=[1]).groupby(level=0),
**agg_args,
**agg_kwargs,
)
except Exception as e:
raise type(e)("No numeric types to aggregate.")

# Reset `as_index` because it was edited inplace.
groupby_args["as_index"] = as_index
groupby_kwargs["as_index"] = as_index
if as_index:
return result
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,36 @@ def get_func(cls, grp, key, **kwargs):

@classmethod
def build_aggregate_method(cls, key):
def fn(df, by, groupby_args, agg_args, axis=0, drop=False, **kwargs):
def fn(
df,
by,
groupby_args,
agg_args,
axis=0,
is_multi_by=None,
drop=False,
**kwargs
):
by = cls.validate_by(by)
groupby_args = groupby_args.copy()
as_index = groupby_args.pop("as_index", True)
groupby_args["as_index"] = True

if not is_multi_by:
groupby_args = groupby_args.copy()
as_index = groupby_args.pop("as_index", True)
groupby_args["as_index"] = True

grp = df.groupby(by, axis=axis, **groupby_args)
agg_func = cls.get_func(grp, key, **kwargs)
result = agg_func(grp, **agg_args)

if as_index:
return result
if not is_multi_by:
if as_index:
return result
else:
if result.index.name is None or result.index.name in result.columns:
drop = False
return result.reset_index(drop=not drop)
else:
if result.index.name is None or result.index.name in result.columns:
drop = False
return result.reset_index(drop=not drop)
return result

return fn

Expand Down
21 changes: 20 additions & 1 deletion modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,25 @@ def groupby_count(self, by, axis, groupby_args, map_args, **kwargs):
)
return self.__constructor__(new_frame)

def groupby_agg(
self,
by,
is_multi_by,
idx_name,
axis,
agg_func,
agg_args,
agg_kwargs,
groupby_kwargs,
drop_,
drop=False,
):
# TODO: handle `is_multi_by`, `idx_name`, `agg_args`, `drop_`, `drop` args
new_frame = self._modin_frame.groupby_agg(
by, axis, agg_func, groupby_kwargs, **agg_kwargs
)
return self.__constructor__(new_frame)

def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
"""Apply aggregation functions to a grouped dataframe per-column.

Expand All @@ -283,7 +302,7 @@ def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
DFAlgQueryCompiler
The result of the per-column aggregations on the grouped dataframe.
"""
# TODO: handle drop arg
# TODO: handle `drop` arg
new_frame = self._modin_frame.groupby_agg(
by, 0, func_dict, groupby_args, **agg_args
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,17 @@ def groupby_count(df, cols, as_index, **kwargs):

run_and_compare(groupby_count, data=self.data, cols=cols, as_index=as_index)

@pytest.mark.xfail(
reason="Currently mean() passes a lambda into backend which cannot be executed on omnisci backend"
)
@pytest.mark.parametrize("cols", cols_value)
@pytest.mark.parametrize("as_index", bool_arg_values)
def test_groupby_mean(self, cols, as_index):
def groupby_mean(df, cols, as_index, **kwargs):
return df.groupby(cols, as_index=as_index).mean()

run_and_compare(groupby_mean, data=self.data, cols=cols, as_index=as_index)

@pytest.mark.parametrize("cols", cols_value)
@pytest.mark.parametrize("as_index", bool_arg_values)
def test_groupby_proj_sum(self, cols, as_index):
Expand All @@ -569,6 +580,17 @@ def groupby(df, **kwargs):

run_and_compare(groupby, data=self.data)

@pytest.mark.xfail(
reason="Function specified as a string should be passed into backend API, but currently it is transformed into a lambda"
)
@pytest.mark.parametrize("cols", cols_value)
@pytest.mark.parametrize("as_index", bool_arg_values)
def test_groupby_agg_mean(self, cols, as_index):
def groupby_mean(df, cols, as_index, **kwargs):
return df.groupby(cols, as_index=as_index).agg("mean")

run_and_compare(groupby_mean, data=self.data, cols=cols, as_index=as_index)

taxi_data = {
"a": [1, 1, 2, 2],
"b": [11, 21, 12, 11],
Expand Down
30 changes: 9 additions & 21 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pandas.core.common as com

from modin.error_message import ErrorMessage
from modin.utils import _inherit_docstrings, wrap_udf_function, try_cast_to_pandas
from modin.utils import _inherit_docstrings, try_cast_to_pandas
from modin.config import IsExperimental
from .series import Series

Expand Down Expand Up @@ -834,28 +834,16 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
"""
assert callable(f), "'{0}' object is not callable".format(type(f))

f = wrap_udf_function(f)
if self._is_multi_by:
return self._default_to_pandas(f, *args, **kwargs)

if isinstance(self._by, type(self._query_compiler)):
by = self._by.to_pandas().squeeze()
else:
by = self._by

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
if self._idx_name is not None and drop and self._drop:
groupby_qc = self._query_compiler.drop(columns=[self._idx_name])
else:
groupby_qc = self._query_compiler
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
new_manager = groupby_qc.groupby_agg(
by=by,
new_manager = self._query_compiler.groupby_agg(
by=self._by,
is_multi_by=self._is_multi_by,
idx_name=self._idx_name,
axis=self._axis,
agg_func=f,
groupby_args=self._kwargs,
agg_args=kwargs,
agg_args=args,
agg_kwargs=kwargs,
groupby_kwargs=self._kwargs,
drop_=drop,
drop=self._drop,
)
if self._idx_name is not None and self._as_index:
Expand Down
Loading