Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Fix backward compatibility for pandas 1.0 (#2628) #2630

Merged
merged 1 commit into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,18 @@ jobs:
- bash: |
set -e
source ci/reload-env.sh
mkdir -p build
pytest $PYTEST_CONFIG mars/$(mars.test.module)
coverage report
mv .coverage build/.coverage.main.file

# do compatibility test for earliest supported pandas release
if [[ "$(mars.test.module)" == "dataframe" ]]; then
pip install pandas==1.0.5
pytest $PYTEST_CONFIG -m pd_compat mars/dataframe
mv .coverage build/.coverage.pd_compat.file
fi

coverage combine build/ && coverage report
coverage xml
displayName: 'Run tests'

Expand Down
2 changes: 2 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest

from mars.config import option_context
from mars.core.mode import is_kernel_mode, is_build_mode
from mars.lib.aio import stop_isolation
from mars.oscar.backends.router import Router
from mars.oscar.backends.ray.communication import RayServer
Expand Down Expand Up @@ -143,6 +144,7 @@ def _new_gpu_test_session(_stop_isolation): # pragma: no cover
def setup(_new_test_session):
_new_test_session.as_default()
yield _new_test_session
assert not (is_build_mode() or is_kernel_mode())


@pytest.fixture
Expand Down
12 changes: 9 additions & 3 deletions mars/core/graph/builder/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,14 @@ def _if_add_node(self, node: EntityType, visited: Set):
return node not in visited and node not in self._processed_chunks

def _build(self) -> Iterable[Union[TileableGraph, ChunkGraph]]:
yield from self.tiler
tile_iterator = iter(self.tiler)
while True:
try:
with enter_mode(build=True, kernel=True):
graph = next(tile_iterator)
yield graph
except StopIteration:
break

def build(self) -> Generator[Union[TileableGraph, ChunkGraph], None, None]:
with enter_mode(build=True, kernel=True):
yield from self._build()
yield from self._build()
2 changes: 1 addition & 1 deletion mars/core/graph/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ cdef class DirectedGraph:
try:
return list(self._successors[n])
except KeyError:
return KeyError(f'Node {n} does not exist in the directed graph')
raise KeyError(f'Node {n} does not exist in the directed graph')

def iter_predecessors(self, n):
try:
Expand Down
6 changes: 3 additions & 3 deletions mars/core/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from ..config import options


_internal_mode = threading.local()


Expand Down Expand Up @@ -69,18 +68,19 @@ def __exit__(self, *_):
setattr(_internal_mode, mode_name, mode_name_to_old_value[mode_name])

def __call__(self, func):
mode_name_to_value = self.mode_name_to_value.copy()
if not inspect.iscoroutinefunction(func):
# sync
@functools.wraps(func)
def _inner(*args, **kwargs):
with self:
with enter_mode(**mode_name_to_value):
return func(*args, **kwargs)

else:
# async
@functools.wraps(func)
async def _inner(*args, **kwargs):
with self:
with enter_mode(**mode_name_to_value):
return await func(*args, **kwargs)

return _inner
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@

from ..config import options
from ..core import is_kernel_mode
from ..lib.version import parse as parse_version
from ..utils import pd_release_version

_use_bool_any_all = parse_version(pd.__version__) >= parse_version("1.3.0")
_use_bool_any_all = pd_release_version >= (1, 3, 0)


class ArrowDtype(ExtensionDtype):
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/base/astype.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

from ... import opcodes as OperandDef
from ...core import recursive_tile
from ...lib.version import parse as parse_version
from ...serialization.serializables import AnyField, StringField, ListField
from ...tensor.base import sort
from ...utils import pd_release_version
from ..core import DATAFRAME_TYPE, SERIES_TYPE
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import build_empty_df, build_empty_series, parse_index

_need_astype_contiguous = parse_version(pd.__version__) == parse_version("1.3.0")
_need_astype_contiguous = pd_release_version == (1, 3, 0)


class DataFrameAstype(DataFrameOperand, DataFrameOperandMixin):
Expand Down
9 changes: 5 additions & 4 deletions mars/dataframe/base/duplicated.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,20 @@ def _execute_tree_map(cls, ctx, op):
duplicated = cls._duplicated(inp, op)
if not duplicated.name:
duplicated.name = "_duplicated_"
result.iloc[duplicated] = None
result.iloc[duplicated.values] = None
result = xdf.concat([result, duplicated], axis=1)
ctx[op.outputs[0].key] = result

@classmethod
def _execute_tree_combine(cls, ctx, op):
inp = ctx[op.input.key]
result = inp.copy()
duplicates = inp[~inp.iloc[:, -1]]
duplicated_filter = ~inp.iloc[:, -1]
duplicates = inp.loc[duplicated_filter]
dup_on_duplicated = cls._duplicated(duplicates, op)
result.iloc[~inp.iloc[:, -1], -1] = dup_on_duplicated
result.iloc[duplicated_filter.values, -1] = dup_on_duplicated
duplicated = result.iloc[:, -1]
result.iloc[duplicated, :-1] = None
result.iloc[duplicated.values, :-1] = None
ctx[op.outputs[0].key] = result

@classmethod
Expand Down
29 changes: 15 additions & 14 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ....tensor import arange, tensor
from ....tensor.random import rand
from ....tests.core import require_cudf
from ....utils import lazy_import
from ....utils import lazy_import, pd_release_version
from ... import eval as mars_eval, cut, qcut, get_dummies
from ...datasource.dataframe import from_pandas as from_pandas_df
from ...datasource.series import from_pandas as from_pandas_series
Expand All @@ -38,8 +38,12 @@
from ..to_numeric import to_numeric
from ..rebalance import DataFrameRebalance

pytestmark = pytest.mark.pd_compat

cudf = lazy_import("cudf", globals=globals())

_explode_with_ignore_index = pd_release_version[:2] >= (1, 1)


@require_cudf
def test_to_gpu_execution(setup_gpu):
Expand Down Expand Up @@ -1968,7 +1972,12 @@ def test_stack_execution(setup):
assert_method(result, expected)


def test_explode_execution(setup):
@pytest.mark.parametrize(
"ignore_index", [False, True] if _explode_with_ignore_index else [False]
)
def test_explode_execution(setup, ignore_index):
explode_kw = {"ignore_index": True} if ignore_index else {}

raw = pd.DataFrame(
{
"a": np.random.rand(10),
Expand All @@ -1978,20 +1987,12 @@ def test_explode_execution(setup):
}
)
df = from_pandas_df(raw, chunk_size=(4, 2))

for ignore_index in [False, True]:
r = df.explode("b", ignore_index=ignore_index)
pd.testing.assert_frame_equal(
r.execute().fetch(), raw.explode("b", ignore_index=ignore_index)
)
r = df.explode("b", ignore_index=ignore_index)
pd.testing.assert_frame_equal(r.execute().fetch(), raw.explode("b", **explode_kw))

series = from_pandas_series(raw.b, chunk_size=4)

for ignore_index in [False, True]:
r = series.explode(ignore_index=ignore_index)
pd.testing.assert_series_equal(
r.execute().fetch(), raw.b.explode(ignore_index=ignore_index)
)
r = series.explode(ignore_index=ignore_index)
pd.testing.assert_series_equal(r.execute().fetch(), raw.b.explode(**explode_kw))


def test_eval_query_execution(setup):
Expand Down
7 changes: 4 additions & 3 deletions mars/dataframe/base/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
from ...config import options
from ...core import OutputType, recursive_tile
from ...core.custom_log import redirect_custom_log
from ...lib.version import parse as parse_version
from ...serialization.serializables import AnyField, BoolField, TupleField, DictField
from ...utils import enter_current_session, quiet_stdio
from ...utils import enter_current_session, quiet_stdio, pd_release_version
from ..core import DATAFRAME_CHUNK_TYPE, DATAFRAME_TYPE
from ..operands import DataFrameOperandMixin, DataFrameOperand
from ..utils import (
Expand All @@ -33,6 +32,8 @@
make_dtypes,
)

_with_convert_dtype = pd_release_version < (1, 2, 0)


class TransformOperand(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = opcodes.TRANSFORM
Expand Down Expand Up @@ -236,7 +237,7 @@ def _infer_df_func_returns(self, df, dtypes):
if self.call_agg:
infer_df = test_df.agg(self._func, args=self.args, **self.kwds)
else:
if parse_version(pd.__version__) >= parse_version("1.2.0"):
if not _with_convert_dtype:
infer_df = test_df.transform(
self._func, *self.args, **self.kwds
)
Expand Down
5 changes: 2 additions & 3 deletions mars/dataframe/base/value_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
from ... import opcodes
from ...core import OutputType, recursive_tile
from ...core.operand import OperandStage
from ...lib.version import parse as parse_version
from ...serialization.serializables import KeyField, BoolField, Int64Field, StringField
from ...utils import has_unknown_shape
from ...utils import has_unknown_shape, pd_release_version
from ..core import Series
from ..operands import DataFrameOperand, DataFrameOperandMixin
from ..utils import build_series, parse_index

_keep_original_order = parse_version(pd.__version__) >= parse_version("1.3.0")
_keep_original_order = pd_release_version >= (1, 3, 0)


class DataFrameValueCounts(DataFrameOperand, DataFrameOperandMixin):
Expand Down
23 changes: 19 additions & 4 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
DictField,
)
from ...typing import ChunkType, TileableType
from ...utils import enter_current_session, lazy_import
from ...utils import enter_current_session, lazy_import, pd_release_version
from ..core import GROUPBY_TYPE
from ..merge import DataFrameConcat
from ..operands import DataFrameOperand, DataFrameOperandMixin, DataFrameShuffleProxy
Expand All @@ -53,6 +53,8 @@
cp = lazy_import("cupy", globals=globals(), rename="cp")
cudf = lazy_import("cudf", globals=globals())

_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)


