Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#1976: indices matching at reduction functions fixed #2270

Merged
merged 8 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 48 additions & 17 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
is_scalar,
)
from pandas.core.base import DataError
from typing import Type, Callable
import warnings


from modin.backends.base.query_compiler import BaseQueryCompiler
from modin.error_message import ErrorMessage
from modin.utils import try_cast_to_pandas, wrap_udf_function
from modin.data_management.functions import (
Function,
FoldFunction,
MapFunction,
MapReduceFunction,
Expand Down Expand Up @@ -150,6 +153,32 @@ def caller(df, *args, **kwargs):
return caller


def _numeric_only_reduce_fn(applier: Type[Function], *funcs) -> Callable:
"""
Build reduce function for statistic operations with `numeric_only` parameter.

Parameters
----------
applier : Function object to register `funcs`
*funcs : list of functions to register in `applier`
dchigarev marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
callable
A callable function to be applied in the partitions
"""

def caller(self, *args, **kwargs):
# If `numeric_only` is None then we don't know what columns/indices will
# be dropped at the result of reduction function, and so can't preserve labels
preserve_index = kwargs.get("numeric_only", None) is not None
return applier.register(*funcs, preserve_index=preserve_index)(
self, *args, **kwargs
)

return caller


class PandasQueryCompiler(BaseQueryCompiler):
"""This class implements the logic necessary for operating on partitions
with a Pandas backend. This logic is specific to Pandas."""
Expand Down Expand Up @@ -625,10 +654,10 @@ def is_monotonic_decreasing(self):
is_monotonic = _is_monotonic

count = MapReduceFunction.register(pandas.DataFrame.count, pandas.DataFrame.sum)
max = MapReduceFunction.register(pandas.DataFrame.max, pandas.DataFrame.max)
min = MapReduceFunction.register(pandas.DataFrame.min, pandas.DataFrame.min)
sum = MapReduceFunction.register(pandas.DataFrame.sum, pandas.DataFrame.sum)
prod = MapReduceFunction.register(pandas.DataFrame.prod, pandas.DataFrame.prod)
max = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.max)
min = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.min)
sum = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.sum)
prod = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.prod)
any = MapReduceFunction.register(pandas.DataFrame.any, pandas.DataFrame.any)
all = MapReduceFunction.register(pandas.DataFrame.all, pandas.DataFrame.all)
memory_usage = MapReduceFunction.register(
Expand Down Expand Up @@ -664,7 +693,7 @@ def value_counts(self, **kwargs):
return self.__constructor__(new_modin_frame)

def map_func(df, *args, **kwargs):
return df.squeeze(axis=1).value_counts(**kwargs)
return df.squeeze(axis=1).value_counts(**kwargs).to_frame()

def reduce_func(df, *args, **kwargs):
normalize = kwargs.get("normalize", False)
Expand Down Expand Up @@ -735,28 +764,30 @@ def sort_index_for_equal_values(result, ascending):
else:
new_index[j] = result.index[j]
i += 1
return pandas.DataFrame(result, index=new_index)
return pandas.DataFrame(
result, index=new_index, columns=["__reduced__"]
)

return sort_index_for_equal_values(result, ascending)

return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)(
self, **kwargs
)
return MapReduceFunction.register(
map_func, reduce_func, axis=0, preserve_index=False
)(self, **kwargs)

# END MapReduce operations

# Reduction operations
idxmax = ReductionFunction.register(pandas.DataFrame.idxmax)
idxmin = ReductionFunction.register(pandas.DataFrame.idxmin)
median = ReductionFunction.register(pandas.DataFrame.median)
median = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.median)
nunique = ReductionFunction.register(pandas.DataFrame.nunique)
skew = ReductionFunction.register(pandas.DataFrame.skew)
kurt = ReductionFunction.register(pandas.DataFrame.kurt)
sem = ReductionFunction.register(pandas.DataFrame.sem)
std = ReductionFunction.register(pandas.DataFrame.std)
var = ReductionFunction.register(pandas.DataFrame.var)
sum_min_count = ReductionFunction.register(pandas.DataFrame.sum)
prod_min_count = ReductionFunction.register(pandas.DataFrame.prod)
skew = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.skew)
kurt = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.kurt)
sem = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sem)
std = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.std)
var = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.var)
sum_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sum)
prod_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.prod)
quantile_for_single_value = ReductionFunction.register(pandas.DataFrame.quantile)
mad = ReductionFunction.register(pandas.DataFrame.mad)
to_datetime = ReductionFunction.register(
Expand Down
2 changes: 2 additions & 0 deletions modin/data_management/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from .function import Function
from .mapfunction import MapFunction
from .mapreducefunction import MapReduceFunction
from .reductionfunction import ReductionFunction
Expand All @@ -19,6 +20,7 @@
from .groupby_function import GroupbyReduceFunction

__all__ = [
"Function",
"MapFunction",
"MapReduceFunction",
"ReductionFunction",
Expand Down
5 changes: 2 additions & 3 deletions modin/data_management/functions/foldfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ class FoldFunction(Function):
@classmethod
def call(cls, fold_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
axis = call_kwds.get("axis") if "axis" in call_kwds else kwargs.get("axis")
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
return query_compiler.__constructor__(
query_compiler._modin_frame._fold(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
cls.validate_axis(axis),
lambda x: fold_function(x, *args, **kwargs),
)
)
Expand Down
6 changes: 6 additions & 0 deletions modin/data_management/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from typing import Optional


class Function(object):
def __init__(self):
Expand All @@ -27,3 +29,7 @@ def call(cls, func, **call_kwds):
@classmethod
def register(cls, func, **kwargs):
return cls.call(func, **kwargs)

@classmethod
def validate_axis(cls, axis: Optional[int]) -> int:
return 0 if axis is None else axis
9 changes: 5 additions & 4 deletions modin/data_management/functions/mapreducefunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ class MapReduceFunction(Function):
def call(cls, map_function, reduce_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
preserve_index = call_kwds.pop("preserve_index", True)
axis = call_kwds.get("axis") if "axis" in call_kwds else kwargs.get("axis")
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
return query_compiler.__constructor__(
query_compiler._modin_frame._map_reduce(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
cls.validate_axis(axis),
lambda x: map_function(x, *args, **kwargs),
lambda y: reduce_function(y, *args, **kwargs),
preserve_index=preserve_index,
Expand All @@ -33,5 +32,7 @@ def caller(query_compiler, *args, **kwargs):
return caller

@classmethod
def register(cls, map_function, reduce_function, **kwargs):
def register(cls, map_function, reduce_function=None, **kwargs):
if reduce_function is None:
reduce_function = map_function
return cls.call(map_function, reduce_function, **kwargs)
7 changes: 4 additions & 3 deletions modin/data_management/functions/reductionfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class ReductionFunction(Function):
@classmethod
def call(cls, reduction_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
preserve_index = call_kwds.pop("preserve_index", True)
axis = call_kwds.get("axis") if "axis" in call_kwds else kwargs.get("axis")
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
return query_compiler.__constructor__(
query_compiler._modin_frame._fold_reduce(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
cls.validate_axis(axis),
lambda x: reduction_function(x, *args, **kwargs),
preserve_index=preserve_index,
)
)

Expand Down
Loading