diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index 6947fdd68d0..cb1dda17831 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -57,46 +57,62 @@ def execute(df): return df.shape -class TimeMultiColumnGroupby: - param_names = ["data_size", "count_columns"] - params = [UNARY_OP_DATA_SIZE, [6]] - - def setup(self, data_size, count_columns): +class BaseTimeGroupBy: + def setup(self, data_size, ncols=1): self.df = generate_dataframe( ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) - self.groupby_columns = [col for col in self.df.columns[:count_columns]] + self.groupby_columns = self.df.columns[:ncols].tolist() + - def time_groupby_agg_quan(self, data_size, count_columns): +class TimeMultiColumnGroupby(BaseTimeGroupBy): + param_names = ["data_size", "ncols"] + params = [UNARY_OP_DATA_SIZE, [6]] + + def time_groupby_agg_quan(self, data_size, ncols): execute(self.df.groupby(by=self.groupby_columns).agg("quantile")) - def time_groupby_agg_mean(self, data_size, count_columns): + def time_groupby_agg_mean(self, data_size, ncols): execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean())) -class TimeGroupByDefaultAggregations: +class TimeGroupByDefaultAggregations(BaseTimeGroupBy): param_names = ["data_size"] params = [ UNARY_OP_DATA_SIZE, ] - def setup(self, data_size): - self.df = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH - ) - self.groupby_column = self.df.columns[0] - def time_groupby_count(self, data_size): - execute(self.df.groupby(by=self.groupby_column).count()) + execute(self.df.groupby(by=self.groupby_columns).count()) def time_groupby_size(self, data_size): - execute(self.df.groupby(by=self.groupby_column).size()) + execute(self.df.groupby(by=self.groupby_columns).size()) def time_groupby_sum(self, data_size): - execute(self.df.groupby(by=self.groupby_column).sum()) + execute(self.df.groupby(by=self.groupby_columns).sum()) def time_groupby_mean(self, data_size): - execute(self.df.groupby(by=self.groupby_column).mean()) + execute(self.df.groupby(by=self.groupby_columns).mean()) + + +class TimeGroupByDictionaryAggregation(BaseTimeGroupBy): + param_names = ["data_size", "operation_type"] + params = [UNARY_OP_DATA_SIZE, ["reduction", "aggregation"]] + operations = { + "reduction": ["sum", "count", "prod"], + "aggregation": ["quantile", "std", "median"], + } + + def setup(self, data_size, operation_type): + super().setup(data_size) + self.cols_to_agg = self.df.columns[1:4] + operations = self.operations[operation_type] + self.agg_dict = { + c: operations[i % len(operations)] for i, c in enumerate(self.cols_to_agg) + } + + def time_groupby_dict_agg(self, data_size, operation_type): + execute(self.df.groupby(by=self.groupby_columns).agg(self.agg_dict)) class TimeJoin: diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index cbd9418f17b..6f62405a735 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -23,6 +23,7 @@ ) from pandas.core.base import DataError from typing import Type, Callable +from collections.abc import Iterable, Container import warnings @@ -37,6 +38,7 @@ ReductionFunction, BinaryFunction, GroupbyReduceFunction, + groupby_reduce_functions, ) @@ -2443,33 +2445,57 @@ def _callable_func(self, func, axis, *args, **kwargs): # nature. They require certain data to exist on the same partition, and # after the shuffle, there should be only a local map required. - groupby_count = GroupbyReduceFunction.register( - lambda df, **kwargs: df.count(**kwargs), lambda df, **kwargs: df.sum(**kwargs) - ) - groupby_any = GroupbyReduceFunction.register( - lambda df, **kwargs: df.any(**kwargs), lambda df, **kwargs: df.any(**kwargs) - ) - groupby_min = GroupbyReduceFunction.register( - lambda df, **kwargs: df.min(**kwargs), lambda df, **kwargs: df.min(**kwargs) - ) - groupby_prod = GroupbyReduceFunction.register( - lambda df, **kwargs: df.prod(**kwargs), lambda df, **kwargs: df.prod(**kwargs) - ) - groupby_max = GroupbyReduceFunction.register( - lambda df, **kwargs: df.max(**kwargs), lambda df, **kwargs: df.max(**kwargs) - ) - groupby_all = GroupbyReduceFunction.register( - lambda df, **kwargs: df.all(**kwargs), lambda df, **kwargs: df.all(**kwargs) - ) - groupby_sum = GroupbyReduceFunction.register( - lambda df, **kwargs: df.sum(**kwargs), lambda df, **kwargs: df.sum(**kwargs) - ) + groupby_count = GroupbyReduceFunction.register(*groupby_reduce_functions["count"]) + groupby_any = GroupbyReduceFunction.register(*groupby_reduce_functions["any"]) + groupby_min = GroupbyReduceFunction.register(*groupby_reduce_functions["min"]) + groupby_prod = GroupbyReduceFunction.register(*groupby_reduce_functions["prod"]) + groupby_max = GroupbyReduceFunction.register(*groupby_reduce_functions["max"]) + groupby_all = GroupbyReduceFunction.register(*groupby_reduce_functions["all"]) + groupby_sum = GroupbyReduceFunction.register(*groupby_reduce_functions["sum"]) groupby_size = GroupbyReduceFunction.register( - lambda df, **kwargs: pandas.DataFrame(df.size()), - lambda df, **kwargs: df.sum(), - method="size", + *groupby_reduce_functions["size"], method="size" ) + def _groupby_dict_reduce( + self, by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop=False + ): + map_dict = {} + reduce_dict = {} + rename_columns = any( + not isinstance(fn, str) and isinstance(fn, Iterable) + for fn in agg_func.values() + ) + for col, col_funcs in agg_func.items(): + if not rename_columns: + map_dict[col], reduce_dict[col] = groupby_reduce_functions[col_funcs] + continue + + if isinstance(col_funcs, str): + col_funcs = [col_funcs] + + map_fns = [] + for i, fn in enumerate(col_funcs): + if not isinstance(fn, str) and isinstance(fn, Iterable): + new_col_name, func = fn + elif isinstance(fn, str): + new_col_name, func = fn, fn + else: + raise TypeError + + map_fns.append((new_col_name, groupby_reduce_functions[func][0])) + reduce_dict[(col, new_col_name)] = groupby_reduce_functions[func][1] + map_dict[col] = map_fns + return GroupbyReduceFunction.register(map_dict, reduce_dict)( + query_compiler=self, + by=by, + axis=axis, + groupby_args=groupby_kwargs, + map_args=agg_kwargs, + reduce_args=agg_kwargs, + numeric_only=False, + drop=drop, + ) + def groupby_agg( self, by, @@ -2481,6 +2507,31 @@ def groupby_agg( groupby_kwargs, drop=False, ): + def is_reduce_fn(fn, deep_level=0): + if not isinstance(fn, str) and isinstance(fn, Container): + # `deep_level` parameter specifies the number of nested containers that was met: + # - if it's 0, then we're outside of container, `fn` could be either function name + # or container of function names/renamers. + # - if it's 1, then we're inside container of function names/renamers. `fn` must be + # either function name or renamer (renamer is some container which length == 2, + # the first element is the new column name and the second is the function name). + assert deep_level == 0 or ( + deep_level > 0 and len(fn) == 2 + ), f"Got the renamer with incorrect length, expected 2 got {len(fn)}." + return ( + all(is_reduce_fn(f, deep_level + 1) for f in fn) + if deep_level == 0 + else is_reduce_fn(fn[1], deep_level + 1) + ) + return isinstance(fn, str) and fn in groupby_reduce_functions + + if isinstance(agg_func, dict) and all( + is_reduce_fn(x) for x in agg_func.values() + ): + return self._groupby_dict_reduce( + by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop + ) + if callable(agg_func): agg_func = wrap_udf_function(agg_func) diff --git a/modin/data_management/functions/__init__.py b/modin/data_management/functions/__init__.py index 4562bb0d429..e5e58ea3bdf 100644 --- a/modin/data_management/functions/__init__.py +++ b/modin/data_management/functions/__init__.py @@ -17,7 +17,7 @@ from .reductionfunction import ReductionFunction from .foldfunction import FoldFunction from .binary_function import BinaryFunction -from .groupby_function import GroupbyReduceFunction +from .groupby_function import GroupbyReduceFunction, groupby_reduce_functions __all__ = [ "Function", @@ -27,4 +27,5 @@ "FoldFunction", "BinaryFunction", "GroupbyReduceFunction", + "groupby_reduce_functions", ] diff --git a/modin/data_management/functions/groupby_function.py b/modin/data_management/functions/groupby_function.py index 66d8a4c3d5c..fef5a8a8ecf 100644 --- a/modin/data_management/functions/groupby_function.py +++ b/modin/data_management/functions/groupby_function.py @@ -20,174 +20,243 @@ class GroupbyReduceFunction(MapReduceFunction): @classmethod def call(cls, map_func, reduce_func, *call_args, **call_kwds): - def caller( - query_compiler, - by, - axis, - groupby_args, - map_args, - reduce_args=None, - numeric_only=True, - drop=False, - ): - if not isinstance(by, (type(query_compiler), str)): - by = try_cast_to_pandas(by, squeeze=True) - return query_compiler.default_to_pandas( - lambda df: map_func( - df.groupby(by=by, axis=axis, **groupby_args), **map_args - ) + assert not ( + isinstance(map_func, dict) ^ isinstance(reduce_func, dict) + ) and not ( + callable(map_func) ^ callable(reduce_func) + ), "Map and reduce functions must be either both dict or both callable." + + return lambda *args, **kwargs: cls.caller( + *args, map_func=map_func, reduce_func=reduce_func, **kwargs, **call_kwds + ) + + @classmethod + def map( + cls, + df, + other=None, + axis=0, + by=None, + groupby_args=None, + map_func=None, + map_args=None, + drop=False, + ): + # Set `as_index` to True to track the metadata of the grouping object + # It is used to make sure that between phases we are constructing the + # right index and placing columns in the correct order. + groupby_args["as_index"] = True + groupby_args["observed"] = True + if other is not None: + other = other.squeeze(axis=axis ^ 1) + if isinstance(other, pandas.DataFrame): + df = pandas.concat( + [df] + [other[[o for o in other if o not in df]]], + axis=1, ) - assert axis == 0, "Can only groupby reduce with axis=0" + other = list(other.columns) + by_part = other + else: + by_part = by + + apply_func = cls.try_filter_dict(map_func, df) + result = apply_func( + df.groupby(by=by_part, axis=axis, **groupby_args), **map_args + ) + return result + + @classmethod + def reduce( + cls, + df, + partition_idx=0, + axis=0, + groupby_args=None, + reduce_func=None, + reduce_args=None, + drop=False, + **kwargs, + ): + by_part = list(df.index.names) + if drop and len(df.columns.intersection(by_part)) > 0: + df.drop(columns=by_part, errors="ignore", inplace=True) + + groupby_args = groupby_args.copy() + method = kwargs.get("method", None) + as_index = groupby_args["as_index"] - if numeric_only: - qc = query_compiler.getitem_column_array( - query_compiler._modin_frame._numeric_columns(True) + # Set `as_index` to True to track the metadata of the grouping object + groupby_args["as_index"] = True + + # since now index levels contain out 'by', in the reduce phace + # we want to group on these levels + groupby_args["level"] = list(range(len(df.index.names))) + + apply_func = cls.try_filter_dict(reduce_func, df) + result = apply_func(df.groupby(axis=axis, **groupby_args), **reduce_args) + + if not as_index: + insert_levels = partition_idx == 0 and (drop or method == "size") + result.reset_index(drop=not insert_levels, inplace=True) + + return result + + @classmethod + def caller( + cls, + query_compiler, + by, + axis, + groupby_args, + map_args, + map_func, + numeric_only=True, + **kwargs, + ): + if not isinstance(by, (type(query_compiler), str)): + by = try_cast_to_pandas(by, squeeze=True) + default_func = ( + (lambda grp: grp.agg(map_func)) + if isinstance(map_func, dict) + else map_func + ) + return query_compiler.default_to_pandas( + lambda df: default_func( + df.groupby(by=by, axis=axis, **groupby_args), **map_args ) - else: - qc = query_compiler - # since we're going to modify `groupby_args` dict in a `compute_map`, - # we want to copy it to not propagate these changes into source dict, in case - # of unsuccessful end of function - groupby_args = groupby_args.copy() - - as_index = groupby_args.get("as_index", True) - observed = groupby_args.get("observed", False) - - if isinstance(by, str): - - def _map(df): - # Set `as_index` to True to track the metadata of the grouping - # object It is used to make sure that between phases we are - # constructing the right index and placing columns in the correct - # order. - groupby_args["as_index"] = True - groupby_args["observed"] = True - - result = map_func( - df.groupby(by=by, axis=axis, **groupby_args), **map_args - ) - # The _modin_groupby_ prefix indicates that this is the first - # partition, and since we may need to insert the grouping data in - # the reduce phase - if ( - not isinstance(result.index, pandas.MultiIndex) - and result.index.name is not None - and result.index.name in result.columns - ): - result.index.name = "{}{}".format( - "_modin_groupby_", result.index.name - ) - return result - - else: - - def _map(df, other): - def compute_map(df, other): - # Set `as_index` to True to track the metadata of the grouping object - # It is used to make sure that between phases we are constructing the - # right index and placing columns in the correct order. - groupby_args["as_index"] = True - groupby_args["observed"] = True - - other = other.squeeze(axis=axis ^ 1) - if isinstance(other, pandas.DataFrame): - df = pandas.concat( - [df] + [other[[o for o in other if o not in df]]], - axis=1, - ) - other = list(other.columns) - result = map_func( - df.groupby(by=other, axis=axis, **groupby_args), **map_args - ) - # if `other` has category dtype, then pandas will drop that - # column after groupby, inserting it back to correctly process - # reduce phase - if ( - drop - and not as_index - and isinstance(other, pandas.Series) - and isinstance(other.dtype, pandas.CategoricalDtype) - and result.index.name is not None - and result.index.name not in result.columns - ): - result.insert( - loc=0, column=result.index.name, value=result.index - ) - # The _modin_groupby_ prefix indicates that this is the first partition, - # and since we may need to insert the grouping data in the reduce phase - if ( - not isinstance(result.index, pandas.MultiIndex) - and result.index.name is not None - and result.index.name in result.columns - ): - result.index.name = "{}{}".format( - "_modin_groupby_", result.index.name - ) - return result - - try: - return compute_map(df, other) - # This will happen with Arrow buffer read-only errors. We don't want to copy - # all the time, so this will try to fast-path the code first. - except ValueError: - return compute_map(df.copy(), other.copy()) - - def _reduce(df): - def compute_reduce(df): - other_len = len(df.index.names) - df = df.reset_index(drop=False) - # See note above about setting `as_index` - groupby_args["as_index"] = as_index - groupby_args["observed"] = observed - if other_len > 1: - by_part = list(df.columns[0:other_len]) - else: - by_part = df.columns[0] - result = reduce_func( - df.groupby(by=by_part, axis=axis, **groupby_args), **reduce_args - ) - if ( - not isinstance(result.index, pandas.MultiIndex) - and result.index.name is not None - and "_modin_groupby_" in result.index.name - ): - result.index.name = result.index.name[len("_modin_groupby_") :] - if isinstance(by_part, str) and by_part in result.columns: - if "_modin_groupby_" in by_part and drop: - col_name = by_part[len("_modin_groupby_") :] - new_result = result.drop(columns=col_name, errors="ignore") - new_result.columns = [ - col_name if "_modin_groupby_" in c else c - for c in new_result.columns - ] - return new_result - else: - return ( - result.drop(columns=by_part) - if call_kwds.get("method", None) != "size" - else result - ) - return result - - try: - return compute_reduce(df) - # This will happen with Arrow buffer read-only errors. We don't want to copy - # all the time, so this will try to fast-path the code first. - except ValueError: - return compute_reduce(df.copy()) - - # TODO: try to precompute `new_index` and `new_columns` - if isinstance(by, str): - new_modin_frame = qc._modin_frame._map_reduce( - axis, _map, reduce_func=_reduce, preserve_index=False + ) + assert axis == 0, "Can only groupby reduce with axis=0" + + if numeric_only: + qc = query_compiler.getitem_column_array( + query_compiler._modin_frame._numeric_columns(True) + ) + else: + qc = query_compiler + + map_fn, reduce_fn = cls.build_map_reduce_functions( + by=by, + axis=axis, + groupby_args=groupby_args, + map_func=map_func, + map_args=map_args, + **kwargs, + ) + + broadcastable_by = getattr(by, "_modin_frame", None) + apply_indices = list(map_func.keys()) if isinstance(map_func, dict) else None + new_modin_frame = qc._modin_frame.groupby_reduce( + axis, broadcastable_by, map_fn, reduce_fn, apply_indices=apply_indices + ) + + result = query_compiler.__constructor__(new_modin_frame) + if result.index.name == "__reduced__": + result.index.name = None + return result + + @staticmethod + def try_filter_dict(agg_func, df): + if not isinstance(agg_func, dict): + return agg_func + partition_dict = {k: v for k, v in agg_func.items() if k in df.columns} + return lambda grp: grp.agg(partition_dict) + + @classmethod + def build_map_reduce_functions( + cls, + by, + axis, + groupby_args, + map_func, + map_args, + reduce_func, + reduce_args, + drop, + **kwargs, + ): + # if by is a query compiler, then it will be broadcasted explicit via + # groupby_reduce method of the modin frame and so we don't want secondary + # implicit broadcastion via passing it as an function argument. + if hasattr(by, "_modin_frame"): + by = None + + def _map(df, other=None, **kwargs): + def wrapper(df, other=None): + return cls.map( + df, + other, + axis=axis, + by=by, + groupby_args=groupby_args.copy(), + map_func=map_func, + map_args=map_args, + drop=drop, + **kwargs, ) - else: - new_modin_frame = qc._modin_frame.groupby_reduce( - axis, by._modin_frame, _map, _reduce + + try: + result = wrapper(df, other) + # This will happen with Arrow buffer read-only errors. We don't want to copy + # all the time, so this will try to fast-path the code first. + except ValueError: + result = wrapper(df.copy(), other if other is None else other.copy()) + return result + + def _reduce(df, **call_kwargs): + def wrapper(df): + return cls.reduce( + df, + axis=axis, + groupby_args=groupby_args, + reduce_func=reduce_func, + reduce_args=reduce_args, + drop=drop, + **kwargs, + **call_kwargs, ) - result = query_compiler.__constructor__(new_modin_frame) - if result.index.name == "__reduced__": - result.index.name = None + + try: + result = wrapper(df) + # This will happen with Arrow buffer read-only errors. We don't want to copy + # all the time, so this will try to fast-path the code first. + except ValueError: + result = wrapper(df.copy()) return result - return caller + return _map, _reduce + + +groupby_reduce_functions = { + "all": ( + lambda df, *args, **kwargs: df.all(*args, **kwargs), + lambda df, *args, **kwargs: df.all(*args, **kwargs), + ), + "any": ( + lambda df, *args, **kwargs: df.any(*args, **kwargs), + lambda df, *args, **kwargs: df.any(*args, **kwargs), + ), + "count": ( + lambda df, *args, **kwargs: df.count(*args, **kwargs), + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + ), + "max": ( + lambda df, *args, **kwargs: df.max(*args, **kwargs), + lambda df, *args, **kwargs: df.max(*args, **kwargs), + ), + "min": ( + lambda df, *args, **kwargs: df.min(*args, **kwargs), + lambda df, *args, **kwargs: df.min(*args, **kwargs), + ), + "prod": ( + lambda df, *args, **kwargs: df.prod(*args, **kwargs), + lambda df, *args, **kwargs: df.prod(*args, **kwargs), + ), + "size": ( + lambda df, *args, **kwargs: pandas.DataFrame(df.size(*args, **kwargs)), + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + ), + "sum": ( + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + ), +} diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index 9c9e305db86..52e27c31eab 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -1996,26 +1996,50 @@ def _concat(self, axis, others, how, sort): ) def groupby_reduce( - self, axis, by, map_func, reduce_func, new_index=None, new_columns=None + self, + axis, + by, + map_func, + reduce_func, + new_index=None, + new_columns=None, + apply_indices=None, ): """Groupby another dataframe and aggregate the result. - Args: - axis: The axis to groupby and aggregate over. - by: The dataframe to group by. - map_func: The map component of the aggregation. - reduce_func: The reduce component of the aggregation. - new_index: (optional) The index of the result. We may know this in advance, + Parameters + ---------- + axis: int, + The axis to groupby and aggregate over. + by: ModinFrame (optional), + The dataframe to group by. + map_func: callable, + The map component of the aggregation. + reduce_func: callable, + The reduce component of the aggregation. + new_index: Index (optional), + The index of the result. We may know this in advance, and if not provided it must be computed. - new_columns: (optional) The columns of the result. We may know this in - advance, and if not provided it must be computed. + new_columns: Index (optional), + The columns of the result. We may know this in advance, + and if not provided it must be computed. + apply_indices : list-like (optional), + Indices of `axis ^ 1` to apply groupby over. Returns ------- A new dataframe. """ + by_parts = by if by is None else by._partitions + + if apply_indices is not None: + numeric_indices = self.axes[axis ^ 1].get_indexer_for(apply_indices) + apply_indices = list( + self._get_dict_of_block_index(axis ^ 1, numeric_indices).keys() + ) + new_partitions = self._frame_mgr_cls.groupby_reduce( - axis, self._partitions, by._partitions, map_func, reduce_func + axis, self._partitions, by_parts, map_func, reduce_func, apply_indices ) new_axes = [ self._compute_axis_labels(i, new_partitions) diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 3d58942af08..3b2adc4e533 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -99,12 +99,45 @@ def axis_partition(cls, partitions, axis): ) @classmethod - def groupby_reduce(cls, axis, partitions, by, map_func, reduce_func): - """Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func.""" - mapped_partitions = cls.broadcast_apply( - axis, map_func, left=partitions, right=by, other_name="other" + def groupby_reduce( + cls, axis, partitions, by, map_func, reduce_func, apply_indices=None + ): + """ + Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func. + + Parameters + ---------- + axis: int, + Axis to groupby over. + partitions: numpy 2D array, + Partitions of the ModinFrame to groupby. + by: numpy 2D array (optional), + Partitions of 'by' to broadcast. + map_func: callable, + Map function. + reduce_func: callable, + Reduce function. + apply_indices : list of ints (optional), + Indices of `axis ^ 1` to apply function over. + + Returns + ------- + Partitions with applied groupby. + """ + if apply_indices is not None: + partitions = ( + partitions[apply_indices] if axis else partitions[:, apply_indices] + ) + + if by is not None: + mapped_partitions = cls.broadcast_apply( + axis, map_func, left=partitions, right=by, other_name="other" + ) + else: + mapped_partitions = cls.map_partitions(partitions, map_func) + return cls.map_axis_partitions( + axis, mapped_partitions, reduce_func, enumerate_partitions=True ) - return cls.map_axis_partitions(axis, mapped_partitions, reduce_func) @classmethod def broadcast_apply_select_indices( @@ -351,6 +384,7 @@ def map_axis_partitions( map_func, keep_partitioning=False, lengths=None, + enumerate_partitions=False, ): """ Apply `map_func` to every partition. @@ -367,6 +401,9 @@ def map_axis_partitions( The flag to keep partitions for Modin Frame. lengths : list(int) The list of lengths to shuffle the object. + enumerate_partitions : bool (optional, default False), + Whether or not to pass partition index into `map_func`. + Note that `map_func` must be able to obtain `partition_idx` kwarg. Returns ------- @@ -385,6 +422,7 @@ def map_axis_partitions( keep_partitioning=keep_partitioning, right=None, lengths=lengths, + enumerate_partitions=enumerate_partitions, ) @classmethod diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 0fdeacc2161..0328701addb 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -29,6 +29,7 @@ from pandas.core.aggregation import reconstruct_func import pandas.core.common as com from types import BuiltinFunctionType +from collections.abc import Iterable from modin.error_message import ErrorMessage from modin.utils import _inherit_docstrings, try_cast_to_pandas, wrap_udf_function @@ -376,27 +377,22 @@ def aggregate(self, func=None, *args, **kwargs): relabeling_required = False if isinstance(func, dict) or func is None: - def _reconstruct_func(func, **kwargs): - relabeling_required, func, new_columns, order = reconstruct_func( - func, **kwargs - ) - # We convert to the string version of the function for simplicity. - func = { - k: v - if not callable(v) or v.__name__ not in dir(self) - else v.__name__ - for k, v in func.items() - } - return relabeling_required, func, new_columns, order - - relabeling_required, func_dict, new_columns, order = _reconstruct_func( + def try_get_str_func(fn): + if not isinstance(fn, str) and isinstance(fn, Iterable): + return [try_get_str_func(f) for f in fn] + return fn.__name__ if callable(fn) and fn.__name__ in dir(self) else fn + + relabeling_required, func_dict, new_columns, order = reconstruct_func( func, **kwargs ) + func_dict = {col: try_get_str_func(fn) for col, fn in func_dict.items()} if any(i not in self._df.columns for i in func_dict.keys()): from pandas.core.base import SpecificationError raise SpecificationError("nested renamer is not supported") + if func is None: + kwargs = {} func = func_dict elif is_list_like(func): return self._default_to_pandas( diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 8ce64e5a88b..54c78563357 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -61,13 +61,6 @@ def wrapper(obj1, obj2, *args, **kwargs): return wrapper -def df_from_grp(self, grp): - if grp.__module__.split(".")[0] == "pandas": - return grp._obj_with_exclusions - else: - return grp._df - - @pytest.mark.parametrize("as_index", [True, False]) def test_mixed_dtypes_groupby(as_index): frame_data = np.random.randint(97, 198, size=(2 ** 6, 2 ** 4)) @@ -641,6 +634,7 @@ def test_large_row_groupby(is_by_category): min, sum, {"A": "sum"}, + {"A": lambda df: df.sum()}, {"A": "max", "B": "sum", "C": "min"}, ] for func in agg_functions: @@ -1246,15 +1240,32 @@ def test_shift_freq(groupby_axis, shift_axis): "min": (list(test_data["int_data"].keys())[-1], np.min), }, }, + { + "by": [ + list(test_data["int_data"].keys())[0], + list(test_data["int_data"].keys())[-1], + ], + "agg_dict": { + "max": (list(test_data["int_data"].keys())[1], max), + "min": (list(test_data["int_data"].keys())[-1], min), + }, + }, ], ) -def test_agg_func_None_rename(by_and_agg_dict): +@pytest.mark.parametrize( + "as_index", + [ + True, + pytest.param(False, marks=pytest.mark.xfail(reason="See modin issue #2543")), + ], +) +def test_agg_func_None_rename(by_and_agg_dict, as_index): modin_df, pandas_df = create_test_dfs(test_data["int_data"]) - modin_result = modin_df.groupby(by_and_agg_dict["by"]).agg( + modin_result = modin_df.groupby(by_and_agg_dict["by"], as_index=as_index).agg( **by_and_agg_dict["agg_dict"] ) - pandas_result = pandas_df.groupby(by_and_agg_dict["by"]).agg( + pandas_result = pandas_df.groupby(by_and_agg_dict["by"], as_index=as_index).agg( **by_and_agg_dict["agg_dict"] ) df_equals(modin_result, pandas_result) @@ -1426,9 +1437,7 @@ def get_columns(df): @pytest.mark.parametrize( "func_to_apply", [ - pytest.param( - lambda df: df.sum(), marks=pytest.mark.skip("See modin issue #2512") - ), + lambda df: df.sum(), lambda df: df.size(), lambda df: df.quantile(), lambda df: df.dtypes, @@ -1439,13 +1448,27 @@ def get_columns(df): ), lambda grp: grp.agg( { - df_from_grp(grp).columns[0]: (max, min, sum), - df_from_grp(grp).columns[-1]: (sum, min, max), + list(test_data_values[0].keys())[1]: (max, min, sum), + list(test_data_values[0].keys())[-2]: (sum, min, max), } ), lambda grp: grp.agg( - max=(df_from_grp(grp).columns[0], max), - sum=(df_from_grp(grp).columns[-1], sum), + { + list(test_data_values[0].keys())[1]: [ + ("new_sum", "sum"), + ("new_min", "min"), + ], + list(test_data_values[0].keys())[-2]: np.sum, + } + ), + pytest.param( + lambda grp: grp.agg( + { + list(test_data_values[0].keys())[1]: (max, min, sum), + list(test_data_values[0].keys())[-1]: (sum, min, max), + } + ), + marks=pytest.mark.skip("See modin issue #2542"), ), ], )