Skip to content

Commit

Permalink
feat: implement the remaining clickhouse joins
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and kszucs committed Mar 2, 2022
1 parent 1a56974 commit b3aa1f0
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 129 deletions.
43 changes: 35 additions & 8 deletions ci/datamgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ def impala_create_test_database(con, env):
]
),
}
PARQUET_SCHEMAS.update(
(table, schema)
for table, schema in TEST_TABLES.items()
if table != "functional_alltypes"
)


AVRO_SCHEMAS = {
Expand All @@ -184,6 +189,30 @@ def impala_create_test_database(con, env):
ALL_SCHEMAS = collections.ChainMap(PARQUET_SCHEMAS, AVRO_SCHEMAS)


@click.group()
@click.option("-v", "--verbose", count=True)
def cli(verbose):
codes = {0: logging.ERROR, 1: logging.INFO, 2: logging.DEBUG}
logger.setLevel(codes.get(verbose, logging.DEBUG))


@cli.command()
@click.pass_context
def create(ctx):
import pyarrow.parquet as pq

executor = ctx.obj["executor"]

for name, table in read_tables(
(name for name in TEST_TABLES if name != "functional_alltypes"),
DATA_DIR,
):
logger.info(f"Creating {name}.parquet from CSV")
dirname = DATA_DIR / "parquet" / name
dirname.mkdir(parents=True, exist_ok=True)
executor.submit(pq.write_table, table, dirname / f"{name}.parquet")


def impala_create_tables(con, env, *, executor=None):
test_data_dir = env.test_data_dir
avro_files = [
Expand Down Expand Up @@ -285,13 +314,6 @@ def read_tables(
)


@click.group()
@click.option("-v", "--verbose", count=True)
def cli(verbose):
codes = {0: logging.ERROR, 1: logging.INFO, 2: logging.DEBUG}
logger.setLevel(codes.get(verbose, logging.DEBUG))


@cli.group()
def load():
pass
Expand Down Expand Up @@ -755,6 +777,11 @@ def duckdb(schema, tables, data_directory, database, **params):
- etc.
"""
with concurrent.futures.ThreadPoolExecutor(
max_workers=URLLIB_DEFAULT_POOL_SIZE
max_workers=int(
os.environ.get(
"IBIS_DATA_MAX_WORKERS",
URLLIB_DEFAULT_POOL_SIZE,
)
)
) as executor:
cli(auto_envvar_prefix='IBIS_TEST', obj=dict(executor=executor))
9 changes: 7 additions & 2 deletions ibis/backends/clickhouse/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ def format_limit(self):
class ClickhouseTableSetFormatter(TableSetFormatter):

_join_names = {
ops.InnerJoin: 'ALL INNER JOIN',
ops.LeftJoin: 'ALL LEFT JOIN',
ops.InnerJoin: 'INNER JOIN',
ops.LeftJoin: 'LEFT OUTER JOIN',
ops.RightJoin: 'RIGHT OUTER JOIN',
ops.OuterJoin: 'FULL OUTER JOIN',
ops.CrossJoin: 'CROSS JOIN',
ops.LeftSemiJoin: 'LEFT SEMI JOIN',
ops.LeftAntiJoin: 'LEFT ANTI JOIN',
ops.AnyInnerJoin: 'ANY INNER JOIN',
ops.AnyLeftJoin: 'ANY LEFT JOIN',
}
Expand Down
9 changes: 0 additions & 9 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,6 @@ def get_schema(

names, types = zip(*pairs)
ibis_types = [udf.parse_type(type.lower()) for type in types]
names = [name.lower() for name in names]

return sch.Schema(names, ibis_types)

@property
Expand Down Expand Up @@ -991,13 +989,6 @@ def _get_schema_using_query(self, query):
names, ibis_types = self._adapt_types(cur.description)
cur.release()

# per #321; most Impala tables will be lower case already, but Avro
# data, depending on the version of Impala, might have field names in
# the metastore cased according to the explicit case in the declared
# avro schema. This is very annoying, so it's easier to just conform on
# all lowercase fields from Impala.
names = [x.lower() for x in names]

return sch.Schema(names, ibis_types)

def create_function(self, func, name=None, database=None):
Expand Down
20 changes: 18 additions & 2 deletions ibis/backends/impala/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,29 @@ def connect(data_directory: Path):
database=env.test_data_db,
)

def _get_original_column_names(self, tablename: str) -> list[str]:
import pyarrow.parquet as pq

pq_file = pq.ParquetFile(
self.data_directory
/ "parquet"
/ tablename
/ f"{tablename}.parquet"
)
return pq_file.schema.names

def _get_renamed_table(self, tablename: str) -> ir.TableExpr:
t = self.connection.table(tablename)
original_names = self._get_original_column_names(tablename)
return t.relabel(dict(zip(t.columns, original_names)))

@property
def batting(self) -> ir.TableExpr:
return None
return self._get_renamed_table("batting")

@property
def awards_players(self) -> ir.TableExpr:
return None
return self._get_renamed_table("awards_players")


class IbisTestEnv:
Expand Down
35 changes: 28 additions & 7 deletions ibis/backends/pandas/execution/join.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import operator

import pandas as pd
Expand Down Expand Up @@ -40,15 +41,35 @@ def execute_cross_join(op, left, right, **kwargs):
)


@execute_node.register(ops.Join, pd.DataFrame, pd.DataFrame)
def execute_join(op, left, right, **kwargs):
op_type = type(op)
def _get_semi_anti_join_filter(op, left, right, **kwargs):
left_on, right_on = _construct_join_predicate_columns(op, **kwargs)
inner = pd.merge(
left,
right,
how="inner",
left_on=left_on,
right_on=right_on,
suffixes=constants.JOIN_SUFFIXES,
)
predicates = [left.loc[:, key].isin(inner.loc[:, key]) for key in left_on]
return functools.reduce(operator.and_, predicates)


@execute_node.register(ops.LeftSemiJoin, pd.DataFrame, pd.DataFrame)
def execute_left_semi_join(op, left, right, **kwargs):
"""Execute a left semi join in pandas."""
inner_filt = _get_semi_anti_join_filter(op, left, right, **kwargs)
return left.loc[inner_filt, :]


@execute_node.register(ops.LeftAntiJoin, pd.DataFrame, pd.DataFrame)
def execute_left_anti_join(op, left, right, **kwargs):
"""Execute a left anti join in pandas."""
inner_filt = _get_semi_anti_join_filter(op, left, right, **kwargs)
return left.loc[~inner_filt, :]

try:
how = constants.JOIN_TYPES[op_type]
except KeyError:
raise NotImplementedError(f'{op_type.__name__} not supported')

def _construct_join_predicate_columns(op, **kwargs):
left_op = op.left.op()
right_op = op.right.op()

Expand Down
54 changes: 18 additions & 36 deletions ibis/backends/pandas/tests/execution/test_join.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,17 @@
import pandas as pd
import pandas.testing as tm
import pytest
from pytest import param

import ibis
from ibis.backends.pandas import Backend

join_type = pytest.mark.parametrize(
# SEMI and ANTI are checked in backend tests
mutating_join_type = pytest.mark.parametrize(
'how',
[
'inner',
'left',
'right',
'outer',
param(
'semi',
marks=pytest.mark.xfail(
raises=NotImplementedError, reason='Semi join not implemented'
),
),
param(
'anti',
marks=pytest.mark.xfail(
raises=NotImplementedError, reason='Anti join not implemented'
),
),
],
['inner', 'left', 'right', 'outer'],
)


@join_type
@mutating_join_type
def test_join(how, left, right, df1, df2):
expr = left.join(right, left.key == right.key, how=how)[
left, right.other_value, right.key3
Expand All @@ -49,7 +31,7 @@ def test_cross_join(left, right, df1, df2):
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_project_left_table(how, left, right, df1, df2):
expr = left.join(right, left.key == right.key, how=how)[left, right.key3]
result = expr.execute()
Expand All @@ -68,7 +50,7 @@ def test_cross_join_project_left_table(left, right, df1, df2):
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_multiple_predicates(how, left, right, df1, df2):
expr = left.join(
right, [left.key == right.key, left.key2 == right.key3], how=how
Expand All @@ -80,7 +62,7 @@ def test_join_with_multiple_predicates(how, left, right, df1, df2):
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_multiple_predicates_written_as_one(
how, left, right, df1, df2
):
Expand All @@ -95,7 +77,7 @@ def test_join_with_multiple_predicates_written_as_one(
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_invalid_predicates(how, left, right):
predicate = (left.key == right.key) & (left.key2 <= right.key3)
expr = left.join(right, predicate, how=how)
Expand All @@ -108,7 +90,7 @@ def test_join_with_invalid_predicates(how, left, right):
expr.execute()


@join_type
@mutating_join_type
@pytest.mark.xfail(reason='Hard to detect this case')
def test_join_with_duplicate_non_key_columns(how, left, right, df1, df2):
left = left.mutate(x=left.value * 2)
Expand All @@ -121,7 +103,7 @@ def test_join_with_duplicate_non_key_columns(how, left, right, df1, df2):
expr.execute()


@join_type
@mutating_join_type
def test_join_with_duplicate_non_key_columns_not_selected(
how, left, right, df1, df2
):
Expand All @@ -141,7 +123,7 @@ def test_join_with_duplicate_non_key_columns_not_selected(
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_post_expression_selection(how, left, right, df1, df2):
join = left.join(right, left.key == right.key, how=how)
expr = join[left.key, left.value, right.other_value]
Expand All @@ -152,7 +134,7 @@ def test_join_with_post_expression_selection(how, left, right, df1, df2):
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_post_expression_filter(how, left):
lhs = left[['key', 'key2']]
rhs = left[['key2', 'value']]
Expand All @@ -170,7 +152,7 @@ def test_join_with_post_expression_filter(how, left):
tm.assert_frame_equal(result, expected)


@join_type
@mutating_join_type
def test_multi_join_with_post_expression_filter(how, left, df1):
lhs = left[['key', 'key2']]
rhs = left[['key2', 'value']]
Expand All @@ -197,7 +179,7 @@ def test_multi_join_with_post_expression_filter(how, left, df1):
tm.assert_frame_equal(result, expected)


@join_type
@mutating_join_type
def test_join_with_non_trivial_key(how, left, right, df1, df2):
# also test that the order of operands in the predicate doesn't matter
join = left.join(right, right.key.length() == left.key.length(), how=how)
Expand All @@ -217,7 +199,7 @@ def test_join_with_non_trivial_key(how, left, right, df1, df2):
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_non_trivial_key_project_table(how, left, right, df1, df2):
# also test that the order of operands in the predicate doesn't matter
join = left.join(right, right.key.length() == left.key.length(), how=how)
Expand All @@ -239,7 +221,7 @@ def test_join_with_non_trivial_key_project_table(how, left, right, df1, df2):
tm.assert_frame_equal(result[expected.columns], expected)


@join_type
@mutating_join_type
def test_join_with_project_right_duplicate_column(client, how, left, df1, df3):
# also test that the order of operands in the predicate doesn't matter
right = client.table('df3')
Expand Down Expand Up @@ -358,7 +340,7 @@ def test_keyed_asof_join_with_tolerance(
def test_select_on_unambiguous_join(how, func):
df_t = pd.DataFrame({'a0': [1, 2, 3], 'b1': list("aab")})
df_s = pd.DataFrame({'a1': [2, 3, 4], 'b2': list("abc")})
con = Backend().connect({"t": df_t, "s": df_s})
con = ibis.pandas.connect({"t": df_t, "s": df_s})
t = con.table("t")
s = con.table("s")
method = getattr(t, f"{how}_join")
Expand Down Expand Up @@ -388,7 +370,7 @@ def test_select_on_unambiguous_asof_join(func):
df_s = pd.DataFrame(
{'a1': [2, 3, 4], 'b2': pd.date_range("20171230", periods=3)}
)
con = Backend().connect({"t": df_t, "s": df_s})
con = ibis.pandas.connect({"t": df_t, "s": df_s})
t = con.table("t")
s = con.table("s")
join = t.asof_join(s, t.b1 == s.b2)
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class BackendTest(abc.ABC):

def __init__(self, data_directory: Path) -> None:
self.connection = self.connect(data_directory)
self.data_directory = data_directory

def __str__(self):
return f'<BackendTest {self.name()}>'
Expand Down
Loading

0 comments on commit b3aa1f0

Please sign in to comment.