Skip to content

Commit

Permalink
test(clickhouse): add TPC-H and TPC-DS queries (#9508)
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored Jul 15, 2024
1 parent 87af588 commit ef4e47b
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 11 deletions.
84 changes: 82 additions & 2 deletions ibis/backends/clickhouse/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from __future__ import annotations

import concurrent.futures
import contextlib
import os
import subprocess
from typing import TYPE_CHECKING, Any

import pytest
import sqlglot as sg

import ibis
import ibis.expr.types as ir
from ibis import util
from ibis.backends.tests.base import ServiceBackendTest

if TYPE_CHECKING:
from collections.abc import Callable, Iterable
from collections.abc import Callable, Iterable, Mapping
from pathlib import Path

CLICKHOUSE_HOST = os.environ.get("IBIS_TEST_CLICKHOUSE_HOST", "localhost")
Expand All @@ -31,6 +34,10 @@ class TestConf(ServiceBackendTest):
data_volume = "/var/lib/clickhouse/user_files/ibis"
service_name = "clickhouse"
deps = ("clickhouse_connect",)
supports_tpch = True
supports_tpcds = True
# Query 14 seems to require a bit more room here
tpc_absolute_tolerance = 0.0001

@property
def native_bool(self) -> bool:
Expand Down Expand Up @@ -71,12 +78,21 @@ def postload(self, **kw: Any):
self.connection = self.connect(database=IBIS_TEST_CLICKHOUSE_DB, **kw)

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
def connect(
*, tmpdir, worker_id, settings: Mapping[str, Any] | None = None, **kw: Any
):
if settings is None:
settings = {}

# without this setting TPC-DS 19 and 24 will fail
settings.setdefault("allow_experimental_join_condition", 1)

return ibis.clickhouse.connect(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
password=CLICKHOUSE_PASS,
user=CLICKHOUSE_USER,
settings=settings,
**kw,
)

Expand All @@ -96,6 +112,70 @@ def least(f: Callable[..., ir.Value], *args: ir.Value) -> ir.Value:
)
return f(*args)

def preload(self):
super().preload()

suites = ("tpch", "tpcds")

service_name = self.service_name
data_volume = self.data_volume

for suite in suites:
subprocess.run(
[
"docker",
"compose",
"exec",
service_name,
"mkdir",
"-p",
f"{data_volume}/{suite}",
],
check=True,
)

with concurrent.futures.ThreadPoolExecutor() as executor:
for fut in concurrent.futures.as_completed(
executor.submit(
subprocess.run,
[
"docker",
"compose",
"cp",
str(path),
f"{service_name}:{data_volume}/{suite}/{path.name}",
],
check=True,
)
for suite in suites
for path in self.data_dir.joinpath(suite).rglob("*.parquet")
):
fut.result()

def _load_tpc(self, *, suite, scale_factor):
con = self.connection
schema = f"tpc{suite}"
con.con.command(f"CREATE DATABASE IF NOT EXISTS {schema}")
parquet_dir = self.data_dir.joinpath(schema, f"sf={scale_factor}", "parquet")
assert parquet_dir.exists(), parquet_dir
for path in parquet_dir.glob("*.parquet"):
table_name = path.with_suffix("").name
con.con.command(
f"CREATE VIEW IF NOT EXISTS {schema}.{table_name} AS "
f"SELECT * FROM file('ibis/{schema}/{path.name}', 'Parquet')"
)

