Skip to content

Commit

Permalink
FIX-#2742: fix performance degradation for dictionary GroupBy aggrega…
Browse files Browse the repository at this point in the history
…tion (#2743)

* FIX-#2742: changed callable functions to its names in dict aggregation

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>

* FIX-#2742: commends added

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev authored Feb 18, 2021
1 parent 1f3b514 commit ad0bcab
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 46 deletions.
18 changes: 8 additions & 10 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2363,16 +2363,14 @@ def _callable_func(self, func, axis, *args, **kwargs):
# nature. They require certain data to exist on the same partition, and
# after the shuffle, there should be only a local map required.

groupby_count = GroupbyReduceFunction.register(*groupby_reduce_functions["count"])
groupby_any = GroupbyReduceFunction.register(*groupby_reduce_functions["any"])
groupby_min = GroupbyReduceFunction.register(*groupby_reduce_functions["min"])
groupby_prod = GroupbyReduceFunction.register(*groupby_reduce_functions["prod"])
groupby_max = GroupbyReduceFunction.register(*groupby_reduce_functions["max"])
groupby_all = GroupbyReduceFunction.register(*groupby_reduce_functions["all"])
groupby_sum = GroupbyReduceFunction.register(*groupby_reduce_functions["sum"])
groupby_size = GroupbyReduceFunction.register(
*groupby_reduce_functions["size"], method="size"
)
groupby_all = GroupbyReduceFunction.register("all")
groupby_any = GroupbyReduceFunction.register("any")
groupby_count = GroupbyReduceFunction.register("count")
groupby_max = GroupbyReduceFunction.register("max")
groupby_min = GroupbyReduceFunction.register("min")
groupby_prod = GroupbyReduceFunction.register("prod")
groupby_size = GroupbyReduceFunction.register("size", method="size")
groupby_sum = GroupbyReduceFunction.register("sum")

def _groupby_dict_reduce(
self, by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop=False
Expand Down
81 changes: 45 additions & 36 deletions modin/data_management/functions/groupby_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,36 @@

class GroupbyReduceFunction(MapReduceFunction):
@classmethod
def call(cls, map_func, reduce_func, *call_args, **call_kwds):
def call(cls, map_func, reduce_func=None, **call_kwds):
"""
Build GroupbyReduce function.
Parameters
----------
map_func: str, callable or dict,
If 'str' this parameter will be treated as a function name to register,
so 'map_func' and 'reduce_func' will be grabbed from 'groupby_reduce_functions'.
If dict or callable then this will be treated as a function to apply to each group
at the map phase.
reduce_func: callable or dict (optional),
A function to apply to each group at the reduce phase. If not specified
will be set the same as 'map_func'.
**call_kwds: kwargs,
Kwargs that will be passed to the returned function.
Returns
-------
Callable,
Function that executes GroupBy aggregation with MapReduce algorithm.
"""
if isinstance(map_func, str):

def build_fn(name):
return lambda df, *args, **kwargs: getattr(df, name)(*args, **kwargs)

map_func, reduce_func = map(build_fn, groupby_reduce_functions[map_func])
if reduce_func is None:
reduce_func = map_func
assert not (
isinstance(map_func, dict) ^ isinstance(reduce_func, dict)
) and not (
Expand Down Expand Up @@ -48,6 +77,8 @@ def map(
groupby_args["as_index"] = True
groupby_args["observed"] = True
if other is not None:
# Other is a broadcasted partition that represents 'by' columns
# Concatenate it with 'df' to group on its columns names
other = other.squeeze(axis=axis ^ 1)
if isinstance(other, pandas.DataFrame):
df = pandas.concat(
Expand All @@ -63,7 +94,8 @@ def map(
result = apply_func(
df.groupby(by=by_part, axis=axis, **groupby_args), **map_args
)
return result
# Result could not always be a frame, so wrapping it into DataFrame
return pandas.DataFrame(result)

@classmethod
def reduce(
Expand Down Expand Up @@ -101,8 +133,8 @@ def reduce(
if not as_index:
insert_levels = partition_idx == 0 and (drop or method == "size")
result.reset_index(drop=not insert_levels, inplace=True)

return result
# Result could not always be a frame, so wrapping it into DataFrame
return pandas.DataFrame(result)

@classmethod
def caller(
Expand Down Expand Up @@ -231,37 +263,14 @@ def wrapper(df):
return _map, _reduce


# This dict is a map for function names and their equivalents in MapReduce
groupby_reduce_functions = {
"all": (
lambda df, *args, **kwargs: df.all(*args, **kwargs),
lambda df, *args, **kwargs: df.all(*args, **kwargs),
),
"any": (
lambda df, *args, **kwargs: df.any(*args, **kwargs),
lambda df, *args, **kwargs: df.any(*args, **kwargs),
),
"count": (
lambda df, *args, **kwargs: df.count(*args, **kwargs),
lambda df, *args, **kwargs: df.sum(*args, **kwargs),
),
"max": (
lambda df, *args, **kwargs: df.max(*args, **kwargs),
lambda df, *args, **kwargs: df.max(*args, **kwargs),
),
"min": (
lambda df, *args, **kwargs: df.min(*args, **kwargs),
lambda df, *args, **kwargs: df.min(*args, **kwargs),
),
"prod": (
lambda df, *args, **kwargs: df.prod(*args, **kwargs),
lambda df, *args, **kwargs: df.prod(*args, **kwargs),
),
"size": (
lambda df, *args, **kwargs: pandas.DataFrame(df.size(*args, **kwargs)),
lambda df, *args, **kwargs: df.sum(*args, **kwargs),
),
"sum": (
lambda df, *args, **kwargs: df.sum(*args, **kwargs),
lambda df, *args, **kwargs: df.sum(*args, **kwargs),
),
"all": ("all", "all"),
"any": ("any", "any"),
"count": ("count", "sum"),
"max": ("max", "max"),
"min": ("min", "min"),
"prod": ("prod", "prod"),
"size": ("size", "sum"),
"sum": ("sum", "sum"),
}

0 comments on commit ad0bcab

Please sign in to comment.