Skip to content

Commit

Permalink
feat(output-formats): add support for to_parquet_dir (#9781)
Browse files Browse the repository at this point in the history
Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com>
Co-authored-by: Gil Forsyth <gforsyth@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 8, 2024
1 parent 0532a31 commit 80dfbe2
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
37 changes: 37 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,43 @@ def to_parquet(
for batch in batch_reader:
writer.write_batch(batch)

@util.experimental
def to_parquet_dir(
self,
expr: ir.Table,
directory: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a parquet file in a directory.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
The ibis expression to execute and persist to parquet.
directory
The data source. A string or Path to the directory where the parquet file will be written.
params
Mapping of scalar parameter expressions to value.
**kwargs
Additional keyword arguments passed to pyarrow.dataset.write_dataset
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html
"""
self._import_pyarrow()
import pyarrow.dataset as ds

# by default write_dataset creates the directory
with expr.to_pyarrow_batches(params=params) as batch_reader:
ds.write_dataset(
batch_reader, base_dir=directory, format="parquet", **kwargs
)

@util.experimental
def to_csv(
self,
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ def _to_filesystem_output(
df = df.write.format(format)
for k, v in (options or {}).items():
df = df.option(k, v)
df.save(path)
df.save(os.fspath(path))
return None
sq = df.writeStream.format(format)
sq = sq.option("path", os.fspath(path))
Expand Down
25 changes: 25 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,31 @@ def test_table_to_parquet(tmp_path, backend, awards_players):
)


def test_table_to_parquet_dir(tmp_path, backend, awards_players):
outparquet_dir = tmp_path / "out"

if backend.name() == "pyspark":
# pyspark already writes more than one file
awards_players.to_parquet_dir(outparquet_dir)
else:
# max_ force pyarrow to write more than one parquet file
awards_players.to_parquet_dir(
outparquet_dir, max_rows_per_file=3000, max_rows_per_group=3000
)

parquet_files = sorted(
outparquet_dir.glob("*.parquet"),
key=lambda path: int(path.with_suffix("").name.split("-")[1]),
)

df_list = [pd.read_parquet(file) for file in parquet_files]
df = pd.concat(df_list).reset_index(drop=True)

backend.assert_frame_equal(
awards_players.to_pandas().fillna(pd.NA), df.fillna(pd.NA)
)


@pytest.mark.notimpl(
["duckdb"],
reason="cannot inline WriteOptions objects",
Expand Down
27 changes: 27 additions & 0 deletions ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,33 @@ def to_parquet(
"""
self._find_backend(use_default=True).to_parquet(self, path, **kwargs)

@experimental
def to_parquet_dir(
self,
directory: str | Path,
*,
params: Mapping[ir.Scalar, Any] | None = None,
**kwargs: Any,
) -> None:
"""Write the results of executing the given expression to a parquet file in a directory.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
directory
The data target. A string or Path to the directory where the parquet file will be written.
params
Mapping of scalar parameter expressions to value.
**kwargs
Additional keyword arguments passed to pyarrow.dataset.write_dataset
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html
"""
self._find_backend(use_default=True).to_parquet_dir(self, directory, **kwargs)

@experimental
def to_csv(
self,
Expand Down

0 comments on commit 80dfbe2

Please sign in to comment.