diff --git a/crates/polars-io/src/path_utils/mod.rs b/crates/polars-io/src/path_utils/mod.rs index be1f0c947df7..62f77c7c2655 100644 --- a/crates/polars-io/src/path_utils/mod.rs +++ b/crates/polars-io/src/path_utils/mod.rs @@ -146,6 +146,8 @@ pub fn expand_paths_hive( if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } { #[cfg(feature = "cloud")] { + use polars_utils::_limit_path_len_io_err; + use crate::cloud::object_path_from_string; if first_path.starts_with("hf://") { @@ -199,14 +201,8 @@ pub fn expand_paths_hive( // indistinguishable from an empty directory. let path = PathBuf::from(path); if !path.is_dir() { - path.metadata().map_err(|err| { - let msg = - Some(format!("{}: {}", err, path.to_str().unwrap()).into()); - PolarsError::IO { - error: err.into(), - msg, - } - })?; + path.metadata() + .map_err(|err| _limit_path_len_io_err(&path, err))?; } } diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index a573c3b89f9b..648c678bf814 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -168,10 +168,7 @@ pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], off } #[cfg(feature = "json")] -pub(crate) fn overwrite_schema( - schema: &mut Schema, - overwriting_schema: &Schema, -) -> PolarsResult<()> { +pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> { for (k, value) in overwriting_schema.iter() { *schema.try_get_mut(k)? = value.clone(); } diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index 029c5253cbb4..fcc110248607 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -18,6 +18,7 @@ pub struct LazyJsonLineReader { pub(crate) low_memory: bool, pub(crate) rechunk: bool, pub(crate) schema: Option, + pub(crate) schema_overwrite: Option, pub(crate) row_index: Option, pub(crate) infer_schema_length: Option, pub(crate) n_rows: Option, @@ -38,6 +39,7 @@ impl LazyJsonLineReader { low_memory: false, rechunk: false, schema: None, + schema_overwrite: None, row_index: None, infer_schema_length: NonZeroUsize::new(100), ignore_errors: false, @@ -82,6 +84,13 @@ impl LazyJsonLineReader { self } + /// Set the JSON file's schema + #[must_use] + pub fn with_schema_overwrite(mut self, schema_overwrite: Option) -> Self { + self.schema_overwrite = schema_overwrite; + self + } + /// Reduce memory usage at the expense of performance #[must_use] pub fn low_memory(mut self, toggle: bool) -> Self { @@ -129,6 +138,7 @@ impl LazyFileListReader for LazyJsonLineReader { low_memory: self.low_memory, ignore_errors: self.ignore_errors, schema: self.schema, + schema_overwrite: self.schema_overwrite, }; let scan_type = FileScan::NDJson { diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 8a8404e29622..9ecfa85382a2 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -325,7 +325,7 @@ pub(super) fn ndjson_file_info( }; let mut reader = std::io::BufReader::new(f); - let (reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { + let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { if file_options.row_index.is_none() { (schema.clone(), schema.clone()) } else { @@ -340,6 +340,11 @@ pub(super) fn ndjson_file_info( prepare_schemas(schema, file_options.row_index.as_ref()) }; + if let Some(overwriting_schema) = &ndjson_options.schema_overwrite { + let schema = Arc::make_mut(&mut reader_schema); + overwrite_schema(schema, overwriting_schema)?; + } + Ok(FileInfo::new( schema, Some(Either::Right(reader_schema)), diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index 75b581d2e6b8..dfae2c5917ec 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -359,4 +359,5 @@ pub struct NDJsonReadOptions { pub low_memory: bool, pub ignore_errors: bool, pub schema: Option, + pub schema_overwrite: Option, } diff --git a/crates/polars-utils/src/io.rs b/crates/polars-utils/src/io.rs index a943f9e5cbf5..5fb273e25d16 100644 --- a/crates/polars-utils/src/io.rs +++ b/crates/polars-utils/src/io.rs @@ -4,7 +4,7 @@ use std::path::Path; use polars_error::*; -fn map_err(path: &Path, err: io::Error) -> PolarsError { +pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError { let path = path.to_string_lossy(); let msg = if path.len() > 88 { let truncated_path: String = path.chars().skip(path.len() - 88).collect(); @@ -19,12 +19,12 @@ pub fn open_file

(path: P) -> PolarsResult where P: AsRef, { - File::open(&path).map_err(|err| map_err(path.as_ref(), err)) + File::open(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err)) } pub fn create_file