class SizeRecorder:
def __init__(self):
Expand Down Expand Up @@ -281,9 +283,22 @@ def _call_dataframe(self, groupby, input_df):
)

def _call_series(self, groupby, in_series):
agg_result = groupby.op.build_mock_groupby().aggregate(
self.raw_func, **self.raw_func_kw
)
try:
agg_result = groupby.op.build_mock_groupby().aggregate(
self.raw_func, **self.raw_func_kw
)
except ValueError:
if (
self._groupby_params.get("as_index")
or _support_get_group_without_as_index
): # pragma: no cover
raise
agg_result = (
groupby.op.build_mock_groupby(as_index=True)
.aggregate(self.raw_func, **self.raw_func_kw)
.to_frame()
)
agg_result.index.names = [None] * agg_result.index.nlevels

index_value = parse_index(
agg_result.index, groupby.key, groupby.index_value.key
Expand Down
60 changes: 41 additions & 19 deletions mars/dataframe/groupby/tests/test_groupby_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
from .... import dataframe as md
from ....core.operand import OperandStage
from ....tests.core import assert_groupby_equal, require_cudf
from ....utils import arrow_array_to_objects
from ....utils import arrow_array_to_objects, pd_release_version
from ..aggregation import DataFrameGroupByAgg

pytestmark = pytest.mark.pd_compat

_agg_size_as_frame = pd_release_version[:2] > (1, 0)


class MockReduction1(md.CustomReduction):
def agg(self, v1):
Expand Down Expand Up @@ -215,20 +219,32 @@ def test_groupby_getitem(setup):
)

