From e5f7318ed749702e6efe0344857259ac9ecc0b3a Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 10 Dec 2020 22:37:51 +0300 Subject: [PATCH 01/11] FEAT-#2491: optimized dictionary reduce aggregation Signed-off-by: Dmitry Chigarev --- modin/backends/pandas/query_compiler.py | 94 ++-- modin/data_management/functions/__init__.py | 3 +- .../functions/groupby_function.py | 407 +++++++++++------- modin/engines/base/frame/data.py | 42 +- modin/engines/base/frame/partition_manager.py | 23 +- modin/pandas/dataframe.py | 2 +- modin/pandas/test/test_groupby.py | 5 +- 7 files changed, 369 insertions(+), 207 deletions(-) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index cbd9418f17b..e6d96c555e3 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -23,6 +23,7 @@ ) from pandas.core.base import DataError from typing import Type, Callable +from collections.abc import Container import warnings @@ -37,6 +38,7 @@ ReductionFunction, BinaryFunction, GroupbyReduceFunction, + groupby_reduce_functions, ) @@ -2443,33 +2445,63 @@ def _callable_func(self, func, axis, *args, **kwargs): # nature. They require certain data to exist on the same partition, and # after the shuffle, there should be only a local map required. - groupby_count = GroupbyReduceFunction.register( - lambda df, **kwargs: df.count(**kwargs), lambda df, **kwargs: df.sum(**kwargs) - ) - groupby_any = GroupbyReduceFunction.register( - lambda df, **kwargs: df.any(**kwargs), lambda df, **kwargs: df.any(**kwargs) - ) - groupby_min = GroupbyReduceFunction.register( - lambda df, **kwargs: df.min(**kwargs), lambda df, **kwargs: df.min(**kwargs) - ) - groupby_prod = GroupbyReduceFunction.register( - lambda df, **kwargs: df.prod(**kwargs), lambda df, **kwargs: df.prod(**kwargs) - ) - groupby_max = GroupbyReduceFunction.register( - lambda df, **kwargs: df.max(**kwargs), lambda df, **kwargs: df.max(**kwargs) - ) - groupby_all = GroupbyReduceFunction.register( - lambda df, **kwargs: df.all(**kwargs), lambda df, **kwargs: df.all(**kwargs) - ) - groupby_sum = GroupbyReduceFunction.register( - lambda df, **kwargs: df.sum(**kwargs), lambda df, **kwargs: df.sum(**kwargs) - ) + groupby_count = GroupbyReduceFunction.register(*groupby_reduce_functions["count"]) + groupby_any = GroupbyReduceFunction.register(*groupby_reduce_functions["any"]) + groupby_min = GroupbyReduceFunction.register(*groupby_reduce_functions["min"]) + groupby_prod = GroupbyReduceFunction.register(*groupby_reduce_functions["prod"]) + groupby_max = GroupbyReduceFunction.register(*groupby_reduce_functions["max"]) + groupby_all = GroupbyReduceFunction.register(*groupby_reduce_functions["all"]) + groupby_sum = GroupbyReduceFunction.register(*groupby_reduce_functions["sum"]) groupby_size = GroupbyReduceFunction.register( - lambda df, **kwargs: pandas.DataFrame(df.size()), - lambda df, **kwargs: df.sum(), - method="size", + *groupby_reduce_functions["size"], method="size" ) + def _groupby_dict_reduce( + self, by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop=False + ): + map_dict = {} + reduce_dict = {} + rename_columns = any( + not isinstance(fn, str) and isinstance(fn, Container) + for fn in agg_func.values() + ) + # breakpoint() + for col, col_funcs in agg_func.items(): + # single function without renaming + if not rename_columns: + map_dict[col], reduce_dict[col] = groupby_reduce_functions[col_funcs] + continue + + if isinstance(col_funcs, str): + col_funcs = [col_funcs] + # breakpoint() + map_fns = [] + for i, fn in enumerate(col_funcs): + if not isinstance(fn, str) and isinstance(fn, Container): + assert ( + len(fn) == 2 + ), f"Incorrect number of values to unpack. (got {len(fn)} expected 2)" + future_col_name, func = fn[0], fn[1] + elif isinstance(fn, str): + future_col_name, func = fn, fn + else: + raise TypeError + # breakpoint() + map_fns.append((future_col_name, groupby_reduce_functions[func][0])) + reduce_dict[(col, future_col_name)] = groupby_reduce_functions[func][1] + map_dict[col] = map_fns + # breakpoint() + return GroupbyReduceFunction.register(map_dict, reduce_dict)( + query_compiler=self, + by=by, + axis=axis, + groupby_args=groupby_kwargs, + map_args=agg_kwargs, + reduce_args=agg_kwargs, + numeric_only=False, + drop=drop, + ) + def groupby_agg( self, by, @@ -2481,6 +2513,20 @@ def groupby_agg( groupby_kwargs, drop=False, ): + def is_reduce_fn(o): + if callable(o): + return False + if isinstance(o, str): + o = [o] + return all(x in groupby_reduce_functions for x in o) + + if isinstance(agg_func, dict) and all( + is_reduce_fn(x) for x in agg_func.values() + ): + return self._groupby_dict_reduce( + by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop + ) + if callable(agg_func): agg_func = wrap_udf_function(agg_func) diff --git a/modin/data_management/functions/__init__.py b/modin/data_management/functions/__init__.py index 4562bb0d429..e5e58ea3bdf 100644 --- a/modin/data_management/functions/__init__.py +++ b/modin/data_management/functions/__init__.py @@ -17,7 +17,7 @@ from .reductionfunction import ReductionFunction from .foldfunction import FoldFunction from .binary_function import BinaryFunction -from .groupby_function import GroupbyReduceFunction +from .groupby_function import GroupbyReduceFunction, groupby_reduce_functions __all__ = [ "Function", @@ -27,4 +27,5 @@ "FoldFunction", "BinaryFunction", "GroupbyReduceFunction", + "groupby_reduce_functions", ] diff --git a/modin/data_management/functions/groupby_function.py b/modin/data_management/functions/groupby_function.py index 66d8a4c3d5c..3f1f4ac3abf 100644 --- a/modin/data_management/functions/groupby_function.py +++ b/modin/data_management/functions/groupby_function.py @@ -20,174 +20,253 @@ class GroupbyReduceFunction(MapReduceFunction): @classmethod def call(cls, map_func, reduce_func, *call_args, **call_kwds): - def caller( - query_compiler, - by, - axis, - groupby_args, - map_args, - reduce_args=None, - numeric_only=True, - drop=False, - ): - if not isinstance(by, (type(query_compiler), str)): - by = try_cast_to_pandas(by, squeeze=True) - return query_compiler.default_to_pandas( - lambda df: map_func( - df.groupby(by=by, axis=axis, **groupby_args), **map_args - ) + assert not ( + isinstance(map_func, dict) ^ isinstance(reduce_func, dict) + ) and not ( + callable(map_func) ^ callable(reduce_func) + ), "Map and reduce functions must be either both dict or both callable." + + return lambda *args, **kwargs: cls.caller( + *args, map_func=map_func, reduce_func=reduce_func, **kwargs, **call_kwds + ) + + @classmethod + def map( + cls, + df, + other=None, + axis=0, + by=None, + groupby_args=None, + map_func=None, + map_args=None, + drop=False, + ): + # Set `as_index` to True to track the metadata of the grouping object + # It is used to make sure that between phases we are constructing the + # right index and placing columns in the correct order. + as_index = groupby_args.get("as_index", True) + groupby_args["as_index"] = True + groupby_args["observed"] = True + if other is not None: + other = other.squeeze(axis=axis ^ 1) + if isinstance(other, pandas.DataFrame): + df = pandas.concat( + [df] + [other[[o for o in other if o not in df]]], + axis=1, ) - assert axis == 0, "Can only groupby reduce with axis=0" + other = list(other.columns) + by_part = other + else: + by_part = by + + apply_func = cls.try_filter_dict(map_func, df) + result = apply_func( + df.groupby(by=by_part, axis=axis, **groupby_args), **map_args + ) + return result + + @classmethod + def reduce( + cls, + df, + partition_idx=0, + axis=0, + groupby_args=None, + reduce_func=None, + reduce_args=None, + drop=False, + **kwargs, + ): + by_part = list(df.index.names) + if drop and len(df.columns.intersection(by_part)) > 0: + df.drop(columns=by_part, errors="ignore", inplace=True) + + groupby_args = groupby_args.copy() + method = kwargs.get("method", None) + as_index = groupby_args["as_index"] + + # Set `as_index` to True to track the metadata of the grouping object + groupby_args["as_index"] = True - if numeric_only: - qc = query_compiler.getitem_column_array( - query_compiler._modin_frame._numeric_columns(True) + # since now index levels contain out 'by', in the reduce phace + # we want to group on these levels + groupby_args["level"] = list(range(len(df.index.names))) + + apply_func = cls.try_filter_dict(reduce_func, df) + result = apply_func(df.groupby(axis=axis, **groupby_args), **reduce_args) + + if not as_index: + insert_levels = partition_idx == 0 and (drop or method == "size") + result.reset_index(drop=not insert_levels, inplace=True) + + cols_to_drop = list( + result.columns[result.columns.str.match(r"__reduced__.*", na=False)] + if hasattr(result.columns, "str") + else [] + ) + + if method != "size" and len(cols_to_drop) > 0 and len(df.columns) > 1: + result.drop(columns=cols_to_drop, inplace=True, errors="ignore") + + return result + + @classmethod + def caller( + cls, + query_compiler, + by, + axis, + groupby_args, + map_args, + map_func, + numeric_only=True, + **kwargs, + ): + if not isinstance(by, (type(query_compiler), str)): + by = try_cast_to_pandas(by, squeeze=True) + default_func = ( + (lambda grp: grp.agg(map_func)) + if isinstance(map_func, dict) + else map_func + ) + return query_compiler.default_to_pandas( + lambda df: default_func( + df.groupby(by=by, axis=axis, **groupby_args), **map_args ) - else: - qc = query_compiler - # since we're going to modify `groupby_args` dict in a `compute_map`, - # we want to copy it to not propagate these changes into source dict, in case - # of unsuccessful end of function - groupby_args = groupby_args.copy() - - as_index = groupby_args.get("as_index", True) - observed = groupby_args.get("observed", False) - - if isinstance(by, str): - - def _map(df): - # Set `as_index` to True to track the metadata of the grouping - # object It is used to make sure that between phases we are - # constructing the right index and placing columns in the correct - # order. - groupby_args["as_index"] = True - groupby_args["observed"] = True - - result = map_func( - df.groupby(by=by, axis=axis, **groupby_args), **map_args - ) - # The _modin_groupby_ prefix indicates that this is the first - # partition, and since we may need to insert the grouping data in - # the reduce phase - if ( - not isinstance(result.index, pandas.MultiIndex) - and result.index.name is not None - and result.index.name in result.columns - ): - result.index.name = "{}{}".format( - "_modin_groupby_", result.index.name - ) - return result - - else: - - def _map(df, other): - def compute_map(df, other): - # Set `as_index` to True to track the metadata of the grouping object - # It is used to make sure that between phases we are constructing the - # right index and placing columns in the correct order. - groupby_args["as_index"] = True - groupby_args["observed"] = True - - other = other.squeeze(axis=axis ^ 1) - if isinstance(other, pandas.DataFrame): - df = pandas.concat( - [df] + [other[[o for o in other if o not in df]]], - axis=1, - ) - other = list(other.columns) - result = map_func( - df.groupby(by=other, axis=axis, **groupby_args), **map_args - ) - # if `other` has category dtype, then pandas will drop that - # column after groupby, inserting it back to correctly process - # reduce phase - if ( - drop - and not as_index - and isinstance(other, pandas.Series) - and isinstance(other.dtype, pandas.CategoricalDtype) - and result.index.name is not None - and result.index.name not in result.columns - ): - result.insert( - loc=0, column=result.index.name, value=result.index - ) - # The _modin_groupby_ prefix indicates that this is the first partition, - # and since we may need to insert the grouping data in the reduce phase - if ( - not isinstance(result.index, pandas.MultiIndex) - and result.index.name is not None - and result.index.name in result.columns - ): - result.index.name = "{}{}".format( - "_modin_groupby_", result.index.name - ) - return result - - try: - return compute_map(df, other) - # This will happen with Arrow buffer read-only errors. We don't want to copy - # all the time, so this will try to fast-path the code first. - except ValueError: - return compute_map(df.copy(), other.copy()) - - def _reduce(df): - def compute_reduce(df): - other_len = len(df.index.names) - df = df.reset_index(drop=False) - # See note above about setting `as_index` - groupby_args["as_index"] = as_index - groupby_args["observed"] = observed - if other_len > 1: - by_part = list(df.columns[0:other_len]) - else: - by_part = df.columns[0] - result = reduce_func( - df.groupby(by=by_part, axis=axis, **groupby_args), **reduce_args - ) - if ( - not isinstance(result.index, pandas.MultiIndex) - and result.index.name is not None - and "_modin_groupby_" in result.index.name - ): - result.index.name = result.index.name[len("_modin_groupby_") :] - if isinstance(by_part, str) and by_part in result.columns: - if "_modin_groupby_" in by_part and drop: - col_name = by_part[len("_modin_groupby_") :] - new_result = result.drop(columns=col_name, errors="ignore") - new_result.columns = [ - col_name if "_modin_groupby_" in c else c - for c in new_result.columns - ] - return new_result - else: - return ( - result.drop(columns=by_part) - if call_kwds.get("method", None) != "size" - else result - ) - return result - - try: - return compute_reduce(df) - # This will happen with Arrow buffer read-only errors. We don't want to copy - # all the time, so this will try to fast-path the code first. - except ValueError: - return compute_reduce(df.copy()) - - # TODO: try to precompute `new_index` and `new_columns` - if isinstance(by, str): - new_modin_frame = qc._modin_frame._map_reduce( - axis, _map, reduce_func=_reduce, preserve_index=False + ) + assert axis == 0, "Can only groupby reduce with axis=0" + + if numeric_only: + qc = query_compiler.getitem_column_array( + query_compiler._modin_frame._numeric_columns(True) + ) + else: + qc = query_compiler + + map_fn, reduce_fn = cls.build_map_reduce_functions( + by=by, + axis=axis, + groupby_args=groupby_args, + map_func=map_func, + map_args=map_args, + **kwargs, + ) + + broadcastable_by = getattr(by, "_modin_frame", None) + apply_indices = list(map_func.keys()) if isinstance(map_func, dict) else None + new_modin_frame = qc._modin_frame.groupby_reduce( + axis, broadcastable_by, map_fn, reduce_fn, apply_indices=apply_indices + ) + + result = query_compiler.__constructor__(new_modin_frame) + if result.index.name == "__reduced__": + result.index.name = None + return result + + @staticmethod + def try_filter_dict(agg_func, df): + if not isinstance(agg_func, dict): + return agg_func + partition_dict = {k: v for k, v in agg_func.items() if k in df.columns} + return lambda grp: grp.agg(partition_dict) + + @classmethod + def build_map_reduce_functions( + cls, + by, + axis, + groupby_args, + map_func, + map_args, + reduce_func, + reduce_args, + drop, + **kwargs, + ): + # if by is a query compiler, then it will be broadcasted explicit via + # groupby_reduce method of the modin frame and so we don't want secondary + # implicit broadcastion via passing it as an function argument. + if hasattr(by, "_modin_frame"): + by = None + + def _map(df, other=None, **kwargs): + def wrapper(df, other=None): + return cls.map( + df, + other, + axis=axis, + by=by, + groupby_args=groupby_args.copy(), + map_func=map_func, + map_args=map_args, + drop=drop, + **kwargs, ) - else: - new_modin_frame = qc._modin_frame.groupby_reduce( - axis, by._modin_frame, _map, _reduce + + try: + result = wrapper(df, other) + # This will happen with Arrow buffer read-only errors. We don't want to copy + # all the time, so this will try to fast-path the code first. + except ValueError: + result = wrapper(df.copy(), other if other is None else other.copy()) + return result + + def _reduce(df, **call_kwargs): + def wrapper(df): + return cls.reduce( + df, + axis=axis, + groupby_args=groupby_args, + reduce_func=reduce_func, + reduce_args=reduce_args, + drop=drop, + **kwargs, + **call_kwargs, ) - result = query_compiler.__constructor__(new_modin_frame) - if result.index.name == "__reduced__": - result.index.name = None + + try: + result = wrapper(df) + # This will happen with Arrow buffer read-only errors. We don't want to copy + # all the time, so this will try to fast-path the code first. + except ValueError: + result = wrapper(df.copy()) return result - return caller + return _map, _reduce + + +groupby_reduce_functions = { + "all": ( + lambda df, *args, **kwargs: df.all(*args, **kwargs), + lambda df, *args, **kwargs: df.all(*args, **kwargs), + ), + "any": ( + lambda df, *args, **kwargs: df.any(*args, **kwargs), + lambda df, *args, **kwargs: df.any(*args, **kwargs), + ), + "count": ( + lambda df, *args, **kwargs: df.count(*args, **kwargs), + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + ), + "max": ( + lambda df, *args, **kwargs: df.max(*args, **kwargs), + lambda df, *args, **kwargs: df.max(*args, **kwargs), + ), + "min": ( + lambda df, *args, **kwargs: df.min(*args, **kwargs), + lambda df, *args, **kwargs: df.min(*args, **kwargs), + ), + "prod": ( + lambda df, *args, **kwargs: df.prod(*args, **kwargs), + lambda df, *args, **kwargs: df.prod(*args, **kwargs), + ), + "size": ( + lambda df, *args, **kwargs: pandas.DataFrame(df.size(*args, **kwargs)), + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + ), + "sum": ( + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + lambda df, *args, **kwargs: df.sum(*args, **kwargs), + ), +} diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index 9c9e305db86..839c9254fb5 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -1996,26 +1996,48 @@ def _concat(self, axis, others, how, sort): ) def groupby_reduce( - self, axis, by, map_func, reduce_func, new_index=None, new_columns=None + self, + axis, + by, + map_func, + reduce_func, + new_index=None, + new_columns=None, + apply_indices=None, ): """Groupby another dataframe and aggregate the result. - Args: - axis: The axis to groupby and aggregate over. - by: The dataframe to group by. - map_func: The map component of the aggregation. - reduce_func: The reduce component of the aggregation. - new_index: (optional) The index of the result. We may know this in advance, + Parameters + ---------- + axis: int, + The axis to groupby and aggregate over. + by: ModinFrame (optional), + The dataframe to group by. + map_func: callable, + The map component of the aggregation. + reduce_func: callable, + The reduce component of the aggregation. + new_index: Index (optional), + The index of the result. We may know this in advance, + and if not provided it must be computed. + new_columns: Index (optional), + The columns of the result. We may know this in advance, and if not provided it must be computed. - new_columns: (optional) The columns of the result. We may know this in - advance, and if not provided it must be computed. Returns ------- A new dataframe. """ + by_parts = by if by is None else by._partitions + + if apply_indices is not None: + numeric_indices = self.axes[axis ^ 1].get_indexer_for(apply_indices) + apply_indices = list( + self._get_dict_of_block_index(axis ^ 1, numeric_indices).keys() + ) + new_partitions = self._frame_mgr_cls.groupby_reduce( - axis, self._partitions, by._partitions, map_func, reduce_func + axis, self._partitions, by_parts, map_func, reduce_func, apply_indices ) new_axes = [ self._compute_axis_labels(i, new_partitions) diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 3d58942af08..61f0e8e1b0c 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -99,12 +99,25 @@ def axis_partition(cls, partitions, axis): ) @classmethod - def groupby_reduce(cls, axis, partitions, by, map_func, reduce_func): + def groupby_reduce( + cls, axis, partitions, by, map_func, reduce_func, apply_indices=None + ): """Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func.""" - mapped_partitions = cls.broadcast_apply( - axis, map_func, left=partitions, right=by, other_name="other" + if apply_indices is not None: + # breakpoint() + partitions = ( + partitions[apply_indices] if axis else partitions[:, apply_indices] + ) + + if by is not None: + mapped_partitions = cls.broadcast_apply( + axis, map_func, left=partitions, right=by, other_name="other" + ) + else: + mapped_partitions = cls.map_partitions(partitions, map_func) + return cls.map_axis_partitions( + axis, mapped_partitions, reduce_func, enumerate_partitions=True ) - return cls.map_axis_partitions(axis, mapped_partitions, reduce_func) @classmethod def broadcast_apply_select_indices( @@ -351,6 +364,7 @@ def map_axis_partitions( map_func, keep_partitioning=False, lengths=None, + enumerate_partitions=False, ): """ Apply `map_func` to every partition. @@ -385,6 +399,7 @@ def map_axis_partitions( keep_partitioning=keep_partitioning, right=None, lengths=lengths, + enumerate_partitions=enumerate_partitions, ) @classmethod diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 8ebf85a4ae1..a6a243a692a 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -355,7 +355,7 @@ def groupby( # strings is passed in, the data used for the groupby is dropped before the # groupby takes place. drop = False - + # breakpoint() if ( not isinstance(by, (pandas.Series, Series)) and is_list_like(by) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 8ce64e5a88b..3d3602366cd 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -114,6 +114,7 @@ def test_mixed_dtypes_groupby(as_index): ) eval_shift(modin_groupby, pandas_groupby) eval_mean(modin_groupby, pandas_groupby) + # breakpoint() eval_any(modin_groupby, pandas_groupby) eval_min(modin_groupby, pandas_groupby) eval_general( @@ -1426,9 +1427,7 @@ def get_columns(df): @pytest.mark.parametrize( "func_to_apply", [ - pytest.param( - lambda df: df.sum(), marks=pytest.mark.skip("See modin issue #2512") - ), + lambda df: df.sum(), lambda df: df.size(), lambda df: df.quantile(), lambda df: df.dtypes, From dfdddcdc5805da567c271bf59cadeaf38c959d17 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 14 Dec 2020 16:46:30 +0300 Subject: [PATCH 02/11] FEAT-#2491: asv benchmarks added Signed-off-by: Dmitry Chigarev --- modin/backends/pandas/query_compiler.py | 8 +++----- modin/data_management/functions/groupby_function.py | 1 - modin/engines/base/frame/partition_manager.py | 1 - modin/pandas/dataframe.py | 2 +- modin/pandas/test/test_groupby.py | 1 - 5 files changed, 4 insertions(+), 9 deletions(-) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index e6d96c555e3..7c4733eb488 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -2465,16 +2465,14 @@ def _groupby_dict_reduce( not isinstance(fn, str) and isinstance(fn, Container) for fn in agg_func.values() ) - # breakpoint() for col, col_funcs in agg_func.items(): - # single function without renaming if not rename_columns: map_dict[col], reduce_dict[col] = groupby_reduce_functions[col_funcs] continue if isinstance(col_funcs, str): col_funcs = [col_funcs] - # breakpoint() + map_fns = [] for i, fn in enumerate(col_funcs): if not isinstance(fn, str) and isinstance(fn, Container): @@ -2486,11 +2484,11 @@ def _groupby_dict_reduce( future_col_name, func = fn, fn else: raise TypeError - # breakpoint() + map_fns.append((future_col_name, groupby_reduce_functions[func][0])) reduce_dict[(col, future_col_name)] = groupby_reduce_functions[func][1] map_dict[col] = map_fns - # breakpoint() + return GroupbyReduceFunction.register(map_dict, reduce_dict)( query_compiler=self, by=by, diff --git a/modin/data_management/functions/groupby_function.py b/modin/data_management/functions/groupby_function.py index 3f1f4ac3abf..f4d06d0fcf6 100644 --- a/modin/data_management/functions/groupby_function.py +++ b/modin/data_management/functions/groupby_function.py @@ -45,7 +45,6 @@ def map( # Set `as_index` to True to track the metadata of the grouping object # It is used to make sure that between phases we are constructing the # right index and placing columns in the correct order. - as_index = groupby_args.get("as_index", True) groupby_args["as_index"] = True groupby_args["observed"] = True if other is not None: diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 61f0e8e1b0c..53256f63a4d 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -104,7 +104,6 @@ def groupby_reduce( ): """Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func.""" if apply_indices is not None: - # breakpoint() partitions = ( partitions[apply_indices] if axis else partitions[:, apply_indices] ) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index a6a243a692a..8ebf85a4ae1 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -355,7 +355,7 @@ def groupby( # strings is passed in, the data used for the groupby is dropped before the # groupby takes place. drop = False - # breakpoint() + if ( not isinstance(by, (pandas.Series, Series)) and is_list_like(by) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 3d3602366cd..39c5511a3ca 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -114,7 +114,6 @@ def test_mixed_dtypes_groupby(as_index): ) eval_shift(modin_groupby, pandas_groupby) eval_mean(modin_groupby, pandas_groupby) - # breakpoint() eval_any(modin_groupby, pandas_groupby) eval_min(modin_groupby, pandas_groupby) eval_general( From 907662f9368b8026266ad75bc04760b3bc5a081a Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 14 Dec 2020 17:05:54 +0300 Subject: [PATCH 03/11] FEAT-#2491: doc-strings added Signed-off-by: Dmitry Chigarev --- modin/engines/base/frame/data.py | 2 ++ modin/engines/base/frame/partition_manager.py | 23 ++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index 839c9254fb5..52e27c31eab 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -2023,6 +2023,8 @@ def groupby_reduce( new_columns: Index (optional), The columns of the result. We may know this in advance, and if not provided it must be computed. + apply_indices : list-like (optional), + Indices of `axis ^ 1` to apply groupby over. Returns ------- diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 53256f63a4d..5774622261b 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -102,7 +102,28 @@ def axis_partition(cls, partitions, axis): def groupby_reduce( cls, axis, partitions, by, map_func, reduce_func, apply_indices=None ): - """Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func.""" + """ + Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func. + + Parameters + ---------- + axis: int, + Axis to groupby over. + partitions: numpy 2D array, + Partitions of the ModinFrame to groupby. + by: numpy 2D array (optional), + Partitions of 'by' to broadcast. + map_func: callable, + Map function. + reduce_func: callable, + Reduce function. + apply_indices : list of ints (optional), + Indices of `axis ^ 1` to apply function over. + + Returns + ------- + Partitions with applied groupby. + """ if apply_indices is not None: partitions = ( partitions[apply_indices] if axis else partitions[:, apply_indices] From c0d4df79260e8a6c82d7091eb04db08d73785b7d Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 14 Dec 2020 22:42:50 +0300 Subject: [PATCH 04/11] FEAT-#2491: fixes Signed-off-by: Dmitry Chigarev --- modin/backends/pandas/query_compiler.py | 11 ++++------- modin/pandas/groupby.py | 22 ++++++++-------------- modin/pandas/test/test_groupby.py | 3 ++- 3 files changed, 14 insertions(+), 22 deletions(-) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index 7c4733eb488..ade61c67e3e 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -23,7 +23,7 @@ ) from pandas.core.base import DataError from typing import Type, Callable -from collections.abc import Container +from collections.abc import Iterable import warnings @@ -2462,7 +2462,7 @@ def _groupby_dict_reduce( map_dict = {} reduce_dict = {} rename_columns = any( - not isinstance(fn, str) and isinstance(fn, Container) + not isinstance(fn, str) and isinstance(fn, Iterable) for fn in agg_func.values() ) for col, col_funcs in agg_func.items(): @@ -2475,11 +2475,8 @@ def _groupby_dict_reduce( map_fns = [] for i, fn in enumerate(col_funcs): - if not isinstance(fn, str) and isinstance(fn, Container): - assert ( - len(fn) == 2 - ), f"Incorrect number of values to unpack. (got {len(fn)} expected 2)" - future_col_name, func = fn[0], fn[1] + if not isinstance(fn, str) and isinstance(fn, Iterable): + future_col_name, func = fn elif isinstance(fn, str): future_col_name, func = fn, fn else: diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 0fdeacc2161..ade62ba7482 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -29,6 +29,7 @@ from pandas.core.aggregation import reconstruct_func import pandas.core.common as com from types import BuiltinFunctionType +from collections.abc import Iterable from modin.error_message import ErrorMessage from modin.utils import _inherit_docstrings, try_cast_to_pandas, wrap_udf_function @@ -376,22 +377,15 @@ def aggregate(self, func=None, *args, **kwargs): relabeling_required = False if isinstance(func, dict) or func is None: - def _reconstruct_func(func, **kwargs): - relabeling_required, func, new_columns, order = reconstruct_func( - func, **kwargs - ) - # We convert to the string version of the function for simplicity. - func = { - k: v - if not callable(v) or v.__name__ not in dir(self) - else v.__name__ - for k, v in func.items() - } - return relabeling_required, func, new_columns, order - - relabeling_required, func_dict, new_columns, order = _reconstruct_func( + def try_get_str_func(o): + if not isinstance(o, str) and isinstance(o, Iterable): + return [try_get_str_func(v) for v in o] + return o.__name__ if callable(o) and o.__name__ in dir(self) else o + + relabeling_required, func_dict, new_columns, order = reconstruct_func( func, **kwargs ) + func_dict = {k: try_get_str_func(v) for k, v in func_dict.items()} if any(i not in self._df.columns for i in func_dict.keys()): from pandas.core.base import SpecificationError diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 39c5511a3ca..ab0082841fb 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -641,6 +641,7 @@ def test_large_row_groupby(is_by_category): min, sum, {"A": "sum"}, + {"A": lambda df: df.sum()}, {"A": "max", "B": "sum", "C": "min"}, ] for func in agg_functions: @@ -1437,7 +1438,7 @@ def get_columns(df): ), lambda grp: grp.agg( { - df_from_grp(grp).columns[0]: (max, min, sum), + df_from_grp(grp).columns[0]: (("new_max", max), min, sum), df_from_grp(grp).columns[-1]: (sum, min, max), } ), From bcbd894866fd6cd4abd055bed142e3d57b9aba1e Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 15 Dec 2020 15:16:38 +0300 Subject: [PATCH 05/11] FEAT-#2491: new test cases added Signed-off-by: Dmitry Chigarev --- modin/backends/pandas/query_compiler.py | 20 ++++--- .../functions/groupby_function.py | 9 ---- modin/pandas/groupby.py | 2 + modin/pandas/test/test_groupby.py | 52 ++++++++++++++----- 4 files changed, 52 insertions(+), 31 deletions(-) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index ade61c67e3e..bbb4558dc25 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -23,7 +23,7 @@ ) from pandas.core.base import DataError from typing import Type, Callable -from collections.abc import Iterable +from collections.abc import Iterable, Container import warnings @@ -2485,7 +2485,6 @@ def _groupby_dict_reduce( map_fns.append((future_col_name, groupby_reduce_functions[func][0])) reduce_dict[(col, future_col_name)] = groupby_reduce_functions[func][1] map_dict[col] = map_fns - return GroupbyReduceFunction.register(map_dict, reduce_dict)( query_compiler=self, by=by, @@ -2508,12 +2507,17 @@ def groupby_agg( groupby_kwargs, drop=False, ): - def is_reduce_fn(o): - if callable(o): - return False - if isinstance(o, str): - o = [o] - return all(x in groupby_reduce_functions for x in o) + def is_reduce_fn(o, deep_level=0): + if not isinstance(o, str) and isinstance(o, Container): + assert deep_level == 0 or ( + deep_level > 0 and len(o) == 2 + ), f"Got the renamer with incorrect length, expected 2 got {len(o)}." + return ( + all(is_reduce_fn(v, deep_level + 1) for v in o) + if deep_level == 0 + else is_reduce_fn(o[1], deep_level + 1) + ) + return isinstance(o, str) and o in groupby_reduce_functions if isinstance(agg_func, dict) and all( is_reduce_fn(x) for x in agg_func.values() diff --git a/modin/data_management/functions/groupby_function.py b/modin/data_management/functions/groupby_function.py index f4d06d0fcf6..fef5a8a8ecf 100644 --- a/modin/data_management/functions/groupby_function.py +++ b/modin/data_management/functions/groupby_function.py @@ -99,15 +99,6 @@ def reduce( insert_levels = partition_idx == 0 and (drop or method == "size") result.reset_index(drop=not insert_levels, inplace=True) - cols_to_drop = list( - result.columns[result.columns.str.match(r"__reduced__.*", na=False)] - if hasattr(result.columns, "str") - else [] - ) - - if method != "size" and len(cols_to_drop) > 0 and len(df.columns) > 1: - result.drop(columns=cols_to_drop, inplace=True, errors="ignore") - return result @classmethod diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index ade62ba7482..2e83b04b337 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -391,6 +391,8 @@ def try_get_str_func(o): from pandas.core.base import SpecificationError raise SpecificationError("nested renamer is not supported") + if func is None: + kwargs = {} func = func_dict elif is_list_like(func): return self._default_to_pandas( diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index ab0082841fb..99e745c94f6 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -61,13 +61,6 @@ def wrapper(obj1, obj2, *args, **kwargs): return wrapper -def df_from_grp(self, grp): - if grp.__module__.split(".")[0] == "pandas": - return grp._obj_with_exclusions - else: - return grp._df - - @pytest.mark.parametrize("as_index", [True, False]) def test_mixed_dtypes_groupby(as_index): frame_data = np.random.randint(97, 198, size=(2 ** 6, 2 ** 4)) @@ -1247,15 +1240,32 @@ def test_shift_freq(groupby_axis, shift_axis): "min": (list(test_data["int_data"].keys())[-1], np.min), }, }, + { + "by": [ + list(test_data["int_data"].keys())[0], + list(test_data["int_data"].keys())[-1], + ], + "agg_dict": { + "max": (list(test_data["int_data"].keys())[1], max), + "min": (list(test_data["int_data"].keys())[-1], min), + }, + }, + ], +) +@pytest.mark.parametrize( + "as_index", + [ + True, + pytest.param(False, marks=pytest.mark.xfail(reason="See modin issue #2543")), ], ) -def test_agg_func_None_rename(by_and_agg_dict): +def test_agg_func_None_rename(by_and_agg_dict, as_index): modin_df, pandas_df = create_test_dfs(test_data["int_data"]) - modin_result = modin_df.groupby(by_and_agg_dict["by"]).agg( + modin_result = modin_df.groupby(by_and_agg_dict["by"], as_index=as_index).agg( **by_and_agg_dict["agg_dict"] ) - pandas_result = pandas_df.groupby(by_and_agg_dict["by"]).agg( + pandas_result = pandas_df.groupby(by_and_agg_dict["by"], as_index=as_index).agg( **by_and_agg_dict["agg_dict"] ) df_equals(modin_result, pandas_result) @@ -1438,13 +1448,27 @@ def get_columns(df): ), lambda grp: grp.agg( { - df_from_grp(grp).columns[0]: (("new_max", max), min, sum), - df_from_grp(grp).columns[-1]: (sum, min, max), + list(test_data_values[0].keys())[1]: (max, min, sum), + list(test_data_values[0].keys())[-2]: (sum, min, max), } ), lambda grp: grp.agg( - max=(df_from_grp(grp).columns[0], max), - sum=(df_from_grp(grp).columns[-1], sum), + { + list(test_data_values[0].keys())[1]: [ + ("new_sum", "sum"), + ("new_prod", "prod"), + ], + list(test_data_values[0].keys())[-2]: np.sum, + } + ), + pytest.param( + lambda grp: grp.agg( + { + list(test_data_values[0].keys())[1]: (max, min, sum), + list(test_data_values[0].keys())[-1]: (sum, min, max), + } + ), + marks=pytest.mark.skip("See modin issue #2542"), ), ], ) From e915820efe096d6bd61a1ae99bc341fde0dce1cb Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 16 Dec 2020 13:05:41 +0300 Subject: [PATCH 06/11] FEAT-#2491: discard changes in asv bench to avoid conflicts with #2539 Signed-off-by: Dmitry Chigarev --- modin/pandas/test/test_groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 99e745c94f6..54c78563357 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1456,7 +1456,7 @@ def get_columns(df): { list(test_data_values[0].keys())[1]: [ ("new_sum", "sum"), - ("new_prod", "prod"), + ("new_min", "min"), ], list(test_data_values[0].keys())[-2]: np.sum, } From 7cd69675124d4da75ef364fafbaf43d161ab4be9 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 16 Dec 2020 15:52:24 +0300 Subject: [PATCH 07/11] FEAT-#2491: added asv benchmarks Signed-off-by: Dmitry Chigarev --- asv_bench/benchmarks/benchmarks.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index 6947fdd68d0..bfad446ea7f 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -98,6 +98,18 @@ def time_groupby_sum(self, data_size): def time_groupby_mean(self, data_size): execute(self.df.groupby(by=self.groupby_column).mean()) + def time_groupby_dictionary_reduction(self, impl, data_type, data_size): + cols_to_agg = self.df.columns[1:4] + self.df.groupby(by=self.df.columns[0]).agg( + {k: v for k, v in zip(cols_to_agg, ["sum", "count", "any"])} + ) + + def time_groupby_dictionary_aggregation(self, impl, data_type, data_size): + cols_to_agg = self.df.columns[1:4] + self.df.groupby(by=self.df.columns[0]).agg( + {k: v for k, v in zip(cols_to_agg, ["quantile", "std", "median"])} + ) + class TimeJoin: param_names = ["data_size", "how", "sort"] From 7145ca4942b370c148df4152efa37ea1ad385c8d Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 16 Dec 2020 19:16:42 +0300 Subject: [PATCH 08/11] FEAT-#2491: comments added Signed-off-by: Dmitry Chigarev --- modin/backends/pandas/query_compiler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index bbb4558dc25..881f5e79304 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -2509,6 +2509,12 @@ def groupby_agg( ): def is_reduce_fn(o, deep_level=0): if not isinstance(o, str) and isinstance(o, Container): + # `deep_level` parameter specifies the number of nested containers that was met: + # - if it's 0, then we're outside of container, `o` could be either function name + # or container of function names/renamers. + # - if it's 1, then we're inside container of function names/renamers. `o` must be + # either function name or renamer (renamer is some container which length == 2, + # the first element is the new column name and the second is the function name). assert deep_level == 0 or ( deep_level > 0 and len(o) == 2 ), f"Got the renamer with incorrect length, expected 2 got {len(o)}." From 349fe8567c5a05dcb1755927a6689b70ceb139cf Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 16 Dec 2020 22:32:13 +0300 Subject: [PATCH 09/11] FEAT-#2491: alligned asv benchmarks with current master Signed-off-by: Dmitry Chigarev --- asv_bench/benchmarks/benchmarks.py | 63 ++++++++++++++++++------------ 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index bfad446ea7f..681d49a975c 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -57,16 +57,18 @@ def execute(df): return df.shape -class TimeMultiColumnGroupby: - param_names = ["data_size", "count_columns"] - params = [UNARY_OP_DATA_SIZE, [6]] - - def setup(self, data_size, count_columns): +class BaseTimeGroupBy: + def setup(self, data_size, count_columns=1): self.df = generate_dataframe( ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) self.groupby_columns = [col for col in self.df.columns[:count_columns]] + +class TimeMultiColumnGroupby(BaseTimeGroupBy): + param_names = ["data_size", "count_columns"] + params = [UNARY_OP_DATA_SIZE, [6]] + def time_groupby_agg_quan(self, data_size, count_columns): execute(self.df.groupby(by=self.groupby_columns).agg("quantile")) @@ -74,40 +76,53 @@ def time_groupby_agg_mean(self, data_size, count_columns): execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean())) -class TimeGroupByDefaultAggregations: +class TimeGroupByDefaultAggregations(BaseTimeGroupBy): param_names = ["data_size"] params = [ UNARY_OP_DATA_SIZE, ] - def setup(self, data_size): - self.df = generate_dataframe( - ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH - ) - self.groupby_column = self.df.columns[0] - def time_groupby_count(self, data_size): - execute(self.df.groupby(by=self.groupby_column).count()) + execute(self.df.groupby(by=self.groupby_columns).count()) def time_groupby_size(self, data_size): - execute(self.df.groupby(by=self.groupby_column).size()) + execute(self.df.groupby(by=self.groupby_columns).size()) def time_groupby_sum(self, data_size): - execute(self.df.groupby(by=self.groupby_column).sum()) + execute(self.df.groupby(by=self.groupby_columns).sum()) def time_groupby_mean(self, data_size): - execute(self.df.groupby(by=self.groupby_column).mean()) + execute(self.df.groupby(by=self.groupby_columns).mean()) + - def time_groupby_dictionary_reduction(self, impl, data_type, data_size): - cols_to_agg = self.df.columns[1:4] - self.df.groupby(by=self.df.columns[0]).agg( - {k: v for k, v in zip(cols_to_agg, ["sum", "count", "any"])} +class TimeGroupByDictionaryAggregation(BaseTimeGroupBy): + param_names = ["data_size"] + params = [ + UNARY_OP_DATA_SIZE, + ] + reduction_operations = ["sum", "count", "prod"] + agg_operations = ["quantile", "std", "median"] + + def setup(self, data_size): + super().setup(data_size) + self.cols_to_agg = self.df.columns[1:4] + + @trigger_execution + def time_groupby_dictionary_reduction(self, data_size): + return self.df.groupby(by=self.groupby_columns).agg( + { + c: self.reduction_operations[i % len(self.reduction_operations)] + for i, c in enumerate(self.cols_to_agg) + } ) - def time_groupby_dictionary_aggregation(self, impl, data_type, data_size): - cols_to_agg = self.df.columns[1:4] - self.df.groupby(by=self.df.columns[0]).agg( - {k: v for k, v in zip(cols_to_agg, ["quantile", "std", "median"])} + @trigger_execution + def time_groupby_dictionary_aggregation(self, data_size): + return self.df.groupby(by=self.groupby_columns).agg( + { + c: self.agg_operations[i % len(self.agg_operations)] + for i, c in enumerate(self.cols_to_agg) + } ) From 2262dc422d8f18a15d19caa44550cbf1fa81899f Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 17 Dec 2020 10:39:11 +0300 Subject: [PATCH 10/11] FEAT-#2491: addressing comments Signed-off-by: Dmitry Chigarev --- asv_bench/benchmarks/benchmarks.py | 41 +++++++++++------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index 681d49a975c..b561a56d8c4 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -62,7 +62,7 @@ def setup(self, data_size, count_columns=1): self.df = generate_dataframe( ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) - self.groupby_columns = [col for col in self.df.columns[:count_columns]] + self.groupby_columns = self.df.columns[:count_columns].tolist() class TimeMultiColumnGroupby(BaseTimeGroupBy): @@ -96,34 +96,23 @@ def time_groupby_mean(self, data_size): class TimeGroupByDictionaryAggregation(BaseTimeGroupBy): - param_names = ["data_size"] - params = [ - UNARY_OP_DATA_SIZE, - ] - reduction_operations = ["sum", "count", "prod"] - agg_operations = ["quantile", "std", "median"] - - def setup(self, data_size): + param_names = ["data_size", "operation_type"] + params = [UNARY_OP_DATA_SIZE, ["reduction", "aggregation"]] + operations = { + "reduction": ["sum", "count", "prod"], + "aggregation": ["quantile", "std", "median"], + } + + def setup(self, data_size, operation_type): super().setup(data_size) self.cols_to_agg = self.df.columns[1:4] + operations = self.operations[operation_type] + self.agg_dict = { + c: operations[i % len(operations)] for i, c in enumerate(self.cols_to_agg) + } - @trigger_execution - def time_groupby_dictionary_reduction(self, data_size): - return self.df.groupby(by=self.groupby_columns).agg( - { - c: self.reduction_operations[i % len(self.reduction_operations)] - for i, c in enumerate(self.cols_to_agg) - } - ) - - @trigger_execution - def time_groupby_dictionary_aggregation(self, data_size): - return self.df.groupby(by=self.groupby_columns).agg( - { - c: self.agg_operations[i % len(self.agg_operations)] - for i, c in enumerate(self.cols_to_agg) - } - ) + def time_groupby_dict_agg(self, data_size, operation_type): + execute(self.df.groupby(by=self.groupby_columns).agg(self.agg_dict)) class TimeJoin: From 58cfaa60568159971957a8458a9e6469acbc2f3c Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 17 Dec 2020 06:52:03 -0600 Subject: [PATCH 11/11] FEAT-#2491: renamed some entities Signed-off-by: Dmitry Chigarev --- asv_bench/benchmarks/benchmarks.py | 10 +++---- modin/backends/pandas/query_compiler.py | 26 +++++++++---------- modin/engines/base/frame/partition_manager.py | 3 +++ modin/pandas/groupby.py | 10 +++---- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index b561a56d8c4..cb1dda17831 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -58,21 +58,21 @@ def execute(df): class BaseTimeGroupBy: - def setup(self, data_size, count_columns=1): + def setup(self, data_size, ncols=1): self.df = generate_dataframe( ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) - self.groupby_columns = self.df.columns[:count_columns].tolist() + self.groupby_columns = self.df.columns[:ncols].tolist() class TimeMultiColumnGroupby(BaseTimeGroupBy): - param_names = ["data_size", "count_columns"] + param_names = ["data_size", "ncols"] params = [UNARY_OP_DATA_SIZE, [6]] - def time_groupby_agg_quan(self, data_size, count_columns): + def time_groupby_agg_quan(self, data_size, ncols): execute(self.df.groupby(by=self.groupby_columns).agg("quantile")) - def time_groupby_agg_mean(self, data_size, count_columns): + def time_groupby_agg_mean(self, data_size, ncols): execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean())) diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index 881f5e79304..6f62405a735 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -2476,14 +2476,14 @@ def _groupby_dict_reduce( map_fns = [] for i, fn in enumerate(col_funcs): if not isinstance(fn, str) and isinstance(fn, Iterable): - future_col_name, func = fn + new_col_name, func = fn elif isinstance(fn, str): - future_col_name, func = fn, fn + new_col_name, func = fn, fn else: raise TypeError - map_fns.append((future_col_name, groupby_reduce_functions[func][0])) - reduce_dict[(col, future_col_name)] = groupby_reduce_functions[func][1] + map_fns.append((new_col_name, groupby_reduce_functions[func][0])) + reduce_dict[(col, new_col_name)] = groupby_reduce_functions[func][1] map_dict[col] = map_fns return GroupbyReduceFunction.register(map_dict, reduce_dict)( query_compiler=self, @@ -2507,23 +2507,23 @@ def groupby_agg( groupby_kwargs, drop=False, ): - def is_reduce_fn(o, deep_level=0): - if not isinstance(o, str) and isinstance(o, Container): + def is_reduce_fn(fn, deep_level=0): + if not isinstance(fn, str) and isinstance(fn, Container): # `deep_level` parameter specifies the number of nested containers that was met: - # - if it's 0, then we're outside of container, `o` could be either function name + # - if it's 0, then we're outside of container, `fn` could be either function name # or container of function names/renamers. - # - if it's 1, then we're inside container of function names/renamers. `o` must be + # - if it's 1, then we're inside container of function names/renamers. `fn` must be # either function name or renamer (renamer is some container which length == 2, # the first element is the new column name and the second is the function name). assert deep_level == 0 or ( - deep_level > 0 and len(o) == 2 - ), f"Got the renamer with incorrect length, expected 2 got {len(o)}." + deep_level > 0 and len(fn) == 2 + ), f"Got the renamer with incorrect length, expected 2 got {len(fn)}." return ( - all(is_reduce_fn(v, deep_level + 1) for v in o) + all(is_reduce_fn(f, deep_level + 1) for f in fn) if deep_level == 0 - else is_reduce_fn(o[1], deep_level + 1) + else is_reduce_fn(fn[1], deep_level + 1) ) - return isinstance(o, str) and o in groupby_reduce_functions + return isinstance(fn, str) and fn in groupby_reduce_functions if isinstance(agg_func, dict) and all( is_reduce_fn(x) for x in agg_func.values() diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 5774622261b..3b2adc4e533 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -401,6 +401,9 @@ def map_axis_partitions( The flag to keep partitions for Modin Frame. lengths : list(int) The list of lengths to shuffle the object. + enumerate_partitions : bool (optional, default False), + Whether or not to pass partition index into `map_func`. + Note that `map_func` must be able to obtain `partition_idx` kwarg. Returns ------- diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 2e83b04b337..0328701addb 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -377,15 +377,15 @@ def aggregate(self, func=None, *args, **kwargs): relabeling_required = False if isinstance(func, dict) or func is None: - def try_get_str_func(o): - if not isinstance(o, str) and isinstance(o, Iterable): - return [try_get_str_func(v) for v in o] - return o.__name__ if callable(o) and o.__name__ in dir(self) else o + def try_get_str_func(fn): + if not isinstance(fn, str) and isinstance(fn, Iterable): + return [try_get_str_func(f) for f in fn] + return fn.__name__ if callable(fn) and fn.__name__ in dir(self) else fn relabeling_required, func_dict, new_columns, order = reconstruct_func( func, **kwargs ) - func_dict = {k: try_get_str_func(v) for k, v in func_dict.items()} + func_dict = {col: try_get_str_func(fn) for col, fn in func_dict.items()} if any(i not in self._df.columns for i in func_dict.keys()): from pandas.core.base import SpecificationError