From 9ed937a9aef71acaf5df86c88e013d9fe3ff7cce Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 21 Oct 2021 07:39:05 -0700 Subject: [PATCH] fix: execute materialized joins in the pandas and dask backends (#3086) * fix: execute materialized joins in the pandas and dask backends * docs: release note --- docs/source/release/index.rst | 1 + ibis/backends/dask/execution/join.py | 9 ++++- .../dask/tests/execution/test_join.py | 38 ++++++++++++++++++- ibis/backends/pandas/execution/join.py | 7 +++- .../pandas/tests/execution/test_join.py | 25 ++++++++++++ 5 files changed, 75 insertions(+), 5 deletions(-) diff --git a/docs/source/release/index.rst b/docs/source/release/index.rst index c4e57b648e29..8e637702ae33 100644 --- a/docs/source/release/index.rst +++ b/docs/source/release/index.rst @@ -12,6 +12,7 @@ Release Notes These release notes are for versions of ibis **1.0 and later**. Release notes for pre-1.0 versions of ibis can be found at :doc:`release-pre-1.0` +* :bug:`3086` Error when trying to join tables with Pandas backend * :support:`2678` Improvement of the backend API. The former `Client` subclasses have been replaced by a `Backend` class that must subclass `ibis.backends.base.BaseBackend`. The `BaseBackend` class contains abstract methods for the minimum subset of methods that backends must implement, and their signatures have been standardized across backends. The Ibis compiler has been refactored, and diff --git a/ibis/backends/dask/execution/join.py b/ibis/backends/dask/execution/join.py index 24e50e1ddd47..15c4d4629c36 100644 --- a/ibis/backends/dask/execution/join.py +++ b/ibis/backends/dask/execution/join.py @@ -15,6 +15,11 @@ from ..execution import constants +@execute_node.register(ops.MaterializedJoin, dd.DataFrame) +def execute_materialized_join(op, df, **kwargs): + return df + + @execute_node.register( ops.AsOfJoin, dd.DataFrame, dd.DataFrame, (Timedelta, type(None)) ) @@ -69,9 +74,9 @@ def execute_cross_join(op, left, right, **kwargs): return result -# TODO - execute_materialized_join - #2553 +# TODO - execute_join - #2553 @execute_node.register(ops.Join, dd.DataFrame, dd.DataFrame) -def execute_materialized_join(op, left, right, **kwargs): +def execute_join(op, left, right, **kwargs): op_type = type(op) try: diff --git a/ibis/backends/dask/tests/execution/test_join.py b/ibis/backends/dask/tests/execution/test_join.py index 82456d929227..2cdb627d8940 100644 --- a/ibis/backends/dask/tests/execution/test_join.py +++ b/ibis/backends/dask/tests/execution/test_join.py @@ -237,7 +237,7 @@ def test_multi_join_with_post_expression_filter(how, left, df1): ) -@pytest.mark.xfail(reason="TODO - execute_materialized_join - #2553") +@pytest.mark.xfail(reason="TODO - execute_join - #2553") @join_type def test_join_with_non_trivial_key(how, left, right, df1, df2): # also test that the order of operands in the predicate doesn't matter @@ -261,7 +261,7 @@ def test_join_with_non_trivial_key(how, left, right, df1, df2): ) -@pytest.mark.xfail(reason="TODO - execute_materialized_join - #2553") +@pytest.mark.xfail(reason="TODO - execute_join - #2553") @join_type def test_join_with_non_trivial_key_project_table(how, left, right, df1, df2): # also test that the order of operands in the predicate doesn't matter @@ -496,3 +496,37 @@ def test_select_on_unambiguous_asof_join(func, npartitions): result.compute(scheduler='single-threaded'), expected.compute(scheduler='single-threaded'), ) + + +def test_materialized_join(npartitions): + df = dd.from_pandas( + pd.DataFrame({"test": [1, 2, 3], "name": ["a", "b", "c"]}), + npartitions=npartitions, + ) + df_2 = dd.from_pandas( + pd.DataFrame({"test_2": [1, 5, 6], "name_2": ["d", "e", "f"]}), + npartitions=npartitions, + ) + + conn = ibis.dask.connect({"df": df, "df_2": df_2}) + + ibis_table_1 = conn.table("df") + ibis_table_2 = conn.table("df_2") + + joined = ibis_table_1.outer_join( + ibis_table_2, + predicates=ibis_table_1["test"] == ibis_table_2["test_2"], + ) + joined = joined.materialize() + result = joined.compile() + expected = dd.merge( + df, + df_2, + left_on="test", + right_on="test_2", + how="outer", + ) + tm.assert_frame_equal( + result.compute(scheduler='single-threaded'), + expected.compute(scheduler='single-threaded'), + ) diff --git a/ibis/backends/pandas/execution/join.py b/ibis/backends/pandas/execution/join.py index 6e5325f2c3e8..9d48266290d6 100644 --- a/ibis/backends/pandas/execution/join.py +++ b/ibis/backends/pandas/execution/join.py @@ -55,7 +55,7 @@ def execute_cross_join(op, left, right, **kwargs): @execute_node.register(ops.Join, pd.DataFrame, pd.DataFrame) -def execute_materialized_join(op, left, right, **kwargs): +def execute_join(op, left, right, **kwargs): op_type = type(op) try: @@ -94,6 +94,11 @@ def execute_materialized_join(op, left, right, **kwargs): return df +@execute_node.register(ops.MaterializedJoin, pd.DataFrame) +def execute_materialized_join(op, df, **kwargs): + return df + + @execute_node.register( ops.AsOfJoin, pd.DataFrame, pd.DataFrame, (pd.Timedelta, type(None)) ) diff --git a/ibis/backends/pandas/tests/execution/test_join.py b/ibis/backends/pandas/tests/execution/test_join.py index b2e16899baf8..21b325d48544 100644 --- a/ibis/backends/pandas/tests/execution/test_join.py +++ b/ibis/backends/pandas/tests/execution/test_join.py @@ -413,3 +413,28 @@ def test_select_on_unambiguous_asof_join(func): expr = func(join) result = expr.execute() tm.assert_frame_equal(result, expected) + + +def test_materialized_join(): + df = pd.DataFrame({"test": [1, 2, 3], "name": ["a", "b", "c"]}) + df_2 = pd.DataFrame({"test_2": [1, 5, 6], "name_2": ["d", "e", "f"]}) + + conn = ibis.pandas.connect({"df": df, "df_2": df_2}) + + ibis_table_1 = conn.table("df") + ibis_table_2 = conn.table("df_2") + + joined = ibis_table_1.outer_join( + ibis_table_2, + predicates=ibis_table_1["test"] == ibis_table_2["test_2"], + ) + joined = joined.materialize() + result = joined.execute() + expected = pd.merge( + df, + df_2, + left_on="test", + right_on="test_2", + how="outer", + ) + tm.assert_frame_equal(result, expected)