def _transform_tpc_sql(self, parsed, *, suite, leaves):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in leaves:
return node.__class__(
catalog=f"tpc{suite}",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node

return parsed.transform(add_catalog_and_schema)


@pytest.fixture(scope="session")
def con(tmp_path_factory, data_dir, worker_id):
Expand Down
26 changes: 18 additions & 8 deletions ibis/backends/tests/tpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def pytest_pyfunc_call(pyfuncitem):
return True


def tpc_test(suite_name: Literal["h", "ds"], result_is_empty=False):
def tpc_test(suite_name: Literal["h", "ds"], *, result_is_empty=False):
"""Decorator for TPC tests.
Parameters
Expand Down Expand Up @@ -68,20 +68,27 @@ def wrapper(*args, backend, **kwargs):
query_number = query_name_match.group(1)
sql_path_name = f"{query_number}.sql"

path = Path(__file__).parent.joinpath(
"queries", "duckdb", suite_name, sql_path_name
)
base = Path(__file__).parent / "queries"

path = base / backend_name / suite_name / sql_path_name

if path.exists():
dialect = backend_name
else:
dialect = "duckdb"
path = base / "duckdb" / suite_name / sql_path_name

raw_sql = path.read_text()

sql = sg.parse_one(raw_sql, read="duckdb")
sql = sg.parse_one(raw_sql, read=dialect)

sql = backend._transform_tpc_sql(
sql, suite=suite_name, leaves=backend.list_tpc_tables(suite_name)
)

raw_sql = sql.sql(dialect="duckdb", pretty=True)
raw_sql = sql.sql(dialect=dialect, pretty=True)

expected_expr = backend.connection.sql(raw_sql, dialect="duckdb")
expected_expr = backend.connection.sql(raw_sql, dialect=dialect)

result_expr = test(*args, **kwargs)

Expand All @@ -90,7 +97,10 @@ def wrapper(*args, backend, **kwargs):
assert (result_is_empty and result.empty) or not result.empty

expected = expected_expr.to_pandas()
assert list(map(str.lower, expected.columns)) == result.columns.tolist()

assert len(expected.columns) == len(result.columns)
assert all(r in e.lower() for r, e in zip(result.columns, expected.columns))

expected.columns = result.columns

expected = PandasData.convert_table(expected, result_expr.schema())
Expand Down
23 changes: 22 additions & 1 deletion ibis/backends/tests/tpc/ds/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
from ibis import _, date, ifelse, null
from ibis import literal as lit
from ibis import selectors as s
from ibis.backends.tests.errors import ClickHouseDatabaseError
from ibis.backends.tests.tpc.conftest import tpc_test


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("ds")
def test_01(store_returns, date_dim, store, customer):
customer_total_return = (
Expand Down Expand Up @@ -220,6 +226,11 @@ def test_05(
raise NotImplementedError()


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("ds")
def test_06(customer_address, customer, store_sales, date_dim, item):
return (
Expand Down Expand Up @@ -743,6 +754,11 @@ def test_09(store_sales, reason):
@pytest.mark.broken(
["datafusion"], reason="Exception: Optimizer rule 'scalar_subquery_to_join' failed"
)
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
def test_10(
customer,
customer_address,
Expand Down Expand Up @@ -1015,11 +1031,16 @@ def test_15(catalog_sales, customer, customer_address, date_dim):
)


@tpc_test("ds")
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@pytest.mark.notyet(
["datafusion"],
reason="Error during planning: Correlated column is not allowed in predicate",
)
@tpc_test("ds")
def test_16(catalog_sales, date_dim, customer_address, call_center, catalog_returns):
return (
catalog_sales.join(date_dim, [("cs_ship_date_sk", "d_date_sk")])
Expand Down
32 changes: 32 additions & 0 deletions ibis/backends/tests/tpc/h/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

import ibis
from ibis.backends.tests.errors import ClickHouseDatabaseError
from ibis.backends.tests.tpc.conftest import add_date, tpc_test


Expand Down Expand Up @@ -39,6 +40,11 @@ def test_01(lineitem):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_02(part, supplier, partsupp, nation, region):
"""Minimum Cost Supplier Query (Q2)"""
Expand Down Expand Up @@ -110,6 +116,11 @@ def test_03(customer, orders, lineitem):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_04(orders, lineitem):
"""Order Priority Checking Query (Q4)"""
Expand Down Expand Up @@ -409,6 +420,7 @@ def test_12(orders, lineitem):
return q


@pytest.mark.broken(["clickhouse"], reason="broken sqlglot codegen")
@tpc_test("h")
def test_13(customer, orders):
"""Customer Distribution Query (Q13)
Expand Down Expand Up @@ -522,6 +534,11 @@ def test_16(partsupp, part, supplier):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_17(lineitem, part):
"""Small-Quantity-Order Revenue Query (Q17)
Expand Down Expand Up @@ -628,6 +645,11 @@ def test_19(lineitem, part):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_20(supplier, nation, partsupp, part, lineitem):
"""Potential Part Promotion Query (Q20)
Expand Down Expand Up @@ -672,6 +694,11 @@ def test_20(supplier, nation, partsupp, part, lineitem):
raises=Exception,
reason="EXISTS subqueries not supported in DataFusion",
)
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_21(supplier, lineitem, orders, nation):
"""Suppliers Who Kept Orders Waiting Query (Q21)
Expand Down Expand Up @@ -723,6 +750,11 @@ def test_21(supplier, lineitem, orders, nation):
raises=Exception,
reason="EXISTS subqueries not supported in DataFusion",
)
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_22(customer, orders):
"""Global Sales Opportunity Query (Q22)
Expand Down
42 changes: 42 additions & 0 deletions ibis/backends/tests/tpc/queries/clickhouse/ds/13.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

SELECT avg(ss_quantity) avg1,
avg(ss_ext_sales_price) avg2,
avg(ss_ext_wholesale_cost) avg3,
sum(ss_ext_wholesale_cost) sum1
FROM store_sales ,
store ,
customer_demographics ,
household_demographics ,
customer_address ,
date_dim
WHERE s_store_sk = ss_store_sk
AND ss_sold_date_sk = d_date_sk
AND ss_hdemo_sk = hd_demo_sk
AND cd_demo_sk = ss_cdemo_sk
AND ss_addr_sk = ca_address_sk
AND d_year = 2001 and((
cd_marital_status = 'M'
AND cd_education_status = 'Advanced Degree'
AND ss_sales_price BETWEEN 100.00 AND 150.00
AND hd_dep_count = 3)
OR (
cd_marital_status = 'S'
AND cd_education_status = 'College'
AND ss_sales_price BETWEEN 50.00 AND 100.00
AND hd_dep_count = 1 )
OR (
cd_marital_status = 'W'
AND cd_education_status = '2 yr Degree'
AND ss_sales_price BETWEEN 150.00 AND 200.00
AND hd_dep_count = 1)) and((
ca_country = 'United States'
AND ca_state IN ('TX', 'OH', 'TX')
AND ss_net_profit BETWEEN 100 AND 200)
OR (
ca_country = 'United States'
AND ca_state IN ('OR', 'NM', 'KY')
AND ss_net_profit BETWEEN 150 AND 300)
OR (
ca_country = 'United States'
AND ca_state IN ('VA', 'TX', 'MS')
AND ss_net_profit BETWEEN 50 AND 250)) ;
Loading

0 comments on commit ef4e47b

Please sign in to comment.