Skip to content

Commit

Permalink
PERF: ExpandingGroupby (#37064)
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke authored Oct 12, 2020
1 parent efa80f3 commit ca27cca
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 182 deletions.
9 changes: 9 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,21 @@ class ExpandingMethods:

def setup(self, constructor, dtype, method):
N = 10 ** 5
N_groupby = 100
arr = (100 * np.random.random(N)).astype(dtype)
self.expanding = getattr(pd, constructor)(arr).expanding()
self.expanding_groupby = (
pd.DataFrame({"A": arr[:N_groupby], "B": range(N_groupby)})
.groupby("B")
.expanding()
)

def time_expanding(self, constructor, dtype, method):
getattr(self.expanding, method)()

def time_expanding_groupby(self, constructor, dtype, method):
getattr(self.expanding_groupby, method)()


class EWMMethods:

Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ Performance improvements
avoiding creating these again, if created on either. This can speed up operations that depend on creating copies of existing indexes (:issue:`36840`)
- Performance improvement in :meth:`RollingGroupby.count` (:issue:`35625`)
- Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`)
- Performance improvement in :class:`ExpandingGroupby` (:issue:`37064`)

.. ---------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs,
result = ssqdm_x / (nobs - <float64_t>ddof)
# Fix for numerical imprecision.
# Can be result < 0 once Kahan Summation is implemented
if result < 1e-15:
if result < 1e-14:
result = 0
else:
result = NaN
Expand Down
65 changes: 0 additions & 65 deletions pandas/core/window/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Common utility functions for rolling operations"""
from collections import defaultdict
from typing import Callable, Optional
import warnings

import numpy as np

from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries

from pandas.core.groupby.base import GotItemMixin
from pandas.core.indexes.api import MultiIndex
from pandas.core.shared_docs import _shared_docs

Expand All @@ -27,69 +25,6 @@
"""


def _dispatch(name: str, *args, **kwargs):
"""
Dispatch to apply.
"""

def outer(self, *args, **kwargs):
def f(x):
x = self._shallow_copy(x, groupby=self._groupby)
return getattr(x, name)(*args, **kwargs)

return self._groupby.apply(f)

outer.__name__ = name
return outer


class WindowGroupByMixin(GotItemMixin):
"""
Provide the groupby facilities.
"""

def __init__(self, obj, *args, **kwargs):
kwargs.pop("parent", None)
groupby = kwargs.pop("groupby", None)
if groupby is None:
groupby, obj = obj, obj._selected_obj
self._groupby = groupby
self._groupby.mutated = True
self._groupby.grouper.mutated = True
super().__init__(obj, *args, **kwargs)

corr = _dispatch("corr", other=None, pairwise=None)
cov = _dispatch("cov", other=None, pairwise=None)

def _apply(
self,
func: Callable,
require_min_periods: int = 0,
floor: int = 1,
is_weighted: bool = False,
name: Optional[str] = None,
use_numba_cache: bool = False,
**kwargs,
):
"""
Dispatch to apply; we are stripping all of the _apply kwargs and
performing the original function call on the grouped object.
"""
kwargs.pop("floor", None)
kwargs.pop("original_func", None)

# TODO: can we de-duplicate with _dispatch?
def f(x, name=name, *args):
x = self._shallow_copy(x)

if isinstance(name, str):
return getattr(x, name)(*args, **kwargs)

return x.apply(name, *args, **kwargs)

return self._groupby.apply(f)


def flex_binary_moment(arg1, arg2, f, pairwise=False):

if not (
Expand Down
28 changes: 22 additions & 6 deletions pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, doc

from pandas.core.window.common import WindowGroupByMixin, _doc_template, _shared_docs
from pandas.core.window.rolling import RollingAndExpandingMixin
from pandas.core.window.common import _doc_template, _shared_docs
from pandas.core.window.indexers import ExpandingIndexer, GroupbyIndexer
from pandas.core.window.rolling import BaseWindowGroupby, RollingAndExpandingMixin


class Expanding(RollingAndExpandingMixin):
Expand Down Expand Up @@ -253,11 +254,26 @@ def corr(self, other=None, pairwise=None, **kwargs):
return super().corr(other=other, pairwise=pairwise, **kwargs)


class ExpandingGroupby(WindowGroupByMixin, Expanding):
class ExpandingGroupby(BaseWindowGroupby, Expanding):
"""
Provide a expanding groupby implementation.
"""

@property
def _constructor(self):
return Expanding
def _get_window_indexer(self, window: int) -> GroupbyIndexer:
"""
Return an indexer class that will compute the window start and end bounds
Parameters
----------
window : int
window size for FixedWindowIndexer (unused)
Returns
-------
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._groupby.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
30 changes: 21 additions & 9 deletions pandas/core/window/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,26 +259,38 @@ def get_window_bounds(
return start, end


class GroupbyRollingIndexer(BaseIndexer):
class GroupbyIndexer(BaseIndexer):
"""Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""

def __init__(
self,
index_array: Optional[np.ndarray],
window_size: int,
groupby_indicies: Dict,
rolling_indexer: Type[BaseIndexer],
indexer_kwargs: Optional[Dict],
index_array: Optional[np.ndarray] = None,
window_size: int = 0,
groupby_indicies: Optional[Dict] = None,
window_indexer: Type[BaseIndexer] = BaseIndexer,
indexer_kwargs: Optional[Dict] = None,
**kwargs,
):
"""
Parameters
----------
index_array : np.ndarray or None
np.ndarray of the index of the original object that we are performing
a chained groupby operation over. This index has been pre-sorted relative to
the groups
window_size : int
window size during the windowing operation
groupby_indicies : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
indexer_kwargs : dict or None
Custom kwargs to be passed to window_indexer
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies
self.rolling_indexer = rolling_indexer
self.groupby_indicies = groupby_indicies or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
Expand All @@ -303,7 +315,7 @@ def get_window_bounds(
index_array = self.index_array.take(ensure_platform_int(indices))
else:
index_array = self.index_array
indexer = self.rolling_indexer(
indexer = self.window_indexer(
index_array=index_array,
window_size=self.window_size,
**self.indexer_kwargs,
Expand Down
Loading

0 comments on commit ca27cca

Please sign in to comment.