From 8f7dd3a053303117ab479e5addc58b25858043e6 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:07:02 +1000 Subject: [PATCH 1/8] c --- crates/polars-io/src/path_utils/mod.rs | 12 +- crates/polars-io/src/utils/other.rs | 5 +- crates/polars-lazy/src/scan/ndjson.rs | 10 ++ .../polars-plan/src/plans/conversion/scans.rs | 7 +- crates/polars-plan/src/plans/options.rs | 1 + crates/polars-utils/src/io.rs | 6 +- py-polars/polars/io/csv/functions.py | 85 ++++++++++-- py-polars/polars/io/ipc/functions.py | 38 +++++- py-polars/polars/io/ndjson.py | 128 ++++++++++++++---- py-polars/src/lazyframe/mod.rs | 4 +- py-polars/tests/unit/io/test_csv.py | 42 ++++-- 11 files changed, 265 insertions(+), 73 deletions(-) diff --git a/crates/polars-io/src/path_utils/mod.rs b/crates/polars-io/src/path_utils/mod.rs index be1f0c947df7..62f77c7c2655 100644 --- a/crates/polars-io/src/path_utils/mod.rs +++ b/crates/polars-io/src/path_utils/mod.rs @@ -146,6 +146,8 @@ pub fn expand_paths_hive( if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } { #[cfg(feature = "cloud")] { + use polars_utils::_limit_path_len_io_err; + use crate::cloud::object_path_from_string; if first_path.starts_with("hf://") { @@ -199,14 +201,8 @@ pub fn expand_paths_hive( // indistinguishable from an empty directory. let path = PathBuf::from(path); if !path.is_dir() { - path.metadata().map_err(|err| { - let msg = - Some(format!("{}: {}", err, path.to_str().unwrap()).into()); - PolarsError::IO { - error: err.into(), - msg, - } - })?; + path.metadata() + .map_err(|err| _limit_path_len_io_err(&path, err))?; } } diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index a573c3b89f9b..648c678bf814 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -168,10 +168,7 @@ pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], off } #[cfg(feature = "json")] -pub(crate) fn overwrite_schema( - schema: &mut Schema, - overwriting_schema: &Schema, -) -> PolarsResult<()> { +pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> { for (k, value) in overwriting_schema.iter() { *schema.try_get_mut(k)? = value.clone(); } diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 029c5253cbb4..fcc110248607 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -18,6 +18,7 @@ pub struct LazyJsonLineReader { pub(crate) low_memory: bool, pub(crate) rechunk: bool, pub(crate) schema: Option, + pub(crate) schema_overwrite: Option, pub(crate) row_index: Option, pub(crate) infer_schema_length: Option, pub(crate) n_rows: Option, @@ -38,6 +39,7 @@ impl LazyJsonLineReader { low_memory: false, rechunk: false, schema: None, + schema_overwrite: None, row_index: None, infer_schema_length: NonZeroUsize::new(100), ignore_errors: false, @@ -82,6 +84,13 @@ impl LazyJsonLineReader { self } + /// Set the JSON file's schema + #[must_use] + pub fn with_schema_overwrite(mut self, schema_overwrite: Option) -> Self { + self.schema_overwrite = schema_overwrite; + self + } + /// Reduce memory usage at the expense of performance #[must_use] pub fn low_memory(mut self, toggle: bool) -> Self { @@ -129,6 +138,7 @@ impl LazyFileListReader for LazyJsonLineReader { low_memory: self.low_memory, ignore_errors: self.ignore_errors, schema: self.schema, + schema_overwrite: self.schema_overwrite, }; let scan_type = FileScan::NDJson { diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 8a8404e29622..9ecfa85382a2 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -325,7 +325,7 @@ pub(super) fn ndjson_file_info( }; let mut reader = std::io::BufReader::new(f); - let (reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { + let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { if file_options.row_index.is_none() { (schema.clone(), schema.clone()) } else { @@ -340,6 +340,11 @@ pub(super) fn ndjson_file_info( prepare_schemas(schema, file_options.row_index.as_ref()) }; + if let Some(overwriting_schema) = &ndjson_options.schema_overwrite { + let schema = Arc::make_mut(&mut reader_schema); + overwrite_schema(schema, overwriting_schema)?; + } + Ok(FileInfo::new( schema, Some(Either::Right(reader_schema)), diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index 75b581d2e6b8..dfae2c5917ec 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -359,4 +359,5 @@ pub struct NDJsonReadOptions { pub low_memory: bool, pub ignore_errors: bool, pub schema: Option, + pub schema_overwrite: Option, } diff --git a/crates/polars-utils/src/io.rs b/crates/polars-utils/src/io.rs index a943f9e5cbf5..5fb273e25d16 100644 --- a/crates/polars-utils/src/io.rs +++ b/crates/polars-utils/src/io.rs @@ -4,7 +4,7 @@ use std::path::Path; use polars_error::*; -fn map_err(path: &Path, err: io::Error) -> PolarsError { +pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError { let path = path.to_string_lossy(); let msg = if path.len() > 88 { let truncated_path: String = path.chars().skip(path.len() - 88).collect(); @@ -19,12 +19,12 @@ pub fn open_file

(path: P) -> PolarsResult where P: AsRef, { - File::open(&path).map_err(|err| map_err(path.as_ref(), err)) + File::open(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err)) } pub fn create_file

