Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(clickhouse): add TPC-H and TPC-DS queries #9508

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions ibis/backends/clickhouse/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from __future__ import annotations

import concurrent.futures
import contextlib
import os
import subprocess
from typing import TYPE_CHECKING, Any

import pytest
import sqlglot as sg

import ibis
import ibis.expr.types as ir
from ibis import util
from ibis.backends.tests.base import ServiceBackendTest

if TYPE_CHECKING:
from collections.abc import Callable, Iterable
from collections.abc import Callable, Iterable, Mapping
from pathlib import Path

CLICKHOUSE_HOST = os.environ.get("IBIS_TEST_CLICKHOUSE_HOST", "localhost")
Expand All @@ -31,6 +34,10 @@ class TestConf(ServiceBackendTest):
data_volume = "/var/lib/clickhouse/user_files/ibis"
service_name = "clickhouse"
deps = ("clickhouse_connect",)
supports_tpch = True
supports_tpcds = True
# Query 14 seems to require a bit more room here
tpc_absolute_tolerance = 0.0001

@property
def native_bool(self) -> bool:
Expand Down Expand Up @@ -71,12 +78,21 @@ def postload(self, **kw: Any):
self.connection = self.connect(database=IBIS_TEST_CLICKHOUSE_DB, **kw)

@staticmethod
def connect(*, tmpdir, worker_id, **kw: Any):
def connect(
*, tmpdir, worker_id, settings: Mapping[str, Any] | None = None, **kw: Any
):
if settings is None:
settings = {}

# without this setting TPC-DS 19 and 24 will fail
settings.setdefault("allow_experimental_join_condition", 1)

return ibis.clickhouse.connect(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
password=CLICKHOUSE_PASS,
user=CLICKHOUSE_USER,
settings=settings,
**kw,
)

Expand All @@ -96,6 +112,70 @@ def least(f: Callable[..., ir.Value], *args: ir.Value) -> ir.Value:
)
return f(*args)

def preload(self):
super().preload()

suites = ("tpch", "tpcds")

service_name = self.service_name
data_volume = self.data_volume

for suite in suites:
subprocess.run(
[
"docker",
"compose",
"exec",
service_name,
"mkdir",
"-p",
f"{data_volume}/{suite}",
],
check=True,
)

with concurrent.futures.ThreadPoolExecutor() as executor:
for fut in concurrent.futures.as_completed(
executor.submit(
subprocess.run,
[
"docker",
"compose",
"cp",
str(path),
f"{service_name}:{data_volume}/{suite}/{path.name}",
],
check=True,
)
for suite in suites
for path in self.data_dir.joinpath(suite).rglob("*.parquet")
):
fut.result()

def _load_tpc(self, *, suite, scale_factor):
con = self.connection
schema = f"tpc{suite}"
con.con.command(f"CREATE DATABASE IF NOT EXISTS {schema}")
parquet_dir = self.data_dir.joinpath(schema, f"sf={scale_factor}", "parquet")
assert parquet_dir.exists(), parquet_dir
for path in parquet_dir.glob("*.parquet"):
table_name = path.with_suffix("").name
con.con.command(
f"CREATE VIEW IF NOT EXISTS {schema}.{table_name} AS "
f"SELECT * FROM file('ibis/{schema}/{path.name}', 'Parquet')"
)

