Skip to content

Commit

Permalink
fix(clickhouse): fix create_table implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and kszucs committed Apr 7, 2023
1 parent ed08e44 commit 5a54489
Show file tree
Hide file tree
Showing 24 changed files with 145 additions and 44 deletions.
71 changes: 58 additions & 13 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ def table(self, name: str, database: str | None = None) -> ir.Table:
Table expression
"""
schema = self.get_schema(name, database=database)
return ClickhouseTable(ops.DatabaseTable(name, schema, self))
qname = self._fully_qualified_name(name, database)
return ClickhouseTable(ops.DatabaseTable(qname, schema, self))

def raw_sql(
self,
Expand Down Expand Up @@ -446,7 +447,9 @@ def close(self):
self.con.close()

def _fully_qualified_name(self, name, database):
return sg.table(name, db=database or self.current_database).sql()
return sg.table(name, db=database or self.current_database or None).sql(
dialect="clickhouse"
)

def get_schema(
self,
Expand Down Expand Up @@ -507,13 +510,13 @@ def drop_database(self, name: str, *, force: bool = False) -> None:
self.raw_sql(f"DROP DATABASE {'IF EXISTS ' * force}{name}")

def truncate_table(self, name: str, database: str | None = None) -> None:
ident = ".".join(filter(None, (database, name)))
ident = self._fully_qualified_name(name, database)
self.raw_sql(f"DELETE FROM {ident}")

def drop_table(
self, name: str, database: str | None = None, force: bool = False
) -> None:
ident = ".".join(filter(None, (database, name)))
ident = self._fully_qualified_name(name, database)
self.raw_sql(f"DROP TABLE {'IF EXISTS ' * force}{ident}")

def create_table(
Expand All @@ -526,30 +529,67 @@ def create_table(
temp: bool = False,
overwrite: bool = False,
# backend specific arguments
engine: str | None,
engine: str,
order_by: Iterable[str] | None = None,
partition_by: Iterable[str] | None = None,
sample_by: str | None = None,
settings: Mapping[str, Any] | None = None,
) -> ir.Table:
"""Create a table in a ClickHouse database.
Parameters
----------
name
Name of the table to create
obj
Optional data to create the table with
schema
Optional names and types of the table
database
Database to create the table in
temp
Create a temporary table. This is not yet supported, and exists for
API compatibility.
overwrite
Whether to overwrite the table
engine
The table engine to use. See [ClickHouse's `CREATE TABLE` documentation](https://clickhouse.com/docs/en/sql-reference/statements/create/table)
for specifics.
order_by
String column names to order by. Required for some table engines like `MergeTree`.
partition_by
String column names to partition by
sample_by
String column names to sample by
settings
Key-value pairs of settings for table creation
Returns
-------
Table
The new table
"""
if temp:
raise com.IbisError("ClickHouse temporary tables are not yet supported")

tmp = "TEMPORARY " * temp
replace = "OR REPLACE " * overwrite
code = f"CREATE {replace}{tmp}TABLE {name}"
code = (
f"CREATE {replace}{tmp}TABLE {self._fully_qualified_name(name, database)}"
)

if obj is None and schema is None:
raise com.IbisError("The schema or obj parameter is required")

if schema is not None:
code += f" ({schema})"
serialized_schema = ", ".join(
f"`{name}` {serialize(typ)}" for name, typ in schema.items()
)
code += f" ({serialized_schema})"

if isinstance(obj, pd.DataFrame):
if obj is not None and not isinstance(obj, ir.Expr):
obj = ibis.memtable(obj, schema=schema)

if obj is not None:
self._register_in_memory_tables(obj)
query = self.compile(obj)
code += f" AS {query}"

code += f" ENGINE = {engine}"

if order_by is not None:
Expand All @@ -565,6 +605,11 @@ def create_table(
kvs = ", ".join(f"{name}={value!r}" for name, value in settings.items())
code += f" SETTINGS {kvs}"

if obj is not None:
self._register_in_memory_tables(obj)
query = self.compile(obj)
code += f" AS {query}"

self.raw_sql(code)
return self.table(name, database=database)

Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/clickhouse/compiler/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _dummy(op: ops.DummyTable, **kw):

@translate_rel.register(ops.PhysicalTable)
def _physical_table(op: ops.PhysicalTable, **_):
return sg.table(op.name)
return sg.parse_one(op.name, into=sg.exp.Table)


@translate_rel.register(ops.Selection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM (
SELECT
t0.string_col,
COUNT(*) AS count
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
GROUP BY
1
) AS t1
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
*
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
t0.string_col IN ('foo', 'bar')
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
*
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
NOT t0.string_col IN ('foo', 'bar')
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT
SUM(CASE WHEN isNull(t0.string_col) THEN 1 ELSE 0 END) AS "Sum(Where(IsNull(string_col), 1, 0))"
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM functional_alltypes AS t0
INNER JOIN functional_alltypes AS t1
FROM ibis_testing.functional_alltypes AS t0
INNER JOIN ibis_testing.functional_alltypes AS t1
ON t0.id = t1.id
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT
*
FROM functional_alltypes
FROM ibis_testing.functional_alltypes
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT
*
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
ANY JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
ANY JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.awardID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
LEFT ANY JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
LEFT ANY JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.awardID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
INNER JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
INNER JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.awardID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
LEFT OUTER JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
LEFT OUTER JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.awardID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
ANY JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
ANY JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.playerID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
LEFT ANY JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
LEFT ANY JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.playerID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
INNER JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
INNER JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.playerID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
t0.*
FROM batting AS t0
LEFT OUTER JOIN awards_players AS t1
FROM ibis_testing.batting AS t0
LEFT OUTER JOIN ibis_testing.awards_players AS t1
ON t0.playerID = t1.playerID
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
SUM(t0.float_col) AS "Sum(float_col)"
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
t0.int_col > 0
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM (
SELECT
t0.string_col,
SUM(t0.float_col) AS total
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
t0.int_col > 0
GROUP BY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ SELECT
toHour(t0.timestamp_col) AS hour,
toMinute(t0.timestamp_col) AS minute,
toSecond(t0.timestamp_col) AS second
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
SELECT
*
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
t0.float_col > 0 AND t0.int_col < (
t0.float_col * 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
*
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
t0.int_col > 0 AND t0.float_col BETWEEN 0 AND 1
56 changes: 56 additions & 0 deletions ibis/backends/clickhouse/tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pandas as pd
import pandas.testing as tm
import pytest
from pytest import param

import ibis
import ibis.expr.datatypes as dt
Expand All @@ -13,6 +14,8 @@
CLICKHOUSE_USER,
IBIS_TEST_CLICKHOUSE_DB,
)
from ibis.common.exceptions import IbisError
from ibis.util import gen_name

pytest.importorskip("clickhouse_driver")

Expand Down Expand Up @@ -204,3 +207,56 @@ def test_list_tables_empty_database(con, worker_id):
assert not con.list_tables(database=dbname)
finally:
con.raw_sql(f"DROP DATABASE IF EXISTS {dbname}")


@pytest.mark.parametrize(
"temp",
[
param(
True,
marks=pytest.mark.xfail(
reason="Ibis is likely making incorrect assumptions about object lifetime and cursors",
raises=IbisError,
),
),
False,
],
ids=["temp", "no_temp"],
)
def test_create_table_no_data(con, temp):
name = gen_name("clickhouse_create_table_no_data")
schema = ibis.schema(dict(a="!int", b="string"))
t = con.create_table(
name, schema=schema, temp=temp, engine="Memory", database="tmptables"
)
try:
assert t.execute().empty
finally:
con.drop_table(name, force=True, database="tmptables")
assert name not in con.list_tables(database="tmptables")


@pytest.mark.parametrize(
"data",
[
{"a": [1, 2, 3], "b": [None, "b", "c"]},
pd.DataFrame({"a": [1, 2, 3], "b": [None, "b", "c"]}),
],
ids=["dict", "dataframe"],
)
@pytest.mark.parametrize(
"engine",
["File(Native)", "File(Parquet)", "Memory"],
ids=["native", "mem", "parquet"],
)
def test_create_table_data(con, data, engine):
name = gen_name("clickhouse_create_table_data")
schema = ibis.schema(dict(a="!int", b="string"))
t = con.create_table(
name, obj=data, schema=schema, engine=engine, database="tmptables"
)
try:
assert len(t.execute()) == 3
finally:
con.drop_table(name, force=True, database="tmptables")
assert name not in con.list_tables(database="tmptables")
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SELECT
*
FROM functional_alltypes AS t0
FROM ibis_testing.functional_alltypes AS t0
WHERE
multiMatchAny(t0.string_col, [ '0'])

0 comments on commit 5a54489

Please sign in to comment.