Skip to content

Commit

Permalink
perf(duckdb): faster to_parquet/to_csv implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and kszucs committed Mar 1, 2023
1 parent 764d9c3 commit 6071bb5
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 14 deletions.
4 changes: 2 additions & 2 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def to_csv(
The data source. A string or Path to the CSV file.
params
Mapping of scalar parameter expressions to value.
**kwargs
kwargs
Additional keyword arguments passed to pyarrow.csv.CSVWriter
https://arrow.apache.org/docs/python/generated/pyarrow.csv.CSVWriter.html
Expand Down Expand Up @@ -708,7 +708,7 @@ def compile(
"""Compile an expression."""
return self.compiler.to_sql(expr, params=params)

def _to_sql(self, expr: ir.Expr) -> str:
def _to_sql(self, expr: ir.Expr, **kwargs) -> str:
"""Convert an expression to a SQL string.
Called by `ibis.to_sql`/`ibis.show_sql`, gives the backend an
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/base/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ def compile(
"""
return self.compiler.to_ast_ensure_limit(expr, limit, params=params).compile()

def _to_sql(self, expr: ir.Expr) -> str:
return str(self.compile(expr))
def _to_sql(self, expr: ir.Expr, **kwargs) -> str:
return str(self.compile(expr, **kwargs))

def explain(
self,
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/base/sql/alchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ def inspector(self):
self._inspector.info_cache.clear()
return self._inspector

def _to_sql(self, expr: ir.Expr) -> str:
def _to_sql(self, expr: ir.Expr, **kwargs) -> str:
# For `ibis.to_sql` calls we render with literal binds and qmark params
dialect_class = sa.dialects.registry.load(
self.compiler.translator_class._dialect_name
)
sql = self.compile(expr).compile(
sql = self.compile(expr, **kwargs).compile(
dialect=dialect_class(paramstyle="qmark"),
compile_kwargs=dict(literal_binds=True),
)
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ def compile(self, expr: ir.Expr, limit: str | None = None, params=None, **_: Any
assert not isinstance(sql, sg.exp.Subquery)
return sql.sql(dialect="clickhouse", pretty=True)

def _to_sql(self, expr: ir.Expr) -> str:
return str(self.compile(expr))
def _to_sql(self, expr: ir.Expr, **kwargs) -> str:
return str(self.compile(expr, **kwargs))

def table(self, name: str, database: str | None = None) -> ir.Table:
"""Construct a table expression.
Expand Down
79 changes: 75 additions & 4 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def do_connect(
Path(temp_directory).mkdir(parents=True, exist_ok=True)
config["temp_directory"] = str(temp_directory)

config.setdefault("experimental_parallel_csv", 1)

engine = sa.create_engine(
f"duckdb:///{database}",
connect_args=dict(read_only=read_only, config=config),
Expand Down Expand Up @@ -556,11 +558,80 @@ def to_pyarrow(
else:
raise ValueError

def fetch_from_cursor(
@util.experimental
def to_parquet(
self,
expr: ir.Table,
path: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
The ibis expression to execute and persist to parquet.
path
The data source. A string or Path to the parquet file.
params
Mapping of scalar parameter expressions to value.
kwargs
DuckDB Parquet writer arguments. See
https://duckdb.org/docs/data/parquet#writing-to-parquet-files for
details
"""
query = self._to_sql(expr, params=params)
args = ["FORMAT 'parquet'", *(f"{k.upper()} {v!r}" for k, v in kwargs.items())]
copy_cmd = f"COPY ({query}) TO {str(path)!r} ({', '.join(args)})"
with self.begin() as con:
con.exec_driver_sql(copy_cmd)

@util.experimental
def to_csv(
self,
cursor: duckdb.DuckDBPyConnection,
schema: sch.Schema,
):
expr: ir.Table,
path: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
header: bool = True,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
The ibis expression to execute and persist to CSV.
path
The data source. A string or Path to the CSV file.
params
Mapping of scalar parameter expressions to value.
header
Whether to write the column names as the first line of the CSV file.
kwargs
DuckDB CSV writer arguments. https://duckdb.org/docs/data/csv.html#parameters
"""
query = self._to_sql(expr, params=params)
args = [
"FORMAT 'csv'",
f"HEADER {int(header)}",
*(f"{k.upper()} {v!r}" for k, v in kwargs.items()),
]
copy_cmd = f"COPY ({query}) TO {str(path)!r} ({', '.join(args)})"
with self.begin() as con:
con.exec_driver_sql(copy_cmd)

def fetch_from_cursor(
self, cursor: duckdb.DuckDBPyConnection, schema: sch.Schema
) -> pd.DataFrame:
import pandas as pd
import pyarrow.types as pat

Expand Down
6 changes: 4 additions & 2 deletions ibis/expr/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def _repr_markdown_(self) -> str:


@public
def to_sql(expr: ir.Expr, dialect: str | None = None) -> SQLString:
def to_sql(expr: ir.Expr, dialect: str | None = None, **kwargs) -> SQLString:
"""Return the formatted SQL string for an expression.
Parameters
Expand All @@ -355,6 +355,8 @@ def to_sql(expr: ir.Expr, dialect: str | None = None) -> SQLString:
Ibis expression.
dialect
SQL dialect to use for compilation.
kwargs
Scalar parameters
Returns
-------
Expand Down Expand Up @@ -382,6 +384,6 @@ def to_sql(expr: ir.Expr, dialect: str | None = None) -> SQLString:
else:
read = write = getattr(backend, "_sqlglot_dialect", dialect)

sql = backend._to_sql(expr)
sql = backend._to_sql(expr, **kwargs)
(pretty,) = sg.transpile(sql, read=read, write=write, pretty=True)
return SQLString(pretty)

0 comments on commit 6071bb5

Please sign in to comment.