Skip to content

Commit

Permalink
refactor(tpc): rearrange tpch into tpc
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Jun 27, 2024
1 parent c1e2002 commit a548b03
Show file tree
Hide file tree
Showing 240 changed files with 6,014 additions and 2,275 deletions.
2 changes: 1 addition & 1 deletion ibis/backends/datafusion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestConf(BackendTest):
stateful = False
deps = ("datafusion",)
# Query 1 seems to require a bit more room here
tpch_absolute_tolerance = 0.11
tpc_absolute_tolerance = 0.11

def _load_data(self, **_: Any) -> None:
con = self.connection
Expand Down
54 changes: 47 additions & 7 deletions ibis/backends/duckdb/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING

import pytest
import sqlglot as sg

import ibis
from ibis.backends.conftest import TEST_TABLES
Expand Down Expand Up @@ -48,6 +49,7 @@ class TestConf(BackendTest):
deps = ("duckdb",)
stateful = False
supports_tpch = True
supports_tpcds = True
driver_supports_multiple_statements = True

def preload(self):
Expand Down Expand Up @@ -107,15 +109,53 @@ def connect(*, tmpdir, worker_id, **kw) -> BaseBackend:
kw["extension_directory"] = extension_directory
return ibis.duckdb.connect(**kw)

def load_tpch(self) -> None:
"""Load TPC-H data."""
def _load_tpc(self, *, suite, scale_factor):
con = self.connection
for path in self.data_dir.joinpath("tpch", "sf=0.17", "parquet").glob(
"*.parquet"
):
schema = f"tpc{suite}"
con.con.execute(f"CREATE OR REPLACE SCHEMA {schema}")
parquet_dir = self.data_dir.joinpath(schema, f"sf={scale_factor}", "parquet")
assert parquet_dir.exists(), parquet_dir
tables = set()
for path in parquet_dir.glob("*.parquet"):
table_name = path.with_suffix("").name
# duckdb automatically infers the sf=0.17 as a hive partition
con.read_parquet(path, table_name=table_name, hive_partitioning=False)
tables.add(table_name)
# duckdb automatically infers the sf= as a hive partition so we
# need to disable it
con.con.execute(
f"CREATE OR REPLACE VIEW {schema}.{table_name} AS "
f"FROM read_parquet({str(path)!r}, hive_partitioning=false)"
)
return tables

def load_tpch(self) -> None:
"""Load TPC-H data."""
self.tpch_tables = frozenset(self._load_tpc(suite="h", scale_factor="0.17"))

def load_tpcds(self) -> None:
"""Load TPC-DS data."""
self.tpcds_tables = frozenset(self._load_tpc(suite="ds", scale_factor="0.2"))

def _transform_tpch_sql(self, parsed):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in self.tpch_tables:
return node.__class__(
catalog="tpch",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node

return parsed.transform(add_catalog_and_schema)

def _transform_tpcds_sql(self, parsed):
def add_catalog_and_schema(node):
if isinstance(node, sg.exp.Table) and node.name in self.tpcds_tables:
return node.__class__(
catalog="tpcds",
**{k: v for k, v in node.args.items() if k != "catalog"},
)
return node

return parsed.transform(add_catalog_and_schema)


@pytest.fixture(scope="session")
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/snowflake/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TestConf(BackendTest):
def load_tpch(self) -> None:
"""No-op, snowflake already defines these in `SNOWFLAKE_SAMPLE_DATA`."""

def _tpch_table(self, name: str):
def h(self, name: str):
name = name.upper()
t = self.connection.table(name, database="SNOWFLAKE_SAMPLE_DATA.TPCH_SF1")
return t.rename("snake_case")
Expand Down
52 changes: 16 additions & 36 deletions ibis/backends/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ class BackendTest(abc.ABC):
"Whether special handling is needed for running a multi-process pytest run."
supports_tpch: bool = False
"Child class defines a `load_tpch` method that loads the required TPC-H tables into a connection."
supports_tpcds: bool = False
"Child class defines a `load_tpcds` method that loads the required TPC-DS tables into a connection."
force_sort = False
"Sort results before comparing against reference computation."
rounding_method: Literal["away_from_zero", "half_to_even"] = "away_from_zero"
"Name of round method to use for rounding test comparisons."
driver_supports_multiple_statements: bool = False
"Whether the driver supports executing multiple statements in a single call."
tpch_absolute_tolerance: float | None = None
"Absolute tolerance for floating point comparisons with pytest.approx in TPC-H correctness tests."
tpc_absolute_tolerance: float | None = None
"Absolute tolerance for floating point comparisons with pytest.approx in TPC correctness tests."

@property
@abc.abstractmethod
Expand Down Expand Up @@ -130,6 +132,8 @@ def stateless_load(self, **kw):

if self.supports_tpch:
self.load_tpch()
if self.supports_tpcds:
self.load_tpcds()

def stateful_load(self, fn, **kw):
if not fn.exists():
Expand Down Expand Up @@ -297,42 +301,18 @@ def api(self):
def make_context(self, params: Mapping[ir.Value, Any] | None = None):
return self.api.compiler.make_context(params=params)

@property
def customer(self):
return self._tpch_table("customer")

@property
def lineitem(self):
return self._tpch_table("lineitem")

@property
def nation(self):
return self._tpch_table("nation")

@property
def orders(self):
return self._tpch_table("orders")

@property
def part(self):
return self._tpch_table("part")

@property
def partsupp(self):
return self._tpch_table("partsupp")

@property
def region(self):
return self._tpch_table("region")
def _tpc_table(self, name: str, benchmark: Literal["h", "ds"]):
if not getattr(self, f"supports_tpc{benchmark}"):
pytest.skip(
f"{self.name()} backend does not support testing TPC-{benchmark.upper()}"
)
return self.connection.table(name, database=f"tpc{benchmark}")

@property
def supplier(self):
return self._tpch_table("supplier")
def h(self, name: str) -> ir.Table:
return self._tpch_table(name, "h")

def _tpch_table(self, name: str):
if not self.supports_tpch:
pytest.skip(f"{self.name()} backend does not support testing TPC-H")
return self.connection.table(name)
def ds(self, name: str) -> ir.Table:
return self._tpc_table(name, "ds")


class ServiceBackendTest(BackendTest):
Expand Down
File renamed without changes.
Loading

0 comments on commit a548b03

Please sign in to comment.