Skip to content

Commit

Permalink
FIX-modin-project#1854: groupby() with arbitrary series (modin-projec…
Browse files Browse the repository at this point in the history
…t#1886)

Signed-off-by: Itamar Turner-Trauring <itamar@itamarst.org>
  • Loading branch information
itamarst authored and aregm committed Sep 16, 2020
1 parent 0c52efa commit 82b6793
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 22 deletions.
5 changes: 4 additions & 1 deletion modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def compute_reduce(df):
new_modin_frame = qc._modin_frame.groupby_reduce(
axis, by._modin_frame, _map, _reduce
)
return query_compiler.__constructor__(new_modin_frame)
result = query_compiler.__constructor__(new_modin_frame)
if result.index.name == "__reduced__":
result.index.name = None
return result

return caller
1 change: 1 addition & 0 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,7 @@ def to_pandas(self):
)
df.index = self.index
df.columns = self.columns

return df

def to_numpy(self):
Expand Down
15 changes: 10 additions & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,21 @@ def groupby(
else:
mismatch = len(by) != len(self.axes[axis])
if mismatch and all(
obj in self
or (hasattr(self.index, "names") and obj in self.index.names)
isinstance(obj, str)
and (
obj in self
or (hasattr(self.index, "names") and obj in self.index.names)
)
for obj in by
):
# In the future, we will need to add logic to handle this, but for now
# we default to pandas in this case.
pass
elif mismatch:
raise KeyError(next(x for x in by if x not in self))

elif mismatch and any(
isinstance(obj, str) and obj not in self.columns for obj in by
):
names = [o.name if isinstance(o, Series) else o for o in by]
raise KeyError(next(x for x in names if x not in self))
return DataFrameGroupBy(
self,
by,
Expand Down
40 changes: 31 additions & 9 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from modin.error_message import ErrorMessage

from .utils import _inherit_docstrings, wrap_udf_function
from .utils import _inherit_docstrings, wrap_udf_function, try_cast_to_pandas
from .series import Series


Expand Down Expand Up @@ -65,7 +65,11 @@ def __init__(
) or (
not isinstance(by, type(self._query_compiler))
and axis == 0
and all(obj in self._query_compiler.columns for obj in self._by)
and all(
(isinstance(obj, str) and obj in self._query_compiler.columns)
or isinstance(obj, Series)
for obj in self._by
)
)
else:
self._is_multi_by = False
Expand Down Expand Up @@ -120,12 +124,14 @@ def _index_grouped(self):
# aware.
ErrorMessage.catch_bugs_and_request_email(self._axis == 1)
ErrorMessage.default_to_pandas("Groupby with multiple columns")
self._index_grouped_cache = {
k: v.index
for k, v in self._df._query_compiler.getitem_column_array(by)
.to_pandas()
.groupby(by=by)
}
if isinstance(by, list) and all(isinstance(o, str) for o in by):
pandas_df = self._df._query_compiler.getitem_column_array(
by
).to_pandas()
else:
by = try_cast_to_pandas(by)
pandas_df = self._df._to_pandas()
self._index_grouped_cache = pandas_df.groupby(by=by).groups
else:
if isinstance(self._by, type(self._query_compiler)):
by = self._by.to_pandas().squeeze().values
Expand Down Expand Up @@ -309,6 +315,15 @@ def __getitem__(self, key):
drop=self._drop,
**kwargs,
)
if (
self._is_multi_by
and isinstance(self._by, list)
and not all(isinstance(o, str) for o in self._by)
):
raise NotImplementedError(
"Column lookups on GroupBy with arbitrary Series in by"
" is not yet supported."
)
return SeriesGroupBy(
self._df[key],
self._by,
Expand Down Expand Up @@ -412,6 +427,9 @@ def all(self, **kwargs):
)

def size(self):
if is_list_like(self._by) and any(isinstance(o, Series) for o in self._by):
# We don't have good way to handle this right now, fall back to Pandas.
return self._default_to_pandas(lambda df: df.size())
if self._axis == 0:
# Size always works in as_index=True mode so it is necessary to make a
# copy of _kwargs and change as_index in it
Expand Down Expand Up @@ -666,12 +684,14 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
if self._idx_name is not None and self._as_index:
new_manager.index.name = self._idx_name
result = type(self._df)(query_compiler=new_manager)
if result.index.name == "__reduced__":
result.index.name = None
if self._kwargs.get("squeeze", False):
return result.squeeze()
return result

def _default_to_pandas(self, f, *args, **kwargs):
"""Defailts the execution of this function to pandas.
"""Defaults the execution of this function to pandas.
Args:
f: The function to apply to each group.
Expand All @@ -689,6 +709,8 @@ def _default_to_pandas(self, f, *args, **kwargs):
else:
by = self._by

by = try_cast_to_pandas(by)

def groupby_on_multiple_columns(df, *args, **kwargs):
return f(
df.groupby(by=by, axis=self._axis, **self._kwargs), *args, **kwargs
Expand Down
106 changes: 101 additions & 5 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pandas
import numpy as np
import modin.pandas as pd
from modin.pandas.utils import from_pandas, to_pandas
from modin.pandas.utils import from_pandas, to_pandas, try_cast_to_pandas
from .utils import (
df_equals,
check_df_columns_have_nans,
Expand Down Expand Up @@ -209,6 +209,16 @@ def test_mixed_dtypes_groupby(as_index):
eval_groups(modin_groupby, pandas_groupby)


class GetColumn:
"""Indicate to the test that it should do gc(df)."""

def __init__(self, name):
self.name = name

def __call__(self, df):
return df[self.name]


@pytest.mark.parametrize(
"by",
[
Expand Down Expand Up @@ -241,6 +251,18 @@ def test_mixed_dtypes_groupby(as_index):
["col5", "col4"],
["col4", "col5"],
["col5", "col4", "col1"],
["col1", pd.Series([1, 5, 7, 8])],
[pd.Series([1, 5, 7, 8])],
[
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
pd.Series([1, 5, 7, 8]),
],
["col1", GetColumn("col5")],
[GetColumn("col1"), GetColumn("col5")],
[GetColumn("col1")],
],
)
@pytest.mark.parametrize("as_index", [True, False])
Expand All @@ -261,8 +283,19 @@ def test_simple_row_groupby(by, as_index, col1_category):

modin_df = from_pandas(pandas_df)
n = 1
modin_groupby = modin_df.groupby(by=by, as_index=as_index)
pandas_groupby = pandas_df.groupby(by=by, as_index=as_index)

def maybe_get_columns(df, by):
if isinstance(by, list):
return [o(df) if isinstance(o, GetColumn) else o for o in by]
else:
return by

modin_groupby = modin_df.groupby(
by=maybe_get_columns(modin_df, by), as_index=as_index
)

pandas_by = maybe_get_columns(pandas_df, try_cast_to_pandas(by))
pandas_groupby = pandas_df.groupby(by=pandas_by, as_index=as_index)

modin_groupby_equals_pandas(modin_groupby, pandas_groupby)
eval_ngroups(modin_groupby, pandas_groupby)
Expand Down Expand Up @@ -295,7 +328,7 @@ def test_simple_row_groupby(by, as_index, col1_category):
)

# Workaround for Pandas bug #34656. Recreate groupby object for Pandas
pandas_groupby = pandas_df.groupby(by=by, as_index=as_index)
pandas_groupby = pandas_df.groupby(by=pandas_by, as_index=as_index)
apply_functions = [lambda df: df.sum(), min]
for func in apply_functions:
eval_apply(modin_groupby, pandas_groupby, func)
Expand Down Expand Up @@ -372,7 +405,11 @@ def test_simple_row_groupby(by, as_index, col1_category):
eval_general(modin_groupby, pandas_groupby, lambda df: df.tail(n), is_default=True)
eval_quantile(modin_groupby, pandas_groupby)
eval_general(modin_groupby, pandas_groupby, lambda df: df.take(), is_default=True)
eval___getattr__(modin_groupby, pandas_groupby, "col3")
if isinstance(by, list) and not any(
isinstance(o, (pd.Series, pandas.Series)) for o in by
):
# Not yet supported for non-original-column-from-dataframe Series in by:
eval___getattr__(modin_groupby, pandas_groupby, "col3")
eval_groups(modin_groupby, pandas_groupby)


Expand Down Expand Up @@ -1188,3 +1225,62 @@ def get_columns(df):
df2 = pd.concat([df2])
exp = df2.groupby(get_columns(df2)).size()
df_equals(ref, exp)


@pytest.mark.parametrize(
# When True, use (df[name] + 1), otherwise just use name
"columns",
[
[(True, "a"), (True, "b"), (True, "c")],
[(True, "a"), (True, "b")],
[(False, "a"), (False, "b"), (True, "c")],
[(False, "a"), (True, "c")],
],
)
def test_mixed_columns_not_from_df(columns):
"""
Unlike the previous test, in this case the Series is not just a column from
the original DataFrame, so you can't use a fasttrack.
"""

def get_columns(df):
return [(df[name] + 1) if lookup else name for (lookup, name) in columns]

data = {"a": [1, 1, 2], "b": [11, 11, 22], "c": [111, 111, 222]}

df1 = pandas.DataFrame(data)
df1 = pandas.concat([df1])
ref = df1.groupby(get_columns(df1)).size()

df2 = pd.DataFrame(data)
df2 = pd.concat([df2])
exp = df2.groupby(get_columns(df2)).size()
df_equals(ref, exp)


@pytest.mark.parametrize(
# When True, do df[obj], otherwise just use the obj
"columns",
[
[(False, "a")],
[(False, "a"), (False, "b"), (False, "c")],
[(False, "a"), (False, "b")],
[(False, "b"), (False, "a")],
[(True, "a"), (True, "b"), (True, "c")],
[(True, "a"), (True, "b")],
[(False, "a"), (False, "b"), (True, "c")],
[(False, "a"), (True, "c")],
[(False, "a"), (False, pd.Series([5, 6, 7, 8]))],
],
)
def test_unknown_groupby(columns):
def get_columns(df):
return [df[name] if lookup else name for (lookup, name) in columns]

data = {"b": [11, 11, 22, 200], "c": [111, 111, 222, 7000]}
modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)

with pytest.raises(KeyError):
pandas_df.groupby(by=get_columns(pandas_df))
with pytest.raises(KeyError):
modin_df.groupby(by=get_columns(modin_df))
10 changes: 8 additions & 2 deletions modin/pandas/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,14 @@ def check_df_columns_have_nans(df, cols):
"""
return (
pandas.api.types.is_list_like(cols)
and any(x in df.columns and df[x].hasnans for x in cols)
or not pandas.api.types.is_list_like(cols)
and (
any(isinstance(x, str) and x in df.columns and df[x].hasnans for x in cols)
or any(
isinstance(x, pd.Series) and x._parent is df and x.hasnans for x in cols
)
)
) or (
not pandas.api.types.is_list_like(cols)
and cols in df.columns
and df[cols].hasnans
)
Expand Down

0 comments on commit 82b6793

Please sign in to comment.