Skip to content

Commit

Permalink
add params and limit to the new to_* methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Chloe He committed Jun 27, 2024
1 parent 1e0bcf9 commit 194d1b6
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1169,9 +1169,11 @@ def _to_filesystem_output(
expr: ir.Expr,
format: str,
path: str | Path,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
options: Mapping[str, str] | None = None,
) -> StreamingQuery | None:
df = self._session.sql(expr.compile())
df = self._session.sql(expr.compile(params=params, limit=limit))
if self.mode == "batch":
df = df.write.format(format)

Check warning on line 1178 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L1178

Added line #L1178 was not covered by tests
for k, v in (options or {}).items():
Expand All @@ -1190,6 +1192,8 @@ def to_parquet_dir(
self,
expr: ir.Expr,
path: str | Path,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
options: Mapping[str, str] | None = None,
) -> StreamingQuery | None:
"""Write the results of executing the given expression to a parquet directory.
Expand All @@ -1200,6 +1204,11 @@ def to_parquet_dir(
The ibis expression to execute and persist to parquet.
path
The data source. A string or Path to the parquet directory.
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
options
Additional keyword arguments passed to pyspark.sql.streaming.DataStreamWriter
Expand All @@ -1209,13 +1218,15 @@ def to_parquet_dir(
Returns a Pyspark StreamingQuery object if in streaming mode, otherwise None
"""
self._run_pre_execute_hooks(expr)
return self._to_filesystem_output(expr, "parquet", path, options)
return self._to_filesystem_output(expr, "parquet", path, params, limit, options)

@util.experimental
def to_csv_dir(
self,
expr: ir.Expr,
path: str | Path,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
options: Mapping[str, str] | None = None,
) -> StreamingQuery | None:
"""Write the results of executing the given expression to a CSV directory.
Expand All @@ -1226,6 +1237,11 @@ def to_csv_dir(
The ibis expression to execute and persist to CSV.
path
The data source. A string or Path to the CSV directory.
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
options
Additional keyword arguments passed to pyspark.sql.streaming.DataStreamWriter
Expand All @@ -1235,4 +1251,4 @@ def to_csv_dir(
Returns a Pyspark StreamingQuery object if in streaming mode, otherwise None
"""
self._run_pre_execute_hooks(expr)
return self._to_filesystem_output(expr, "csv", path, options)
return self._to_filesystem_output(expr, "csv", path, params, limit, options)

0 comments on commit 194d1b6

Please sign in to comment.