Skip to content

Commit

Permalink
REF: use BlockManager.apply for Rolling.count (pandas-dev#35883)
Browse files Browse the repository at this point in the history
* REF: remove unnecesary try/except

* TST: add test for agg on ordered categorical cols (pandas-dev#35630)

* TST: resample does not yield empty groups (pandas-dev#10603) (pandas-dev#35799)

* revert accidental rebase

* REF: use BlockManager.apply for Rolling.count

Co-authored-by: Karthik Mathur <22126205+mathurk1@users.noreply.github.com>
Co-authored-by: tkmz-n <60312218+tkmz-n@users.noreply.github.com>
  • Loading branch information
3 people committed Aug 31, 2020
1 parent 5eface5 commit 842d34b
Showing 1 changed file with 17 additions and 42 deletions.
59 changes: 17 additions & 42 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from pandas._libs.tslibs import BaseOffset, to_offset
import pandas._libs.window.aggregations as window_aggregations
from pandas._typing import ArrayLike, Axis, FrameOrSeries, FrameOrSeriesUnion, Label
from pandas._typing import ArrayLike, Axis, FrameOrSeries, FrameOrSeriesUnion
from pandas.compat._optional import import_optional_dependency
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, cache_readonly, doc
Expand All @@ -44,6 +44,7 @@
ABCSeries,
ABCTimedeltaIndex,
)
from pandas.core.dtypes.missing import notna

from pandas.core.base import DataError, PandasObject, SelectionMixin, ShallowMixin
import pandas.core.common as com
Expand Down Expand Up @@ -395,40 +396,6 @@ def _wrap_result(self, result, block=None, obj=None):
return type(obj)(result, index=index, columns=block.columns)
return result

def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeriesUnion:
"""
Wrap the results.
Parameters
----------
results : list of ndarrays
obj : conformed data (may be resampled)
skipped: List[int]
Indices of blocks that are skipped.
"""
from pandas import Series, concat

if obj.ndim == 1:
if not results:
raise DataError("No numeric types to aggregate")
assert len(results) == 1
return Series(results[0], index=obj.index, name=obj.name)

exclude: List[Label] = []
orig_blocks = list(obj._to_dict_of_blocks(copy=False).values())
for i in skipped:
exclude.extend(orig_blocks[i].columns)

columns = [c for c in self._selected_obj.columns if c not in exclude]
if not columns and not len(results) and exclude:
raise DataError("No numeric types to aggregate")
elif not len(results):
return obj.astype("float64")

df = concat(results, axis=1).reindex(columns=columns, copy=False)
self._insert_on_column(df, obj)
return df

def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"):
# if we have an 'on' column we want to put it back into
# the results in the same location
Expand Down Expand Up @@ -1325,21 +1292,29 @@ def count(self):
# implementations shouldn't end up here
assert not isinstance(self.window, BaseIndexer)

blocks, obj = self._create_blocks(self._selected_obj)
results = []
for b in blocks:
result = b.notna().astype(int)
_, obj = self._create_blocks(self._selected_obj)

def hfunc(values: np.ndarray) -> np.ndarray:
result = notna(values)
result = result.astype(int)
frame = type(obj)(result.T)
result = self._constructor(
result,
frame,
window=self._get_window(),
min_periods=self.min_periods or 0,
center=self.center,
axis=self.axis,
closed=self.closed,
).sum()
results.append(result)
return result.values.T

return self._wrap_results(results, obj, skipped=[])
new_mgr = obj._mgr.apply(hfunc)
out = obj._constructor(new_mgr)
if obj.ndim == 1:
out.name = obj.name
else:
self._insert_on_column(out, obj)
return out

_shared_docs["apply"] = dedent(
r"""
Expand Down

0 comments on commit 842d34b

Please sign in to comment.