Skip to content

Commit

Permalink
FIX-#1426: Groupby on categories fixed (#1802)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Jul 29, 2020
1 parent e25342b commit 538bd9c
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 19 deletions.
5 changes: 5 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,11 @@ def _callable_func(self, func, axis, *args, **kwargs):
)

def groupby_agg(self, by, axis, agg_func, groupby_args, agg_args, drop=False):
# since we're going to modify `groupby_args` dict in a `groupby_agg_builder`,
# we want to copy it to not propagate these changes into source dict, in case
# of unsuccessful end of function
groupby_args = groupby_args.copy()

as_index = groupby_args.get("as_index", True)

def groupby_agg_builder(df):
Expand Down
24 changes: 23 additions & 1 deletion modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,21 @@ def caller(
)
else:
qc = query_compiler
# since we're going to modify `groupby_args` dict in a `compute_map`,
# we want to copy it to not propagate these changes into source dict, in case
# of unsuccessful end of function
groupby_args = groupby_args.copy()

as_index = groupby_args.get("as_index", True)
observed = groupby_args.get("observed", False)

def _map(df, other):
def compute_map(df, other):
# Set `as_index` to True to track the metadata of the grouping object
# It is used to make sure that between phases we are constructing the
# right index and placing columns in the correct order.
groupby_args["as_index"] = True
groupby_args["observed"] = True
other = other.squeeze(axis=axis ^ 1)
if isinstance(other, pandas.DataFrame):
df = pandas.concat(
Expand All @@ -57,6 +64,20 @@ def compute_map(df, other):
result = map_func(
df.groupby(by=other, axis=axis, **groupby_args), **map_args
)
# if `other` has category dtype, then pandas will drop that
# column after groupby, inserting it back to correctly process
# reduce phase
if (
drop
and not as_index
and isinstance(other, pandas.Series)
and isinstance(other.dtype, pandas.CategoricalDtype)
and result.index.name is not None
and result.index.name not in result.columns
):
result.insert(
loc=0, column=result.index.name, value=result.index
)
# The _modin_groupby_ prefix indicates that this is the first partition,
# and since we may need to insert the grouping data in the reduce phase
if (
Expand All @@ -82,6 +103,7 @@ def compute_reduce(df):
df = df.reset_index(drop=False)
# See note above about setting `as_index`
groupby_args["as_index"] = as_index
groupby_args["observed"] = observed
if other_len > 1:
by_part = list(df.columns[0:other_len])
else:
Expand All @@ -98,7 +120,7 @@ def compute_reduce(df):
if isinstance(by_part, str) and by_part in result.columns:
if "_modin_groupby_" in by_part and drop:
col_name = by_part[len("_modin_groupby_") :]
new_result = result.drop(columns=col_name)
new_result = result.drop(columns=col_name, errors="ignore")
new_result.columns = [
col_name if "_modin_groupby_" in c else c
for c in new_result.columns
Expand Down
10 changes: 7 additions & 3 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _index_grouped(self):
}
else:
if isinstance(self._by, type(self._query_compiler)):
by = self._by.to_pandas().squeeze()
by = self._by.to_pandas().squeeze().values
else:
by = self._by
if self._axis == 0:
Expand Down Expand Up @@ -433,7 +433,7 @@ def size(self):
series_result = Series(query_compiler=result._query_compiler)
# Pandas does not name size() output
series_result.name = None
return series_result
return series_result.fillna(0)
else:
return DataFrameGroupBy(
self._df.T,
Expand Down Expand Up @@ -529,12 +529,16 @@ def fillna(self, **kwargs):
return result

def count(self, **kwargs):
return self._wrap_aggregation(
result = self._wrap_aggregation(
type(self._query_compiler).groupby_count,
lambda df, **kwargs: df.count(**kwargs),
numeric_only=False,
**kwargs,
)
# pandas do it in case of Series
if isinstance(result, Series):
result = result.fillna(0)
return result

def pipe(self, func, *args, **kwargs):
return com.pipe(self, func, *args, **kwargs)
Expand Down
48 changes: 37 additions & 11 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@
check_df_columns_have_nans,
create_test_dfs,
eval_general,
df_categories_equals,
)

pd.DEFAULT_NPARTITIONS = 4


def modin_df_almost_equals_pandas(modin_df, pandas_df):
difference = to_pandas(modin_df) - pandas_df
df_categories_equals(modin_df._to_pandas(), pandas_df)

modin_df = to_pandas(modin_df)

if hasattr(modin_df, "select_dtypes"):
modin_df = modin_df.select_dtypes(exclude=["category"])
if hasattr(pandas_df, "select_dtypes"):
pandas_df = pandas_df.select_dtypes(exclude=["category"])

difference = modin_df - pandas_df
diff_max = difference.max()
if isinstance(diff_max, pandas.Series):
diff_max = diff_max.max()
assert (
to_pandas(modin_df).equals(pandas_df)
modin_df.equals(pandas_df)
or diff_max < 0.0001
or (all(modin_df.isna().all()) and all(pandas_df.isna().all()))
)
Expand Down Expand Up @@ -234,7 +244,8 @@ def test_mixed_dtypes_groupby(as_index):
],
)
@pytest.mark.parametrize("as_index", [True, False])
def test_simple_row_groupby(by, as_index):
@pytest.mark.parametrize("col1_category", [True, False])
def test_simple_row_groupby(by, as_index, col1_category):
pandas_df = pandas.DataFrame(
{
"col1": [0, 1, 2, 3],
Expand All @@ -245,6 +256,9 @@ def test_simple_row_groupby(by, as_index):
}
)

if col1_category:
pandas_df = pandas_df.astype({"col1": "category"})

modin_df = from_pandas(pandas_df)
n = 1
modin_groupby = modin_df.groupby(by=by, as_index=as_index)
Expand All @@ -267,10 +281,10 @@ def test_simple_row_groupby(by, as_index):
eval_ndim(modin_groupby, pandas_groupby)
if not check_df_columns_have_nans(modin_df, by):
# cum* functions produce undefined results for columns with NaNs so we run them only when "by" columns contain no NaNs
eval_cumsum(modin_groupby, pandas_groupby)
eval_cummax(modin_groupby, pandas_groupby)
eval_cummin(modin_groupby, pandas_groupby)
eval_cumprod(modin_groupby, pandas_groupby)
eval_general(modin_groupby, pandas_groupby, lambda df: df.cumsum(axis=0))
eval_general(modin_groupby, pandas_groupby, lambda df: df.cummax(axis=0))
eval_general(modin_groupby, pandas_groupby, lambda df: df.cummin(axis=0))
eval_general(modin_groupby, pandas_groupby, lambda df: df.cumprod(axis=0))

eval_general(
modin_groupby,
Expand Down Expand Up @@ -312,7 +326,7 @@ def test_simple_row_groupby(by, as_index):
modin_df_almost_equals_pandas,
is_default=True,
)
eval_rank(modin_groupby, pandas_groupby)
eval_general(modin_groupby, pandas_groupby, lambda df: df.rank())
eval_max(modin_groupby, pandas_groupby)
eval_len(modin_groupby, pandas_groupby)
eval_sum(modin_groupby, pandas_groupby)
Expand All @@ -332,7 +346,12 @@ def test_simple_row_groupby(by, as_index):
# Pandas groupby.transform does not work correctly with NaN values in grouping columns. See Pandas bug 17093.
transform_functions = [lambda df: df + 4, lambda df: -df - 10]
for func in transform_functions:
eval_transform(modin_groupby, pandas_groupby, func)
eval_general(
modin_groupby,
pandas_groupby,
lambda df: df.transform(func),
check_exception_type=None,
)

pipe_functions = [lambda dfgb: dfgb.sum()]
for func in pipe_functions:
Expand All @@ -347,7 +366,9 @@ def test_simple_row_groupby(by, as_index):
)
eval_fillna(modin_groupby, pandas_groupby)
eval_count(modin_groupby, pandas_groupby)
eval_size(modin_groupby, pandas_groupby)
eval_general(
modin_groupby, pandas_groupby, lambda df: df.size(), check_exception_type=None
)
eval_general(modin_groupby, pandas_groupby, lambda df: df.tail(n), is_default=True)
eval_quantile(modin_groupby, pandas_groupby)
eval_general(modin_groupby, pandas_groupby, lambda df: df.take(), is_default=True)
Expand Down Expand Up @@ -471,14 +492,19 @@ def test_single_group_row_groupby():
eval_groups(modin_groupby, pandas_groupby)


