Skip to content

Commit

Permalink
fix(pandas): preserve RHS values in asof join when column names collide
Browse files Browse the repository at this point in the history
`pandas.merge_asof` loses information when any of the grouping or
predicate columns share the same name.  It outputs a single column (e.g.
`time`) instead of two columns (`time`, `time_right`).

Ibis attempts to recover the now "missing" column, but previously, we
were simply creating a copy of the LHS `time` column and then renaming
it to be `time_right`.

This behavior also occurred in grouping columns if the names match.

Now, we add default join suffixes to predicates and groups and rename the
corresponding columns before the `merge_asof`.
  • Loading branch information
gforsyth authored and cpcloud committed Jun 6, 2023
1 parent 53ed88e commit 4514668
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
36 changes: 32 additions & 4 deletions ibis/backends/pandas/execution/join.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import itertools

import pandas as pd

import ibis.expr.analysis as an
Expand Down Expand Up @@ -130,13 +132,39 @@ def execute_asof_join(op, left, right, by, tolerance, predicates, **kwargs):
left_on, right_on = _extract_predicate_names(predicates)
left_by, right_by = _extract_predicate_names(by)

# Add default join suffixes to predicates and groups and rename the
# corresponding columns before the `merge_asof`. If we don't do this and the
# predicates have the same column name, we lose the original RHS column
# values in the output. Instead, the RHS values are copies of the LHS values.
# xref https://github.com/ibis-project/ibis/issues/6080
left_on_suffixed = [x + constants.JOIN_SUFFIXES[0] for x in left_on]
right_on_suffixed = [x + constants.JOIN_SUFFIXES[1] for x in right_on]

left_by_suffixed = [x + constants.JOIN_SUFFIXES[0] for x in left_by]
right_by_suffixed = [x + constants.JOIN_SUFFIXES[1] for x in right_by]

left = left.rename(
columns=dict(
itertools.chain(
zip(left_on, left_on_suffixed), zip(left_by, left_by_suffixed)
)
)
)
right = right.rename(
columns=dict(
itertools.chain(
zip(right_on, right_on_suffixed), zip(right_by, right_by_suffixed)
)
)
)

return pd.merge_asof(
left=left,
right=right,
left_on=left_on,
right_on=right_on,
left_by=left_by or None,
right_by=right_by or None,
left_on=left_on_suffixed,
right_on=right_on_suffixed,
left_by=left_by_suffixed or None,
right_by=right_by_suffixed or None,
tolerance=tolerance,
suffixes=constants.JOIN_SUFFIXES,
)
Expand Down
24 changes: 16 additions & 8 deletions ibis/backends/pandas/tests/execution/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,32 +256,36 @@ def test_join_with_window_function(players_base, players_df, batting, batting_df

@merge_asof_minversion
def test_asof_join(time_left, time_right, time_df1, time_df2):
expr = time_left.asof_join(time_right, 'time')[time_left, time_right.other_value]
expr = time_left.asof_join(time_right, 'time')
result = expr.execute()
expected = pd.merge_asof(time_df1, time_df2, on='time')
tm.assert_frame_equal(result[expected.columns], expected)
with pytest.raises(AssertionError):
tm.assert_series_equal(result["time"], result["time_right"])


@merge_asof_minversion
def test_asof_join_predicate(time_left, time_right, time_df1, time_df2):
expr = time_left.asof_join(time_right, time_left.time == time_right.time)[
time_left, time_right.other_value
]
expr = time_left.asof_join(time_right, time_left.time == time_right.time)
result = expr.execute()
expected = pd.merge_asof(time_df1, time_df2, on='time')
tm.assert_frame_equal(result[expected.columns], expected)
with pytest.raises(AssertionError):
tm.assert_series_equal(result["time"], result["time_right"])


@merge_asof_minversion
def test_keyed_asof_join(
time_keyed_left, time_keyed_right, time_keyed_df1, time_keyed_df2
):
expr = time_keyed_left.asof_join(time_keyed_right, 'time', by='key')[
time_keyed_left, time_keyed_right.other_value
]
expr = time_keyed_left.asof_join(time_keyed_right, 'time', by='key')
result = expr.execute()
expected = pd.merge_asof(time_keyed_df1, time_keyed_df2, on='time', by='key')
tm.assert_frame_equal(result[expected.columns], expected)
with pytest.raises(AssertionError):
tm.assert_series_equal(result["time"], result["time_right"])
with pytest.raises(AssertionError):
tm.assert_series_equal(result["key"], result["key_right"])


@merge_asof_minversion
Expand All @@ -290,7 +294,7 @@ def test_keyed_asof_join_with_tolerance(
):
expr = time_keyed_left.asof_join(
time_keyed_right, 'time', by='key', tolerance=2 * ibis.interval(days=1)
)[time_keyed_left, time_keyed_right.other_value]
)
result = expr.execute()
expected = pd.merge_asof(
time_keyed_df1,
Expand All @@ -300,6 +304,10 @@ def test_keyed_asof_join_with_tolerance(
tolerance=pd.Timedelta('2D'),
)
tm.assert_frame_equal(result[expected.columns], expected)
with pytest.raises(AssertionError):
tm.assert_series_equal(result["time"], result["time_right"])
with pytest.raises(AssertionError):
tm.assert_series_equal(result["key"], result["key_right"])


@merge_asof_minversion
Expand Down

0 comments on commit 4514668

Please sign in to comment.