(path: P) -> PolarsResult where P: AsRef, { - File::create(&path).map_err(|err| map_err(path.as_ref(), err)) + File::create(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err)) } diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 3b0fa4cf808a..7b3d3a91dbf3 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -1,11 +1,13 @@ from __future__ import annotations import contextlib +import os from io import BytesIO, StringIO from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Callable, Mapping, Sequence import polars._reexport as pl +import polars.functions as F from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import ( _process_null_values, @@ -419,38 +421,56 @@ def read_csv( if not infer_schema: infer_schema_length = 0 - with prepare_file_arg( - source, - encoding=encoding, - use_pyarrow=False, - raise_if_empty=raise_if_empty, - storage_options=storage_options, - ) as data: - df = _read_csv_impl( - data, + # TODO: scan_csv doesn't support a "dtype slice" (i.e. list[DataType]) + schema_overrides_is_list = isinstance(schema_overrides, Sequence) + encoding_supported_in_lazy = encoding in {"utf8", "utf8-lossy"} + + if ( + # Check that it is not a BytesIO object + isinstance(v := source, (str, Path)) + ) and ( + # HuggingFace only for now ⊂( ◜◒◝ )⊃ + str(v).startswith("hf://") + # Also dispatch on FORCE_ASYNC, so that this codepath gets run + # through by our test suite during CI. + or ( + os.getenv("POLARS_FORCE_ASYNC") == "1" + and not schema_overrides_is_list + and encoding_supported_in_lazy + ) + # TODO: We can't dispatch this for all paths due to a few reasons: + # * `scan_csv` does not support compressed files + # * The `storage_options` configuration keys are different between + # fsspec and object_store (would require a breaking change) + ): + if schema_overrides_is_list: + msg = "passing a list to `schema_overrides` is unsupported for hf:// paths" + raise ValueError(msg) + if not encoding_supported_in_lazy: + msg = f"unsupported encoding {encoding} for hf:// paths" + raise ValueError(msg) + + lf = _scan_csv_impl( + source, # type: ignore[arg-type] has_header=has_header, - columns=columns if columns else projection, separator=separator, comment_prefix=comment_prefix, quote_char=quote_char, skip_rows=skip_rows, - schema_overrides=schema_overrides, + schema_overrides=schema_overrides, # type: ignore[arg-type] schema=schema, null_values=null_values, missing_utf8_is_empty_string=missing_utf8_is_empty_string, ignore_errors=ignore_errors, try_parse_dates=try_parse_dates, - n_threads=n_threads, infer_schema_length=infer_schema_length, - batch_size=batch_size, n_rows=n_rows, - encoding=encoding if encoding == "utf8-lossy" else "utf8", + encoding=encoding, # type: ignore[arg-type] low_memory=low_memory, rechunk=rechunk, skip_rows_after_header=skip_rows_after_header, row_index_name=row_index_name, row_index_offset=row_index_offset, - sample_size=sample_size, eol_char=eol_char, raise_if_empty=raise_if_empty, truncate_ragged_lines=truncate_ragged_lines, @@ -458,6 +478,53 @@ def read_csv( glob=glob, ) + if columns: + lf = lf.select(columns) + elif projection: + lf = lf.select(F.nth(projection)) + + df = lf.collect() + + else: + with prepare_file_arg( + source, + encoding=encoding, + use_pyarrow=False, + raise_if_empty=raise_if_empty, + storage_options=storage_options, + ) as data: + df = _read_csv_impl( + data, + has_header=has_header, + columns=columns if columns else projection, + separator=separator, + comment_prefix=comment_prefix, + quote_char=quote_char, + skip_rows=skip_rows, + schema_overrides=schema_overrides, + schema=schema, + null_values=null_values, + missing_utf8_is_empty_string=missing_utf8_is_empty_string, + ignore_errors=ignore_errors, + try_parse_dates=try_parse_dates, + n_threads=n_threads, + infer_schema_length=infer_schema_length, + batch_size=batch_size, + n_rows=n_rows, + encoding=encoding if encoding == "utf8-lossy" else "utf8", + low_memory=low_memory, + rechunk=rechunk, + skip_rows_after_header=skip_rows_after_header, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + sample_size=sample_size, + eol_char=eol_char, + raise_if_empty=raise_if_empty, + truncate_ragged_lines=truncate_ragged_lines, + decimal_comma=decimal_comma, + glob=glob, + ) + if new_columns: return _update_columns(df, new_columns) return df diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index c07e32790a7d..ce05d19eaf0c 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -1,10 +1,12 @@ from __future__ import annotations import contextlib +import os from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Sequence import polars._reexport as pl +import polars.functions as F from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import ( is_str_sequence, @@ -29,8 +31,6 @@ from polars._typing import SchemaDict -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def read_ipc( source: str | Path | IO[bytes] | bytes, *, @@ -92,6 +92,42 @@ def read_ipc( That means that you cannot write to the same filename. E.g. `pl.read_ipc("my_file.arrow").write_ipc("my_file.arrow")` will fail. """ + if ( + # Check that it is not a BytesIO object + isinstance(v := source, (str, Path)) + ) and ( + # HuggingFace only for now ⊂( ◜◒◝ )⊃ + (is_hf := str(v).startswith("hf://")) + # Also dispatch on FORCE_ASYNC, so that this codepath gets run + # through by our test suite during CI. + or os.getenv("POLARS_FORCE_ASYNC") == "1" + # TODO: Dispatch all paths to `scan_ipc` - this will need a breaking + # change to the `storage_options` parameter. + ): + if is_hf and use_pyarrow: + msg = "`use_pyarrow=True` is not supported for Hugging Face" + raise ValueError(msg) + + lf = scan_ipc( + source, # type: ignore[arg-type] + n_rows=n_rows, + memory_map=memory_map, + storage_options=storage_options, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + rechunk=rechunk, + ) + + if columns: + if isinstance(columns[0], int): + lf = lf.select(F.nth(columns)) # type: ignore[arg-type] + else: + lf = lf.select(columns) + + df = lf.collect() + + return df + if use_pyarrow and n_rows and not memory_map: msg = "`n_rows` cannot be used with `use_pyarrow=True` and `memory_map=False`" raise ValueError(msg) @@ -305,8 +341,6 @@ def read_ipc_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, DataTyp return _read_ipc_schema(source) -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def scan_ipc( source: str | Path | list[str] | list[Path], *, diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index 373d0de5248f..739daa7d0c36 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -3,9 +3,8 @@ import contextlib from io import BytesIO, StringIO from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Sequence -from polars._utils.deprecation import deprecate_renamed_parameter from polars._utils.various import normalize_filepath from polars._utils.wrap import wrap_df, wrap_ldf from polars.datatypes import N_INFER_DEFAULT @@ -22,11 +21,22 @@ def read_ndjson( - source: str | Path | IOBase | bytes, + source: str | Path | list[str] | list[Path] | IOBase | bytes, *, schema: SchemaDefinition | None = None, schema_overrides: SchemaDefinition | None = None, + infer_schema_length: int | None = N_INFER_DEFAULT, + batch_size: int | None = 1024, + n_rows: int | None = None, + low_memory: bool = False, + rechunk: bool = False, + row_index_name: str | None = None, + row_index_offset: int = 0, ignore_errors: bool = False, + storage_options: dict[str, Any] | None = None, + retries: int = 2, + file_cache_ttl: int | None = None, + include_file_paths: str | None = None, ) -> DataFrame: r""" Read into a DataFrame from a newline delimited JSON file. @@ -52,8 +62,46 @@ def read_ndjson( schema_overrides : dict, default None Support type specification or override of one or more columns; note that any dtypes inferred from the schema param will be overridden. + infer_schema_length + The maximum number of rows to scan for schema inference. + If set to `None`, the full data may be scanned *(this is slow)*. + batch_size + Number of rows to read in each batch. + n_rows + Stop reading from JSON file after reading `n_rows`. + low_memory + Reduce memory pressure at the expense of performance. + rechunk + Reallocate to contiguous memory when all chunks/ files are parsed. + row_index_name + If not None, this will insert a row index column with give name into the + DataFrame + row_index_offset + Offset to start the row index column (only use if the name is set) ignore_errors Return `Null` if parsing fails because of schema mismatches. + storage_options + Options that indicate how to connect to a cloud provider. + + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. + retries + Number of retries if accessing a cloud instance fails. + file_cache_ttl + Amount of time to keep downloaded cloud files since their last access time, + in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable + (which defaults to 1 hour) if not given. + include_file_paths + Include the path of the source file(s) as a column with this name. Examples -------- @@ -71,26 +119,54 @@ def read_ndjson( │ 3 ┆ 8 │ └─────┴─────┘ """ - if isinstance(source, StringIO): - source = BytesIO(source.getvalue().encode()) - elif isinstance(source, (str, Path)): - source = normalize_filepath(source) + if not ( + isinstance(source, (str, Path)) + or isinstance(source, Sequence) + and source + and isinstance(source[0], (str, Path)) + ): + # TODO: A lot of the parameters aren't applied for BytesIO + if isinstance(source, StringIO): + source = BytesIO(source.getvalue().encode()) - pydf = PyDataFrame.read_ndjson( - source, - ignore_errors=ignore_errors, + pydf = PyDataFrame.read_ndjson( + source, + ignore_errors=ignore_errors, + schema=schema, + schema_overrides=schema_overrides, + ) + + df = wrap_df(pydf) + + if n_rows: + df = df.head(n_rows) + + return df + + return scan_ndjson( + source, # type: ignore[arg-type] schema=schema, schema_overrides=schema_overrides, - ) - return wrap_df(pydf) + infer_schema_length=infer_schema_length, + batch_size=batch_size, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + ignore_errors=ignore_errors, + include_file_paths=include_file_paths, + retries=retries, + storage_options=storage_options, + file_cache_ttl=file_cache_ttl, + ).collect() -@deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") -@deprecate_renamed_parameter("row_count_offset", "row_index_offset", version="0.20.4") def scan_ndjson( source: str | Path | list[str] | list[Path], *, schema: SchemaDefinition | None = None, + schema_overrides: SchemaDefinition | None = None, infer_schema_length: int | None = N_INFER_DEFAULT, batch_size: int | None = 1024, n_rows: int | None = None, @@ -124,6 +200,9 @@ def scan_ndjson( If you supply a list of column names that does not match the names in the underlying data, the names given here will overwrite them. The number of names given in the schema should match the underlying data dimensions. + schema_overrides : dict, default None + Support type specification or override of one or more columns; note that + any dtypes inferred from the schema param will be overridden. infer_schema_length The maximum number of rows to scan for schema inference. If set to `None`, the full data may be scanned *(this is slow)*. @@ -184,16 +263,17 @@ def scan_ndjson( storage_options = None pylf = PyLazyFrame.new_from_ndjson( - source, - sources, - infer_schema_length, - schema, - batch_size, - n_rows, - low_memory, - rechunk, - parse_row_index_args(row_index_name, row_index_offset), - ignore_errors, + path=source, + paths=sources, + infer_schema_length=infer_schema_length, + schema=schema, + schema_overrides=schema_overrides, + batch_size=batch_size, + n_rows=n_rows, + low_memory=low_memory, + rechunk=rechunk, + row_index=parse_row_index_args(row_index_name, row_index_offset), + ignore_errors=ignore_errors, include_file_paths=include_file_paths, retries=retries, cloud_options=storage_options, diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index a60839f2f3e7..3aedd69087a9 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -45,7 +45,7 @@ impl PyLazyFrame { #[cfg(feature = "json")] #[allow(clippy::too_many_arguments)] #[pyo3(signature = ( - path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, + path, paths, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk, row_index, ignore_errors, include_file_paths, cloud_options, retries, file_cache_ttl ))] fn new_from_ndjson( @@ -53,6 +53,7 @@ impl PyLazyFrame { paths: Vec, infer_schema_length: Option, schema: Option>, + schema_overrides: Option>, batch_size: Option, n_rows: Option, low_memory: bool, @@ -109,6 +110,7 @@ impl PyLazyFrame { .low_memory(low_memory) .with_rechunk(rechunk) .with_schema(schema.map(|schema| Arc::new(schema.0))) + .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0))) .with_row_index(row_index) .with_ignore_errors(ignore_errors) .with_include_file_paths(include_file_paths.map(Arc::from)) diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 772a1ea74d7f..294ed6073824 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -527,7 +527,9 @@ def test_column_rename_and_dtype_overwrite() -> None: assert df.dtypes == [pl.String, pl.Int64, pl.Float32] -def test_compressed_csv(io_files_path: Path) -> None: +def test_compressed_csv(io_files_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") + # gzip compression csv = textwrap.dedent( """\ @@ -868,18 +870,24 @@ def test_csv_globbing(io_files_path: Path) -> None: df = pl.read_csv(path) assert df.shape == (135, 4) - with pytest.raises(ValueError): - _ = pl.read_csv(path, columns=[0, 1]) + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + + with pytest.raises(ValueError): + _ = pl.read_csv(path, columns=[0, 1]) df = pl.read_csv(path, columns=["category", "sugars_g"]) assert df.shape == (135, 2) assert df.row(-1) == ("seafood", 1) assert df.row(0) == ("vegetables", 2) - with pytest.raises(ValueError): - _ = pl.read_csv( - path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64] - ) + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + + with pytest.raises(ValueError): + _ = pl.read_csv( + path, schema_overrides=[pl.String, pl.Int64, pl.Int64, pl.Int64] + ) dtypes = { "category": pl.String, @@ -2005,7 +2013,11 @@ def test_invalid_csv_raise() -> None: @pytest.mark.write_disk() -def test_partial_read_compressed_file(tmp_path: Path) -> None: +def test_partial_read_compressed_file( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("POLARS_FORCE_ASYNC", "0") + df = pl.DataFrame( {"idx": range(1_000), "dt": date(2025, 12, 31), "txt": "hello world"} ) @@ -2170,14 +2182,18 @@ def test_csv_float_decimal() -> None: pl.read_csv(floats, decimal_comma=True) -def test_fsspec_not_available(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr("polars.io._utils._FSSPEC_AVAILABLE", False) - with pytest.raises( - ImportError, match=r"`fsspec` is required for `storage_options` argument" - ): - pl.read_csv( - "s3://foods/cabbage.csv", storage_options={"key": "key", "secret": "secret"} - ) +def test_fsspec_not_available() -> None: + with pytest.MonkeyPatch.context() as mp: + mp.setenv("POLARS_FORCE_ASYNC", "0") + mp.setattr("polars.io._utils._FSSPEC_AVAILABLE", False) + + with pytest.raises( + ImportError, match=r"`fsspec` is required for `storage_options` argument" + ): + pl.read_csv( + "s3://foods/cabbage.csv", + storage_options={"key": "key", "secret": "secret"}, + ) @pytest.mark.write_disk()