Skip to content

Commit

Permalink
fix: execute materialized joins in the pandas and dask backends (#3086)
Browse files Browse the repository at this point in the history
* fix: execute materialized joins in the pandas and dask backends

* docs: release note
  • Loading branch information
cpcloud authored Oct 21, 2021
1 parent 5569480 commit 9ed937a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/source/release/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Release Notes
These release notes are for versions of ibis **1.0 and later**. Release
notes for pre-1.0 versions of ibis can be found at :doc:`release-pre-1.0`

* :bug:`3086` Error when trying to join tables with Pandas backend
* :support:`2678` Improvement of the backend API. The former `Client` subclasses have been replaced by a `Backend` class that must
subclass `ibis.backends.base.BaseBackend`. The `BaseBackend` class contains abstract methods for the minimum subset of methods that
backends must implement, and their signatures have been standardized across backends. The Ibis compiler has been refactored, and
Expand Down
9 changes: 7 additions & 2 deletions ibis/backends/dask/execution/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
from ..execution import constants


@execute_node.register(ops.MaterializedJoin, dd.DataFrame)
def execute_materialized_join(op, df, **kwargs):
return df


@execute_node.register(
ops.AsOfJoin, dd.DataFrame, dd.DataFrame, (Timedelta, type(None))
)
Expand Down Expand Up @@ -69,9 +74,9 @@ def execute_cross_join(op, left, right, **kwargs):
return result


# TODO - execute_materialized_join - #2553
# TODO - execute_join - #2553
@execute_node.register(ops.Join, dd.DataFrame, dd.DataFrame)
def execute_materialized_join(op, left, right, **kwargs):
def execute_join(op, left, right, **kwargs):
op_type = type(op)

try:
Expand Down
38 changes: 36 additions & 2 deletions ibis/backends/dask/tests/execution/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_multi_join_with_post_expression_filter(how, left, df1):
)


@pytest.mark.xfail(reason="TODO - execute_materialized_join - #2553")
@pytest.mark.xfail(reason="TODO - execute_join - #2553")
@join_type
def test_join_with_non_trivial_key(how, left, right, df1, df2):
# also test that the order of operands in the predicate doesn't matter
Expand All @@ -261,7 +261,7 @@ def test_join_with_non_trivial_key(how, left, right, df1, df2):
)


@pytest.mark.xfail(reason="TODO - execute_materialized_join - #2553")
@pytest.mark.xfail(reason="TODO - execute_join - #2553")
@join_type
def test_join_with_non_trivial_key_project_table(how, left, right, df1, df2):
# also test that the order of operands in the predicate doesn't matter
Expand Down Expand Up @@ -496,3 +496,37 @@ def test_select_on_unambiguous_asof_join(func, npartitions):
result.compute(scheduler='single-threaded'),
expected.compute(scheduler='single-threaded'),
)


def test_materialized_join(npartitions):
df = dd.from_pandas(
pd.DataFrame({"test": [1, 2, 3], "name": ["a", "b", "c"]}),
npartitions=npartitions,
)
df_2 = dd.from_pandas(
pd.DataFrame({"test_2": [1, 5, 6], "name_2": ["d", "e", "f"]}),
npartitions=npartitions,
)

conn = ibis.dask.connect({"df": df, "df_2": df_2})

ibis_table_1 = conn.table("df")
ibis_table_2 = conn.table("df_2")

joined = ibis_table_1.outer_join(
ibis_table_2,
predicates=ibis_table_1["test"] == ibis_table_2["test_2"],
)
joined = joined.materialize()
result = joined.compile()
expected = dd.merge(
df,
df_2,
left_on="test",
right_on="test_2",
how="outer",
)
tm.assert_frame_equal(
result.compute(scheduler='single-threaded'),
expected.compute(scheduler='single-threaded'),
)
7 changes: 6 additions & 1 deletion ibis/backends/pandas/execution/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def execute_cross_join(op, left, right, **kwargs):


@execute_node.register(ops.Join, pd.DataFrame, pd.DataFrame)
def execute_materialized_join(op, left, right, **kwargs):
def execute_join(op, left, right, **kwargs):
op_type = type(op)

try:
Expand Down Expand Up @@ -94,6 +94,11 @@ def execute_materialized_join(op, left, right, **kwargs):
return df


@execute_node.register(ops.MaterializedJoin, pd.DataFrame)
def execute_materialized_join(op, df, **kwargs):
return df


@execute_node.register(
ops.AsOfJoin, pd.DataFrame, pd.DataFrame, (pd.Timedelta, type(None))
)
Expand Down
25 changes: 25 additions & 0 deletions ibis/backends/pandas/tests/execution/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,28 @@ def test_select_on_unambiguous_asof_join(func):
expr = func(join)
result = expr.execute()
tm.assert_frame_equal(result, expected)


def test_materialized_join():
df = pd.DataFrame({"test": [1, 2, 3], "name": ["a", "b", "c"]})
df_2 = pd.DataFrame({"test_2": [1, 5, 6], "name_2": ["d", "e", "f"]})

conn = ibis.pandas.connect({"df": df, "df_2": df_2})

ibis_table_1 = conn.table("df")
ibis_table_2 = conn.table("df_2")

joined = ibis_table_1.outer_join(
ibis_table_2,
predicates=ibis_table_1["test"] == ibis_table_2["test_2"],
)
joined = joined.materialize()
result = joined.execute()
expected = pd.merge(
df,
df_2,
left_on="test",
right_on="test_2",
how="outer",
)
tm.assert_frame_equal(result, expected)

0 comments on commit 9ed937a

Please sign in to comment.