def _transform_tpc_sql(self, parsed, *, suite, leaves):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in leaves:
return node.__class__(
catalog=f"tpc{suite}",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node

return parsed.transform(add_catalog_and_schema)


@pytest.fixture(scope="session")
def con(tmp_path_factory, data_dir, worker_id):
Expand Down
26 changes: 18 additions & 8 deletions ibis/backends/tests/tpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def pytest_pyfunc_call(pyfuncitem):
return True


def tpc_test(suite_name: Literal["h", "ds"], result_is_empty=False):
def tpc_test(suite_name: Literal["h", "ds"], *, result_is_empty=False):
"""Decorator for TPC tests.
Parameters
Expand Down Expand Up @@ -68,20 +68,27 @@ def wrapper(*args, backend, **kwargs):
query_number = query_name_match.group(1)
sql_path_name = f"{query_number}.sql"

path = Path(__file__).parent.joinpath(
"queries", "duckdb", suite_name, sql_path_name
)
base = Path(__file__).parent / "queries"

path = base / backend_name / suite_name / sql_path_name

if path.exists():
dialect = backend_name
else:
dialect = "duckdb"
path = base / "duckdb" / suite_name / sql_path_name

raw_sql = path.read_text()

sql = sg.parse_one(raw_sql, read="duckdb")
sql = sg.parse_one(raw_sql, read=dialect)

sql = backend._transform_tpc_sql(
sql, suite=suite_name, leaves=backend.list_tpc_tables(suite_name)
)

raw_sql = sql.sql(dialect="duckdb", pretty=True)
raw_sql = sql.sql(dialect=dialect, pretty=True)

expected_expr = backend.connection.sql(raw_sql, dialect="duckdb")
expected_expr = backend.connection.sql(raw_sql, dialect=dialect)

result_expr = test(*args, **kwargs)

Expand All @@ -90,7 +97,10 @@ def wrapper(*args, backend, **kwargs):
assert (result_is_empty and result.empty) or not result.empty

expected = expected_expr.to_pandas()
assert list(map(str.lower, expected.columns)) == result.columns.tolist()

assert len(expected.columns) == len(result.columns)
assert all(r in e.lower() for r, e in zip(result.columns, expected.columns))

expected.columns = result.columns

expected = PandasData.convert_table(expected, result_expr.schema())
Expand Down
23 changes: 22 additions & 1 deletion ibis/backends/tests/tpc/ds/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
from ibis import _, date, ifelse, null
from ibis import literal as lit
from ibis import selectors as s
from ibis.backends.tests.errors import ClickHouseDatabaseError
from ibis.backends.tests.tpc.conftest import tpc_test


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("ds")
def test_01(store_returns, date_dim, store, customer):
customer_total_return = (
Expand Down Expand Up @@ -220,6 +226,11 @@ def test_05(
raise NotImplementedError()


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("ds")
def test_06(customer_address, customer, store_sales, date_dim, item):
return (
Expand Down Expand Up @@ -743,6 +754,11 @@ def test_09(store_sales, reason):
@pytest.mark.broken(
["datafusion"], reason="Exception: Optimizer rule 'scalar_subquery_to_join' failed"
)
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
def test_10(
customer,
customer_address,
Expand Down Expand Up @@ -1015,11 +1031,16 @@ def test_15(catalog_sales, customer, customer_address, date_dim):
)


@tpc_test("ds")
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@pytest.mark.notyet(
["datafusion"],
reason="Error during planning: Correlated column is not allowed in predicate",
)
@tpc_test("ds")
def test_16(catalog_sales, date_dim, customer_address, call_center, catalog_returns):
return (
catalog_sales.join(date_dim, [("cs_ship_date_sk", "d_date_sk")])
Expand Down
32 changes: 32 additions & 0 deletions ibis/backends/tests/tpc/h/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

import ibis
from ibis.backends.tests.errors import ClickHouseDatabaseError
from ibis.backends.tests.tpc.conftest import add_date, tpc_test


Expand Down Expand Up @@ -39,6 +40,11 @@ def test_01(lineitem):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_02(part, supplier, partsupp, nation, region):
"""Minimum Cost Supplier Query (Q2)"""
Expand Down Expand Up @@ -110,6 +116,11 @@ def test_03(customer, orders, lineitem):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_04(orders, lineitem):
"""Order Priority Checking Query (Q4)"""
Expand Down Expand Up @@ -409,6 +420,7 @@ def test_12(orders, lineitem):
return q


@pytest.mark.broken(["clickhouse"], reason="broken sqlglot codegen")
@tpc_test("h")
def test_13(customer, orders):
"""Customer Distribution Query (Q13)
Expand Down Expand Up @@ -522,6 +534,11 @@ def test_16(partsupp, part, supplier):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_17(lineitem, part):
"""Small-Quantity-Order Revenue Query (Q17)
Expand Down Expand Up @@ -628,6 +645,11 @@ def test_19(lineitem, part):
return q


@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_20(supplier, nation, partsupp, part, lineitem):
"""Potential Part Promotion Query (Q20)
Expand Down Expand Up @@ -672,6 +694,11 @@ def test_20(supplier, nation, partsupp, part, lineitem):
raises=Exception,
reason="EXISTS subqueries not supported in DataFusion",
)
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_21(supplier, lineitem, orders, nation):
"""Suppliers Who Kept Orders Waiting Query (Q21)
Expand Down Expand Up @@ -723,6 +750,11 @@ def test_21(supplier, lineitem, orders, nation):
raises=Exception,
reason="EXISTS subqueries not supported in DataFusion",
)
@pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="correlated subqueries don't exist in clickhouse",
)
@tpc_test("h")
def test_22(customer, orders):
"""Global Sales Opportunity Query (Q22)
Expand Down
42 changes: 42 additions & 0 deletions ibis/backends/tests/tpc/queries/clickhouse/ds/13.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

SELECT avg(ss_quantity) avg1,
avg(ss_ext_sales_price) avg2,
avg(ss_ext_wholesale_cost) avg3,
sum(ss_ext_wholesale_cost) sum1
FROM store_sales ,
store ,
customer_demographics ,
household_demographics ,
customer_address ,
date_dim
WHERE s_store_sk = ss_store_sk
AND ss_sold_date_sk = d_date_sk
AND ss_hdemo_sk = hd_demo_sk
AND cd_demo_sk = ss_cdemo_sk
AND ss_addr_sk = ca_address_sk
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
AND d_year = 2001 and((
cd_marital_status = 'M'
AND cd_education_status = 'Advanced Degree'
AND ss_sales_price BETWEEN 100.00 AND 150.00
AND hd_dep_count = 3)
OR (
cd_marital_status = 'S'
AND cd_education_status = 'College'
AND ss_sales_price BETWEEN 50.00 AND 100.00
AND hd_dep_count = 1 )
OR (
cd_marital_status = 'W'
AND cd_education_status = '2 yr Degree'
AND ss_sales_price BETWEEN 150.00 AND 200.00
AND hd_dep_count = 1)) and((
ca_country = 'United States'
AND ca_state IN ('TX', 'OH', 'TX')
AND ss_net_profit BETWEEN 100 AND 200)
OR (
ca_country = 'United States'
AND ca_state IN ('OR', 'NM', 'KY')
AND ss_net_profit BETWEEN 150 AND 300)
OR (
ca_country = 'United States'
AND ca_state IN ('VA', 'TX', 'MS')
AND ss_net_profit BETWEEN 50 AND 250)) ;
Loading