diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index f940ddf23d6..181af801306 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -2029,6 +2029,11 @@ def _callable_func(self, func, axis, *args, **kwargs): ) def groupby_agg(self, by, axis, agg_func, groupby_args, agg_args, drop=False): + # since we're going to modify `groupby_args` dict in a `groupby_agg_builder`, + # we want to copy it to not propagate these changes into source dict, in case + # of unsuccessful end of function + groupby_args = groupby_args.copy() + as_index = groupby_args.get("as_index", True) def groupby_agg_builder(df): diff --git a/modin/data_management/functions/groupby_function.py b/modin/data_management/functions/groupby_function.py index 6d2373d83bf..ddc10fdaf7f 100644 --- a/modin/data_management/functions/groupby_function.py +++ b/modin/data_management/functions/groupby_function.py @@ -40,7 +40,13 @@ def caller( ) else: qc = query_compiler + # since we're going to modify `groupby_args` dict in a `compute_map`, + # we want to copy it to not propagate these changes into source dict, in case + # of unsuccessful end of function + groupby_args = groupby_args.copy() + as_index = groupby_args.get("as_index", True) + observed = groupby_args.get("observed", False) def _map(df, other): def compute_map(df, other): @@ -48,6 +54,7 @@ def compute_map(df, other): # It is used to make sure that between phases we are constructing the # right index and placing columns in the correct order. groupby_args["as_index"] = True + groupby_args["observed"] = True other = other.squeeze(axis=axis ^ 1) if isinstance(other, pandas.DataFrame): df = pandas.concat( @@ -57,6 +64,20 @@ def compute_map(df, other): result = map_func( df.groupby(by=other, axis=axis, **groupby_args), **map_args ) + # if `other` has category dtype, then pandas will drop that + # column after groupby, inserting it back to correctly process + # reduce phase + if ( + drop + and not as_index + and isinstance(other, pandas.Series) + and isinstance(other.dtype, pandas.CategoricalDtype) + and result.index.name is not None + and result.index.name not in result.columns + ): + result.insert( + loc=0, column=result.index.name, value=result.index + ) # The _modin_groupby_ prefix indicates that this is the first partition, # and since we may need to insert the grouping data in the reduce phase if ( @@ -82,6 +103,7 @@ def compute_reduce(df): df = df.reset_index(drop=False) # See note above about setting `as_index` groupby_args["as_index"] = as_index + groupby_args["observed"] = observed if other_len > 1: by_part = list(df.columns[0:other_len]) else: @@ -98,7 +120,7 @@ def compute_reduce(df): if isinstance(by_part, str) and by_part in result.columns: if "_modin_groupby_" in by_part and drop: col_name = by_part[len("_modin_groupby_") :] - new_result = result.drop(columns=col_name) + new_result = result.drop(columns=col_name, errors="ignore") new_result.columns = [ col_name if "_modin_groupby_" in c else c for c in new_result.columns diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 52f5972d06a..383f23ab99b 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -128,7 +128,7 @@ def _index_grouped(self): } else: if isinstance(self._by, type(self._query_compiler)): - by = self._by.to_pandas().squeeze() + by = self._by.to_pandas().squeeze().values else: by = self._by if self._axis == 0: @@ -433,7 +433,7 @@ def size(self): series_result = Series(query_compiler=result._query_compiler) # Pandas does not name size() output series_result.name = None - return series_result + return series_result.fillna(0) else: return DataFrameGroupBy( self._df.T, @@ -529,12 +529,16 @@ def fillna(self, **kwargs): return result def count(self, **kwargs): - return self._wrap_aggregation( + result = self._wrap_aggregation( type(self._query_compiler).groupby_count, lambda df, **kwargs: df.count(**kwargs), numeric_only=False, **kwargs, ) + # pandas do it in case of Series + if isinstance(result, Series): + result = result.fillna(0) + return result def pipe(self, func, *args, **kwargs): return com.pipe(self, func, *args, **kwargs) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 5dbe30baa19..060906df206 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -21,18 +21,28 @@ check_df_columns_have_nans, create_test_dfs, eval_general, + df_categories_equals, ) pd.DEFAULT_NPARTITIONS = 4 def modin_df_almost_equals_pandas(modin_df, pandas_df): - difference = to_pandas(modin_df) - pandas_df + df_categories_equals(modin_df._to_pandas(), pandas_df) + + modin_df = to_pandas(modin_df) + + if hasattr(modin_df, "select_dtypes"): + modin_df = modin_df.select_dtypes(exclude=["category"]) + if hasattr(pandas_df, "select_dtypes"): + pandas_df = pandas_df.select_dtypes(exclude=["category"]) + + difference = modin_df - pandas_df diff_max = difference.max() if isinstance(diff_max, pandas.Series): diff_max = diff_max.max() assert ( - to_pandas(modin_df).equals(pandas_df) + modin_df.equals(pandas_df) or diff_max < 0.0001 or (all(modin_df.isna().all()) and all(pandas_df.isna().all())) ) @@ -234,7 +244,8 @@ def test_mixed_dtypes_groupby(as_index): ], ) @pytest.mark.parametrize("as_index", [True, False]) -def test_simple_row_groupby(by, as_index): +@pytest.mark.parametrize("col1_category", [True, False]) +def test_simple_row_groupby(by, as_index, col1_category): pandas_df = pandas.DataFrame( { "col1": [0, 1, 2, 3], @@ -245,6 +256,9 @@ def test_simple_row_groupby(by, as_index): } ) + if col1_category: + pandas_df = pandas_df.astype({"col1": "category"}) + modin_df = from_pandas(pandas_df) n = 1 modin_groupby = modin_df.groupby(by=by, as_index=as_index) @@ -267,10 +281,10 @@ def test_simple_row_groupby(by, as_index): eval_ndim(modin_groupby, pandas_groupby) if not check_df_columns_have_nans(modin_df, by): # cum* functions produce undefined results for columns with NaNs so we run them only when "by" columns contain no NaNs - eval_cumsum(modin_groupby, pandas_groupby) - eval_cummax(modin_groupby, pandas_groupby) - eval_cummin(modin_groupby, pandas_groupby) - eval_cumprod(modin_groupby, pandas_groupby) + eval_general(modin_groupby, pandas_groupby, lambda df: df.cumsum(axis=0)) + eval_general(modin_groupby, pandas_groupby, lambda df: df.cummax(axis=0)) + eval_general(modin_groupby, pandas_groupby, lambda df: df.cummin(axis=0)) + eval_general(modin_groupby, pandas_groupby, lambda df: df.cumprod(axis=0)) eval_general( modin_groupby, @@ -312,7 +326,7 @@ def test_simple_row_groupby(by, as_index): modin_df_almost_equals_pandas, is_default=True, ) - eval_rank(modin_groupby, pandas_groupby) + eval_general(modin_groupby, pandas_groupby, lambda df: df.rank()) eval_max(modin_groupby, pandas_groupby) eval_len(modin_groupby, pandas_groupby) eval_sum(modin_groupby, pandas_groupby) @@ -332,7 +346,12 @@ def test_simple_row_groupby(by, as_index): # Pandas groupby.transform does not work correctly with NaN values in grouping columns. See Pandas bug 17093. transform_functions = [lambda df: df + 4, lambda df: -df - 10] for func in transform_functions: - eval_transform(modin_groupby, pandas_groupby, func) + eval_general( + modin_groupby, + pandas_groupby, + lambda df: df.transform(func), + check_exception_type=None, + ) pipe_functions = [lambda dfgb: dfgb.sum()] for func in pipe_functions: @@ -347,7 +366,9 @@ def test_simple_row_groupby(by, as_index): ) eval_fillna(modin_groupby, pandas_groupby) eval_count(modin_groupby, pandas_groupby) - eval_size(modin_groupby, pandas_groupby) + eval_general( + modin_groupby, pandas_groupby, lambda df: df.size(), check_exception_type=None + ) eval_general(modin_groupby, pandas_groupby, lambda df: df.tail(n), is_default=True) eval_quantile(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.take(), is_default=True) @@ -471,7 +492,8 @@ def test_single_group_row_groupby(): eval_groups(modin_groupby, pandas_groupby) -def test_large_row_groupby(): +@pytest.mark.parametrize("is_by_category", [True, False]) +def test_large_row_groupby(is_by_category): pandas_df = pandas.DataFrame( np.random.randint(0, 8, size=(100, 4)), columns=list("ABCD") ) @@ -479,6 +501,10 @@ def test_large_row_groupby(): modin_df = from_pandas(pandas_df) by = [str(i) for i in pandas_df["A"].tolist()] + + if is_by_category: + by = pandas.Categorical(by) + n = 4 modin_groupby = modin_df.groupby(by=by) diff --git a/modin/pandas/test/utils.py b/modin/pandas/test/utils.py index bd7dcd0596b..ddd1eac497f 100644 --- a/modin/pandas/test/utils.py +++ b/modin/pandas/test/utils.py @@ -381,8 +381,17 @@ def categories_equals(left, right): def df_categories_equals(df1, df2): - categories_columns = df1.select_dtypes(include="category").columns + if not hasattr(df1, "select_dtypes"): + if isinstance(df1, pandas.CategoricalDtype): + return categories_equals(df1, df2) + elif isinstance(getattr(df1, "dtype"), pandas.CategoricalDtype) and isinstance( + getattr(df1, "dtype"), pandas.CategoricalDtype + ): + return categories_equals(df1.dtype, df2.dtype) + else: + return True + categories_columns = df1.select_dtypes(include="category").columns for column in categories_columns: is_category_ordered = df1[column].dtype.ordered assert_categorical_equal( @@ -558,17 +567,27 @@ def check_df_columns_have_nans(df, cols): def eval_general( - modin_df, pandas_df, operation, comparator=df_equals, __inplace__=False, **kwargs + modin_df, + pandas_df, + operation, + comparator=df_equals, + __inplace__=False, + check_exception_type=True, + **kwargs, ): md_kwargs, pd_kwargs = {}, {} def execute_callable(fn, inplace=False, md_kwargs={}, pd_kwargs={}): try: pd_result = fn(pandas_df, **pd_kwargs) - except Exception as e: - with pytest.raises(type(e)): + except Exception as pd_e: + if check_exception_type is None: + return None + with pytest.raises(Exception) as md_e: # repr to force materialization repr(fn(modin_df, **md_kwargs)) + if check_exception_type: + assert isinstance(md_e.value, type(pd_e)) else: md_result = fn(modin_df, **md_kwargs) return (md_result, pd_result) if not __inplace__ else (modin_df, pandas_df)