(path: P) -> PolarsResult where P: AsRef, { - File::create(&path).map_err(|err| map_err(path.as_ref(), err)) + File::create(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err)) } diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 3b0fa4cf808a..2a0e7c3de865 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -1,11 +1,13 @@ from __future__ import annotations import contextlib +import os from io import BytesIO, StringIO from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Callable, Mapping, Sequence import polars._reexport as pl +import polars.functions as F from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import ( _process_null_values, @@ -419,17 +421,30 @@ def read_csv( if not infer_schema: infer_schema_length = 0 - with prepare_file_arg( - source, - encoding=encoding, - use_pyarrow=False, - raise_if_empty=raise_if_empty, - storage_options=storage_options, - ) as data: - df = _read_csv_impl( - data, + if ( + # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType]) + schema_overrides is None or isinstance(schema_overrides, dict) + ) and ( + ( + # Check that it is not a BytesIO object + isinstance(v := source, (str, Path)) + ) + and ( + # HuggingFace only for now ⊂( ◜◒◝ )⊃ + str(v).startswith("hf://") + # Also dispatch on FORCE_ASYNC, so that this codepath gets run + # through by our test suite during CI. + or os.getenv("POLARS_FORCE_ASYNC") == "1" + # TODO: + # We can't dispatch this for all paths due to a few reasons: + # * `scan_csv` does not support compressed files + # * The `storage_options` configuration keys are different between + # fsspec and object_store (would require a breaking change) + ) + ): + lf = _scan_csv_impl( + source, # type: ignore[arg-type] has_header=has_header, - columns=columns if columns else projection, separator=separator, comment_prefix=comment_prefix, quote_char=quote_char, @@ -440,9 +455,7 @@ def read_csv( missing_utf8_is_empty_string=missing_utf8_is_empty_string, ignore_errors=ignore_errors, try_parse_dates=try_parse_dates, - n_threads=n_threads, infer_schema_length=infer_schema_length, - batch_size=batch_size, n_rows=n_rows, encoding=encoding if encoding == "utf8-lossy" else "utf8", low_memory=low_memory, @@ -450,7 +463,6 @@ def read_csv( skip_rows_after_header=skip_rows_after_header, row_index_name=row_index_name, row_index_offset=row_index_offset, - sample_size=sample_size, eol_char=eol_char, raise_if_empty=raise_if_empty, truncate_ragged_lines=truncate_ragged_lines, @@ -458,6 +470,53 @@ def read_csv( glob=glob, ) + if columns: + lf = lf.select(columns) + elif projection: + lf = lf.select(F.nth(projection)) + + df = lf.collect() + + else: + with prepare_file_arg( + source, + encoding=encoding, + use_pyarrow=False, + raise_if_empty=raise_if_empty, + storage_options=storage_options, + ) as data: + df = _read_csv_impl( + data, + has_header=has_header, + columns=columns if columns else projection, + separator=separator, + comment_prefix=comment_prefix, + quote_char=quote_char, + skip_rows=skip_rows, + schema_overrides=schema_overrides, + schema=schema, + null_values=null_values, + missing_utf8_is_empty_string=missing_utf8_is_empty_string, + ignore_errors=ignore_errors, + try_parse_dates=try_parse_dates, + n_threads=n_threads, + infer_schema_length=infer_schema_length, + batch_size=batch_size, + n_rows=n_rows, + encoding=encoding if encoding == "utf8-lossy" else "utf8", + low_memory=low_memory, + rechunk=rechunk, + skip_rows_after_header=skip_rows_after_header, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + sample_size=sample_size, + eol_char=eol_char, + raise_if_empty=raise_if_empty, + truncate_ragged_lines=truncate_ragged_lines, + decimal_comma=decimal_comma, + glob=glob, + ) + if new_columns: return _update_columns(df, new_columns) return df diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index c07e32790a7d..e2f03f5575db 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -1,10 +1,12 @@ from __future__ import annotations import contextlib +import os from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Sequence import polars._reexport as pl +import polars.functions as F from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import ( is_str_sequence, @@ -29,8 +31,6 @@ from polars._typing import SchemaDict -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def read_ipc( source: str | Path | IO[bytes] | bytes, *, @@ -92,6 +92,38 @@ def read_ipc( That means that you cannot write to the same filename. E.g. `pl.read_ipc("my_file.arrow").write_ipc("my_file.arrow")` will fail. """ + if ( + # Check that it is not a BytesIO object + isinstance(v := source, (str, Path)) + ) and ( + # HuggingFace only for now ⊂( ◜◒◝ )⊃ + str(v).startswith("hf://") + # Also dispatch on FORCE_ASYNC, so that this codepath gets run + # through by our test suite during CI. + or os.getenv("POLARS_FORCE_ASYNC") == "1" + # TODO: Dispatch all paths to `scan_ipc` - this will need a breaking + # change to the `storage_options` parameter. + ): + lf = scan_ipc( + source, # type: ignore[arg-type] + n_rows=n_rows, + memory_map=memory_map, + storage_options=storage_options, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + rechunk=rechunk, + ) + + if columns: + if isinstance(columns[0], int): + lf = lf.select(F.nth(columns)) # type: ignore[arg-type] + else: + lf = lf.select(columns) + + df = lf.collect() + + return df + if use_pyarrow and n_rows and not memory_map: msg = "`n_rows` cannot be used with `use_pyarrow=True` and `memory_map=False`" raise ValueError(msg) @@ -305,8 +337,6 @@ def read_ipc_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, DataTyp return _read_ipc_schema(source) -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def scan_ipc( source: str | Path | list[str] | list[Path], *, diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index 373d0de5248f..739daa7d0c36 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -3,9 +3,8 @@ import contextlib from io import BytesIO, StringIO from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Sequence -from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import normalize_filepath from polars._utils.wrap import wrap_df, wrap_ldf from polars.datatypes import N_INFER_DEFAULT @@ -22,11 +21,22 @@ def read_ndjson( - source: str | Path | IOBase | bytes, + source: str | Path | list[str] | list[Path] | IOBase | bytes, *, schema: SchemaDefinition | None = None, schema_overrides: SchemaDefinition | None = None, + infer_schema_length: int | None = N_INFER_DEFAULT, + batch_size: int | None = 1024, + n_rows: int | None = None, + low_memory: bool = False, + rechunk: bool = False, + row_index_name: str | None = None, + row_index_offset: int = 0, ignore_errors: bool = False, + storage_options: dict[str, Any] | None = None, + retries: int = 2, + file_cache_ttl: int | None = None, + include_file_paths: str | None = None, ) -> DataFrame: r""" Read into a DataFrame from a newline delimited JSON file. @@ -52,8 +62,46 @@ def read_ndjson( schema_overrides : dict, default None Support type specification or override of one or more columns; note that any dtypes inferred from the schema param will be overridden. + infer_schema_length + The maximum number of rows to scan for schema inference. + If set to `None`, the full data may be scanned *(this is slow)*. + batch_size + Number of rows to read in each batch. + n_rows + Stop reading from JSON file after reading `n_rows`. + low_memory + Reduce memory pressure at the expense of performance. + rechunk + Reallocate to contiguous memory when all chunks/ files are parsed. + row_index_name + If not None, this will insert a row index column with give name into the + DataFrame + row_index_offset + Offset to start the row index column (only use if the name is set) ignore_errors Return `Null` if parsing fails because of schema mismatches. + storage_options + Options that indicate how to connect to a cloud provider. + + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. + retries + Number of retries if accessing a cloud instance fails. + file_cache_ttl + Amount of time to keep downloaded cloud files since their last access time, + in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable + (which defaults to 1 hour) if not given. + include_file_paths + Include the path of the source file(s) as a column with this name. Examples -------- @@ -71,26 +119,54 @@ def read_ndjson( │ 3 ┆ 8 │ └─────┴─────┘ """ - if isinstance(source, StringIO): - source = BytesIO(source.getvalue().encode()) - elif isinstance(source, (str, Path)): - source = normalize_filepath(source) + if not ( + isinstance(source, (str, Path)) + or isinstance(source, Sequence) + and source + and isinstance(source[0], (str, Path)) + ): + # TODO: A lot of the parameters aren't applied for BytesIO + if isinstance(source, StringIO): + source = BytesIO(source.getvalue().encode()) - pydf = PyDataFrame.read_ndjson( - source, - ignore_errors=ignore_errors, + pydf = PyDataFrame.read_ndjson( + source, + ignore_errors=ignore_errors, + schema=schema, + schema_overrides=schema_overrides, + ) + + df = wrap_df(pydf) + + if n_rows: + df = df.head(n_rows) + + return df + + return scan_ndjson( + source, # type: ignore[arg-type] schema=schema, schema_overrides=schema_overrides, - ) - return wrap_df(pydf) + infer_schema_length=infer_schema_length, + batch_size=batch_size, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + ignore_errors=ignore_errors, + include_file_paths=include_file_paths, + retries=retries, + storage_options=storage_options, + file_cache_ttl=file_cache_ttl, + ).collect() -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def scan_ndjson( source: str | Path | list[str] | list[Path], *, schema: SchemaDefinition | None = None, + schema_overrides: SchemaDefinition | None = None, infer_schema_length: int | None = N_INFER_DEFAULT, batch_size: int | None = 1024, n_rows: int | None = None, @@ -124,6 +200,9 @@ def scan_ndjson( If you supply a list of column names that does not match the names in the underlying data, the names given here will overwrite them. The number of names given in the schema should match the underlying data dimensions. + schema_overrides : dict, default None + Support type specification or override of one or more columns; note that + any dtypes inferred from the schema param will be overridden. infer_schema_length The maximum number of rows to scan for schema inference. If set to `None`, the full data may be scanned *(this is slow)*. @@ -184,16 +263,17 @@ def scan_ndjson( storage_options = None pylf = PyLazyFrame.new_from_ndjson( - source, - sources, - infer_schema_length, - schema, - batch_size, - n_rows, - low_memory, - rechunk, - parse_row_index_args(row_index_name, row_index_offset), - ignore_errors, + path=source, + paths=sources, + infer_schema_length=infer_schema_length, + schema=schema, + schema_overrides=schema_overrides, + batch_size=batch_size, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + row_index=parse_row_index_args(row_index_name, row_index_offset), + ignore_errors=ignore_errors, include_file_paths=include_file_paths, retries=retries, cloud_options=storage_options, diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index a60839f2f3e7..3aedd69087a9 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -45,7 +45,7 @@ impl PyLazyFrame { #[cfg(feature = "json")] #[allow(clippy::too_many_arguments)] #[pyo3(signature = ( - path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, + path, paths, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk, row_index, ignore_errors, include_file_paths, cloud_options, retries, file_cache_ttl ))] fn new_from_ndjson( @@ -53,6 +53,7 @@ impl PyLazyFrame { paths: Vec, infer_schema_length: Option, schema: Option>, + schema_overrides: Option>, batch_size: Option, n_rows: Option, low_memory: bool, @@ -109,6 +110,7 @@ impl PyLazyFrame { .low_memory(low_memory) .with_rechunk(rechunk) .with_schema(schema.map(|schema| Arc::new(schema.0))) + .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0))) .with_row_index(row_index) .with_ignore_errors(ignore_errors) .with_include_file_paths(include_file_paths.map(Arc::from)) diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 772a1ea74d7f..74f7402cf2ec 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -527,7 +527,9 @@ def test_column_rename_and_dtype_overwrite() -> None: assert df.dtypes == [pl.String, pl.Int64, pl.Float32] -def test_compressed_csv(io_files_path: Path) -> None: +def test_compressed_csv(io_files_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") + # gzip compression csv = textwrap.dedent( """\ @@ -868,18 +870,24 @@ def test_csv_globbing(io_files_path: Path) -> None: df = pl.read_csv(path) assert df.shape == (135, 4) - with pytest.raises(ValueError): - _ = pl.read_csv(path, columns=[0, 1]) + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + + with pytest.raises(ValueError): + _ = pl.read_csv(path, columns=[0, 1]) df = pl.read_csv(path, columns=["category", "sugars_g"]) assert df.shape == (135, 2) assert df.row(-1) == ("seafood", 1) assert df.row(0) == ("vegetables", 2) - with pytest.raises(ValueError): - _ = pl.read_csv( - path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64] - ) + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + + with pytest.raises(ValueError): + _ = pl.read_csv( + path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64] + ) dtypes = { "category": pl.String, @@ -2170,14 +2178,18 @@ def test_csv_float_decimal() -> None: pl.read_csv(floats, decimal_comma=True) -def test_fsspec_not_available(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr("polars.io._utils._FSSPEC_AVAILABLE", False) - with pytest.raises( - ImportError, match=r"`fsspec` is required for `storage_options` argument" - ): - pl.read_csv( - "s3://foods/cabbage.csv", storage_options={"key": "key", "secret": "secret"} - ) +def test_fsspec_not_available() -> None: + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + mp.setattr("polars.io._utils._FSSPEC_AVAILABLE", False) + + with pytest.raises( + ImportError, match=r"`fsspec` is required for `storage_options` argument" + ): + pl.read_csv( + "s3://foods/cabbage.csv", + storage_options={"key": "key", "secret": "secret"}, + ) @pytest.mark.write_disk() From b7de6ed7aafbb4b837b1cfca9852d8405d1152f3 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:08:09 +1000 Subject: [PATCH 2/8] c --- py-polars/polars/io/csv/functions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 2a0e7c3de865..299b4b0a3b48 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -435,8 +435,7 @@ def read_csv( # Also dispatch on FORCE_ASYNC, so that this codepath gets run # through by our test suite during CI. or os.getenv("POLARS_FORCE_ASYNC") == "1" - # TODO: - # We can't dispatch this for all paths due to a few reasons: + # TODO: We can't dispatch this for all paths due to a few reasons: # * `scan_csv` does not support compressed files # * The `storage_options` configuration keys are different between # fsspec and object_store (would require a breaking change) From eb0152ca247cb453d58e7b03199afdcad9b2826d Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:11:03 +1000 Subject: [PATCH 3/8] c --- py-polars/polars/io/ipc/functions.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index e2f03f5575db..0617bafd0961 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -97,13 +97,17 @@ def read_ipc( isinstance(v := source, (str, Path)) ) and ( # HuggingFace only for now ⊂( ◜◒◝ )⊃ - str(v).startswith("hf://") + (is_hf := str(v).startswith("hf://")) # Also dispatch on FORCE_ASYNC, so that this codepath gets run # through by our test suite during CI. or os.getenv("POLARS_FORCE_ASYNC") == "1" # TODO: Dispatch all paths to `scan_ipc` - this will need a breaking # change to the `storage_options` parameter. ): + if is_hf and use_pyarrow: + msg = f"`use_pyarrow=True` is not supported for Hugging Face" + raise ValueError(msg) + lf = scan_ipc( source, # type: ignore[arg-type] n_rows=n_rows, From d2d70d816379d2a60fca75d3a9ad06f7e5cd728e Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:13:32 +1000 Subject: [PATCH 4/8] c --- py-polars/polars/io/ipc/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index 0617bafd0961..ce05d19eaf0c 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -105,7 +105,7 @@ def read_ipc( # change to the `storage_options` parameter. ): if is_hf and use_pyarrow: - msg = f"`use_pyarrow=True` is not supported for Hugging Face" + msg = "`use_pyarrow=True` is not supported for Hugging Face" raise ValueError(msg) lf = scan_ipc( From 419797add8fb4285a3b95157c48eba1ab52c58f4 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:27:26 +1000 Subject: [PATCH 5/8] c --- py-polars/polars/io/csv/functions.py | 35 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 299b4b0a3b48..2205759b7de3 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -421,26 +421,27 @@ def read_csv( if not infer_schema: infer_schema_length = 0 + # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType]) + schema_overrides_is_list = isinstance(schema_overrides, Sequence) + if ( - # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType]) - schema_overrides is None or isinstance(schema_overrides, dict) + # Check that it is not a BytesIO object + isinstance(v := source, (str, Path)) ) and ( - ( - # Check that it is not a BytesIO object - isinstance(v := source, (str, Path)) - ) - and ( - # HuggingFace only for now ⊂( ◜◒◝ )⊃ - str(v).startswith("hf://") - # Also dispatch on FORCE_ASYNC, so that this codepath gets run - # through by our test suite during CI. - or os.getenv("POLARS_FORCE_ASYNC") == "1" - # TODO: We can't dispatch this for all paths due to a few reasons: - # * `scan_csv` does not support compressed files - # * The `storage_options` configuration keys are different between - # fsspec and object_store (would require a breaking change) - ) + # HuggingFace only for now ⊂( ◜◒◝ )⊃ + str(v).startswith("hf://") + # Also dispatch on FORCE_ASYNC, so that this codepath gets run + # through by our test suite during CI. + or (os.getenv("POLARS_FORCE_ASYNC") == "1" and not schema_overrides_is_list) + # TODO: We can't dispatch this for all paths due to a few reasons: + # * `scan_csv` does not support compressed files + # * The `storage_options` configuration keys are different between + # fsspec and object_store (would require a breaking change) ): + if schema_overrides_is_list: + msg = "passing a list to `schema_overrides` is unsupported for hf:// paths" + raise ValueError(msg) + lf = _scan_csv_impl( source, # type: ignore[arg-type] has_header=has_header, From f6f24a5c6364f4734bfb1e820bc65f35f5c8883a Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:33:26 +1000 Subject: [PATCH 6/8] c --- py-polars/polars/io/csv/functions.py | 12 ++++++++++-- py-polars/tests/unit/io/test_csv.py | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 2205759b7de3..24acff94bf66 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -423,6 +423,7 @@ def read_csv( # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType]) schema_overrides_is_list = isinstance(schema_overrides, Sequence) + encoding_supported_in_lazy = encoding in {"utf8", "utf8-lossy"} if ( # Check that it is not a BytesIO object @@ -432,7 +433,11 @@ def read_csv( str(v).startswith("hf://") # Also dispatch on FORCE_ASYNC, so that this codepath gets run # through by our test suite during CI. - or (os.getenv("POLARS_FORCE_ASYNC") == "1" and not schema_overrides_is_list) + or ( + os.getenv("POLARS_FORCE_ASYNC") == "1" + and not schema_overrides_is_list + and encoding_supported_in_lazy + ) # TODO: We can't dispatch this for all paths due to a few reasons: # * `scan_csv` does not support compressed files # * The `storage_options` configuration keys are different between @@ -441,6 +446,9 @@ def read_csv( if schema_overrides_is_list: msg = "passing a list to `schema_overrides` is unsupported for hf:// paths" raise ValueError(msg) + if not encoding_supported_in_lazy: + msg = f"unsupported encoding {encoding} for hf:// paths" + raise ValueError(msg) lf = _scan_csv_impl( source, # type: ignore[arg-type] @@ -449,7 +457,7 @@ def read_csv( comment_prefix=comment_prefix, quote_char=quote_char, skip_rows=skip_rows, - schema_overrides=schema_overrides, + schema_overrides=schema_overrides, # type: ignore[arg-type] schema=schema, null_values=null_values, missing_utf8_is_empty_string=missing_utf8_is_empty_string, diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 74f7402cf2ec..294ed6073824 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -2013,7 +2013,11 @@ def test_invalid_csv_raise() -> None: @pytest.mark.write_disk() -def test_partial_read_compressed_file(tmp_path: Path) -> None: +def test_partial_read_compressed_file( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") + df = pl.DataFrame( {"idx": range(1_000), "dt": date(2025, 12, 31), "txt": "hello world"} ) From 1e79ce6173aa196768d7a1bf1dddb273b8c2341b Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:34:47 +1000 Subject: [PATCH 7/8] c --- py-polars/polars/io/csv/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 24acff94bf66..b53c902c6078 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -465,7 +465,7 @@ def read_csv( try_parse_dates=try_parse_dates, infer_schema_length=infer_schema_length, n_rows=n_rows, - encoding=encoding if encoding == "utf8-lossy" else "utf8", + encoding=encoding, low_memory=low_memory, rechunk=rechunk, skip_rows_after_header=skip_rows_after_header, From 4054d66e75f56d80d8187eb06e2017cc390b91ae Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 23 Jul 2024 15:37:20 +1000 Subject: [PATCH 8/8] c --- py-polars/polars/io/csv/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index b53c902c6078..7b3d3a91dbf3 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -465,7 +465,7 @@ def read_csv( try_parse_dates=try_parse_dates, infer_schema_length=infer_schema_length, n_rows=n_rows, - encoding=encoding, + encoding=encoding, # type: ignore[arg-type] low_memory=low_memory, rechunk=rechunk, skip_rows_after_header=skip_rows_after_header,