Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#1854: groupby() with arbitrary series #1886

Merged
5 changes: 4 additions & 1 deletion modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def compute_reduce(df):
new_modin_frame = qc._modin_frame.groupby_reduce(
axis, by._modin_frame, _map, _reduce
)
return query_compiler.__constructor__(new_modin_frame)
result = query_compiler.__constructor__(new_modin_frame)
if result.index.name == "__reduced__":
result.index.name = None
return result

return caller
1 change: 1 addition & 0 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,7 @@ def to_pandas(self):
)
df.index = self.index
df.columns = self.columns

return df

def to_numpy(self):
Expand Down
15 changes: 10 additions & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,21 @@ def groupby(
else:
mismatch = len(by) != len(self.axes[axis])
if mismatch and all(
obj in self
or (hasattr(self.index, "names") and obj in self.index.names)
isinstance(obj, str)
and (
obj in self
or (hasattr(self.index, "names") and obj in self.index.names)
)
for obj in by
):
# In the future, we will need to add logic to handle this, but for now
# we default to pandas in this case.
pass
elif mismatch:
raise KeyError(next(x for x in by if x not in self))

elif mismatch and any(
isinstance(obj, str) and obj not in self.columns for obj in by
):
names = [o.name if isinstance(o, Series) else o for o in by]
raise KeyError(next(x for x in names if x not in self))
return DataFrameGroupBy(
self,
by,
Expand Down
40 changes: 31 additions & 9 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from modin.error_message import ErrorMessage

from .utils import _inherit_docstrings, wrap_udf_function
from .utils import _inherit_docstrings, wrap_udf_function, try_cast_to_pandas
from .series import Series


Expand Down Expand Up @@ -65,7 +65,11 @@ def __init__(
) or (
not isinstance(by, type(self._query_compiler))
and axis == 0
and all(obj in self._query_compiler.columns for obj in self._by)
and all(
(isinstance(obj, str) and obj in self._query_compiler.columns)
or isinstance(obj, Series)
for obj in self._by
)
)
else:
self._is_multi_by = False
Expand Down Expand Up @@ -120,12 +124,14 @@ def _index_grouped(self):
# aware.
ErrorMessage.catch_bugs_and_request_email(self._axis == 1)
ErrorMessage.default_to_pandas("Groupby with multiple columns")
self._index_grouped_cache = {
k: v.index
for k, v in self._df._query_compiler.getitem_column_array(by)
.to_pandas()
.groupby(by=by)
}
if isinstance(by, list) and all(isinstance(o, str) for o in by):
pandas_df = self._df._query_compiler.getitem_column_array(
by
).to_pandas()
else:
by = try_cast_to_pandas(by)
pandas_df = self._df._to_pandas()
self._index_grouped_cache = pandas_df.groupby(by=by).groups
else:
if isinstance(self._by, type(self._query_compiler)):
by = self._by.to_pandas().squeeze().values
Expand Down Expand Up @@ -309,6 +315,15 @@ def __getitem__(self, key):
drop=self._drop,
**kwargs,
)
if (
self._is_multi_by
and isinstance(self._by, list)
and not all(isinstance(o, str) for o in self._by)
):
raise NotImplementedError(
"Column lookups on GroupBy with arbitrary Series in by"
" is not yet supported."
)
return SeriesGroupBy(
self._df[key],
self._by,
Expand Down Expand Up @@ -412,6 +427,9 @@ def all(self, **kwargs):
)

def size(self):
if is_list_like(self._by) and any(isinstance(o, Series) for o in self._by):
# We don't have good way to handle this right now, fall back to Pandas.
return self._default_to_pandas(lambda df: df.size())
if self._axis == 0:
# Size always works in as_index=True mode so it is necessary to make a
# copy of _kwargs and change as_index in it
Expand Down Expand Up @@ -666,12 +684,14 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
if self._idx_name is not None and self._as_index:
new_manager.index.name = self._idx_name
result = type(self._df)(query_compiler=new_manager)
if result.index.name == "__reduced__":
result.index.name = None
if self._kwargs.get("squeeze", False):
return result.squeeze()
return result

def _default_to_pandas(self, f, *args, **kwargs):
"""Defailts the execution of this function to pandas.
"""Defaults the execution of this function to pandas.

Args:
f: The function to apply to each group.
Expand All @@ -689,6 +709,8 @@ def _default_to_pandas(self, f, *args, **kwargs):
else:
by = self._by

by = try_cast_to_pandas(by)

def groupby_on_multiple_columns(df, *args, **kwargs):
return f(
df.groupby(by=by, axis=self._axis, **self._kwargs), *args, **kwargs
Expand Down
106 changes: 101 additions & 5 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pandas
import numpy as np
import modin.pandas as pd
from modin.pandas.utils import from_pandas, to_pandas
from modin.pandas.utils import from_pandas, to_pandas, try_cast_to_pandas
from .utils import (
df_equals,
check_df_columns_have_nans,
Expand Down Expand Up @@ -209,6 +209,16 @@ def test_mixed_dtypes_groupby(as_index):
eval_groups(modin_groupby, pandas_groupby)


class GetColumn:
"""Indicate to the test that it should do gc(df)."""

def __init__(self, name):
self.name = name

def __call__(self, df):
return df[self.name]


@pytest.mark.parametrize(
"by",
[
Expand Down Expand Up @@ -241,6 +251,18 @@ def test_mixed_dtypes_groupby(as_index):
["col5", "col4"],
["col4", "col5"],
["col5", "col4", "col1"],
["col1", pd.Series([1, 5, 7, 8])],
[pd.Series([1, 5, 7, 8])],
[
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
],
["col1", GetColumn("col5")],
[GetColumn("col1"), GetColumn("col5")],
[GetColumn("col1")],
],
)
@pytest.mark.parametrize("as_index", [True, False])
Expand All @@ -261,8 +283,19 @@ def test_simple_row_groupby(by, as_index, col1_category):

modin_df = from_pandas(pandas_df)
n = 1
modin_groupby = modin_df.groupby(by=by, as_index=as_index)
pandas_groupby = pandas_df.groupby(by=by, as_index=as_index)

def maybe_get_columns(df, by):
if isinstance(by, list):
return [o(df) if isinstance(o, GetColumn) else o for o in by]
else:
return by

modin_groupby = modin_df.groupby(
by=maybe_get_columns(modin_df, by), as_index=as_index
)

pandas_by = maybe_get_columns(pandas_df, try_cast_to_pandas(by))
pandas_groupby = pandas_df.groupby(by=pandas_by, as_index=as_index)

modin_groupby_equals_pandas(modin_groupby, pandas_groupby)
eval_ngroups(modin_groupby, pandas_groupby)
Expand Down Expand Up @@ -295,7 +328,7 @@ def test_simple_row_groupby(by, as_index, col1_category):
)

# Workaround for Pandas bug #34656. Recreate groupby object for Pandas
pandas_groupby = pandas_df.groupby(by=by, as_index=as_index)
pandas_groupby = pandas_df.groupby(by=pandas_by, as_index=as_index)
apply_functions = [lambda df: df.sum(), min]
for func in apply_functions:
eval_apply(modin_groupby, pandas_groupby, func)
Expand Down Expand Up @@ -372,7 +405,11 @@ def test_simple_row_groupby(by, as_index, col1_category):
eval_general(modin_groupby, pandas_groupby, lambda df: df.tail(n), is_default=True)
eval_quantile(modin_groupby, pandas_groupby)
eval_general(modin_groupby, pandas_groupby, lambda df: df.take(), is_default=True)
eval___getattr__(modin_groupby, pandas_groupby, "col3")
if isinstance(by, list) and not any(
isinstance(o, (pd.Series, pandas.Series)) for o in by
):
# Not yet supported for non-original-column-from-dataframe Series in by:
eval___getattr__(modin_groupby, pandas_groupby, "col3")
eval_groups(modin_groupby, pandas_groupby)


Expand Down Expand Up @@ -1188,3 +1225,62 @@ def get_columns(df):
df2 = pd.concat([df2])
exp = df2.groupby(get_columns(df2)).size()
df_equals(ref, exp)


@pytest.mark.parametrize(
# When True, use (df[name] + 1), otherwise just use name
"columns",
[
[(True, "a"), (True, "b"), (True, "c")],
[(True, "a"), (True, "b")],
[(False, "a"), (False, "b"), (True, "c")],
[(False, "a"), (True, "c")],
],
)
def test_mixed_columns_not_from_df(columns):
"""
Unlike the previous test, in this case the Series is not just a column from
the original DataFrame, so you can't use a fasttrack.
"""

def get_columns(df):
return [(df[name] + 1) if lookup else name for (lookup, name) in columns]

data = {"a": [1, 1, 2], "b": [11, 11, 22], "c": [111, 111, 222]}

df1 = pandas.DataFrame(data)
df1 = pandas.concat([df1])
ref = df1.groupby(get_columns(df1)).size()

df2 = pd.DataFrame(data)
df2 = pd.concat([df2])
exp = df2.groupby(get_columns(df2)).size()
df_equals(ref, exp)


@pytest.mark.parametrize(
# When True, do df[obj], otherwise just use the obj
"columns",
[
[(False, "a")],
[(False, "a"), (False, "b"), (False, "c")],
[(False, "a"), (False, "b")],
[(False, "b"), (False, "a")],
[(True, "a"), (True, "b"), (True, "c")],
[(True, "a"), (True, "b")],
[(False, "a"), (False, "b"), (True, "c")],
[(False, "a"), (True, "c")],
[(False, "a"), (False, pd.Series([5, 6, 7, 8]))],
],
)
def test_unknown_groupby(columns):
def get_columns(df):
return [df[name] if lookup else name for (lookup, name) in columns]

data = {"b": [11, 11, 22, 200], "c": [111, 111, 222, 7000]}
modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)

with pytest.raises(KeyError):
pandas_df.groupby(by=get_columns(pandas_df))
with pytest.raises(KeyError):
modin_df.groupby(by=get_columns(modin_df))
10 changes: 8 additions & 2 deletions modin/pandas/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,14 @@ def check_df_columns_have_nans(df, cols):
"""
return (
pandas.api.types.is_list_like(cols)
and any(x in df.columns and df[x].hasnans for x in cols)
or not pandas.api.types.is_list_like(cols)
and (
any(isinstance(x, str) and x in df.columns and df[x].hasnans for x in cols)
or any(
isinstance(x, pd.Series) and x._parent is df and x.hasnans for x in cols
)
)
) or (
not pandas.api.types.is_list_like(cols)
and cols in df.columns
and df[cols].hasnans
)
Expand Down