Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Raise more informative error message for directories containing files with mixed extensions #17480

Merged
merged 2 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;
let Some(path) = paths.first() else {
polars_bail!(ComputeError: "no paths specified for this reader");
};
Expand Down Expand Up @@ -262,7 +262,7 @@ impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
// `expand_paths` respects globs
let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;

let mut lf: LazyFrame =
DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)?
Expand Down
58 changes: 53 additions & 5 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ pub(super) fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
memchr::memchr3(b'*', b'?', b'[', path)
}

/// Checks if `expanded_paths` were expanded from a single directory
pub(super) fn expanded_from_single_directory<P: AsRef<std::path::Path>>(
paths: &[P],
expanded_paths: &[P],
) -> bool {
// Single input that isn't a glob
paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none()
// And isn't a file
&& {
(
// For local paths, we can just use `is_dir`
!is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir()
)
|| (
// Otherwise we check the output path is different from the input path, so that we also
// handle the case of a directory containing a single file.
!expanded_paths.is_empty() && (paths[0].as_ref() != expanded_paths[0].as_ref())
)
}
}

/// Recursively traverses directories and expands globs if `glob` is `true`.
/// Returns the expanded paths and the index at which to start parsing hive
/// partitions from the path.
Expand Down Expand Up @@ -228,10 +249,31 @@ fn expand_paths(
}
}

Ok((
out_paths.into_iter().collect::<Arc<[_]>>(),
*expand_start_idx,
))
let out_paths = if expanded_from_single_directory(paths, out_paths.as_ref()) {
// Require all file extensions to be the same when expanding a single directory.
let ext = out_paths[0].extension();

(0..out_paths.len())
.map(|i| {
let path = out_paths[i].clone();

if path.extension() != ext {
polars_bail!(
InvalidOperation: r#"directory contained paths with different file extensions: \
first path: {}, second path: {}. Please use a glob pattern to explicitly specify
which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
out_paths[i - 1].to_str().unwrap(), path.to_str().unwrap()
);
};

Ok(path)
})
.collect::<PolarsResult<Arc<[_]>>>()?
} else {
Arc::<[_]>::from(out_paths)
};

Ok((out_paths, *expand_start_idx))
}

/// Reads [LazyFrame] from a filesystem or a cloud storage.
Expand All @@ -245,7 +287,7 @@ pub trait LazyFileListReader: Clone {
return self.finish_no_glob();
}

let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;

let lfs = paths
.iter()
Expand Down Expand Up @@ -348,4 +390,10 @@ pub trait LazyFileListReader: Clone {
check_directory_level,
)
}

/// Expand paths without performing any directory level or file extension
/// checks.
fn expand_paths_default(&self) -> PolarsResult<Arc<[PathBuf]>> {
self.expand_paths(false).map(|x| x.0)
}
}
11 changes: 2 additions & 9 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::path::{Path, PathBuf};

use file_list_reader::get_glob_start_idx;
use file_list_reader::expanded_from_single_directory;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::utils::is_cloud_url;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
Expand Down Expand Up @@ -55,13 +54,7 @@ impl LazyFileListReader for LazyIpcReader {
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& {
(!is_cloud_url(&paths[0]) && paths[0].is_dir())
|| (paths[0] != self.paths[0])
}
expanded_from_single_directory(self.paths.as_ref(), paths.as_ref())
}));
self.args.hive_options.hive_start_idx = hive_start_idx;
let args = self.args;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl LazyFileListReader for LazyJsonLineReader {
return self.finish_no_glob();
}

let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;

let file_options = FileScanOptions {
n_rows: self.n_rows,
Expand Down
11 changes: 2 additions & 9 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::path::{Path, PathBuf};

use file_list_reader::expanded_from_single_directory;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::read::ParallelStrategy;
use polars_io::utils::is_cloud_url;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
use crate::scan::file_list_reader::get_glob_start_idx;

#[derive(Clone)]
pub struct ScanArgsParquet {
Expand Down Expand Up @@ -63,13 +62,7 @@ impl LazyFileListReader for LazyParquetReader {
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& {
(!is_cloud_url(&paths[0]) && paths[0].is_dir())
|| (paths[0] != self.paths[0])
}
expanded_from_single_directory(self.paths.as_ref(), paths.as_ref())
}));
self.args.hive_options.hive_start_idx = hive_start_idx;

Expand Down
27 changes: 27 additions & 0 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,30 @@ def test_path_expansion_excludes_empty_files_17362(tmp_path: Path) -> None:

assert_frame_equal(pl.scan_parquet(tmp_path).collect(), df)
assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)


@pytest.mark.write_disk()
def test_scan_single_dir_differing_file_extensions_raises_17436(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

df = pl.DataFrame({"x": 1})
df.write_parquet(tmp_path / "data.parquet")
df.write_ipc(tmp_path / "data.ipc")

with pytest.raises(
pl.exceptions.InvalidOperationError, match="different file extensions"
):
pl.scan_parquet(tmp_path).collect()

for lf in [
pl.scan_parquet(tmp_path / "*.parquet"),
pl.scan_ipc(tmp_path / "*.ipc"),
]:
assert_frame_equal(lf.collect(), df)

# Ensure passing a glob doesn't trigger file extension checking
with pytest.raises(
pl.exceptions.ComputeError,
match="parquet: File out of specification: The file must end with PAR1",
):
pl.scan_parquet(tmp_path / "*").collect()