From 6121b3bd63de526f01ef46ecad49427176a0961e Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 2 Jul 2024 07:37:19 -0400 Subject: [PATCH] test(clickhouse): setup tpc data --- ibis/backends/clickhouse/tests/conftest.py | 84 ++++++++++++++++++- ibis/backends/tests/tpc/conftest.py | 26 ++++-- ibis/backends/tests/tpc/ds/test_queries.py | 23 ++++- ibis/backends/tests/tpc/h/test_queries.py | 32 +++++++ .../tests/tpc/queries/clickhouse/ds/13.sql | 42 ++++++++++ .../tests/tpc/queries/clickhouse/h/19.sql | 29 +++++++ 6 files changed, 225 insertions(+), 11 deletions(-) create mode 100644 ibis/backends/tests/tpc/queries/clickhouse/ds/13.sql create mode 100644 ibis/backends/tests/tpc/queries/clickhouse/h/19.sql diff --git a/ibis/backends/clickhouse/tests/conftest.py b/ibis/backends/clickhouse/tests/conftest.py index 72a4ec0aba86..a2880a7e52e2 100644 --- a/ibis/backends/clickhouse/tests/conftest.py +++ b/ibis/backends/clickhouse/tests/conftest.py @@ -1,10 +1,13 @@ from __future__ import annotations +import concurrent.futures import contextlib import os +import subprocess from typing import TYPE_CHECKING, Any import pytest +import sqlglot as sg import ibis import ibis.expr.types as ir @@ -12,7 +15,7 @@ from ibis.backends.tests.base import ServiceBackendTest if TYPE_CHECKING: - from collections.abc import Callable, Iterable + from collections.abc import Callable, Iterable, Mapping from pathlib import Path CLICKHOUSE_HOST = os.environ.get("IBIS_TEST_CLICKHOUSE_HOST", "localhost") @@ -31,6 +34,10 @@ class TestConf(ServiceBackendTest): data_volume = "/var/lib/clickhouse/user_files/ibis" service_name = "clickhouse" deps = ("clickhouse_connect",) + supports_tpch = True + supports_tpcds = True + # Query 14 seems to require a bit more room here + tpc_absolute_tolerance = 0.0001 @property def native_bool(self) -> bool: @@ -71,12 +78,21 @@ def postload(self, **kw: Any): self.connection = self.connect(database=IBIS_TEST_CLICKHOUSE_DB, **kw) @staticmethod - def connect(*, tmpdir, worker_id, **kw: Any): + def connect( + *, tmpdir, worker_id, settings: Mapping[str, Any] | None = None, **kw: Any + ): + if settings is None: + settings = {} + + # without this setting TPC-DS 19 and 24 will fail + settings.setdefault("allow_experimental_join_condition", 1) + return ibis.clickhouse.connect( host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, password=CLICKHOUSE_PASS, user=CLICKHOUSE_USER, + settings=settings, **kw, ) @@ -96,6 +112,70 @@ def least(f: Callable[..., ir.Value], *args: ir.Value) -> ir.Value: ) return f(*args) + def preload(self): + super().preload() + + suites = ("tpch", "tpcds") + + service_name = self.service_name + data_volume = self.data_volume + + for suite in suites: + subprocess.run( + [ + "docker", + "compose", + "exec", + service_name, + "mkdir", + "-p", + f"{data_volume}/{suite}", + ], + check=True, + ) + + with concurrent.futures.ThreadPoolExecutor() as executor: + for fut in concurrent.futures.as_completed( + executor.submit( + subprocess.run, + [ + "docker", + "compose", + "cp", + str(path), + f"{service_name}:{data_volume}/{suite}/{path.name}", + ], + check=True, + ) + for suite in suites + for path in self.data_dir.joinpath(suite).rglob("*.parquet") + ): + fut.result() + + def _load_tpc(self, *, suite, scale_factor): + con = self.connection + schema = f"tpc{suite}" + con.con.command(f"CREATE DATABASE IF NOT EXISTS {schema}") + parquet_dir = self.data_dir.joinpath(schema, f"sf={scale_factor}", "parquet") + assert parquet_dir.exists(), parquet_dir + for path in parquet_dir.glob("*.parquet"): + table_name = path.with_suffix("").name + con.con.command( + f"CREATE VIEW IF NOT EXISTS {schema}.{table_name} AS " + f"SELECT * FROM file('ibis/{schema}/{path.name}', 'Parquet')" + ) + + def _transform_tpc_sql(self, parsed, *, suite, leaves): + def add_catalog_and_schema(node): + if isinstance(node, sg.exp.Table) and node.name in leaves: + return node.__class__( + catalog=f"tpc{suite}", + **{k: v for k, v in node.args.items() if k != "catalog"}, + ) + return node + + return parsed.transform(add_catalog_and_schema) + @pytest.fixture(scope="session") def con(tmp_path_factory, data_dir, worker_id): diff --git a/ibis/backends/tests/tpc/conftest.py b/ibis/backends/tests/tpc/conftest.py index ce30aaa83ac3..442cd29bd293 100644 --- a/ibis/backends/tests/tpc/conftest.py +++ b/ibis/backends/tests/tpc/conftest.py @@ -34,7 +34,7 @@ def pytest_pyfunc_call(pyfuncitem): return True -def tpc_test(suite_name: Literal["h", "ds"], result_is_empty=False): +def tpc_test(suite_name: Literal["h", "ds"], *, result_is_empty=False): """Decorator for TPC tests. Parameters @@ -68,20 +68,27 @@ def wrapper(*args, backend, **kwargs): query_number = query_name_match.group(1) sql_path_name = f"{query_number}.sql" - path = Path(__file__).parent.joinpath( - "queries", "duckdb", suite_name, sql_path_name - ) + base = Path(__file__).parent / "queries" + + path = base / backend_name / suite_name / sql_path_name + + if path.exists(): + dialect = backend_name + else: + dialect = "duckdb" + path = base / "duckdb" / suite_name / sql_path_name + raw_sql = path.read_text() - sql = sg.parse_one(raw_sql, read="duckdb") + sql = sg.parse_one(raw_sql, read=dialect) sql = backend._transform_tpc_sql( sql, suite=suite_name, leaves=backend.list_tpc_tables(suite_name) ) - raw_sql = sql.sql(dialect="duckdb", pretty=True) + raw_sql = sql.sql(dialect=dialect, pretty=True) - expected_expr = backend.connection.sql(raw_sql, dialect="duckdb") + expected_expr = backend.connection.sql(raw_sql, dialect=dialect) result_expr = test(*args, **kwargs) @@ -90,7 +97,10 @@ def wrapper(*args, backend, **kwargs): assert (result_is_empty and result.empty) or not result.empty expected = expected_expr.to_pandas() - assert list(map(str.lower, expected.columns)) == result.columns.tolist() + + assert len(expected.columns) == len(result.columns) + assert all(r in e.lower() for r, e in zip(result.columns, expected.columns)) + expected.columns = result.columns expected = PandasData.convert_table(expected, result_expr.schema()) diff --git a/ibis/backends/tests/tpc/ds/test_queries.py b/ibis/backends/tests/tpc/ds/test_queries.py index cf0c42581dc8..12c8a9ba8676 100644 --- a/ibis/backends/tests/tpc/ds/test_queries.py +++ b/ibis/backends/tests/tpc/ds/test_queries.py @@ -7,9 +7,15 @@ from ibis import _, date, ifelse, null from ibis import literal as lit from ibis import selectors as s +from ibis.backends.tests.errors import ClickHouseDatabaseError from ibis.backends.tests.tpc.conftest import tpc_test +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("ds") def test_01(store_returns, date_dim, store, customer): customer_total_return = ( @@ -220,6 +226,11 @@ def test_05( raise NotImplementedError() +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("ds") def test_06(customer_address, customer, store_sales, date_dim, item): return ( @@ -743,6 +754,11 @@ def test_09(store_sales, reason): @pytest.mark.broken( ["datafusion"], reason="Exception: Optimizer rule 'scalar_subquery_to_join' failed" ) +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) def test_10( customer, customer_address, @@ -1015,11 +1031,16 @@ def test_15(catalog_sales, customer, customer_address, date_dim): ) -@tpc_test("ds") +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @pytest.mark.notyet( ["datafusion"], reason="Error during planning: Correlated column is not allowed in predicate", ) +@tpc_test("ds") def test_16(catalog_sales, date_dim, customer_address, call_center, catalog_returns): return ( catalog_sales.join(date_dim, [("cs_ship_date_sk", "d_date_sk")]) diff --git a/ibis/backends/tests/tpc/h/test_queries.py b/ibis/backends/tests/tpc/h/test_queries.py index 208969eec7e4..cb549cdd0fe4 100644 --- a/ibis/backends/tests/tpc/h/test_queries.py +++ b/ibis/backends/tests/tpc/h/test_queries.py @@ -3,6 +3,7 @@ import pytest import ibis +from ibis.backends.tests.errors import ClickHouseDatabaseError from ibis.backends.tests.tpc.conftest import add_date, tpc_test @@ -39,6 +40,11 @@ def test_01(lineitem): return q +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("h") def test_02(part, supplier, partsupp, nation, region): """Minimum Cost Supplier Query (Q2)""" @@ -110,6 +116,11 @@ def test_03(customer, orders, lineitem): return q +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("h") def test_04(orders, lineitem): """Order Priority Checking Query (Q4)""" @@ -409,6 +420,7 @@ def test_12(orders, lineitem): return q +@pytest.mark.broken(["clickhouse"], reason="broken sqlglot codegen") @tpc_test("h") def test_13(customer, orders): """Customer Distribution Query (Q13) @@ -522,6 +534,11 @@ def test_16(partsupp, part, supplier): return q +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("h") def test_17(lineitem, part): """Small-Quantity-Order Revenue Query (Q17) @@ -628,6 +645,11 @@ def test_19(lineitem, part): return q +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("h") def test_20(supplier, nation, partsupp, part, lineitem): """Potential Part Promotion Query (Q20) @@ -672,6 +694,11 @@ def test_20(supplier, nation, partsupp, part, lineitem): raises=Exception, reason="EXISTS subqueries not supported in DataFusion", ) +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("h") def test_21(supplier, lineitem, orders, nation): """Suppliers Who Kept Orders Waiting Query (Q21) @@ -723,6 +750,11 @@ def test_21(supplier, lineitem, orders, nation): raises=Exception, reason="EXISTS subqueries not supported in DataFusion", ) +@pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="correlated subqueries don't exist in clickhouse", +) @tpc_test("h") def test_22(customer, orders): """Global Sales Opportunity Query (Q22) diff --git a/ibis/backends/tests/tpc/queries/clickhouse/ds/13.sql b/ibis/backends/tests/tpc/queries/clickhouse/ds/13.sql new file mode 100644 index 000000000000..a54e48693bb4 --- /dev/null +++ b/ibis/backends/tests/tpc/queries/clickhouse/ds/13.sql @@ -0,0 +1,42 @@ + +SELECT avg(ss_quantity) avg1, + avg(ss_ext_sales_price) avg2, + avg(ss_ext_wholesale_cost) avg3, + sum(ss_ext_wholesale_cost) sum1 +FROM store_sales , + store , + customer_demographics , + household_demographics , + customer_address , + date_dim +WHERE s_store_sk = ss_store_sk + AND ss_sold_date_sk = d_date_sk + AND ss_hdemo_sk = hd_demo_sk + AND cd_demo_sk = ss_cdemo_sk + AND ss_addr_sk = ca_address_sk + AND d_year = 2001 and(( + cd_marital_status = 'M' + AND cd_education_status = 'Advanced Degree' + AND ss_sales_price BETWEEN 100.00 AND 150.00 + AND hd_dep_count = 3) + OR ( + cd_marital_status = 'S' + AND cd_education_status = 'College' + AND ss_sales_price BETWEEN 50.00 AND 100.00 + AND hd_dep_count = 1 ) + OR ( + cd_marital_status = 'W' + AND cd_education_status = '2 yr Degree' + AND ss_sales_price BETWEEN 150.00 AND 200.00 + AND hd_dep_count = 1)) and(( + ca_country = 'United States' + AND ca_state IN ('TX', 'OH', 'TX') + AND ss_net_profit BETWEEN 100 AND 200) + OR ( + ca_country = 'United States' + AND ca_state IN ('OR', 'NM', 'KY') + AND ss_net_profit BETWEEN 150 AND 300) + OR ( + ca_country = 'United States' + AND ca_state IN ('VA', 'TX', 'MS') + AND ss_net_profit BETWEEN 50 AND 250)) ; diff --git a/ibis/backends/tests/tpc/queries/clickhouse/h/19.sql b/ibis/backends/tests/tpc/queries/clickhouse/h/19.sql new file mode 100644 index 000000000000..d56bd19ae5b4 --- /dev/null +++ b/ibis/backends/tests/tpc/queries/clickhouse/h/19.sql @@ -0,0 +1,29 @@ +SELECT + sum(l_extendedprice * (1 - l_discount)) AS revenue +FROM lineitem +INNER JOIN part +ON p_partkey = l_partkey +WHERE ( + p_brand = 'Brand#12' + AND p_container IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') + AND l_quantity >= 1 + AND l_quantity <= 1 + 10 + AND p_size BETWEEN 1 AND 5 + AND l_shipmode IN ('AIR', 'AIR REG') + AND l_shipinstruct = 'DELIVER IN PERSON') + OR ( + p_brand = 'Brand#23' + AND p_container IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') + AND l_quantity >= 10 + AND l_quantity <= 10 + 10 + AND p_size BETWEEN 1 AND 10 + AND l_shipmode IN ('AIR', 'AIR REG') + AND l_shipinstruct = 'DELIVER IN PERSON') + OR ( + p_brand = 'Brand#34' + AND p_container IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') + AND l_quantity >= 20 + AND l_quantity <= 20 + 10 + AND p_size BETWEEN 1 AND 15 + AND l_shipmode IN ('AIR', 'AIR REG') + AND l_shipinstruct = 'DELIVER IN PERSON');