Skip to content

Commit

Permalink
feat(python)!: Use Object Store instead of fsspec for read_parquet (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Dec 15, 2023
1 parent 345e741 commit fc03c4a
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 148 deletions.
38 changes: 27 additions & 11 deletions py-polars/polars/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,48 @@ def _is_local_file(file: str) -> bool:

@overload
def _prepare_file_arg(
file: str | list[str] | Path | BinaryIO | bytes, **kwargs: Any
file: str | list[str] | Path | BinaryIO | bytes,
encoding: str | None = ...,
*,
use_pyarrow: bool = ...,
raise_if_empty: bool = ...,
storage_options: dict[str, Any] | None = ...,
) -> ContextManager[str | BinaryIO]:
...


@overload
def _prepare_file_arg(
file: str | TextIO | Path | BinaryIO | bytes, **kwargs: Any
file: str | TextIO | Path | BinaryIO | bytes,
encoding: str | None = ...,
*,
use_pyarrow: bool = ...,
raise_if_empty: bool = ...,
storage_options: dict[str, Any] | None = ...,
) -> ContextManager[str | BinaryIO]:
...


@overload
def _prepare_file_arg(
file: str | list[str] | TextIO | Path | BinaryIO | bytes, **kwargs: Any
file: str | list[str] | Path | TextIO | BinaryIO | bytes,
encoding: str | None = ...,
*,
use_pyarrow: bool = ...,
raise_if_empty: bool = ...,
storage_options: dict[str, Any] | None = ...,
) -> ContextManager[str | list[str] | BinaryIO | list[BinaryIO]]:
...


def _prepare_file_arg(
file: str | list[str] | TextIO | Path | BinaryIO | bytes,
file: str | list[str] | Path | TextIO | BinaryIO | bytes,
encoding: str | None = None,
*,
use_pyarrow: bool | None = None,
use_pyarrow: bool = False,
raise_if_empty: bool = True,
**kwargs: Any,
) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]:
storage_options: dict[str, Any] | None = None,
) -> ContextManager[str | list[str] | BinaryIO | list[BinaryIO]]:
"""
Prepare file argument.
Expand All @@ -80,6 +95,7 @@ def _prepare_file_arg(
fsspec too.
"""
storage_options = storage_options or {}

# Small helper to use a variable as context
@contextmanager
Expand Down Expand Up @@ -167,8 +183,8 @@ def managed_file(file: Any) -> Iterator[Any]:
context=f"{file!r}",
raise_if_empty=raise_if_empty,
)
kwargs["encoding"] = encoding
return fsspec.open(file, **kwargs)
storage_options["encoding"] = encoding
return fsspec.open(file, **storage_options)

if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file):
if _FSSPEC_AVAILABLE:
Expand All @@ -182,8 +198,8 @@ def managed_file(file: Any) -> Iterator[Any]:
for f in file
]
)
kwargs["encoding"] = encoding
return fsspec.open_files(file, **kwargs)
storage_options["encoding"] = encoding
return fsspec.open_files(file, **storage_options)

if isinstance(file, str):
file = normalize_filepath(file, check_not_directory=check_not_dir)
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/io/csv/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def read_csv(
encoding=None,
use_pyarrow=True,
raise_if_empty=raise_if_empty,
**storage_options,
storage_options=storage_options,
) as data:
import pyarrow as pa
import pyarrow.csv
Expand Down Expand Up @@ -364,7 +364,7 @@ def read_csv(
encoding=encoding,
use_pyarrow=False,
raise_if_empty=raise_if_empty,
**storage_options,
storage_options=storage_options,
) as data:
df = pl.DataFrame._read_csv(
data,
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/io/ipc/anonymous_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _scan_ipc_fsspec(
func = partial(_scan_ipc_impl, source, storage_options=storage_options)

storage_options = storage_options or {}
with _prepare_file_arg(source, **storage_options) as data:
with _prepare_file_arg(source, storage_options=storage_options) as data:
schema = polars.io.ipc.read_ipc_schema(data)

return pl.LazyFrame._scan_python_function(schema, func)
Expand Down
10 changes: 6 additions & 4 deletions py-polars/polars/io/ipc/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def read_ipc(
"`n_rows` cannot be used with `use_pyarrow=True` and `memory_map=False`"
)

storage_options = storage_options or {}
with _prepare_file_arg(source, use_pyarrow=use_pyarrow, **storage_options) as data:
with _prepare_file_arg(
source, use_pyarrow=use_pyarrow, storage_options=storage_options
) as data:
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ModuleNotFoundError(
Expand Down Expand Up @@ -154,8 +155,9 @@ def read_ipc_stream(
DataFrame
"""
storage_options = storage_options or {}
with _prepare_file_arg(source, use_pyarrow=use_pyarrow, **storage_options) as data:
with _prepare_file_arg(
source, use_pyarrow=use_pyarrow, storage_options=storage_options
) as data:
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ModuleNotFoundError(
Expand Down
3 changes: 1 addition & 2 deletions py-polars/polars/io/parquet/anonymous_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ def _scan_parquet_fsspec(
) -> LazyFrame:
func = partial(_scan_parquet_impl, source, storage_options=storage_options)

storage_options = storage_options or {}
with _prepare_file_arg(source, **storage_options) as data:
with _prepare_file_arg(source, storage_options=storage_options) as data:
schema = polars.io.parquet.read_parquet_schema(data)

return pl.LazyFrame._scan_python_function(schema, func)
Expand Down
Loading

0 comments on commit fc03c4a

Please sign in to comment.