diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index faa0e6dc963c..36924e63327d 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -427,7 +427,7 @@ def to_csv( The data source. A string or Path to the CSV file. params Mapping of scalar parameter expressions to value. - **kwargs + kwargs Additional keyword arguments passed to pyarrow.csv.CSVWriter https://arrow.apache.org/docs/python/generated/pyarrow.csv.CSVWriter.html @@ -708,7 +708,7 @@ def compile( """Compile an expression.""" return self.compiler.to_sql(expr, params=params) - def _to_sql(self, expr: ir.Expr) -> str: + def _to_sql(self, expr: ir.Expr, **kwargs) -> str: """Convert an expression to a SQL string. Called by `ibis.to_sql`/`ibis.show_sql`, gives the backend an diff --git a/ibis/backends/base/sql/__init__.py b/ibis/backends/base/sql/__init__.py index 197230ff8763..cefaaa7d8598 100644 --- a/ibis/backends/base/sql/__init__.py +++ b/ibis/backends/base/sql/__init__.py @@ -330,8 +330,8 @@ def compile( """ return self.compiler.to_ast_ensure_limit(expr, limit, params=params).compile() - def _to_sql(self, expr: ir.Expr) -> str: - return str(self.compile(expr)) + def _to_sql(self, expr: ir.Expr, **kwargs) -> str: + return str(self.compile(expr, **kwargs)) def explain( self, diff --git a/ibis/backends/base/sql/alchemy/__init__.py b/ibis/backends/base/sql/alchemy/__init__.py index d3ce35ba7c82..4da0ab57145e 100644 --- a/ibis/backends/base/sql/alchemy/__init__.py +++ b/ibis/backends/base/sql/alchemy/__init__.py @@ -114,12 +114,12 @@ def inspector(self): self._inspector.info_cache.clear() return self._inspector - def _to_sql(self, expr: ir.Expr) -> str: + def _to_sql(self, expr: ir.Expr, **kwargs) -> str: # For `ibis.to_sql` calls we render with literal binds and qmark params dialect_class = sa.dialects.registry.load( self.compiler.translator_class._dialect_name ) - sql = self.compile(expr).compile( + sql = self.compile(expr, **kwargs).compile( dialect=dialect_class(paramstyle="qmark"), compile_kwargs=dict(literal_binds=True), ) diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index cace008c8fdd..842f65fc4496 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -378,8 +378,8 @@ def compile(self, expr: ir.Expr, limit: str | None = None, params=None, **_: Any assert not isinstance(sql, sg.exp.Subquery) return sql.sql(dialect="clickhouse", pretty=True) - def _to_sql(self, expr: ir.Expr) -> str: - return str(self.compile(expr)) + def _to_sql(self, expr: ir.Expr, **kwargs) -> str: + return str(self.compile(expr, **kwargs)) def table(self, name: str, database: str | None = None) -> ir.Table: """Construct a table expression. diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 4e33cca3b6cf..eae2ddca0715 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -137,6 +137,8 @@ def do_connect( Path(temp_directory).mkdir(parents=True, exist_ok=True) config["temp_directory"] = str(temp_directory) + config.setdefault("experimental_parallel_csv", 1) + engine = sa.create_engine( f"duckdb:///{database}", connect_args=dict(read_only=read_only, config=config), @@ -556,11 +558,80 @@ def to_pyarrow( else: raise ValueError - def fetch_from_cursor( + @util.experimental + def to_parquet( + self, + expr: ir.Table, + path: str | Path, + *, + params: Mapping[ir.Scalar, Any] | None = None, + **kwargs: Any, + ) -> None: + """Write the results of executing the given expression to a parquet file. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + expr + The ibis expression to execute and persist to parquet. + path + The data source. A string or Path to the parquet file. + params + Mapping of scalar parameter expressions to value. + kwargs + DuckDB Parquet writer arguments. See + https://duckdb.org/docs/data/parquet#writing-to-parquet-files for + details + """ + query = self._to_sql(expr, params=params) + args = ["FORMAT 'parquet'", *(f"{k.upper()} {v!r}" for k, v in kwargs.items())] + copy_cmd = f"COPY ({query}) TO {str(path)!r} ({', '.join(args)})" + with self.begin() as con: + con.exec_driver_sql(copy_cmd) + + @util.experimental + def to_csv( self, - cursor: duckdb.DuckDBPyConnection, - schema: sch.Schema, - ): + expr: ir.Table, + path: str | Path, + *, + params: Mapping[ir.Scalar, Any] | None = None, + header: bool = True, + **kwargs: Any, + ) -> None: + """Write the results of executing the given expression to a CSV file. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + expr + The ibis expression to execute and persist to CSV. + path + The data source. A string or Path to the CSV file. + params + Mapping of scalar parameter expressions to value. + header + Whether to write the column names as the first line of the CSV file. + kwargs + DuckDB CSV writer arguments. https://duckdb.org/docs/data/csv.html#parameters + """ + query = self._to_sql(expr, params=params) + args = [ + "FORMAT 'csv'", + f"HEADER {int(header)}", + *(f"{k.upper()} {v!r}" for k, v in kwargs.items()), + ] + copy_cmd = f"COPY ({query}) TO {str(path)!r} ({', '.join(args)})" + with self.begin() as con: + con.exec_driver_sql(copy_cmd) + + def fetch_from_cursor( + self, cursor: duckdb.DuckDBPyConnection, schema: sch.Schema + ) -> pd.DataFrame: import pandas as pd import pyarrow.types as pat diff --git a/ibis/expr/sql.py b/ibis/expr/sql.py index c9184c4e1ca2..69a1b632d984 100644 --- a/ibis/expr/sql.py +++ b/ibis/expr/sql.py @@ -346,7 +346,7 @@ def _repr_markdown_(self) -> str: @public -def to_sql(expr: ir.Expr, dialect: str | None = None) -> SQLString: +def to_sql(expr: ir.Expr, dialect: str | None = None, **kwargs) -> SQLString: """Return the formatted SQL string for an expression. Parameters @@ -355,6 +355,8 @@ def to_sql(expr: ir.Expr, dialect: str | None = None) -> SQLString: Ibis expression. dialect SQL dialect to use for compilation. + kwargs + Scalar parameters Returns ------- @@ -382,6 +384,6 @@ def to_sql(expr: ir.Expr, dialect: str | None = None) -> SQLString: else: read = write = getattr(backend, "_sqlglot_dialect", dialect) - sql = backend._to_sql(expr) + sql = backend._to_sql(expr, **kwargs) (pretty,) = sg.transpile(sql, read=read, write=write, pretty=True) return SQLString(pretty)