From ea0826d829253f3aa23fb2aee52e3a2ea1193b7f Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Wed, 11 Oct 2023 05:59:30 -0400 Subject: [PATCH] refactor(clickhouse): use sqlglot for `create_table` implementation --- ibis/backends/clickhouse/__init__.py | 117 +++++++++++++++++---------- 1 file changed, 74 insertions(+), 43 deletions(-) diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index 6077f58e52c4..69a49b21ee30 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -486,21 +486,10 @@ def raw_sql( self._log(query) return self.con.query(query, external_data=external_data, **kwargs) - def fetch_from_cursor(self, cursor, schema): - import pandas as pd - - from ibis.formats.pandas import PandasData - - df = pd.DataFrame.from_records(iter(cursor), columns=schema.names) - return PandasData.convert_table(df, schema) - def close(self) -> None: """Close ClickHouse connection.""" self.con.close() - def _fully_qualified_name(self, name: str, database: str | None) -> str: - return sg.table(name, db=database).sql(dialect="clickhouse") - def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema: """Return a Schema object for the indicated table and database. @@ -673,21 +662,13 @@ def create_table( Table The new table """ - tmp = "TEMPORARY " * temp - replace = "OR REPLACE " * overwrite - if temp and overwrite: - raise com.IbisInputError("Cannot specify both temp and overwrite") - - if not temp: - table = self._fully_qualified_name(name, database) - else: - table = name - database = None - code = f"CREATE {replace}{tmp}TABLE {table}" + raise com.IbisInputError( + "Cannot specify both `temp=True` and `overwrite=True` for ClickHouse" + ) if obj is None and schema is None: - raise com.IbisError("The schema or obj parameter is required") + raise com.IbisError("The `schema` or `obj` parameter is required") if obj is not None and not isinstance(obj, ir.Expr): obj = ibis.memtable(obj, schema=schema) @@ -695,39 +676,89 @@ def create_table( if schema is None: schema = obj.schema() - serialized_schema = ", ".join( - f"`{name}` {ClickhouseType.to_string(typ)}" for name, typ in schema.items() + this = sg.exp.Schema( + this=sg.table(name, db=database), + expressions=[ + sg.exp.ColumnDef( + this=sg.to_identifier(name), kind=ClickhouseType.from_ibis(typ) + ) + for name, typ in schema.items() + ], ) - - code += f" ({serialized_schema}) ENGINE = {engine}" - - if order_by is not None: - code += f" ORDER BY {', '.join(util.promote_list(order_by))}" - elif engine == "MergeTree": - # empty tuple to indicate no specific order when engine is - # MergeTree - code += " ORDER BY tuple()" + properties = [ + # the engine cannot be quoted, since clickhouse won't allow e.g., + # "File(Native)" + sg.exp.EngineProperty(this=sg.to_identifier(engine, quoted=False)) + ] + + if temp: + properties.append(sg.exp.TemporaryProperty()) + + if order_by is not None or engine == "MergeTree": + # engine == "MergeTree" requires an order by clause, which is the + # empty tuple if order_by is False-y + properties.append( + sg.exp.Order( + expressions=[ + sg.exp.Ordered( + this=sg.exp.Tuple( + expressions=list(map(sg.column, order_by or ())) + ) + ) + ] + ) + ) if partition_by is not None: - code += f" PARTITION BY {', '.join(util.promote_list(partition_by))}" + properties.append( + sg.exp.PartitionedByProperty( + this=sg.exp.Schema( + expressions=list(map(sg.to_identifier, partition_by)) + ) + ) + ) if sample_by is not None: - code += f" SAMPLE BY {sample_by}" + properties.append( + sg.exp.SampleProperty( + this=sg.exp.Tuple(expressions=list(map(sg.column, sample_by))) + ) + ) if settings: - kvs = ", ".join(f"{name}={value!r}" for name, value in settings.items()) - code += f" SETTINGS {kvs}" + properties.append( + sg.exp.SettingsProperty( + expressions=[ + sg.exp.SetItem( + this=sg.exp.EQ( + this=sg.to_identifier(name), expression=lit(value) + ) + ) + for name, value in settings.items() + ] + ) + ) + + external_tables = {} + expression = None if obj is not None: - code += f" AS {self.compile(obj)}" - external_tables = self._collect_in_memory_tables(obj) - else: - external_tables = {} + expression = self._to_sqlglot(obj) + external_tables.update(self._collect_in_memory_tables(obj)) + + code = sg.exp.Create( + this=this, + kind="TABLE", + replace=overwrite, + expression=expression, + properties=sg.exp.Properties(expressions=properties), + ) external_data = self._normalize_external_tables(external_tables) # create the table - self.con.raw_query(code, external_data=external_data) + sql = code.sql(self.name, pretty=True) + self.con.raw_query(sql, external_data=external_data) return self.table(name, database=database)