Skip to content

Commit

Permalink
Fix backward compatibility for pandas 1.1 and 1.2 (#2624)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Dec 24, 2021
1 parent 3756981 commit a969a17
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 69 deletions.
34 changes: 18 additions & 16 deletions mars/dataframe/reduction/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,22 @@ def where_function(cond, var1, var2):


_agg_functions = {
"sum": lambda x, skipna=None: x.sum(skipna=skipna),
"prod": lambda x, skipna=None: x.prod(skipna=skipna),
"product": lambda x, skipna=None: x.product(skipna=skipna),
"min": lambda x, skipna=None: x.min(skipna=skipna),
"max": lambda x, skipna=None: x.max(skipna=skipna),
"all": lambda x, skipna=None: x.all(skipna=skipna),
"any": lambda x, skipna=None: x.any(skipna=skipna),
"sum": lambda x, skipna=True: x.sum(skipna=skipna),
"prod": lambda x, skipna=True: x.prod(skipna=skipna),
"product": lambda x, skipna=True: x.product(skipna=skipna),
"min": lambda x, skipna=True: x.min(skipna=skipna),
"max": lambda x, skipna=True: x.max(skipna=skipna),
"all": lambda x, skipna=True: x.all(skipna=skipna),
"any": lambda x, skipna=True: x.any(skipna=skipna),
"count": lambda x: x.count(),
"size": lambda x: x._reduction_size(),
"mean": lambda x, skipna=None: x.mean(skipna=skipna),
"var": lambda x, skipna=None, ddof=1: x.var(skipna=skipna, ddof=ddof),
"std": lambda x, skipna=None, ddof=1: x.std(skipna=skipna, ddof=ddof),
"sem": lambda x, skipna=None, ddof=1: x.sem(skipna=skipna, ddof=ddof),
"skew": lambda x, skipna=None, bias=False: x.skew(skipna=skipna, bias=bias),
"kurt": lambda x, skipna=None, bias=False: x.kurt(skipna=skipna, bias=bias),
"kurtosis": lambda x, skipna=None, bias=False: x.kurtosis(skipna=skipna, bias=bias),
"mean": lambda x, skipna=True: x.mean(skipna=skipna),
"var": lambda x, skipna=True, ddof=1: x.var(skipna=skipna, ddof=ddof),
"std": lambda x, skipna=True, ddof=1: x.std(skipna=skipna, ddof=ddof),
"sem": lambda x, skipna=True, ddof=1: x.sem(skipna=skipna, ddof=ddof),
"skew": lambda x, skipna=True, bias=False: x.skew(skipna=skipna, bias=bias),
"kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias),
"kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias),
}


Expand Down Expand Up @@ -291,7 +291,7 @@ def _gen_map_chunks(
else:
agg_chunks_shape = (len(func_infos), in_df.chunk_shape[1])

agg_chunks = np.empty(agg_chunks_shape, dtype=np.object)
agg_chunks = np.empty(agg_chunks_shape, dtype=object)
dtypes_cache = dict()
for chunk in in_df.chunks:
input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0
Expand Down Expand Up @@ -504,7 +504,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
ceildiv(chunks.shape[1], combine_size),
)

new_chunks = np.empty(new_chunks_shape, dtype=np.object)
new_chunks = np.empty(new_chunks_shape, dtype=object)
for idx0, i in enumerate(range(0, chunks.shape[axis], combine_size)):
for idx1 in range(chunks.shape[1 - axis]):
func_info = axis_func_infos[idx1]
Expand Down Expand Up @@ -761,6 +761,8 @@ def _do_predefined_agg(cls, op: "DataFrameAggregate", input_obj, func_name, kwds
if op.gpu:
if kwds.pop("numeric_only", None):
raise NotImplementedError("numeric_only not implemented under cudf")
if isinstance(input_obj, pd.Index):
kwds.pop("skipna", None)
return getattr(input_obj, func_name)(**kwds)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def all_series(
series,
axis=None,
bool_only=None,
skipna=None,
skipna=True,
level=None,
combine_size=None,
method=None,
Expand All @@ -54,7 +54,7 @@ def all_dataframe(
df,
axis=None,
bool_only=None,
skipna=None,
skipna=True,
level=None,
combine_size=None,
method=None,
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/any.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def any_series(
series,
axis=None,
bool_only=None,
skipna=None,
skipna=True,
level=None,
combine_size=None,
method=None,
Expand All @@ -54,7 +54,7 @@ def any_dataframe(
df,
axis=None,
bool_only=None,
skipna=None,
skipna=True,
level=None,
combine_size=None,
method=None,
Expand Down
78 changes: 50 additions & 28 deletions mars/dataframe/reduction/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
recursive_tile,
)
from ...core.operand import OperandStage
from ...utils import tokenize
from ...lib.version import parse as parse_version
from ...serialization.serializables import (
BoolField,
AnyField,
DataTypeField,
Int32Field,
StringField,
)
from ...utils import tokenize
from ..core import SERIES_TYPE
from ..utils import (
parse_index,
Expand All @@ -47,6 +48,14 @@
)
from ..operands import DataFrameOperandMixin, DataFrameOperand, DATAFRAME_TYPE

_pd_release = parse_version(pd.__version__).release[:2]
# in pandas<1.3, when aggregating with multiple levels and numeric_only is True,
# object cols not ignored with min-max funcs
_level_reduction_keep_object = _pd_release < (1, 3)
# in pandas>=1.3, when dataframes are reduced into series, mixture of float and bool
# results in object.
_reduce_bool_as_object = _pd_release >= (1, 3)


class DataFrameReductionOperand(DataFrameOperand):
_axis = AnyField("axis")
Expand Down Expand Up @@ -211,22 +220,22 @@ def _get_series_reduction_dtype(
func_name,
axis=None,
bool_only=False,
skipna=False,
skipna=True,
numeric_only=False,
):
empty_series = build_series(dtype=dtype, ensure_string=True)
test_series = build_series(dtype=dtype, ensure_string=True)
if func_name == "count":
reduced = empty_series.count()
reduced = test_series.count()
elif func_name == "nunique":
reduced = empty_series.nunique()
reduced = test_series.nunique()
elif func_name in ("all", "any"):
reduced = getattr(empty_series, func_name)(axis=axis, bool_only=bool_only)
reduced = getattr(test_series, func_name)(axis=axis, bool_only=bool_only)
elif func_name == "size":
reduced = empty_series.size
reduced = test_series.size
elif func_name == "str_concat":
reduced = pd.Series([empty_series.str.cat()])
reduced = pd.Series([test_series.str.cat()])
else:
reduced = getattr(empty_series, func_name)(
reduced = getattr(test_series, func_name)(
axis=axis, skipna=skipna, numeric_only=numeric_only
)
return pd.Series(reduced).dtype
Expand All @@ -236,17 +245,17 @@ def _get_series_reduction_dtype(
def _get_df_reduction_dtype(
dtype, func_name, axis=None, bool_only=False, skipna=False, numeric_only=False
):
empty_df = build_series(dtype=dtype, ensure_string=True).to_frame()
test_df = build_series(dtype=dtype, ensure_string=True).to_frame()
if func_name == "count":
reduced = getattr(empty_df, func_name)(axis=axis, numeric_only=numeric_only)
reduced = getattr(test_df, func_name)(axis=axis, numeric_only=numeric_only)
elif func_name == "nunique":
reduced = getattr(empty_df, func_name)(axis=axis)
reduced = getattr(test_df, func_name)(axis=axis)
elif func_name in ("all", "any"):
reduced = getattr(empty_df, func_name)(axis=axis, bool_only=bool_only)
reduced = getattr(test_df, func_name)(axis=axis, bool_only=bool_only)
elif func_name == "str_concat":
reduced = empty_df.apply(lambda s: s.str.cat(), axis=axis)
reduced = test_df.apply(lambda s: s.str.cat(), axis=axis)
else:
reduced = getattr(empty_df, func_name)(
reduced = getattr(test_df, func_name)(
axis=axis, skipna=skipna, numeric_only=numeric_only
)
if len(reduced) == 0:
Expand Down Expand Up @@ -304,7 +313,7 @@ def _call_groupby_level(self, df, level):
def _call_dataframe(self, df):
axis = getattr(self, "axis", None) or 0
level = getattr(self, "level", None)
skipna = getattr(self, "skipna", None)
skipna = getattr(self, "skipna", True)
numeric_only = getattr(self, "numeric_only", None)
bool_only = getattr(self, "bool_only", None)
self._axis = axis = validate_axis(axis, df)
Expand All @@ -327,9 +336,9 @@ def _call_dataframe(self, df):
reduced_dtype = reduced.dtype
else:
reduced_cols, dtypes = [], []
for col, dt in df.dtypes.items():
for col, src_dt in df.dtypes.items():
dt = _get_df_reduction_dtype(
dt,
src_dt,
func_name,
axis=axis,
bool_only=bool_only,
Expand All @@ -339,16 +348,29 @@ def _call_dataframe(self, df):
if dt is not None:
reduced_cols.append(col)
dtypes.append(dt)
elif (
_level_reduction_keep_object
and numeric_only
and level is not None
and func_name in ("min", "max")
and src_dt == np.dtype(object)
): # pragma: no cover
reduced_cols.append(col)
dtypes.append(np.dtype(object))
if len(dtypes) == 0:
reduced_dtype = np.dtype("O")
elif all(dt == dtypes[0] for dt in dtypes):
reduced_dtype = dtypes[0]
elif not all(isinstance(dt, np.dtype) and dt != bool for dt in dtypes):
# todo currently we return mixed dtypes as np.dtype('O').
# handle pandas Dtypes in the future more carefully.
reduced_dtype = np.dtype("O")
else:
reduced_dtype = np.find_common_type(dtypes, [])
has_bool = any(dt == bool for dt in dtypes)
if _reduce_bool_as_object and has_bool:
reduced_dtype = np.dtype("O")
elif not all(isinstance(dt, np.dtype) for dt in dtypes):
# todo currently we return mixed dtypes as np.dtype('O').
# handle pandas Dtypes in the future more carefully.
reduced_dtype = np.dtype("O")
else:
reduced_dtype = np.find_common_type(dtypes, [])

if level is not None:
return self._call_groupby_level(df[reduced_cols], level)
Expand All @@ -370,7 +392,7 @@ def _call_dataframe(self, df):
def _call_series(self, series):
level = getattr(self, "level", None)
axis = getattr(self, "axis", None)
skipna = getattr(self, "skipna", None)
skipna = getattr(self, "skipna", True)
numeric_only = getattr(self, "numeric_only", None)
bool_only = getattr(self, "bool_only", None)
self._axis = axis = validate_axis(axis or 0, series)
Expand Down Expand Up @@ -442,8 +464,8 @@ def _tile_dataframe(cls, op):
n_rows, n_cols = in_df.chunk_shape

# map to get individual results and summaries
src_chunks = np.empty(in_df.chunk_shape, dtype=np.object)
summary_chunks = np.empty(in_df.chunk_shape, dtype=np.object)
src_chunks = np.empty(in_df.chunk_shape, dtype=object)
summary_chunks = np.empty(in_df.chunk_shape, dtype=object)
for c in in_df.chunks:
new_chunk_op = op.copy().reset_key()
new_chunk_op.stage = OperandStage.map
Expand All @@ -457,7 +479,7 @@ def _tile_dataframe(cls, op):
)

# combine summaries into results
output_chunk_array = np.empty(in_df.chunk_shape, dtype=np.object)
output_chunk_array = np.empty(in_df.chunk_shape, dtype=object)
if op.axis == 1:
for row in range(n_rows):
row_src = src_chunks[row, :]
Expand Down Expand Up @@ -493,7 +515,7 @@ def _tile_series(cls, op):
series = op.outputs[0]

# map to get individual results and summaries
summary_chunks = np.empty(in_series.chunk_shape, dtype=np.object)
summary_chunks = np.empty(in_series.chunk_shape, dtype=object)
for c in in_series.chunks:
new_chunk_op = op.copy().reset_key()
new_chunk_op.stage = OperandStage.map
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/kurtosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def kurt(x):
def kurt_series(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
combine_size=None,
bias=False,
Expand All @@ -100,7 +100,7 @@ def kurt_series(
def kurt_dataframe(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
numeric_only=None,
combine_size=None,
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/max.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def is_atomic(self):
return True


def max_series(df, axis=None, skipna=None, level=None, combine_size=None, method=None):
def max_series(df, axis=None, skipna=True, level=None, combine_size=None, method=None):
use_inf_as_na = options.dataframe.mode.use_inf_as_na
op = DataFrameMax(
axis=axis,
Expand All @@ -44,7 +44,7 @@ def max_series(df, axis=None, skipna=None, level=None, combine_size=None, method
def max_dataframe(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
numeric_only=None,
combine_size=None,
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def mean(x):
return mean


def mean_series(df, axis=None, skipna=None, level=None, combine_size=None, method=None):
def mean_series(df, axis=None, skipna=True, level=None, combine_size=None, method=None):
use_inf_as_na = options.dataframe.mode.use_inf_as_na
op = DataFrameMean(
axis=axis,
Expand All @@ -49,7 +49,7 @@ def mean_series(df, axis=None, skipna=None, level=None, combine_size=None, metho
def mean_dataframe(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
numeric_only=None,
combine_size=None,
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/min.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def is_atomic(self):
return True


def min_series(df, axis=None, skipna=None, level=None, combine_size=None, method=None):
def min_series(df, axis=None, skipna=True, level=None, combine_size=None, method=None):
use_inf_as_na = options.dataframe.mode.use_inf_as_na
op = DataFrameMin(
axis=axis,
Expand All @@ -44,7 +44,7 @@ def min_series(df, axis=None, skipna=None, level=None, combine_size=None, method
def min_dataframe(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
numeric_only=None,
combine_size=None,
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def prod(value):


def prod_series(
df, axis=None, skipna=None, level=None, min_count=0, combine_size=None, method=None
df, axis=None, skipna=True, level=None, min_count=0, combine_size=None, method=None
):
use_inf_as_na = options.dataframe.mode.use_inf_as_na
op = DataFrameProd(
Expand All @@ -64,7 +64,7 @@ def prod_series(
def prod_dataframe(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
min_count=0,
numeric_only=None,
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/reduction/sem.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def sem(x):


def sem_series(
series, axis=None, skipna=None, level=None, ddof=1, combine_size=None, method=None
series, axis=None, skipna=True, level=None, ddof=1, combine_size=None, method=None
):
use_inf_as_na = options.dataframe.mode.use_inf_as_na
op = DataFrameSem(
Expand All @@ -64,7 +64,7 @@ def sem_series(
def sem_dataframe(
df,
axis=None,
skipna=None,
skipna=True,
level=None,
ddof=1,
numeric_only=None,
Expand Down
Loading

0 comments on commit a969a17

Please sign in to comment.