Skip to content

Commit

Permalink
REF: ignore_failures in BlockManager.reduce (#35881)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrockmendel authored Oct 10, 2020
1 parent d26a630 commit 7d257c6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 12 deletions.
23 changes: 19 additions & 4 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -8630,6 +8630,7 @@ def _reduce(
cols = self.columns[~dtype_is_dt]
self = self[cols]

any_object = self.dtypes.apply(is_object_dtype).any()
# TODO: Make other agg func handle axis=None properly GH#21597
axis = self._get_axis_number(axis)
labels = self._get_agg_axis(axis)
Expand All @@ -8656,22 +8657,36 @@ def _get_data() -> DataFrame:
data = self._get_bool_data()
return data

if numeric_only is not None:
if numeric_only is not None or (
numeric_only is None
and axis == 0
and not any_object
and not self._mgr.any_extension_types
):
# For numeric_only non-None and axis non-None, we know
# which blocks to use and no try/except is needed.
# For numeric_only=None only the case with axis==0 and no object
# dtypes are unambiguous can be handled with BlockManager.reduce
# Case with EAs see GH#35881
df = self
if numeric_only is True:
df = _get_data()
if axis == 1:
df = df.T
axis = 0

ignore_failures = numeric_only is None

# After possibly _get_data and transposing, we are now in the
# simple case where we can use BlockManager.reduce
res = df._mgr.reduce(blk_func)
out = df._constructor(res).iloc[0].rename(None)
res, indexer = df._mgr.reduce(blk_func, ignore_failures=ignore_failures)
out = df._constructor(res).iloc[0]
if out_dtype is not None:
out = out.astype(out_dtype)
if axis == 0 and is_object_dtype(out.dtype):
out[:] = coerce_to_dtypes(out.values, df.dtypes)
# GH#35865 careful to cast explicitly to object
nvs = coerce_to_dtypes(out.values, df.dtypes.iloc[np.sort(indexer)])
out[:] = np.array(nvs, dtype=object)
return out

assert numeric_only is None
Expand Down
38 changes: 36 additions & 2 deletions pandas/core/internals/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,18 @@ def apply(self, func, **kwargs) -> List["Block"]:

return self._split_op_result(result)

def reduce(self, func) -> List["Block"]:
def reduce(self, func, ignore_failures: bool = False) -> List["Block"]:
# We will apply the function and reshape the result into a single-row
# Block with the same mgr_locs; squeezing will be done at a higher level
assert self.ndim == 2

result = func(self.values)
try:
result = func(self.values)
except (TypeError, NotImplementedError):
if ignore_failures:
return []
raise

if np.ndim(result) == 0:
# TODO(EA2D): special case not needed with 2D EAs
res_values = np.array([[result]])
Expand Down Expand Up @@ -2427,6 +2433,34 @@ def is_bool(self):
"""
return lib.is_bool_array(self.values.ravel("K"))

def reduce(self, func, ignore_failures: bool = False) -> List[Block]:
"""
For object-dtype, we operate column-wise.
"""
assert self.ndim == 2

values = self.values
if len(values) > 1:
# split_and_operate expects func with signature (mask, values, inplace)
def mask_func(mask, values, inplace):
if values.ndim == 1:
values = values.reshape(1, -1)
return func(values)

return self.split_and_operate(None, mask_func, False)

try:
res = func(values)
except TypeError:
if not ignore_failures:
raise
return []

assert isinstance(res, np.ndarray)
assert res.ndim == 1
res = res.reshape(1, -1)
return [self.make_block_same_class(res)]

def convert(
self,
copy: bool = True,
Expand Down
43 changes: 37 additions & 6 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
from typing import (
Any,
Callable,
DefaultDict,
Dict,
List,
Expand Down Expand Up @@ -324,18 +325,44 @@ def _verify_integrity(self) -> None:
f"tot_items: {tot_items}"
)

def reduce(self: T, func) -> T:
def reduce(
self: T, func: Callable, ignore_failures: bool = False
) -> Tuple[T, np.ndarray]:
"""
Apply reduction function blockwise, returning a single-row BlockManager.
Parameters
----------
func : reduction function
ignore_failures : bool, default False
Whether to drop blocks where func raises TypeError.
Returns
-------
BlockManager
np.ndarray
Indexer of mgr_locs that are retained.
"""
# If 2D, we assume that we're operating column-wise
assert self.ndim == 2

res_blocks: List[Block] = []
for blk in self.blocks:
nbs = blk.reduce(func)
nbs = blk.reduce(func, ignore_failures)
res_blocks.extend(nbs)

index = Index([0]) # placeholder
new_mgr = BlockManager.from_blocks(res_blocks, [self.items, index])
return new_mgr
index = Index([None]) # placeholder
if ignore_failures:
if res_blocks:
indexer = np.concatenate([blk.mgr_locs.as_array for blk in res_blocks])
new_mgr = self._combine(res_blocks, copy=False, index=index)
else:
indexer = []
new_mgr = type(self).from_blocks([], [Index([]), index])
else:
indexer = np.arange(self.shape[0])
new_mgr = type(self).from_blocks(res_blocks, [self.items, index])
return new_mgr, indexer

def operate_blockwise(self, other: "BlockManager", array_op) -> "BlockManager":
"""
Expand Down Expand Up @@ -698,7 +725,9 @@ def get_numeric_data(self, copy: bool = False) -> "BlockManager":
"""
return self._combine([b for b in self.blocks if b.is_numeric], copy)

def _combine(self: T, blocks: List[Block], copy: bool = True) -> T:
def _combine(
self: T, blocks: List[Block], copy: bool = True, index: Optional[Index] = None
) -> T:
""" return a new manager with the blocks """
if len(blocks) == 0:
return self.make_empty()
Expand All @@ -714,6 +743,8 @@ def _combine(self: T, blocks: List[Block], copy: bool = True) -> T:
new_blocks.append(b)

axes = list(self.axes)
if index is not None:
axes[-1] = index
axes[0] = self.items.take(indexer)

return type(self).from_blocks(new_blocks, axes)
Expand Down

0 comments on commit 7d257c6

Please sign in to comment.