diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 566e2501d32e..265d3ab76561 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -473,6 +473,43 @@ def to_parquet( for batch in batch_reader: writer.write_batch(batch) + @util.experimental + def to_parquet_dir( + self, + expr: ir.Table, + directory: str | Path, + *, + params: Mapping[ir.Scalar, Any] | None = None, + **kwargs: Any, + ) -> None: + """Write the results of executing the given expression to a parquet file in a directory. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + expr + The ibis expression to execute and persist to parquet. + directory + The data source. A string or Path to the directory where the parquet file will be written. + params + Mapping of scalar parameter expressions to value. + **kwargs + Additional keyword arguments passed to pyarrow.dataset.write_dataset + + https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html + + """ + self._import_pyarrow() + import pyarrow.dataset as ds + + # by default write_dataset creates the directory + with expr.to_pyarrow_batches(params=params) as batch_reader: + ds.write_dataset( + batch_reader, base_dir=directory, format="parquet", **kwargs + ) + @util.experimental def to_csv( self, diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 2c171394c320..11c62f290441 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -1286,7 +1286,7 @@ def _to_filesystem_output( df = df.write.format(format) for k, v in (options or {}).items(): df = df.option(k, v) - df.save(path) + df.save(os.fspath(path)) return None sq = df.writeStream.format(format) sq = sq.option("path", os.fspath(path)) diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index b59e94e5907a..1a0755584767 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -215,6 +215,31 @@ def test_table_to_parquet(tmp_path, backend, awards_players): ) +def test_table_to_parquet_dir(tmp_path, backend, awards_players): + outparquet_dir = tmp_path / "out" + + if backend.name() == "pyspark": + # pyspark already writes more than one file + awards_players.to_parquet_dir(outparquet_dir) + else: + # max_ force pyarrow to write more than one parquet file + awards_players.to_parquet_dir( + outparquet_dir, max_rows_per_file=3000, max_rows_per_group=3000 + ) + + parquet_files = sorted( + outparquet_dir.glob("*.parquet"), + key=lambda path: int(path.with_suffix("").name.split("-")[1]), + ) + + df_list = [pd.read_parquet(file) for file in parquet_files] + df = pd.concat(df_list).reset_index(drop=True) + + backend.assert_frame_equal( + awards_players.to_pandas().fillna(pd.NA), df.fillna(pd.NA) + ) + + @pytest.mark.notimpl( ["duckdb"], reason="cannot inline WriteOptions objects", diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index fe2dea4e4d0a..cbca4622507a 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -615,6 +615,33 @@ def to_parquet( """ self._find_backend(use_default=True).to_parquet(self, path, **kwargs) + @experimental + def to_parquet_dir( + self, + directory: str | Path, + *, + params: Mapping[ir.Scalar, Any] | None = None, + **kwargs: Any, + ) -> None: + """Write the results of executing the given expression to a parquet file in a directory. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + directory + The data target. A string or Path to the directory where the parquet file will be written. + params + Mapping of scalar parameter expressions to value. + **kwargs + Additional keyword arguments passed to pyarrow.dataset.write_dataset + + https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html + + """ + self._find_backend(use_default=True).to_parquet_dir(self, directory, **kwargs) + @experimental def to_csv( self,