From dc2ea252efab5deb8b318897f455e8ed7af98c2f Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sun, 13 Aug 2023 10:22:16 -0400 Subject: [PATCH] feat(clickhouse): add `read_parquet` and `read_csv` --- ibis/backends/clickhouse/__init__.py | 47 +++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index 7e4de6405d90..875f524f591f 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -25,6 +25,8 @@ from ibis.backends.clickhouse.datatypes import parse, serialize if TYPE_CHECKING: + from pathlib import Path + import pandas as pd from ibis.common.typing import SupportsSchema @@ -243,7 +245,7 @@ def _normalize_external_tables(self, external_tables=None) -> ExternalData | Non return external_data def _collect_in_memory_tables( - self, expr: ir.TableExpr | None, external_tables: Mapping | None = None + self, expr: ir.Table | None, external_tables: Mapping | None = None ): memtables = {op.name: op for op in expr.op().find(ops.InMemoryTable)} externals = toolz.valmap(_to_memtable, external_tables or {}) @@ -533,6 +535,49 @@ def drop_table( with closing(self.raw_sql(f"DROP TABLE {'IF EXISTS ' * force}{ident}")): pass + def read_parquet( + self, + path: str | Path, + table_name: str | None = None, + engine: str = "File(Parquet)", + **kwargs: Any, + ) -> ir.Table: + import pyarrow.parquet as pq + from clickhouse_connect.driver.tools import insert_file + + from ibis.formats.pyarrow import PyArrowSchema + + schema = PyArrowSchema.to_ibis(pq.read_metadata(path).schema.to_arrow_schema()) + + name = table_name or util.gen_name("clickhouse_read_parquet") + table = self.create_table(name, engine=engine, schema=schema, temp=True) + + insert_file( + client=self.con, table=name, file_path=str(path), fmt="Parquet", **kwargs + ) + return table + + def read_csv( + self, + path: str | Path, + table_name: str | None = None, + engine: str = "File(Native)", + **kwargs: Any, + ) -> ir.Table: + import pyarrow.csv as pac + from clickhouse_connect.driver.tools import insert_file + + from ibis.formats.pyarrow import PyArrowSchema + + with pac.open_csv(path) as f: + schema = PyArrowSchema.to_ibis(f.schema) + + name = table_name or util.gen_name("clickhouse_read_csv") + table = self.create_table(name, engine=engine, schema=schema, temp=True) + + insert_file(client=self.con, table=name, file_path=str(path), **kwargs) + return table + def create_table( self, name: str,