From ca27ccae793e1281b67694a0c95501dc90771ba9 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Mon, 12 Oct 2020 09:57:08 -0700 Subject: [PATCH] PERF: ExpandingGroupby (#37064) --- asv_bench/benchmarks/rolling.py | 9 ++ doc/source/whatsnew/v1.2.0.rst | 1 + pandas/_libs/window/aggregations.pyx | 2 +- pandas/core/window/common.py | 65 -------- pandas/core/window/expanding.py | 28 +++- pandas/core/window/indexers.py | 30 ++-- pandas/core/window/rolling.py | 214 ++++++++++++++------------- 7 files changed, 167 insertions(+), 182 deletions(-) diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index f0dd908f81043..226b225b47591 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -76,12 +76,21 @@ class ExpandingMethods: def setup(self, constructor, dtype, method): N = 10 ** 5 + N_groupby = 100 arr = (100 * np.random.random(N)).astype(dtype) self.expanding = getattr(pd, constructor)(arr).expanding() + self.expanding_groupby = ( + pd.DataFrame({"A": arr[:N_groupby], "B": range(N_groupby)}) + .groupby("B") + .expanding() + ) def time_expanding(self, constructor, dtype, method): getattr(self.expanding, method)() + def time_expanding_groupby(self, constructor, dtype, method): + getattr(self.expanding_groupby, method)() + class EWMMethods: diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 7afe48e565b58..da7dcc6ab29b9 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -314,6 +314,7 @@ Performance improvements avoiding creating these again, if created on either. This can speed up operations that depend on creating copies of existing indexes (:issue:`36840`) - Performance improvement in :meth:`RollingGroupby.count` (:issue:`35625`) - Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`) +- Performance improvement in :class:`ExpandingGroupby` (:issue:`37064`) .. --------------------------------------------------------------------------- diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 937f7d8df7728..bba8b4b2432a9 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -302,7 +302,7 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs, result = ssqdm_x / (nobs - ddof) # Fix for numerical imprecision. # Can be result < 0 once Kahan Summation is implemented - if result < 1e-15: + if result < 1e-14: result = 0 else: result = NaN diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index aa71c44f75ead..938f1846230cb 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -1,13 +1,11 @@ """Common utility functions for rolling operations""" from collections import defaultdict -from typing import Callable, Optional import warnings import numpy as np from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries -from pandas.core.groupby.base import GotItemMixin from pandas.core.indexes.api import MultiIndex from pandas.core.shared_docs import _shared_docs @@ -27,69 +25,6 @@ """ -def _dispatch(name: str, *args, **kwargs): - """ - Dispatch to apply. - """ - - def outer(self, *args, **kwargs): - def f(x): - x = self._shallow_copy(x, groupby=self._groupby) - return getattr(x, name)(*args, **kwargs) - - return self._groupby.apply(f) - - outer.__name__ = name - return outer - - -class WindowGroupByMixin(GotItemMixin): - """ - Provide the groupby facilities. - """ - - def __init__(self, obj, *args, **kwargs): - kwargs.pop("parent", None) - groupby = kwargs.pop("groupby", None) - if groupby is None: - groupby, obj = obj, obj._selected_obj - self._groupby = groupby - self._groupby.mutated = True - self._groupby.grouper.mutated = True - super().__init__(obj, *args, **kwargs) - - corr = _dispatch("corr", other=None, pairwise=None) - cov = _dispatch("cov", other=None, pairwise=None) - - def _apply( - self, - func: Callable, - require_min_periods: int = 0, - floor: int = 1, - is_weighted: bool = False, - name: Optional[str] = None, - use_numba_cache: bool = False, - **kwargs, - ): - """ - Dispatch to apply; we are stripping all of the _apply kwargs and - performing the original function call on the grouped object. - """ - kwargs.pop("floor", None) - kwargs.pop("original_func", None) - - # TODO: can we de-duplicate with _dispatch? - def f(x, name=name, *args): - x = self._shallow_copy(x) - - if isinstance(name, str): - return getattr(x, name)(*args, **kwargs) - - return x.apply(name, *args, **kwargs) - - return self._groupby.apply(f) - - def flex_binary_moment(arg1, arg2, f, pairwise=False): if not ( diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index c24c5d5702764..44e0e36b61d46 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -4,8 +4,9 @@ from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, doc -from pandas.core.window.common import WindowGroupByMixin, _doc_template, _shared_docs -from pandas.core.window.rolling import RollingAndExpandingMixin +from pandas.core.window.common import _doc_template, _shared_docs +from pandas.core.window.indexers import ExpandingIndexer, GroupbyIndexer +from pandas.core.window.rolling import BaseWindowGroupby, RollingAndExpandingMixin class Expanding(RollingAndExpandingMixin): @@ -253,11 +254,26 @@ def corr(self, other=None, pairwise=None, **kwargs): return super().corr(other=other, pairwise=pairwise, **kwargs) -class ExpandingGroupby(WindowGroupByMixin, Expanding): +class ExpandingGroupby(BaseWindowGroupby, Expanding): """ Provide a expanding groupby implementation. """ - @property - def _constructor(self): - return Expanding + def _get_window_indexer(self, window: int) -> GroupbyIndexer: + """ + Return an indexer class that will compute the window start and end bounds + + Parameters + ---------- + window : int + window size for FixedWindowIndexer (unused) + + Returns + ------- + GroupbyIndexer + """ + window_indexer = GroupbyIndexer( + groupby_indicies=self._groupby.indices, + window_indexer=ExpandingIndexer, + ) + return window_indexer diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index f2bc01438097c..71e77f97d8797 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -259,26 +259,38 @@ def get_window_bounds( return start, end -class GroupbyRollingIndexer(BaseIndexer): +class GroupbyIndexer(BaseIndexer): """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" def __init__( self, - index_array: Optional[np.ndarray], - window_size: int, - groupby_indicies: Dict, - rolling_indexer: Type[BaseIndexer], - indexer_kwargs: Optional[Dict], + index_array: Optional[np.ndarray] = None, + window_size: int = 0, + groupby_indicies: Optional[Dict] = None, + window_indexer: Type[BaseIndexer] = BaseIndexer, + indexer_kwargs: Optional[Dict] = None, **kwargs, ): """ Parameters ---------- + index_array : np.ndarray or None + np.ndarray of the index of the original object that we are performing + a chained groupby operation over. This index has been pre-sorted relative to + the groups + window_size : int + window size during the windowing operation + groupby_indicies : dict or None + dict of {group label: [positional index of rows belonging to the group]} + window_indexer : BaseIndexer + BaseIndexer class determining the start and end bounds of each group + indexer_kwargs : dict or None + Custom kwargs to be passed to window_indexer **kwargs : keyword arguments that will be available when get_window_bounds is called """ - self.groupby_indicies = groupby_indicies - self.rolling_indexer = rolling_indexer + self.groupby_indicies = groupby_indicies or {} + self.window_indexer = window_indexer self.indexer_kwargs = indexer_kwargs or {} super().__init__( index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs @@ -303,7 +315,7 @@ def get_window_bounds( index_array = self.index_array.take(ensure_platform_int(indices)) else: index_array = self.index_array - indexer = self.rolling_indexer( + indexer = self.window_indexer( index_array=index_array, window_size=self.window_size, **self.indexer_kwargs, diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 5398c14c8774a..cc0927437ad1d 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -51,11 +51,10 @@ from pandas.core.base import DataError, SelectionMixin import pandas.core.common as com from pandas.core.construction import extract_array -from pandas.core.groupby.base import ShallowMixin +from pandas.core.groupby.base import GotItemMixin, ShallowMixin from pandas.core.indexes.api import Index, MultiIndex from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import ( - WindowGroupByMixin, _doc_template, _shared_docs, flex_binary_moment, @@ -64,7 +63,7 @@ from pandas.core.window.indexers import ( BaseIndexer, FixedWindowIndexer, - GroupbyRollingIndexer, + GroupbyIndexer, VariableWindowIndexer, ) from pandas.core.window.numba_ import generate_numba_apply_func @@ -855,6 +854,111 @@ def aggregate(self, func, *args, **kwargs): ) +def _dispatch(name: str, *args, **kwargs): + """ + Dispatch to groupby apply. + """ + + def outer(self, *args, **kwargs): + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return getattr(x, name)(*args, **kwargs) + + return self._groupby.apply(f) + + outer.__name__ = name + return outer + + +class BaseWindowGroupby(GotItemMixin, BaseWindow): + """ + Provide the groupby windowing facilities. + """ + + def __init__(self, obj, *args, **kwargs): + kwargs.pop("parent", None) + groupby = kwargs.pop("groupby", None) + if groupby is None: + groupby, obj = obj, obj._selected_obj + self._groupby = groupby + self._groupby.mutated = True + self._groupby.grouper.mutated = True + super().__init__(obj, *args, **kwargs) + + corr = _dispatch("corr", other=None, pairwise=None) + cov = _dispatch("cov", other=None, pairwise=None) + + def _apply( + self, + func: Callable, + require_min_periods: int = 0, + floor: int = 1, + is_weighted: bool = False, + name: Optional[str] = None, + use_numba_cache: bool = False, + **kwargs, + ): + result = super()._apply( + func, + require_min_periods, + floor, + is_weighted, + name, + use_numba_cache, + **kwargs, + ) + # Compose MultiIndex result from grouping levels then rolling level + # Aggregate the MultiIndex data as tuples then the level names + grouped_object_index = self.obj.index + grouped_index_name = [*grouped_object_index.names] + groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings] + result_index_names = groupby_keys + grouped_index_name + + result_index_data = [] + for key, values in self._groupby.grouper.indices.items(): + for value in values: + data = [ + *com.maybe_make_list(key), + *com.maybe_make_list(grouped_object_index[value]), + ] + result_index_data.append(tuple(data)) + + result_index = MultiIndex.from_tuples( + result_index_data, names=result_index_names + ) + result.index = result_index + return result + + def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: + """ + Split data into blocks & return conformed data. + """ + # Ensure the object we're rolling over is monotonically sorted relative + # to the groups + # GH 36197 + if not obj.empty: + groupby_order = np.concatenate( + list(self._groupby.grouper.indices.values()) + ).astype(np.int64) + obj = obj.take(groupby_order) + return super()._create_data(obj) + + def _gotitem(self, key, ndim, subset=None): + # we are setting the index on the actual object + # here so our index is carried through to the selected obj + # when we do the splitting for the groupby + if self.on is not None: + self.obj = self.obj.set_index(self._on) + self.on = None + return super()._gotitem(key, ndim, subset=subset) + + def _validate_monotonic(self): + """ + Validate that "on" is monotonic; already validated at a higher level. + """ + pass + + class Window(BaseWindow): """ Provide rolling window calculations. @@ -1332,8 +1436,7 @@ def apply( args = () if kwargs is None: kwargs = {} - kwargs.pop("_level", None) - kwargs.pop("floor", None) + if not is_bool(raw): raise ValueError("raw parameter must be `True` or `False`") @@ -1348,13 +1451,10 @@ def apply( else: raise ValueError("engine must be either 'numba' or 'cython'") - # name=func & raw=raw for WindowGroupByMixin._apply return self._apply( apply_func, floor=0, - name=func, use_numba_cache=maybe_use_numba(engine), - raw=raw, original_func=func, args=args, kwargs=kwargs, @@ -1381,7 +1481,6 @@ def apply_func(values, begin, end, min_periods, raw=raw): def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) window_func = self._get_roll_func("roll_sum") - kwargs.pop("floor", None) return self._apply(window_func, floor=0, name="sum", **kwargs) _shared_docs["max"] = dedent( @@ -1492,31 +1591,25 @@ def median(self, **kwargs): def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) - kwargs.pop("require_min_periods", None) window_func = self._get_roll_func("roll_var") def zsqrt_func(values, begin, end, min_periods): return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof)) - # ddof passed again for compat with groupby.rolling return self._apply( zsqrt_func, require_min_periods=1, name="std", - ddof=ddof, **kwargs, ) def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) - kwargs.pop("require_min_periods", None) window_func = partial(self._get_roll_func("roll_var"), ddof=ddof) - # ddof passed again for compat with groupby.rolling return self._apply( window_func, require_min_periods=1, name="var", - ddof=ddof, **kwargs, ) @@ -1533,7 +1626,6 @@ def var(self, ddof=1, *args, **kwargs): def skew(self, **kwargs): window_func = self._get_roll_func("roll_skew") - kwargs.pop("require_min_periods", None) return self._apply( window_func, require_min_periods=3, @@ -1628,7 +1720,6 @@ def sem(self, ddof=1, *args, **kwargs): def kurt(self, **kwargs): window_func = self._get_roll_func("roll_kurt") - kwargs.pop("require_min_periods", None) return self._apply( window_func, require_min_periods=4, @@ -2192,73 +2283,12 @@ def corr(self, other=None, pairwise=None, **kwargs): Rolling.__doc__ = Window.__doc__ -class RollingGroupby(WindowGroupByMixin, Rolling): +class RollingGroupby(BaseWindowGroupby, Rolling): """ Provide a rolling groupby implementation. """ - def _apply( - self, - func: Callable, - require_min_periods: int = 0, - floor: int = 1, - is_weighted: bool = False, - name: Optional[str] = None, - use_numba_cache: bool = False, - **kwargs, - ): - result = Rolling._apply( - self, - func, - require_min_periods, - floor, - is_weighted, - name, - use_numba_cache, - **kwargs, - ) - # Cannot use _wrap_outputs because we calculate the result all at once - # Compose MultiIndex result from grouping levels then rolling level - # Aggregate the MultiIndex data as tuples then the level names - grouped_object_index = self.obj.index - grouped_index_name = [*grouped_object_index.names] - groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings] - result_index_names = groupby_keys + grouped_index_name - - result_index_data = [] - for key, values in self._groupby.grouper.indices.items(): - for value in values: - data = [ - *com.maybe_make_list(key), - *com.maybe_make_list(grouped_object_index[value]), - ] - result_index_data.append(tuple(data)) - - result_index = MultiIndex.from_tuples( - result_index_data, names=result_index_names - ) - result.index = result_index - return result - - @property - def _constructor(self): - return Rolling - - def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: - """ - Split data into blocks & return conformed data. - """ - # Ensure the object we're rolling over is monotonically sorted relative - # to the groups - # GH 36197 - if not obj.empty: - groupby_order = np.concatenate( - list(self._groupby.grouper.indices.values()) - ).astype(np.int64) - obj = obj.take(groupby_order) - return super()._create_data(obj) - - def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: + def _get_window_indexer(self, window: int) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -2269,7 +2299,7 @@ def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: Returns ------- - GroupbyRollingIndexer + GroupbyIndexer """ rolling_indexer: Type[BaseIndexer] indexer_kwargs: Optional[Dict] = None @@ -2285,29 +2315,11 @@ def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: else: rolling_indexer = FixedWindowIndexer index_array = None - window_indexer = GroupbyRollingIndexer( + window_indexer = GroupbyIndexer( index_array=index_array, window_size=window, groupby_indicies=self._groupby.indices, - rolling_indexer=rolling_indexer, + window_indexer=rolling_indexer, indexer_kwargs=indexer_kwargs, ) return window_indexer - - def _gotitem(self, key, ndim, subset=None): - # we are setting the index on the actual object - # here so our index is carried thru to the selected obj - # when we do the splitting for the groupby - if self.on is not None: - self.obj = self.obj.set_index(self._on) - self.on = None - return super()._gotitem(key, ndim, subset=subset) - - def _validate_monotonic(self): - """ - Validate that on is monotonic; - we don't care for groupby.rolling - because we have already validated at a higher - level. - """ - pass