r = mdf.groupby("b", as_index=False).b.count(method=method)
pd.testing.assert_frame_equal(
r.execute().fetch().sort_values("b", ignore_index=True),
raw.groupby("b", as_index=False)
.b.count()
.sort_values("b", ignore_index=True),
)
result = r.execute().fetch().sort_values("b", ignore_index=True)
try:
expected = (
raw.groupby("b", as_index=False)
.b.count()
.sort_values("b", ignore_index=True)
)
except ValueError:
expected = raw.groupby("b").b.count().to_frame()
expected.index.names = [None] * expected.index.nlevels
expected = expected.sort_values("b", ignore_index=True)
pd.testing.assert_frame_equal(result, expected)

r = mdf.groupby("b", as_index=False).b.agg({"cnt": "count"}, method=method)
pd.testing.assert_frame_equal(
r.execute().fetch().sort_values("b", ignore_index=True),
raw.groupby("b", as_index=False)
.b.agg({"cnt": "count"})
.sort_values("b", ignore_index=True),
)
result = r.execute().fetch().sort_values("b", ignore_index=True)
try:
expected = (
raw.groupby("b", as_index=False)
.b.agg({"cnt": "count"})
.sort_values("b", ignore_index=True)
)
except ValueError:
expected = raw.groupby("b").b.agg({"cnt": "count"}).to_frame()
expected.index.names = [None] * expected.index.nlevels
expected = expected.sort_values("b", ignore_index=True)
pd.testing.assert_frame_equal(result, expected)

r = mdf.groupby("b").a.apply(lambda x: x + 1)
pd.testing.assert_series_equal(
Expand Down Expand Up @@ -352,12 +368,18 @@ def test_dataframe_groupby_agg(setup):
# test as_index=False
for method in ["tree", "shuffle"]:
r = mdf.groupby("c2", as_index=False).agg("size", method=method)
pd.testing.assert_frame_equal(
r.execute().fetch().sort_values("c2", ignore_index=True),
raw.groupby("c2", as_index=False)
.agg("size")
.sort_values("c2", ignore_index=True),
)
if _agg_size_as_frame:
result = r.execute().fetch().sort_values("c2", ignore_index=True)
expected = (
raw.groupby("c2", as_index=False)
.agg("size")
.sort_values("c2", ignore_index=True)
)
pd.testing.assert_frame_equal(result, expected)
else:
result = r.execute().fetch().sort_index()
expected = raw.groupby("c2", as_index=False).agg("size").sort_index()
pd.testing.assert_series_equal(result, expected)

r = mdf.groupby("c2", as_index=False).agg("mean", method=method)
pd.testing.assert_frame_equal(
Expand Down
5 changes: 4 additions & 1 deletion mars/dataframe/indexing/getitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,10 @@ def tile_with_columns(cls, op):
for i, (columns, column_idx) in enumerate(
zip(column_splits, column_indexes)
):
dtypes = in_df.dtypes[columns]
try:
dtypes = in_df.dtypes[columns]
except ValueError: # pragma: no cover
dtypes = in_df.dtypes[list(columns)]
column_nsplits.append(len(dtypes))
for j in range(in_df.chunk_shape[0]):
c = in_df.cix[(j, column_idx[0])]
Expand Down
Loading