Skip to content

Commit

Permalink
refactor(tpc): add tpc-ds tests (#9467)
Browse files Browse the repository at this point in the history
First 27 TPC-DS queries running against DuckDB, Trino, Snowflake, and
DataFusion, sans the ones that requires `ROLLUP`.

More fail on DataFusion than the others. Those are marked with
appropriate xfail marker.
  • Loading branch information
cpcloud authored Jul 1, 2024
1 parent cec0374 commit d2dff68
Show file tree
Hide file tree
Showing 94 changed files with 1,568 additions and 6,699 deletions.
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[codespell]
# local codespell matches `./docs`, pre-commit codespell matches `docs`
skip = *.lock,.direnv,.git,./docs/_freeze,./docs/_output/**,./docs/_inv/**,docs/_freeze/**,*.svg,*.css,*.html,*.js
skip = *.lock,.direnv,.git,./docs/_freeze,./docs/_output/**,./docs/_inv/**,docs/_freeze/**,*.svg,*.css,*.html,*.js,ibis/backends/tests/tpc/queries/duckdb/ds/44.sql
ignore-regex = \b(i[if]f|I[IF]F|AFE)\b
builtin = clear,rare,names
ignore-words-list = tim,notin,ang
20 changes: 8 additions & 12 deletions ci/schema/trino.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
DROP TABLE IF EXISTS hive.default.diamonds;
CREATE TABLE hive.default.diamonds (
CREATE TABLE IF NOT EXISTS hive.default.diamonds (
"carat" DOUBLE,
"cut" VARCHAR,
"color" VARCHAR,
Expand All @@ -18,8 +17,7 @@ CREATE TABLE hive.default.diamonds (
CREATE OR REPLACE VIEW memory.default.diamonds AS
SELECT * FROM hive.default.diamonds;

DROP TABLE IF EXISTS hive.default.astronauts;
CREATE TABLE hive.default.astronauts (
CREATE TABLE IF NOT EXISTS hive.default.astronauts (
"id" BIGINT,
"number" BIGINT,
"nationwide_number" BIGINT,
Expand Down Expand Up @@ -52,8 +50,7 @@ CREATE TABLE hive.default.astronauts (
CREATE OR REPLACE VIEW memory.default.astronauts AS
SELECT * FROM hive.default.astronauts;

DROP TABLE IF EXISTS hive.default.batting;
CREATE TABLE hive.default.batting (
CREATE TABLE IF NOT EXISTS hive.default.batting (
"playerID" VARCHAR,
"yearID" BIGINT,
"stint" BIGINT,
Expand Down Expand Up @@ -84,24 +81,22 @@ CREATE TABLE hive.default.batting (
CREATE OR REPLACE VIEW memory.default.batting AS
SELECT * FROM hive.default.batting;

DROP TABLE IF EXISTS hive.default.awards_players;
CREATE TABLE hive.default.awards_players (
CREATE TABLE IF NOT EXISTS hive.default.awards_players (
"playerID" VARCHAR,
"awardID" VARCHAR,
"yearID" BIGINT,
"lgID" VARCHAR,
"tie" VARCHAR,
"notes" VARCHAR
) WITH (
external_location = 's3a://trino/awards-players',
external_location = 's3a://trino/awards_players',
format = 'PARQUET'
);

CREATE OR REPLACE VIEW memory.default.awards_players AS
SELECT * FROM hive.default.awards_players;

DROP TABLE IF EXISTS hive.default.functional_alltypes;
CREATE TABLE hive.default.functional_alltypes (
CREATE TABLE IF NOT EXISTS hive.default.functional_alltypes (
"id" INTEGER,
"bool_col" BOOLEAN,
"tinyint_col" TINYINT,
Expand All @@ -116,9 +111,10 @@ CREATE TABLE hive.default.functional_alltypes (
"year" INTEGER,
"month" INTEGER
) WITH (
external_location = 's3a://trino/functional-alltypes',
external_location = 's3a://trino/functional_alltypes',
format = 'PARQUET'
);

CREATE OR REPLACE VIEW memory.default.functional_alltypes AS
SELECT * FROM hive.default.functional_alltypes;

Expand Down
14 changes: 4 additions & 10 deletions ibis/backends/datafusion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class TestConf(BackendTest):
supports_json = False
supports_arrays = True
supports_tpch = True
supports_tpcds = True
stateful = False
deps = ("datafusion",)
# Query 1 seems to require a bit more room here
Expand All @@ -39,20 +40,14 @@ def _load_data(self, **_: Any) -> None:
def connect(*, tmpdir, worker_id, **kw):
return ibis.datafusion.connect(**kw)

def load_tpch(self) -> None:
"""Load TPC-H data."""
self.tpch_tables = frozenset(self._load_tpc(suite="h", scale_factor="0.17"))

def _load_tpc(self, *, suite, scale_factor):
con = self.connection
schema = f"tpc{suite}"
con.create_database(schema)
tables = set()
for path in self.data_dir.joinpath(
schema, f"sf={scale_factor}", "parquet"
).glob("*.parquet"):
table_name = path.with_suffix("").name
tables.add(table_name)
con.con.sql(
# datafusion can't create an external table in a specific schema it seems
# so hack around that by
Expand All @@ -68,13 +63,12 @@ def _load_tpc(self, *, suite, scale_factor):
f"CREATE TABLE {schema}.{table_name} AS SELECT * FROM {table_name}"
)
con.con.sql(f"DROP TABLE {table_name}")
return tables

def _transform_tpch_sql(self, parsed):
def _transform_tpc_sql(self, parsed, *, suite, leaves):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in self.tpch_tables:
if isinstance(node, sg.exp.Table) and node.name in leaves:
return node.__class__(
catalog="tpch",
catalog=f"tpc{suite}",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node
Expand Down
28 changes: 3 additions & 25 deletions ibis/backends/duckdb/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,42 +115,20 @@ def _load_tpc(self, *, suite, scale_factor):
con.con.execute(f"CREATE OR REPLACE SCHEMA {schema}")
parquet_dir = self.data_dir.joinpath(schema, f"sf={scale_factor}", "parquet")
assert parquet_dir.exists(), parquet_dir
tables = set()
for path in parquet_dir.glob("*.parquet"):
table_name = path.with_suffix("").name
tables.add(table_name)
# duckdb automatically infers the sf= as a hive partition so we
# need to disable it
con.con.execute(
f"CREATE OR REPLACE VIEW {schema}.{table_name} AS "
f"FROM read_parquet({str(path)!r}, hive_partitioning=false)"
)
return tables

def load_tpch(self) -> None:
"""Load TPC-H data."""
self.tpch_tables = frozenset(self._load_tpc(suite="h", scale_factor="0.17"))

def load_tpcds(self) -> None:
"""Load TPC-DS data."""
self.tpcds_tables = frozenset(self._load_tpc(suite="ds", scale_factor="0.2"))

def _transform_tpch_sql(self, parsed):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in self.tpch_tables:
return node.__class__(
catalog="tpch",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node

return parsed.transform(add_catalog_and_schema)

def _transform_tpcds_sql(self, parsed):
def _transform_tpc_sql(self, parsed, *, suite, leaves):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in self.tpcds_tables:
if isinstance(node, sg.exp.Table) and node.name in leaves:
return node.__class__(
catalog="tpcds",
catalog=f"tpc{suite}",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node
Expand Down
49 changes: 34 additions & 15 deletions ibis/backends/snowflake/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,47 @@ class TestConf(BackendTest):
supports_map = True
deps = ("snowflake.connector",)
supports_tpch = True
supports_tpcds = True

def load_tpch(self) -> None:
"""No-op, snowflake already defines these in `SNOWFLAKE_SAMPLE_DATA`."""
def _load_tpc(self, *, suite, scale_factor) -> None:
"""Create views of data in the TPC-H catalog that ships with Trino.
def h(self, name: str):
name = name.upper()
t = self.connection.table(name, database="SNOWFLAKE_SAMPLE_DATA.TPCH_SF1")
return t.rename("snake_case")
This method create relations that have column names prefixed with the
first one (or two in the case of partsupp -> ps) character table name
to match the DuckDB TPC-H query conventions.
"""
con = self.connection
schema = f"tpc{suite}"

con.create_database(schema, force=True)

parquet_dir = self.data_dir.joinpath(schema, f"sf={scale_factor}", "parquet")
assert parquet_dir.exists(), parquet_dir

tables = frozenset(con.list_tables(database=("IBIS_TESTING", schema)))
for path in parquet_dir.glob("*.parquet"):
table_name = path.with_suffix("").name
if table_name not in tables:
con.create_table(
table_name,
con.read_parquet(path),
database=f'IBIS_TESTING."{schema}"',
)

def _transform_tpc_sql(self, parsed, *, suite, leaves):
def quote(node):
if isinstance(node, sg.exp.Identifier):
return sg.to_identifier(node.name, quoted=True)
return node

def _transform_tpch_sql(self, parsed):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table):
return node.__class__(
db="TPCH_SF1",
catalog="SNOWFLAKE_SAMPLE_DATA",
**{
k: v for k, v in node.args.items() if k not in ("db", "catalog")
},
)
if isinstance(node, sg.exp.Table) and node.name in leaves:
node.args["db"] = sg.to_identifier(f"tpc{suite}")
node.args["catalog"] = sg.to_identifier("IBIS_TESTING")
return node

result = parsed.transform(add_catalog_and_schema)
result = result.transform(quote)
return result

def _load_data(self, **_: Any) -> None:
Expand Down
16 changes: 15 additions & 1 deletion ibis/backends/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def name(cls) -> str:
def connect(*, tmpdir, worker_id, **kw: Any):
"""Return a connection with data loaded from `data_dir`."""

def _transform_tpch_sql(self, parsed):
def _transform_tpc_sql(self, parsed, *, suite, leaves):
return parsed

def _load_data(self, **_: Any) -> None:
Expand All @@ -140,6 +140,14 @@ def stateful_load(self, fn, **kw):
self.stateless_load(**kw)
fn.touch()

def load_tpch(self) -> None:
"""Load TPC-H data."""
self._load_tpc(suite="h", scale_factor="0.17")

def load_tpcds(self) -> None:
"""Load TPC-DS data."""
self._load_tpc(suite="ds", scale_factor="0.2")

@classmethod
def load_data(
cls, data_dir: Path, tmpdir: Path, worker_id: str, **kw: Any
Expand Down Expand Up @@ -314,6 +322,12 @@ def h(self, name: str) -> ir.Table:
def ds(self, name: str) -> ir.Table:
return self._tpc_table(name, "ds")

def list_tpc_tables(self, suite: Literal["h", "ds"]) -> frozenset[str]:
return frozenset(
path.with_suffix("").name
for path in self.data_dir.joinpath(f"tpc{suite}").rglob("*.parquet")
)


class ServiceBackendTest(BackendTest):
"""Parent class to use for backend test configuration if backend requires a
Expand Down
47 changes: 23 additions & 24 deletions ibis/backends/tests/tpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import functools
import re
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

import pytest
import sqlglot as sg
Expand All @@ -20,38 +20,43 @@


def pytest_pyfunc_call(pyfuncitem):
"""Inject `backend` and `snapshot` fixtures to all TPC-DS test functions.
"""Inject `backend` and fixtures to all TPC-DS test functions.
Defining this hook here limits its scope to the TPC-DS tests.
"""
testfunction = pyfuncitem.obj
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
result = testfunction(
**testargs, backend=funcargs["backend"], snapshot=funcargs["snapshot"]
)
result = testfunction(**testargs, backend=funcargs["backend"])
assert (
result is None
), "test function should not return anything, did you mean to use assert?"
return True


def tpc_test(suite_name):
def inner(test: Callable[..., ir.Table]):
"""Decorator for TPC tests.
def tpc_test(suite_name: Literal["h", "ds"], result_is_empty=False):
"""Decorator for TPC tests.
Parameters
----------
suite_name
The name of the TPC suite. Only `'h'` and ~'ds'~ are supported right now.
result_is_empty
If the expected result is an empty table.
Automates the process of loading the SQL query from the file system and
asserting that the result of the ibis expression is equal to the expected
result of executing the raw SQL.
"""
Automates the process of loading the SQL query from the file system and
asserting that the result of the ibis expression is equal to the expected
result of executing the raw SQL.
"""

def inner(test: Callable[..., ir.Table]):
name = f"tpc{suite_name}"

@getattr(pytest.mark, name)
@pytest.mark.usefixtures("backend", "snapshot")
@pytest.mark.usefixtures("backend")
@pytest.mark.xdist_group(name)
@functools.wraps(test)
def wrapper(*args, backend, snapshot, **kwargs):
def wrapper(*args, backend, **kwargs):
backend_name = backend.name()
if not getattr(backend, f"supports_{name}"):
pytest.skip(
Expand All @@ -70,29 +75,26 @@ def wrapper(*args, backend, snapshot, **kwargs):

sql = sg.parse_one(raw_sql, read="duckdb")

transform_method = getattr(
backend, f"_transform_{name}_sql", lambda sql: sql
sql = backend._transform_tpc_sql(
sql, suite=suite_name, leaves=backend.list_tpc_tables(suite_name)
)
sql = transform_method(sql)

raw_sql = sql.sql(dialect="duckdb", pretty=True)

expected_expr = backend.connection.sql(raw_sql, dialect="duckdb")

result_expr = test(*args, **kwargs)

ibis_sql = ibis.to_sql(result_expr, dialect=backend_name)

assert result_expr._find_backend(use_default=False) is backend.connection
result = backend.connection.to_pandas(result_expr)
assert not result.empty
assert (result_is_empty and result.empty) or not result.empty

expected = expected_expr.to_pandas()
assert list(map(str.lower, expected.columns)) == result.columns.tolist()
expected.columns = result.columns

expected = PandasData.convert_table(expected, result_expr.schema())
assert not expected.empty
assert (result_is_empty and expected.empty) or not expected.empty

assert len(expected) == len(result)
assert result.columns.tolist() == expected.columns.tolist()
Expand All @@ -108,9 +110,6 @@ def wrapper(*args, backend, snapshot, **kwargs):
== right.values.tolist()
)

# only write sql if the execution passes
snapshot.assert_match(ibis_sql, sql_path_name)

return wrapper

return inner
Expand Down
Loading

0 comments on commit d2dff68

Please sign in to comment.