def test_large_row_groupby():
@pytest.mark.parametrize("is_by_category", [True, False])
def test_large_row_groupby(is_by_category):
pandas_df = pandas.DataFrame(
np.random.randint(0, 8, size=(100, 4)), columns=list("ABCD")
)

modin_df = from_pandas(pandas_df)

by = [str(i) for i in pandas_df["A"].tolist()]

if is_by_category:
by = pandas.Categorical(by)

n = 4

modin_groupby = modin_df.groupby(by=by)
Expand Down
27 changes: 23 additions & 4 deletions modin/pandas/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,17 @@ def categories_equals(left, right):


def df_categories_equals(df1, df2):
categories_columns = df1.select_dtypes(include="category").columns
if not hasattr(df1, "select_dtypes"):
if isinstance(df1, pandas.CategoricalDtype):
return categories_equals(df1, df2)
elif isinstance(getattr(df1, "dtype"), pandas.CategoricalDtype) and isinstance(
getattr(df1, "dtype"), pandas.CategoricalDtype
):
return categories_equals(df1.dtype, df2.dtype)
else:
return True

categories_columns = df1.select_dtypes(include="category").columns
for column in categories_columns:
is_category_ordered = df1[column].dtype.ordered
assert_categorical_equal(
Expand Down Expand Up @@ -558,17 +567,27 @@ def check_df_columns_have_nans(df, cols):


def eval_general(
modin_df, pandas_df, operation, comparator=df_equals, __inplace__=False, **kwargs
modin_df,
pandas_df,
operation,
comparator=df_equals,
__inplace__=False,
check_exception_type=True,
**kwargs,
):
md_kwargs, pd_kwargs = {}, {}

def execute_callable(fn, inplace=False, md_kwargs={}, pd_kwargs={}):
try:
pd_result = fn(pandas_df, **pd_kwargs)
except Exception as e:
with pytest.raises(type(e)):
except Exception as pd_e:
if check_exception_type is None:
return None
with pytest.raises(Exception) as md_e:
# repr to force materialization
repr(fn(modin_df, **md_kwargs))
if check_exception_type:
assert isinstance(md_e.value, type(pd_e))
else:
md_result = fn(modin_df, **md_kwargs)
return (md_result, pd_result) if not __inplace__ else (modin_df, pandas_df)
Expand Down

0 comments on commit 538bd9c

Please sign in to comment.