From f2aebea52c39519ebec3086121617030ad47d3f5 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 14 Dec 2023 14:37:40 +0100 Subject: [PATCH 01/10] Move tests to the correct module --- py-polars/tests/unit/io/test_lazy_parquet.py | 30 -------------------- py-polars/tests/unit/io/test_parquet.py | 30 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index 5257c355e807..dcf77d9b29c4 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -97,17 +97,6 @@ def test_categorical_parquet_statistics(tmp_path: Path) -> None: assert df.shape == (4, 3) -@pytest.mark.write_disk() -def test_null_parquet(tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - - df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)]) - file_path = tmp_path / "null.parquet" - df.write_parquet(file_path) - out = pl.read_parquet(file_path) - assert_frame_equal(out, df) - - @pytest.mark.write_disk() def test_parquet_eq_stats(tmp_path: Path) -> None: tmp_path.mkdir(exist_ok=True) @@ -353,25 +342,6 @@ def test_streaming_categorical(tmp_path: Path) -> None: assert_frame_equal(result, expected) -@pytest.mark.write_disk() -def test_parquet_struct_categorical(tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - - df = pl.DataFrame( - [ - pl.Series("a", ["bob"], pl.Categorical), - pl.Series("b", ["foo"], pl.Categorical), - ] - ) - - file_path = tmp_path / "categorical.parquet" - df.write_parquet(file_path) - - with pl.StringCache(): - out = pl.read_parquet(file_path).select(pl.col("b").value_counts()) - assert out.to_dict(as_series=False) == {"b": [{"b": "foo", "count": 1}]} - - def test_glob_n_rows(io_files_path: Path) -> None: file_path = io_files_path / "foods*.parquet" df = pl.scan_parquet(file_path, n_rows=40).collect() diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 26a8e33549d5..bfee08f9f9fe 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -588,3 +588,33 @@ def test_parquet_12831() -> None: df.write_parquet(f, row_group_size=int(1e8), data_page_size=512) f.seek(0) assert_frame_equal(pl.from_arrow(pq.read_table(f)), df) # type: ignore[arg-type] + + +@pytest.mark.write_disk() +def test_parquet_struct_categorical(tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + + df = pl.DataFrame( + [ + pl.Series("a", ["bob"], pl.Categorical), + pl.Series("b", ["foo"], pl.Categorical), + ] + ) + + file_path = tmp_path / "categorical.parquet" + df.write_parquet(file_path) + + with pl.StringCache(): + out = pl.read_parquet(file_path).select(pl.col("b").value_counts()) + assert out.to_dict(as_series=False) == {"b": [{"b": "foo", "count": 1}]} + + +@pytest.mark.write_disk() +def test_null_parquet(tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + + df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)]) + file_path = tmp_path / "null.parquet" + df.write_parquet(file_path) + out = pl.read_parquet(file_path) + assert_frame_equal(out, df) From c6544c22b6ef6e0d2d6e0cb22763ad425657404b Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 14 Dec 2023 14:37:47 +0100 Subject: [PATCH 02/10] Minor util fix --- py-polars/polars/io/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/io/_utils.py b/py-polars/polars/io/_utils.py index 7590f8c871b9..f5dcb479cf9a 100644 --- a/py-polars/polars/io/_utils.py +++ b/py-polars/polars/io/_utils.py @@ -54,7 +54,7 @@ def _prepare_file_arg( file: str | list[str] | TextIO | Path | BinaryIO | bytes, encoding: str | None = None, *, - use_pyarrow: bool | None = None, + use_pyarrow: bool = False, raise_if_empty: bool = True, **kwargs: Any, ) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]: From 3e2e5e50414fc7c94b32c3317d54665e7ab7b11f Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 14 Dec 2023 14:38:16 +0100 Subject: [PATCH 03/10] Dispatch to scan_parquet --- py-polars/polars/io/parquet/functions.py | 209 +++++++++++++---------- 1 file changed, 122 insertions(+), 87 deletions(-) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index cfce8e1d085c..0ef7c455abdd 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO @@ -14,42 +15,31 @@ from polars.polars import read_parquet_schema as _read_parquet_schema if TYPE_CHECKING: - from io import BytesIO - from polars import DataFrame, DataType, LazyFrame from polars.type_aliases import ParallelStrategy def read_parquet( - source: str | Path | BinaryIO | BytesIO | bytes, + source: str | Path | list[str] | list[Path] | BinaryIO | BytesIO | bytes, *, columns: list[int] | list[str] | None = None, n_rows: int | None = None, - use_pyarrow: bool = False, - memory_map: bool = True, - storage_options: dict[str, Any] | None = None, - parallel: ParallelStrategy = "auto", row_count_name: str | None = None, row_count_offset: int = 0, - low_memory: bool = False, - pyarrow_options: dict[str, Any] | None = None, + parallel: ParallelStrategy = "auto", use_statistics: bool = True, + hive_partitioning: bool = True, rechunk: bool = True, + low_memory: bool = False, + storage_options: dict[str, Any] | None = None, + retries: int = 0, + use_pyarrow: bool = False, + pyarrow_options: dict[str, Any] | None = None, + memory_map: bool = True, ) -> DataFrame: """ Read into a DataFrame from a parquet file. - Notes - ----- - * Partitioned files: - If you have a directory-nested (hive-style) partitioned dataset, you should - use the :func:`scan_pyarrow_dataset` method instead. - * When benchmarking: - This operation defaults to a `rechunk` operation at the end, meaning that all - data will be stored continuously in memory. Set `rechunk=False` if you are - benchmarking the parquet-reader as `rechunk` can be an expensive operation - that should not contribute to the timings. - Parameters ---------- source @@ -62,63 +52,78 @@ def read_parquet( n_rows Stop reading from parquet file after reading `n_rows`. Only valid when `use_pyarrow=False`. - use_pyarrow - Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is - more stable. - memory_map - Memory map underlying file. This will likely increase performance. - Only used when `use_pyarrow=True`. - storage_options - Extra options that make sense for `fsspec.open()` or a particular storage - connection, e.g. host, port, username, password, etc. - parallel : {'auto', 'columns', 'row_groups', 'none'} - This determines the direction of parallelism. 'auto' will try to determine the - optimal direction. row_count_name If not None, this will insert a row count column with give name into the DataFrame. row_count_offset Offset to start the row_count column (only use if the name is set). - low_memory - Reduce memory pressure at the expense of performance. - pyarrow_options - Keyword arguments for `pyarrow.parquet.read_table - `_. + parallel : {'auto', 'columns', 'row_groups', 'none'} + This determines the direction of parallelism. 'auto' will try to determine the + optimal direction. use_statistics Use statistics in the parquet to determine if pages can be skipped from reading. + hive_partitioning + Infer statistics and schema from hive partitioned URL and use them + to prune reads. rechunk Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. + low_memory + Reduce memory pressure at the expense of performance. + storage_options + Extra options that make sense for `fsspec.open()` or a particular storage + connection, e.g. host, port, username, password, etc. + retries + Number of retries if accessing a cloud instance fails. + use_pyarrow + Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is + more stable. + pyarrow_options + Keyword arguments for `pyarrow.parquet.read_table + `_. + memory_map + Memory map underlying file. This will likely increase performance. + Only used when `use_pyarrow=True`. + + Returns + ------- + DataFrame See Also -------- scan_parquet scan_pyarrow_dataset - Returns - ------- - DataFrame - + Notes + ----- + * Partitioned files: + If you have a directory-nested (hive-style) partitioned dataset, you should + use the :func:`scan_pyarrow_dataset` method instead. + * When benchmarking: + This operation defaults to a `rechunk` operation at the end, meaning that all + data will be stored continuously in memory. Set `rechunk=False` if you are + benchmarking the parquet-reader as `rechunk` can be an expensive operation + that should not contribute to the timings. """ - if use_pyarrow and n_rows: - raise ValueError("`n_rows` cannot be used with `use_pyarrow=True`") - storage_options = storage_options or {} - pyarrow_options = pyarrow_options or {} - - with _prepare_file_arg( - source, use_pyarrow=use_pyarrow, **storage_options - ) as source_prep: - if use_pyarrow: - if not _PYARROW_AVAILABLE: - raise ModuleNotFoundError( - "'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`" - ) - import pyarrow as pa - import pyarrow.parquet + if use_pyarrow: + if not _PYARROW_AVAILABLE: + raise ModuleNotFoundError( + "'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`" + ) + if n_rows is not None: + raise ValueError("`n_rows` cannot be used with `use_pyarrow=True`") + import pyarrow as pa + import pyarrow.parquet + + pyarrow_options = pyarrow_options or {} + + with _prepare_file_arg( + source, use_pyarrow=use_pyarrow, **storage_options + ) as source_prep: return from_arrow( # type: ignore[return-value] pa.parquet.read_table( source_prep, @@ -128,17 +133,47 @@ def read_parquet( ) ) - return pl.DataFrame._read_parquet( - source_prep, - columns=columns, - n_rows=n_rows, - parallel=parallel, - row_count_name=row_count_name, - row_count_offset=row_count_offset, - low_memory=low_memory, - use_statistics=use_statistics, - rechunk=rechunk, - ) + if isinstance(source, (BinaryIO, BytesIO, bytes)): + with _prepare_file_arg( + source, use_pyarrow=use_pyarrow, **storage_options + ) as source_prep: + return pl.DataFrame._read_parquet( + source_prep, + columns=columns, + n_rows=n_rows, + parallel=parallel, + row_count_name=row_count_name, + row_count_offset=row_count_offset, + low_memory=low_memory, + use_statistics=use_statistics, + rechunk=rechunk, + ) + + print(type(source)) + + lf = scan_parquet( + source, + n_rows=n_rows, + row_count_name=row_count_name, + row_count_offset=row_count_offset, + parallel=parallel, + use_statistics=use_statistics, + hive_partitioning=hive_partitioning, + rechunk=rechunk, + low_memory=low_memory, + cache=False, + storage_options=storage_options, + retries=retries, + ) + + if columns is not None: + # Handle columns specified as ints + if columns and isinstance(columns[0], int): + columns = [lf.columns[i] for i in columns] + + lf = lf.select(columns) + + return lf.collect(no_optimization=True) def read_parquet_schema( @@ -170,15 +205,15 @@ def scan_parquet( source: str | Path | list[str] | list[Path], *, n_rows: int | None = None, - cache: bool = True, - parallel: ParallelStrategy = "auto", - rechunk: bool = True, row_count_name: str | None = None, row_count_offset: int = 0, - storage_options: dict[str, Any] | None = None, - low_memory: bool = False, + parallel: ParallelStrategy = "auto", use_statistics: bool = True, hive_partitioning: bool = True, + rechunk: bool = True, + low_memory: bool = False, + cache: bool = True, + storage_options: dict[str, Any] | None = None, retries: int = 0, ) -> LazyFrame: """ @@ -194,19 +229,27 @@ def scan_parquet( If a single path is given, it can be a globbing pattern. n_rows Stop reading from parquet file after reading `n_rows`. - cache - Cache the result after reading. + row_count_name + If not None, this will insert a row count column with the given name into the + DataFrame + row_count_offset + Offset to start the row_count column (only used if the name is set) parallel : {'auto', 'columns', 'row_groups', 'none'} This determines the direction of parallelism. 'auto' will try to determine the optimal direction. + use_statistics + Use statistics in the parquet to determine if pages + can be skipped from reading. + hive_partitioning + Infer statistics and schema from hive partitioned URL and use them + to prune reads. rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks. - row_count_name - If not None, this will insert a row count column with give name into the - DataFrame - row_count_offset - Offset to start the row_count column (only use if the name is set) + low_memory + Reduce memory pressure at the expense of performance. + cache + Cache the result after reading. storage_options Options that inform use how to connect to the cloud provider. If the cloud provider is not supported by us, the storage options @@ -220,14 +263,6 @@ def scan_parquet( If `storage_options` are not provided we will try to infer them from the environment variables. - low_memory - Reduce memory pressure at the expense of performance. - use_statistics - Use statistics in the parquet to determine if pages - can be skipped from reading. - hive_partitioning - Infer statistics and schema from hive partitioned URL and use them - to prune reads. retries Number of retries if accessing a cloud instance fails. From 242737cac1c8648002b6fc261ea97ece3c3bafa7 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 14 Dec 2023 15:04:16 +0100 Subject: [PATCH 04/10] Typing update --- py-polars/polars/io/parquet/functions.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 0ef7c455abdd..48b3f6ab9b14 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -9,7 +9,7 @@ from polars.convert import from_arrow from polars.dependencies import _PYARROW_AVAILABLE from polars.io._utils import _prepare_file_arg -from polars.utils.various import normalize_filepath +from polars.utils.various import is_int_sequence, normalize_filepath with contextlib.suppress(ImportError): from polars.polars import read_parquet_schema as _read_parquet_schema @@ -121,8 +121,11 @@ def read_parquet( pyarrow_options = pyarrow_options or {} + # TODO: Update _prepare_file_arg to handle list[Path] input with _prepare_file_arg( - source, use_pyarrow=use_pyarrow, **storage_options + source, # type: ignore[arg-type] + use_pyarrow=use_pyarrow, + **storage_options, ) as source_prep: return from_arrow( # type: ignore[return-value] pa.parquet.read_table( @@ -168,7 +171,7 @@ def read_parquet( if columns is not None: # Handle columns specified as ints - if columns and isinstance(columns[0], int): + if is_int_sequence(columns): columns = [lf.columns[i] for i in columns] lf = lf.select(columns) From 8facd8407de0f0eb1caf7bc419bae021f8a4dcad Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Thu, 14 Dec 2023 16:29:41 +0100 Subject: [PATCH 05/10] Remove failing test --- py-polars/polars/io/parquet/functions.py | 2 -- py-polars/tests/unit/io/cloud/test_aws.py | 1 - 2 files changed, 3 deletions(-) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 48b3f6ab9b14..ca71d7e55223 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -152,8 +152,6 @@ def read_parquet( rechunk=rechunk, ) - print(type(source)) - lf = scan_parquet( source, n_rows=n_rows, diff --git a/py-polars/tests/unit/io/cloud/test_aws.py b/py-polars/tests/unit/io/cloud/test_aws.py index d48ad8ec575e..03fb72b7eff4 100644 --- a/py-polars/tests/unit/io/cloud/test_aws.py +++ b/py-polars/tests/unit/io/cloud/test_aws.py @@ -58,7 +58,6 @@ def s3(s3_base: str, io_files_path: Path) -> str: [ (pl.read_csv, "csv"), (pl.read_ipc, "ipc"), - (pl.read_parquet, "parquet"), ], ) def test_read_s3(s3: str, function: Callable[..., Any], extension: str) -> None: From 113e790f08baf4b1094a98c52c24d8be816f8e3f Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Fri, 15 Dec 2023 04:57:44 +0100 Subject: [PATCH 06/10] try fix --- py-polars/polars/io/parquet/functions.py | 5 +++-- py-polars/polars/lazyframe/frame.py | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index ca71d7e55223..61e1e492e602 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -106,8 +106,6 @@ def read_parquet( benchmarking the parquet-reader as `rechunk` can be an expensive operation that should not contribute to the timings. """ - storage_options = storage_options or {} - if use_pyarrow: if not _PYARROW_AVAILABLE: raise ModuleNotFoundError( @@ -120,6 +118,7 @@ def read_parquet( import pyarrow.parquet pyarrow_options = pyarrow_options or {} + storage_options = storage_options or {} # TODO: Update _prepare_file_arg to handle list[Path] input with _prepare_file_arg( @@ -137,6 +136,8 @@ def read_parquet( ) if isinstance(source, (BinaryIO, BytesIO, bytes)): + storage_options = storage_options or {} + with _prepare_file_arg( source, use_pyarrow=use_pyarrow, **storage_options ) as source_prep: diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 7e5443be62ff..915a4a6d0f1b 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -450,6 +450,9 @@ def _scan_parquet( if storage_options is not None: storage_options = list(storage_options.items()) # type: ignore[assignment] + print(source) + print(storage_options) + self = cls.__new__(cls) self._ldf = PyLazyFrame.new_from_parquet( source, From 416c8bf11c5a538fa2bbd3c82681a5368946a47a Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Fri, 15 Dec 2023 05:20:10 +0100 Subject: [PATCH 07/10] Refactor prepare file args util --- py-polars/polars/io/_utils.py | 32 ++++++++++++++----- py-polars/polars/io/csv/functions.py | 4 +-- py-polars/polars/io/ipc/anonymous_scan.py | 2 +- py-polars/polars/io/ipc/functions.py | 10 +++--- py-polars/polars/io/parquet/anonymous_scan.py | 3 +- py-polars/polars/io/parquet/functions.py | 7 ++-- py-polars/polars/lazyframe/frame.py | 8 ++--- 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/py-polars/polars/io/_utils.py b/py-polars/polars/io/_utils.py index f5dcb479cf9a..9e3b6a84d63e 100644 --- a/py-polars/polars/io/_utils.py +++ b/py-polars/polars/io/_utils.py @@ -31,21 +31,36 @@ def _is_local_file(file: str) -> bool: @overload def _prepare_file_arg( - file: str | list[str] | Path | BinaryIO | bytes, **kwargs: Any + file: str | list[str] | Path | BinaryIO | bytes, + encoding: str | None = ..., + *, + use_pyarrow: bool = ..., + raise_if_empty: bool = ..., + storage_options: dict[str, Any] | None = ..., ) -> ContextManager[str | BinaryIO]: ... @overload def _prepare_file_arg( - file: str | TextIO | Path | BinaryIO | bytes, **kwargs: Any + file: str | TextIO | Path | BinaryIO | bytes, + encoding: str | None = ..., + *, + use_pyarrow: bool = ..., + raise_if_empty: bool = ..., + storage_options: dict[str, Any] | None = ..., ) -> ContextManager[str | BinaryIO]: ... @overload def _prepare_file_arg( - file: str | list[str] | TextIO | Path | BinaryIO | bytes, **kwargs: Any + file: str | list[str] | TextIO | Path | BinaryIO | bytes, + encoding: str | None = ..., + *, + use_pyarrow: bool = ..., + raise_if_empty: bool = ..., + storage_options: dict[str, Any] | None = ..., ) -> ContextManager[str | list[str] | BinaryIO | list[BinaryIO]]: ... @@ -56,7 +71,7 @@ def _prepare_file_arg( *, use_pyarrow: bool = False, raise_if_empty: bool = True, - **kwargs: Any, + storage_options: dict[str, Any] | None = None, ) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]: """ Prepare file argument. @@ -80,6 +95,7 @@ def _prepare_file_arg( fsspec too. """ + storage_options = storage_options or {} # Small helper to use a variable as context @contextmanager @@ -167,8 +183,8 @@ def managed_file(file: Any) -> Iterator[Any]: context=f"{file!r}", raise_if_empty=raise_if_empty, ) - kwargs["encoding"] = encoding - return fsspec.open(file, **kwargs) + storage_options["encoding"] = encoding + return fsspec.open(file, **storage_options) if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file): if _FSSPEC_AVAILABLE: @@ -182,8 +198,8 @@ def managed_file(file: Any) -> Iterator[Any]: for f in file ] ) - kwargs["encoding"] = encoding - return fsspec.open_files(file, **kwargs) + storage_options["encoding"] = encoding + return fsspec.open_files(file, **storage_options) if isinstance(file, str): file = normalize_filepath(file, check_not_directory=check_not_dir) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index cf0e93522512..299414adc1a7 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -230,7 +230,7 @@ def read_csv( encoding=None, use_pyarrow=True, raise_if_empty=raise_if_empty, - **storage_options, + storage_options=storage_options, ) as data: import pyarrow as pa import pyarrow.csv @@ -364,7 +364,7 @@ def read_csv( encoding=encoding, use_pyarrow=False, raise_if_empty=raise_if_empty, - **storage_options, + storage_options=storage_options, ) as data: df = pl.DataFrame._read_csv( data, diff --git a/py-polars/polars/io/ipc/anonymous_scan.py b/py-polars/polars/io/ipc/anonymous_scan.py index db8cfc802a4b..b6f59b44783f 100644 --- a/py-polars/polars/io/ipc/anonymous_scan.py +++ b/py-polars/polars/io/ipc/anonymous_scan.py @@ -18,7 +18,7 @@ def _scan_ipc_fsspec( func = partial(_scan_ipc_impl, source, storage_options=storage_options) storage_options = storage_options or {} - with _prepare_file_arg(source, **storage_options) as data: + with _prepare_file_arg(source, storage_options=storage_options) as data: schema = polars.io.ipc.read_ipc_schema(data) return pl.LazyFrame._scan_python_function(schema, func) diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index 3d520b5cc388..a9563f032c82 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -79,8 +79,9 @@ def read_ipc( "`n_rows` cannot be used with `use_pyarrow=True` and `memory_map=False`" ) - storage_options = storage_options or {} - with _prepare_file_arg(source, use_pyarrow=use_pyarrow, **storage_options) as data: + with _prepare_file_arg( + source, use_pyarrow=use_pyarrow, storage_options=storage_options + ) as data: if use_pyarrow: if not _PYARROW_AVAILABLE: raise ModuleNotFoundError( @@ -154,8 +155,9 @@ def read_ipc_stream( DataFrame """ - storage_options = storage_options or {} - with _prepare_file_arg(source, use_pyarrow=use_pyarrow, **storage_options) as data: + with _prepare_file_arg( + source, use_pyarrow=use_pyarrow, storage_options=storage_options + ) as data: if use_pyarrow: if not _PYARROW_AVAILABLE: raise ModuleNotFoundError( diff --git a/py-polars/polars/io/parquet/anonymous_scan.py b/py-polars/polars/io/parquet/anonymous_scan.py index 1b698094d7bd..eea410c72b9c 100644 --- a/py-polars/polars/io/parquet/anonymous_scan.py +++ b/py-polars/polars/io/parquet/anonymous_scan.py @@ -17,8 +17,7 @@ def _scan_parquet_fsspec( ) -> LazyFrame: func = partial(_scan_parquet_impl, source, storage_options=storage_options) - storage_options = storage_options or {} - with _prepare_file_arg(source, **storage_options) as data: + with _prepare_file_arg(source, storage_options=storage_options) as data: schema = polars.io.parquet.read_parquet_schema(data) return pl.LazyFrame._scan_python_function(schema, func) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 61e1e492e602..93e9947bae70 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -118,13 +118,12 @@ def read_parquet( import pyarrow.parquet pyarrow_options = pyarrow_options or {} - storage_options = storage_options or {} # TODO: Update _prepare_file_arg to handle list[Path] input with _prepare_file_arg( source, # type: ignore[arg-type] use_pyarrow=use_pyarrow, - **storage_options, + storage_options=storage_options, ) as source_prep: return from_arrow( # type: ignore[return-value] pa.parquet.read_table( @@ -136,10 +135,8 @@ def read_parquet( ) if isinstance(source, (BinaryIO, BytesIO, bytes)): - storage_options = storage_options or {} - with _prepare_file_arg( - source, use_pyarrow=use_pyarrow, **storage_options + source, use_pyarrow=use_pyarrow, storage_options=storage_options ) as source_prep: return pl.DataFrame._read_parquet( source_prep, diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 915a4a6d0f1b..ece772e9bac2 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -447,11 +447,11 @@ def _scan_parquet( scan = scan.with_row_count(row_count_name, row_count_offset) return scan # type: ignore[return-value] - if storage_options is not None: + if storage_options: storage_options = list(storage_options.items()) # type: ignore[assignment] - - print(source) - print(storage_options) + else: + # Handle empty dict input + storage_options = None self = cls.__new__(cls) self._ldf = PyLazyFrame.new_from_parquet( From 9bc7c6ced5927fefe827444a253f9517779254ce Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Fri, 15 Dec 2023 05:45:12 +0100 Subject: [PATCH 08/10] Minor update --- py-polars/polars/io/_utils.py | 6 +++--- py-polars/polars/io/parquet/functions.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/py-polars/polars/io/_utils.py b/py-polars/polars/io/_utils.py index 9e3b6a84d63e..55440d481ecf 100644 --- a/py-polars/polars/io/_utils.py +++ b/py-polars/polars/io/_utils.py @@ -55,7 +55,7 @@ def _prepare_file_arg( @overload def _prepare_file_arg( - file: str | list[str] | TextIO | Path | BinaryIO | bytes, + file: str | list[str] | Path | TextIO | BinaryIO | bytes, encoding: str | None = ..., *, use_pyarrow: bool = ..., @@ -66,13 +66,13 @@ def _prepare_file_arg( def _prepare_file_arg( - file: str | list[str] | TextIO | Path | BinaryIO | bytes, + file: str | list[str] | Path | TextIO | BinaryIO | bytes, encoding: str | None = None, *, use_pyarrow: bool = False, raise_if_empty: bool = True, storage_options: dict[str, Any] | None = None, -) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]: +) -> ContextManager[str | list[str] | BinaryIO | list[BinaryIO]]: """ Prepare file argument. diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 93e9947bae70..b7aa9c8c7144 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -119,10 +119,9 @@ def read_parquet( pyarrow_options = pyarrow_options or {} - # TODO: Update _prepare_file_arg to handle list[Path] input with _prepare_file_arg( source, # type: ignore[arg-type] - use_pyarrow=use_pyarrow, + use_pyarrow=True, storage_options=storage_options, ) as source_prep: return from_arrow( # type: ignore[return-value] @@ -136,7 +135,7 @@ def read_parquet( if isinstance(source, (BinaryIO, BytesIO, bytes)): with _prepare_file_arg( - source, use_pyarrow=use_pyarrow, storage_options=storage_options + source, use_pyarrow=False, storage_options=storage_options ) as source_prep: return pl.DataFrame._read_parquet( source_prep, From 65638f08e88ace62fc069e4d0d4707cc9c7f0d14 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Fri, 15 Dec 2023 06:04:25 +0100 Subject: [PATCH 09/10] Cleanup --- py-polars/polars/io/parquet/functions.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index b7aa9c8c7144..92f2172dcd78 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -44,8 +44,7 @@ def read_parquet( ---------- source Path to a file, or a file-like object. If the path is a directory, files in that - directory will all be read. If `fsspec` is installed, it will be used to open - remote files. + directory will all be read. columns Columns to select. Accepts a list of column indices (starting at zero) or a list of column names. @@ -106,6 +105,7 @@ def read_parquet( benchmarking the parquet-reader as `rechunk` can be an expensive operation that should not contribute to the timings. """ + # Dispatch to pyarrow if requested if use_pyarrow: if not _PYARROW_AVAILABLE: raise ModuleNotFoundError( @@ -133,10 +133,9 @@ def read_parquet( ) ) - if isinstance(source, (BinaryIO, BytesIO, bytes)): - with _prepare_file_arg( - source, use_pyarrow=False, storage_options=storage_options - ) as source_prep: + # Read binary types using `read_parquet` + elif isinstance(source, (BinaryIO, BytesIO, bytes)): + with _prepare_file_arg(source, use_pyarrow=False) as source_prep: return pl.DataFrame._read_parquet( source_prep, columns=columns, @@ -149,6 +148,7 @@ def read_parquet( rechunk=rechunk, ) + # For other inputs, defer to `scan_parquet` lf = scan_parquet( source, n_rows=n_rows, @@ -165,10 +165,8 @@ def read_parquet( ) if columns is not None: - # Handle columns specified as ints if is_int_sequence(columns): columns = [lf.columns[i] for i in columns] - lf = lf.select(columns) return lf.collect(no_optimization=True) From 643dce6ed1bc2051d1cb615c4d0d4a07ee96f6b3 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Fri, 15 Dec 2023 10:55:11 +0100 Subject: [PATCH 10/10] Update docstring --- py-polars/polars/io/parquet/functions.py | 26 +++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 92f2172dcd78..82b46aee45f1 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -71,8 +71,19 @@ def read_parquet( low_memory Reduce memory pressure at the expense of performance. storage_options - Extra options that make sense for `fsspec.open()` or a particular storage - connection, e.g. host, port, username, password, etc. + Options that indicate how to connect to a cloud provider. + If the cloud provider is not supported by Polars, the storage options + are passed to `fsspec.open()`. + + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. retries Number of retries if accessing a cloud instance fails. use_pyarrow @@ -247,18 +258,19 @@ def scan_parquet( cache Cache the result after reading. storage_options - Options that inform use how to connect to the cloud provider. - If the cloud provider is not supported by us, the storage options + Options that indicate how to connect to a cloud provider. + If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`. - Currently supported providers are: {'aws', 'gcp', 'azure' }. + + The cloud providers currently supported are AWS, GCP, and Azure. See supported keys here: * `aws `_ * `gcp `_ * `azure `_ - If `storage_options` are not provided we will try to infer them from the - environment variables. + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. retries Number of retries if accessing a cloud instance fails.