Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#2491: optimized groupby dictionary aggregation #2534

Merged
merged 11 commits into from
Dec 17, 2020
54 changes: 35 additions & 19 deletions asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,46 +57,62 @@ def execute(df):
return df.shape


class TimeMultiColumnGroupby:
param_names = ["data_size", "count_columns"]
params = [UNARY_OP_DATA_SIZE, [6]]

def setup(self, data_size, count_columns):
class BaseTimeGroupBy:
def setup(self, data_size, ncols=1):
self.df = generate_dataframe(
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.groupby_columns = [col for col in self.df.columns[:count_columns]]
self.groupby_columns = self.df.columns[:ncols].tolist()


def time_groupby_agg_quan(self, data_size, count_columns):
class TimeMultiColumnGroupby(BaseTimeGroupBy):
param_names = ["data_size", "ncols"]
params = [UNARY_OP_DATA_SIZE, [6]]
YarShev marked this conversation as resolved.
Show resolved Hide resolved

def time_groupby_agg_quan(self, data_size, ncols):
execute(self.df.groupby(by=self.groupby_columns).agg("quantile"))

def time_groupby_agg_mean(self, data_size, count_columns):
def time_groupby_agg_mean(self, data_size, ncols):
execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean()))


class TimeGroupByDefaultAggregations:
class TimeGroupByDefaultAggregations(BaseTimeGroupBy):
param_names = ["data_size"]
params = [
UNARY_OP_DATA_SIZE,
]

def setup(self, data_size):
self.df = generate_dataframe(
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.groupby_column = self.df.columns[0]

def time_groupby_count(self, data_size):
execute(self.df.groupby(by=self.groupby_column).count())
execute(self.df.groupby(by=self.groupby_columns).count())

def time_groupby_size(self, data_size):
execute(self.df.groupby(by=self.groupby_column).size())
execute(self.df.groupby(by=self.groupby_columns).size())

def time_groupby_sum(self, data_size):
execute(self.df.groupby(by=self.groupby_column).sum())
execute(self.df.groupby(by=self.groupby_columns).sum())

def time_groupby_mean(self, data_size):
execute(self.df.groupby(by=self.groupby_column).mean())
execute(self.df.groupby(by=self.groupby_columns).mean())


class TimeGroupByDictionaryAggregation(BaseTimeGroupBy):
param_names = ["data_size", "operation_type"]
params = [UNARY_OP_DATA_SIZE, ["reduction", "aggregation"]]
operations = {
"reduction": ["sum", "count", "prod"],
"aggregation": ["quantile", "std", "median"],
}

def setup(self, data_size, operation_type):
super().setup(data_size)
self.cols_to_agg = self.df.columns[1:4]
operations = self.operations[operation_type]
self.agg_dict = {
c: operations[i % len(operations)] for i, c in enumerate(self.cols_to_agg)
}

def time_groupby_dict_agg(self, data_size, operation_type):
execute(self.df.groupby(by=self.groupby_columns).agg(self.agg_dict))


class TimeJoin:
Expand Down
99 changes: 75 additions & 24 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from pandas.core.base import DataError
from typing import Type, Callable
from collections.abc import Iterable, Container
import warnings


Expand All @@ -37,6 +38,7 @@
ReductionFunction,
BinaryFunction,
GroupbyReduceFunction,
groupby_reduce_functions,
)


Expand Down Expand Up @@ -2443,33 +2445,57 @@ def _callable_func(self, func, axis, *args, **kwargs):
# nature. They require certain data to exist on the same partition, and
# after the shuffle, there should be only a local map required.

