diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index c8c7ca4c72f..b4082db158c 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -22,12 +22,15 @@ is_scalar, ) from pandas.core.base import DataError +from typing import Type, Callable import warnings + from modin.backends.base.query_compiler import BaseQueryCompiler from modin.error_message import ErrorMessage from modin.utils import try_cast_to_pandas, wrap_udf_function from modin.data_management.functions import ( + Function, FoldFunction, MapFunction, MapReduceFunction, @@ -150,6 +153,34 @@ def caller(df, *args, **kwargs): return caller +def _numeric_only_reduce_fn(applier: Type[Function], *funcs) -> Callable: + """ + Build reduce function for statistic operations with `numeric_only` parameter. + + Parameters + ---------- + applier: Callable + Function object to register `funcs` + *funcs: list + List of functions to register in `applier` + + Returns + ------- + callable + A callable function to be applied in the partitions + """ + + def caller(self, *args, **kwargs): + # If `numeric_only` is None then we don't know what columns/indices will + # be dropped at the result of reduction function, and so can't preserve labels + preserve_index = kwargs.get("numeric_only", None) is not None + return applier.register(*funcs, preserve_index=preserve_index)( + self, *args, **kwargs + ) + + return caller + + class PandasQueryCompiler(BaseQueryCompiler): """This class implements the logic necessary for operating on partitions with a Pandas backend. This logic is specific to Pandas.""" @@ -625,10 +656,10 @@ def is_monotonic_decreasing(self): is_monotonic = _is_monotonic count = MapReduceFunction.register(pandas.DataFrame.count, pandas.DataFrame.sum) - max = MapReduceFunction.register(pandas.DataFrame.max, pandas.DataFrame.max) - min = MapReduceFunction.register(pandas.DataFrame.min, pandas.DataFrame.min) - sum = MapReduceFunction.register(pandas.DataFrame.sum, pandas.DataFrame.sum) - prod = MapReduceFunction.register(pandas.DataFrame.prod, pandas.DataFrame.prod) + max = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.max) + min = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.min) + sum = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.sum) + prod = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.prod) any = MapReduceFunction.register(pandas.DataFrame.any, pandas.DataFrame.any) all = MapReduceFunction.register(pandas.DataFrame.all, pandas.DataFrame.all) memory_usage = MapReduceFunction.register( @@ -636,18 +667,43 @@ def is_monotonic_decreasing(self): lambda x, *args, **kwargs: pandas.DataFrame.sum(x), axis=0, ) - mean = MapReduceFunction.register( - lambda df, **kwargs: df.apply( - lambda x: (x.sum(skipna=kwargs.get("skipna", True)), x.count()), - axis=kwargs.get("axis", 0), - result_type="reduce", - ).set_axis(df.axes[kwargs.get("axis", 0) ^ 1], axis=0), - lambda df, **kwargs: df.apply( - lambda x: x.apply(lambda d: d[0]).sum(skipna=kwargs.get("skipna", True)) - / x.apply(lambda d: d[1]).sum(skipna=kwargs.get("skipna", True)), - axis=kwargs.get("axis", 0), - ).set_axis(df.axes[kwargs.get("axis", 0) ^ 1], axis=0), - ) + + def mean(self, axis, **kwargs): + if kwargs.get("level") is not None: + return self.default_to_pandas(pandas.DataFrame.mean, axis=axis, **kwargs) + + skipna = kwargs.get("skipna", True) + + def map_apply_fn(ser, **kwargs): + try: + sum_result = ser.sum(skipna=skipna) + count_result = ser.count() + except TypeError: + return None + else: + return (sum_result, count_result) + + def reduce_apply_fn(ser, **kwargs): + sum_result = ser.apply(lambda x: x[0]).sum(skipna=skipna) + count_result = ser.apply(lambda x: x[1]).sum(skipna=skipna) + return sum_result / count_result + + def reduce_fn(df, **kwargs): + df.dropna(axis=1, inplace=True, how="any") + return build_applyier(reduce_apply_fn, axis=axis)(df) + + def build_applyier(func, **applyier_kwargs): + def applyier(df, **kwargs): + result = df.apply(func, **applyier_kwargs) + return result.set_axis(df.axes[axis ^ 1], axis=0) + + return applyier + + return MapReduceFunction.register( + build_applyier(map_apply_fn, axis=axis, result_type="reduce"), + reduce_fn, + preserve_index=(kwargs.get("numeric_only") is not None), + )(self, axis=axis, **kwargs) def value_counts(self, **kwargs): """ @@ -664,7 +720,7 @@ def value_counts(self, **kwargs): return self.__constructor__(new_modin_frame) def map_func(df, *args, **kwargs): - return df.squeeze(axis=1).value_counts(**kwargs) + return df.squeeze(axis=1).value_counts(**kwargs).to_frame() def reduce_func(df, *args, **kwargs): normalize = kwargs.get("normalize", False) @@ -735,28 +791,30 @@ def sort_index_for_equal_values(result, ascending): else: new_index[j] = result.index[j] i += 1 - return pandas.DataFrame(result, index=new_index) + return pandas.DataFrame( + result, index=new_index, columns=["__reduced__"] + ) return sort_index_for_equal_values(result, ascending) - return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)( - self, **kwargs - ) + return MapReduceFunction.register( + map_func, reduce_func, axis=0, preserve_index=False + )(self, **kwargs) # END MapReduce operations # Reduction operations idxmax = ReductionFunction.register(pandas.DataFrame.idxmax) idxmin = ReductionFunction.register(pandas.DataFrame.idxmin) - median = ReductionFunction.register(pandas.DataFrame.median) + median = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.median) nunique = ReductionFunction.register(pandas.DataFrame.nunique) - skew = ReductionFunction.register(pandas.DataFrame.skew) - kurt = ReductionFunction.register(pandas.DataFrame.kurt) - sem = ReductionFunction.register(pandas.DataFrame.sem) - std = ReductionFunction.register(pandas.DataFrame.std) - var = ReductionFunction.register(pandas.DataFrame.var) - sum_min_count = ReductionFunction.register(pandas.DataFrame.sum) - prod_min_count = ReductionFunction.register(pandas.DataFrame.prod) + skew = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.skew) + kurt = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.kurt) + sem = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sem) + std = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.std) + var = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.var) + sum_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sum) + prod_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.prod) quantile_for_single_value = ReductionFunction.register(pandas.DataFrame.quantile) mad = ReductionFunction.register(pandas.DataFrame.mad) to_datetime = ReductionFunction.register( diff --git a/modin/data_management/functions/__init__.py b/modin/data_management/functions/__init__.py index ec78d509317..4562bb0d429 100644 --- a/modin/data_management/functions/__init__.py +++ b/modin/data_management/functions/__init__.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +from .function import Function from .mapfunction import MapFunction from .mapreducefunction import MapReduceFunction from .reductionfunction import ReductionFunction @@ -19,6 +20,7 @@ from .groupby_function import GroupbyReduceFunction __all__ = [ + "Function", "MapFunction", "MapReduceFunction", "ReductionFunction", diff --git a/modin/data_management/functions/foldfunction.py b/modin/data_management/functions/foldfunction.py index e72e039da04..3deea5591f7 100644 --- a/modin/data_management/functions/foldfunction.py +++ b/modin/data_management/functions/foldfunction.py @@ -18,11 +18,10 @@ class FoldFunction(Function): @classmethod def call(cls, fold_function, **call_kwds): def caller(query_compiler, *args, **kwargs): + axis = call_kwds.get("axis", kwargs.get("axis")) return query_compiler.__constructor__( query_compiler._modin_frame._fold( - call_kwds.get("axis") - if "axis" in call_kwds - else kwargs.get("axis"), + cls.validate_axis(axis), lambda x: fold_function(x, *args, **kwargs), ) ) diff --git a/modin/data_management/functions/function.py b/modin/data_management/functions/function.py index 517515f9059..072d725d6ce 100644 --- a/modin/data_management/functions/function.py +++ b/modin/data_management/functions/function.py @@ -11,6 +11,8 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +from typing import Optional + class Function(object): def __init__(self): @@ -27,3 +29,7 @@ def call(cls, func, **call_kwds): @classmethod def register(cls, func, **kwargs): return cls.call(func, **kwargs) + + @classmethod + def validate_axis(cls, axis: Optional[int]) -> int: + return 0 if axis is None else axis diff --git a/modin/data_management/functions/mapreducefunction.py b/modin/data_management/functions/mapreducefunction.py index aace46679ea..e76426d4f7d 100644 --- a/modin/data_management/functions/mapreducefunction.py +++ b/modin/data_management/functions/mapreducefunction.py @@ -19,11 +19,10 @@ class MapReduceFunction(Function): def call(cls, map_function, reduce_function, **call_kwds): def caller(query_compiler, *args, **kwargs): preserve_index = call_kwds.pop("preserve_index", True) + axis = call_kwds.get("axis", kwargs.get("axis")) return query_compiler.__constructor__( query_compiler._modin_frame._map_reduce( - call_kwds.get("axis") - if "axis" in call_kwds - else kwargs.get("axis"), + cls.validate_axis(axis), lambda x: map_function(x, *args, **kwargs), lambda y: reduce_function(y, *args, **kwargs), preserve_index=preserve_index, @@ -33,5 +32,7 @@ def caller(query_compiler, *args, **kwargs): return caller @classmethod - def register(cls, map_function, reduce_function, **kwargs): + def register(cls, map_function, reduce_function=None, **kwargs): + if reduce_function is None: + reduce_function = map_function return cls.call(map_function, reduce_function, **kwargs) diff --git a/modin/data_management/functions/reductionfunction.py b/modin/data_management/functions/reductionfunction.py index ae6e918a659..0b44f88c911 100644 --- a/modin/data_management/functions/reductionfunction.py +++ b/modin/data_management/functions/reductionfunction.py @@ -18,12 +18,13 @@ class ReductionFunction(Function): @classmethod def call(cls, reduction_function, **call_kwds): def caller(query_compiler, *args, **kwargs): + preserve_index = call_kwds.pop("preserve_index", True) + axis = call_kwds.get("axis", kwargs.get("axis")) return query_compiler.__constructor__( query_compiler._modin_frame._fold_reduce( - call_kwds.get("axis") - if "axis" in call_kwds - else kwargs.get("axis"), + cls.validate_axis(axis), lambda x: reduction_function(x, *args, **kwargs), + preserve_index=preserve_index, ) ) diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index e2a8ea0ab57..37af1a73912 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -113,6 +113,11 @@ def _column_widths(self): self._column_widths_cache = [] return self._column_widths_cache + @property + def _axes_lengths(self): + """The row lengths, column widths that can be accessed with an `axis` integer.""" + return [self._row_lengths, self._column_widths] + @property def dtypes(self): """Compute the data types if they are not cached. @@ -244,6 +249,29 @@ def axes(self): """The index, columns that can be accessed with an `axis` integer.""" return [self.index, self.columns] + def _compute_axis_labels(self, axis: int, partitions=None): + """ + Computes labels for specific `axis` + + Parameters + ---------- + axis: int + Axis to compute labels along + partitions: numpy 2D array (optional) + Partitions from which labels will be grabbed, + if no specified, partitions will be considered as `self._partitions` + + Returns + ------- + Pandas.Index + Labels for the specified `axis` + """ + if partitions is None: + partitions = self._partitions + return self._frame_mgr_cls.get_indices( + axis, partitions, lambda df: df.axes[axis] + ) + def _filter_empties(self): """Removes empty partitions to avoid triggering excess computation.""" if len(self.axes[0]) == 0 or len(self.axes[1]) == 0: @@ -278,9 +306,7 @@ def _validate_axis_equality(self, axis: int, force: bool = False): Whether to update external indices with internal if their lengths do not match or raise an exception in that case. """ - internal_axis = self._frame_mgr_cls.get_indices( - axis, self._partitions, lambda df: df.axes[axis] - ) + internal_axis = self._compute_axis_labels(axis) self_axis = self.axes[axis] is_equals = self_axis.equals(internal_axis) if ( @@ -979,36 +1005,56 @@ def _map_reduce_func(df, *args, **kwargs): return _map_reduce_func - def _compute_map_reduce_metadata(self, axis, new_parts): - if axis == 0: - columns = self.columns - index = ["__reduced__"] - new_lengths = [1] - new_widths = self._column_widths + def _compute_map_reduce_metadata(self, axis, new_parts, preserve_index=True): + """ + Computes metadata for the result of reduce function. + + Parameters + ---------- + axis: int, + The axis on which reduce function was applied + new_parts: numpy 2D array + Partitions with the result of applied function + preserve_index: boolean + The flag to preserve labels for the reduced axis. + + Returns + ------- + BasePandasFrame + Pandas series containing the reduced data. + """ + new_axes, new_axes_lengths = [0, 0], [0, 0] + + new_axes[axis] = ["__reduced__"] + new_axes[axis ^ 1] = ( + self.axes[axis ^ 1] + if preserve_index + else self._compute_axis_labels(axis ^ 1, new_parts) + ) + + new_axes_lengths[axis] = [1] + new_axes_lengths[axis ^ 1] = ( + self._axes_lengths[axis ^ 1] if preserve_index else None + ) + + if (axis == 0 or self._dtypes is None) and preserve_index: new_dtypes = self._dtypes + elif preserve_index: + new_dtypes = pandas.Series( + [find_common_type(self.dtypes.values)], index=new_axes[axis] + ) else: - columns = ["__reduced__"] - index = self.index - new_lengths = self._row_lengths - new_widths = [1] - if self._dtypes is not None: - new_dtypes = pandas.Series( - np.full(1, find_common_type(self.dtypes.values)), - index=["__reduced__"], - ) - else: - new_dtypes = self._dtypes + new_dtypes = None + return self.__constructor__( new_parts, - index, - columns, - new_lengths, - new_widths, + *new_axes, + *new_axes_lengths, new_dtypes, validate_axes="reduced", ) - def _fold_reduce(self, axis, func): + def _fold_reduce(self, axis, func, preserve_index=True): """ Apply function that reduce Manager to series but require knowledge of full axis. @@ -1018,6 +1064,8 @@ def _fold_reduce(self, axis, func): The axis to apply the function to (0 - index, 1 - columns). func : callable The function to reduce the Manager by. This function takes in a Manager. + preserve_index : boolean + The flag to preserve labels for the reduced axis. Returns ------- @@ -1028,7 +1076,9 @@ def _fold_reduce(self, axis, func): new_parts = self._frame_mgr_cls.map_axis_partitions( axis, self._partitions, func ) - return self._compute_map_reduce_metadata(axis, new_parts) + return self._compute_map_reduce_metadata( + axis, new_parts, preserve_index=preserve_index + ) def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True): """ @@ -1062,22 +1112,9 @@ def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True): reduce_parts = self._frame_mgr_cls.map_axis_partitions( axis, map_parts, reduce_func ) - if preserve_index: - return self._compute_map_reduce_metadata(axis, reduce_parts) - else: - if axis == 0: - new_index = ["__reduced__"] - new_columns = self._frame_mgr_cls.get_indices( - 1, reduce_parts, lambda df: df.columns - ) - else: - new_index = self._frame_mgr_cls.get_indices( - 0, reduce_parts, lambda df: df.index - ) - new_columns = ["__reduced__"] - return self.__constructor__( - reduce_parts, new_index, new_columns, validate_axes="reduced" - ) + return self._compute_map_reduce_metadata( + axis, reduce_parts, preserve_index=preserve_index + ) def _map(self, func, dtypes=None, validate_index=False, validate_columns=False): """Perform a function that maps across the entire dataset. @@ -1103,33 +1140,26 @@ def _map(self, func, dtypes=None, validate_index=False, validate_columns=False): dtypes = pandas.Series( [np.dtype(dtypes)] * len(self.columns), index=self.columns ) - if validate_index: - new_index = self._frame_mgr_cls.get_indices( - 0, new_partitions, lambda df: df.index - ) - else: - new_index = self.index - if len(new_index) != len(self.index): - new_row_lengths = None - else: - new_row_lengths = self._row_lengths - if validate_columns: - new_columns = self._frame_mgr_cls.get_indices( - 1, new_partitions, lambda df: df.columns - ) - else: - new_columns = self.columns - if len(new_columns) != len(self.columns): - new_column_widths = None - else: - new_column_widths = self._column_widths + axis_validate_mask = [validate_index, validate_columns] + new_axes = [ + self._compute_axis_labels(axis, new_partitions) + if should_validate + else self.axes[axis] + for axis, should_validate in enumerate(axis_validate_mask) + ] + + new_lengths = [ + self._axes_lengths[axis] + if len(new_axes[axis]) == len(self.axes[axis]) + else None + for axis in [0, 1] + ] + return self.__constructor__( new_partitions, - new_index, - new_columns, - new_row_lengths, - new_column_widths, + *new_axes, + *new_lengths, dtypes=dtypes, ) @@ -1170,26 +1200,18 @@ def filter_full_axis(self, axis, func): new_partitions = self._frame_mgr_cls.map_axis_partitions( axis, self._partitions, func, keep_partitioning=True ) - if axis == 0: - new_index = self.index - new_lengths = self._row_lengths - new_widths = None # We do not know what the resulting widths will be - new_columns = self._frame_mgr_cls.get_indices( - 1, new_partitions, lambda df: df.columns - ) - else: - new_columns = self.columns - new_lengths = None # We do not know what the resulting lengths will be - new_widths = self._column_widths - new_index = self._frame_mgr_cls.get_indices( - 0, new_partitions, lambda df: df.index - ) + new_axes, new_lengths = [0, 0], [0, 0] + + new_axes[axis] = self.axes[axis] + new_axes[axis ^ 1] = self._compute_axis_labels(axis ^ 1, new_partitions) + + new_lengths[axis] = self._axes_lengths[axis] + new_lengths[axis ^ 1] = None # We do not know what the resulting widths will be + return self.__constructor__( new_partitions, - new_index, - new_columns, - new_lengths, - new_widths, + *new_axes, + *new_lengths, self.dtypes if axis == 0 else None, ) @@ -1530,15 +1552,15 @@ def broadcast_apply_select_indices( broadcasted_dict, keep_remaining, ) - if new_index is None: - new_index = self._frame_mgr_cls.get_indices( - 0, new_partitions, lambda df: df.index - ) - if new_columns is None: - new_columns = self._frame_mgr_cls.get_indices( - 1, new_partitions, lambda df: df.columns - ) - return self.__constructor__(new_partitions, new_index, new_columns) + + new_axes = [ + self._compute_axis_labels(i, new_partitions) + if new_axis is None + else new_axis + for i, new_axis in enumerate([new_index, new_columns]) + ] + + return self.__constructor__(new_partitions, *new_axes) def broadcast_apply_full_axis( self, @@ -1581,24 +1603,21 @@ def broadcast_apply_full_axis( keep_partitioning=True, ) # Index objects for new object creation. This is shorter than if..else - if new_columns is None: - new_columns = self._frame_mgr_cls.get_indices( - 1, new_partitions, lambda df: df.columns - ) - if new_index is None: - new_index = self._frame_mgr_cls.get_indices( - 0, new_partitions, lambda df: df.index - ) + new_axes = [ + self._compute_axis_labels(i, new_partitions) + if new_axis is None + else new_axis + for i, new_axis in enumerate([new_index, new_columns]) + ] if dtypes == "copy": dtypes = self._dtypes elif dtypes is not None: dtypes = pandas.Series( - [np.dtype(dtypes)] * len(new_columns), index=new_columns + [np.dtype(dtypes)] * len(new_axes[1]), index=new_axes[1] ) return self.__constructor__( new_partitions, - new_index, - new_columns, + *new_axes, None, None, dtypes, @@ -1808,15 +1827,14 @@ def groupby_reduce( new_partitions = self._frame_mgr_cls.groupby_reduce( axis, self._partitions, by._partitions, map_func, reduce_func ) - if new_columns is None: - new_columns = self._frame_mgr_cls.get_indices( - 1, new_partitions, lambda df: df.columns - ) - if new_index is None: - new_index = self._frame_mgr_cls.get_indices( - 0, new_partitions, lambda df: df.index - ) - return self.__constructor__(new_partitions, new_index, new_columns) + new_axes = [ + self._compute_axis_labels(i, new_partitions) + if new_axis is None + else new_axis + for i, new_axis in enumerate([new_index, new_columns]) + ] + + return self.__constructor__(new_partitions, *new_axes) @classmethod def from_pandas(cls, df): diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 74872889c1c..9b4b8a4f28a 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -1334,10 +1334,13 @@ def kurt(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): query_compiler=self._query_compiler.apply("kurt", axis, **func_kwargs) ) - if numeric_only: + if numeric_only is not None and not numeric_only: self._validate_dtypes(numeric_only=True) + + data = self._get_numeric_data(axis) if numeric_only else self + return self._reduce_dimension( - self._query_compiler.kurt( + data._query_compiler.kurt( axis=axis, skipna=skipna, level=level, @@ -1412,19 +1415,62 @@ def max(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): ) ) - def mean(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): + def _stat_operation( + self, + op_name: str, + axis: Union[int, str], + skipna: bool, + level: Optional[Union[int, str]], + numeric_only: Optional[bool] = None, + **kwargs, + ): + """ + Do common statistic reduce operations under frame. + + Parameters + ---------- + op_name: str, + Name of method to apply. + axis: int or axis name, + Axis to apply method on. + skipna: bool, + Exclude NA/null values when computing the result. + level: int or level name, + If specified `axis` is a MultiIndex, applying method along a particular + level, collapsing into a Series + numeric_only: bool + Include only float, int, boolean columns. If None, will attempt + to use everything, then use only numeric data. + + Returns + ------- + In case of Series: scalar or Series (if level specified) + In case of DataFrame: Series of DataFrame (if level specified) + + """ axis = self._get_axis_number(axis) - data = self._validate_dtypes_sum_prod_mean( - axis, numeric_only, ignore_axis=False + if numeric_only is not None and not numeric_only: + self._validate_dtypes(numeric_only=True) + + data = self._get_numeric_data(axis) if numeric_only else self + + result_qc = getattr(data._query_compiler, op_name)( + axis=axis, + skipna=skipna, + level=level, + numeric_only=numeric_only, + **kwargs, ) - return data._reduce_dimension( - data._query_compiler.mean( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) + if level is not None: + return self.__constructor__(query_compiler=result_qc) + return self._reduce_dimension(result_qc) + + def mean(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): + return self._stat_operation("mean", axis, skipna, level, numeric_only, **kwargs) + + def median(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): + return self._stat_operation( + "median", axis, skipna, level, numeric_only, **kwargs ) def memory_usage(self, index=True, deep=False): @@ -1970,6 +2016,13 @@ def sample( query_compiler = self._query_compiler.getitem_row_array(samples) return self.__constructor__(query_compiler=query_compiler) + def sem( + self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs + ): + return self._stat_operation( + "sem", axis, skipna, level, numeric_only, ddof=ddof, **kwargs + ) + def set_axis(self, labels, axis=0, inplace=False): if is_scalar(labels): warnings.warn( @@ -2059,6 +2112,9 @@ def shift(self, periods=1, freq=None, axis=0, fill_value=None): else: return self.tshift(periods, freq) + def skew(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): + return self._stat_operation("skew", axis, skipna, level, numeric_only, **kwargs) + def sort_index( self, axis=0, @@ -2119,6 +2175,13 @@ def sort_values( ) return self._create_or_update_from_compiler(result, inplace) + def std( + self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs + ): + return self._stat_operation( + "std", axis, skipna, level, numeric_only, ddof=ddof, **kwargs + ) + def sub(self, other, axis="columns", level=None, fill_value=None): return self._binary_op( "sub", other, axis=axis, level=level, fill_value=fill_value @@ -2500,6 +2563,13 @@ def tz_localize( ) return self.set_axis(labels=new_labels, axis=axis, inplace=not copy) + def var( + self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs + ): + return self._stat_operation( + "var", axis, skipna, level, numeric_only, ddof=ddof, **kwargs + ) + def __abs__(self): return self.abs() diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 4566eb47284..2d7ee67161e 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1068,30 +1068,6 @@ def lt(self, other, axis="columns", level=None): "lt", other, axis=axis, level=level, broadcast=isinstance(other, Series) ) - def median(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.median( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.median( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - def melt( self, id_vars=None, @@ -1558,34 +1534,6 @@ def is_dtype_instance_mapper(column, dtype): ] return self.drop(columns=self.columns[indicate], inplace=False) - def sem( - self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs - ): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.sem( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.sem( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - def set_index( self, keys, drop=True, append=False, inplace=False, verify_integrity=False ): @@ -1646,30 +1594,6 @@ def set_index( if not inplace: return frame - def skew(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.skew( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.skew( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - @property def sparse(self): return self._default_to_pandas(pandas.DataFrame.sparse) @@ -1685,34 +1609,6 @@ def squeeze(self, axis=None): else: return self.copy() - def std( - self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs - ): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.std( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.std( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - def stack(self, level=-1, dropna=True): if not isinstance(self.columns, pandas.MultiIndex) or ( isinstance(self.columns, pandas.MultiIndex) @@ -1965,34 +1861,6 @@ def update( ) self._update_inplace(new_query_compiler=query_compiler) - def var( - self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs - ): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.var( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.var( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - def value_counts( self, subset: Optional[Sequence[Label]] = None, @@ -2266,6 +2134,30 @@ def _create_or_update_from_compiler(self, new_query_compiler, inplace=False): else: self._update_inplace(new_query_compiler=new_query_compiler) + def _get_numeric_data(self, axis: int): + """ + Grabs only numeric columns from frame. + + Parameters + ---------- + axis: int + Axis to inspect on having numeric types only. + If axis is not 0, returns the frame itself. + + Returns + ------- + DataFrame with numeric data. + """ + # Pandas ignores `numeric_only` if `axis` is 1, but we do have to drop + # non-numeric columns if `axis` is 0. + if axis != 0: + return self + return self.drop( + columns=[ + i for i in self.dtypes.index if not is_numeric_dtype(self.dtypes[i]) + ] + ) + def _validate_dtypes(self, numeric_only=False): """ Help to check that all the dtypes are the same. @@ -2305,16 +2197,8 @@ def _validate_dtypes_min_max(self, axis, numeric_only): for dtype in self.dtypes ): raise TypeError("Cannot compare Numeric and Non-Numeric Types") - # Pandas ignores `numeric_only` if `axis` is 1, but we do have to drop - # non-numeric columns if `axis` is 0. - if numeric_only and axis == 0: - return self.drop( - columns=[ - i for i in self.dtypes.index if not is_numeric_dtype(self.dtypes[i]) - ] - ) - else: - return self + + return self._get_numeric_data(axis) if numeric_only else self def _validate_dtypes_sum_prod_mean(self, axis, numeric_only, ignore_axis=False): """ @@ -2363,16 +2247,8 @@ def _validate_dtypes_sum_prod_mean(self, axis, numeric_only, ignore_axis=False): for dtype in self.dtypes ): raise TypeError("Cannot operate on Numeric and Non-Numeric Types") - # Pandas ignores `numeric_only` if `axis` is 1, but we do have to drop - # non-numeric columns if `axis` is 0. - if numeric_only and axis == 0: - return self.drop( - columns=[ - i for i in self.dtypes.index if not is_numeric_dtype(self.dtypes[i]) - ] - ) - else: - return self + + return self._get_numeric_data(axis) if numeric_only else self def _to_pandas(self): return self._query_compiler.to_pandas() diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 3300130188e..a7bb85cc4ac 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -817,30 +817,6 @@ def arg(s): ) ) - def median(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.median( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.median( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - def memory_usage(self, index=True, deep=False): if index: result = self._reduce_dimension( @@ -877,34 +853,6 @@ def nlargest(self, n=5, keep="first"): def nsmallest(self, n=5, keep="first"): return Series(query_compiler=self._query_compiler.nsmallest(n=n, keep=keep)) - def sem( - self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs - ): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.sem( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.sem( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - def slice_shift(self, periods=1, axis=0): if periods == 0: return self.copy() @@ -938,58 +886,6 @@ def unstack(self, level=-1, fill_value=None): return result.droplevel(0, axis=1) if result.columns.nlevels > 1 else result - def skew(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.skew( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.skew( - axis=axis, - skipna=skipna, - level=level, - numeric_only=numeric_only, - **kwargs, - ) - ) - - def std( - self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs - ): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.std( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.std( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - @property def plot( self, @@ -1471,34 +1367,6 @@ def value_counts( ) ) - def var( - self, axis=None, skipna=None, level=None, ddof=1, numeric_only=None, **kwargs - ): - axis = self._get_axis_number(axis) - if numeric_only is not None and not numeric_only: - self._validate_dtypes(numeric_only=True) - if level is not None: - return self.__constructor__( - query_compiler=self._query_compiler.var( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - return self._reduce_dimension( - self._query_compiler.var( - axis=axis, - skipna=skipna, - level=level, - ddof=ddof, - numeric_only=numeric_only, - **kwargs, - ) - ) - def view(self, dtype=None): return self.__constructor__( query_compiler=self._query_compiler.series_view(dtype=dtype) @@ -1689,6 +1557,11 @@ def _validate_dtypes_min_max(self, axis, numeric_only): def _validate_dtypes(self, numeric_only=False): pass + def _get_numeric_data(self, axis: int): + # `numeric_only` parameter does not supported by Series, so this method + # doesn't do anything + return self + def _update_inplace(self, new_query_compiler): """ Implement [METHOD_NAME]. diff --git a/modin/pandas/test/dataframe/test_reduction.py b/modin/pandas/test/dataframe/test_reduction.py index df2c3060b45..1acdd3e97c5 100644 --- a/modin/pandas/test/dataframe/test_reduction.py +++ b/modin/pandas/test/dataframe/test_reduction.py @@ -305,26 +305,6 @@ def test_prod( df_equals(modin_result, pandas_result) -@pytest.mark.parametrize( - "numeric_only", - [ - pytest.param(None, marks=pytest.mark.xfail(reason="See #1976 for details")), - False, - True, - ], -) -@pytest.mark.parametrize( - "min_count", int_arg_values, ids=arg_keys("min_count", int_arg_keys) -) -def test_prod_specific(min_count, numeric_only): - if min_count == 5 and numeric_only: - pytest.xfail("see #1953 for details") - eval_general( - *create_test_dfs(test_data_diff_dtype), - lambda df: df.prod(min_count=min_count, numeric_only=numeric_only), - ) - - @pytest.mark.parametrize("is_transposed", [False, True]) @pytest.mark.parametrize( "skipna", bool_arg_values, ids=arg_keys("skipna", bool_arg_keys) @@ -353,19 +333,17 @@ def test_sum(data, axis, skipna, is_transposed): df_equals(modin_result, pandas_result) +@pytest.mark.parametrize("fn", ["prod, sum"]) @pytest.mark.parametrize( - "numeric_only", - [ - pytest.param(None, marks=pytest.mark.xfail(reason="See #1976 for details")), - False, - True, - ], + "numeric_only", bool_arg_values, ids=arg_keys("numeric_only", bool_arg_keys) +) +@pytest.mark.parametrize( + "min_count", int_arg_values, ids=arg_keys("min_count", int_arg_keys) ) -@pytest.mark.parametrize("min_count", int_arg_values) -def test_sum_specific(min_count, numeric_only): +def test_sum_prod_specific(fn, min_count, numeric_only): eval_general( *create_test_dfs(test_data_diff_dtype), - lambda df: df.sum(min_count=min_count, numeric_only=numeric_only), + lambda df: getattr(df, fn)(min_count=min_count, numeric_only=numeric_only), ) @@ -375,3 +353,16 @@ def test_sum_single_column(data): pandas_df = pandas.DataFrame(data).iloc[:, [0]] df_equals(modin_df.sum(), pandas_df.sum()) df_equals(modin_df.sum(axis=1), pandas_df.sum(axis=1)) + + +@pytest.mark.parametrize( + "fn", ["max", "min", "median", "mean", "skew", "kurt", "sem", "std", "var"] +) +@pytest.mark.parametrize( + "numeric_only", bool_arg_values, ids=arg_keys("numeric_only", bool_arg_keys) +) +def test_reduction_specific(fn, numeric_only): + eval_general( + *create_test_dfs(test_data_diff_dtype), + lambda df: getattr(df, fn)(numeric_only=numeric_only), + )