groupby_count = GroupbyReduceFunction.register(
lambda df, **kwargs: df.count(**kwargs), lambda df, **kwargs: df.sum(**kwargs)
)
groupby_any = GroupbyReduceFunction.register(
lambda df, **kwargs: df.any(**kwargs), lambda df, **kwargs: df.any(**kwargs)
)
groupby_min = GroupbyReduceFunction.register(
lambda df, **kwargs: df.min(**kwargs), lambda df, **kwargs: df.min(**kwargs)
)
groupby_prod = GroupbyReduceFunction.register(
lambda df, **kwargs: df.prod(**kwargs), lambda df, **kwargs: df.prod(**kwargs)
)
groupby_max = GroupbyReduceFunction.register(
lambda df, **kwargs: df.max(**kwargs), lambda df, **kwargs: df.max(**kwargs)
)
groupby_all = GroupbyReduceFunction.register(
lambda df, **kwargs: df.all(**kwargs), lambda df, **kwargs: df.all(**kwargs)
)
groupby_sum = GroupbyReduceFunction.register(
lambda df, **kwargs: df.sum(**kwargs), lambda df, **kwargs: df.sum(**kwargs)
)
groupby_count = GroupbyReduceFunction.register(*groupby_reduce_functions["count"])
groupby_any = GroupbyReduceFunction.register(*groupby_reduce_functions["any"])
groupby_min = GroupbyReduceFunction.register(*groupby_reduce_functions["min"])
groupby_prod = GroupbyReduceFunction.register(*groupby_reduce_functions["prod"])
groupby_max = GroupbyReduceFunction.register(*groupby_reduce_functions["max"])
groupby_all = GroupbyReduceFunction.register(*groupby_reduce_functions["all"])
groupby_sum = GroupbyReduceFunction.register(*groupby_reduce_functions["sum"])
groupby_size = GroupbyReduceFunction.register(
lambda df, **kwargs: pandas.DataFrame(df.size()),
lambda df, **kwargs: df.sum(),
method="size",
*groupby_reduce_functions["size"], method="size"
)

def _groupby_dict_reduce(
self, by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop=False
):
map_dict = {}
reduce_dict = {}
rename_columns = any(
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
not isinstance(fn, str) and isinstance(fn, Iterable)
for fn in agg_func.values()
)
for col, col_funcs in agg_func.items():
if not rename_columns:
map_dict[col], reduce_dict[col] = groupby_reduce_functions[col_funcs]
continue

if isinstance(col_funcs, str):
col_funcs = [col_funcs]

map_fns = []
for i, fn in enumerate(col_funcs):
if not isinstance(fn, str) and isinstance(fn, Iterable):
new_col_name, func = fn
elif isinstance(fn, str):
new_col_name, func = fn, fn
else:
raise TypeError

map_fns.append((new_col_name, groupby_reduce_functions[func][0]))
reduce_dict[(col, new_col_name)] = groupby_reduce_functions[func][1]
map_dict[col] = map_fns
return GroupbyReduceFunction.register(map_dict, reduce_dict)(
query_compiler=self,
by=by,
axis=axis,
groupby_args=groupby_kwargs,
map_args=agg_kwargs,
reduce_args=agg_kwargs,
numeric_only=False,
drop=drop,
)

def groupby_agg(
self,
by,
Expand All @@ -2481,6 +2507,31 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
def is_reduce_fn(fn, deep_level=0):
if not isinstance(fn, str) and isinstance(fn, Container):
# `deep_level` parameter specifies the number of nested containers that was met:
# - if it's 0, then we're outside of container, `fn` could be either function name
# or container of function names/renamers.
# - if it's 1, then we're inside container of function names/renamers. `fn` must be
# either function name or renamer (renamer is some container which length == 2,
# the first element is the new column name and the second is the function name).
assert deep_level == 0 or (
deep_level > 0 and len(fn) == 2
), f"Got the renamer with incorrect length, expected 2 got {len(fn)}."
return (
all(is_reduce_fn(f, deep_level + 1) for f in fn)
if deep_level == 0
else is_reduce_fn(fn[1], deep_level + 1)
)
return isinstance(fn, str) and fn in groupby_reduce_functions

if isinstance(agg_func, dict) and all(
is_reduce_fn(x) for x in agg_func.values()
):
return self._groupby_dict_reduce(
by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop
)

if callable(agg_func):
agg_func = wrap_udf_function(agg_func)

Expand Down
3 changes: 2 additions & 1 deletion modin/data_management/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .reductionfunction import ReductionFunction
from .foldfunction import FoldFunction
from .binary_function import BinaryFunction
from .groupby_function import GroupbyReduceFunction
from .groupby_function import GroupbyReduceFunction, groupby_reduce_functions

__all__ = [
"Function",
Expand All @@ -27,4 +27,5 @@
"FoldFunction",
"BinaryFunction",
"GroupbyReduceFunction",
"groupby_reduce_functions",
]
Loading