Skip to content

Commit

Permalink
fix: Defer path expansion until collect in file scan methods (#17532)
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Jul 11, 2024
1 parent aa2c8f1 commit 64b45a8
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 84 deletions.
13 changes: 11 additions & 2 deletions crates/polars-io/src/utils/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,21 @@ pub fn expanded_from_single_directory<P: AsRef<std::path::Path>>(
}

/// Recursively traverses directories and expands globs if `glob` is `true`.
/// Returns the expanded paths and the index at which to start parsing hive
/// partitions from the path.
pub fn expand_paths(
paths: &[PathBuf],
glob: bool,
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Arc<[PathBuf]>> {
expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
}

/// Recursively traverses directories and expands globs if `glob` is `true`.
/// Returns the expanded paths and the index at which to start parsing hive
/// partitions from the path.
pub fn expand_paths_hive(
paths: &[PathBuf],
glob: bool,
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
check_directory_level: bool,
) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
let Some(first_path) = paths.first() else {
Expand Down
22 changes: 13 additions & 9 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use polars_io::cloud::CloudOptions;
use polars_io::csv::read::{
infer_file_schema, CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues,
};
use polars_io::utils::get_reader_bytes;
use polars_io::utils::{expand_paths, get_reader_bytes};
use polars_io::RowIndex;

use crate::prelude::*;
Expand Down Expand Up @@ -216,7 +216,9 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let paths = self.expand_paths_default()?;
// TODO: This should be done when converting to the IR
let paths = expand_paths(self.paths(), self.glob(), self.cloud_options())?;

let Some(path) = paths.first() else {
polars_bail!(ComputeError: "no paths specified for this reader");
};
Expand Down Expand Up @@ -261,13 +263,15 @@ impl LazyCsvReader {
impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
// `expand_paths` respects globs
let paths = self.expand_paths_default()?;

let mut lf: LazyFrame =
DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)?
.build()
.into();
let mut lf: LazyFrame = DslBuilder::scan_csv(
self.paths,
self.read_options,
self.cache,
self.cloud_options,
self.glob,
)?
.build()
.into();
lf.opt_state.file_caching = true;
Ok(lf)
}
Expand Down
23 changes: 2 additions & 21 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::path::PathBuf;

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::utils::expand_paths;
use polars_io::RowIndex;
use polars_plan::prelude::UnionArgs;

Expand All @@ -19,9 +18,8 @@ pub trait LazyFileListReader: Clone {
return self.finish_no_glob();
}

let paths = self.expand_paths_default()?;

let lfs = paths
let lfs = self
.paths()
.iter()
.map(|path| {
self.clone()
Expand Down Expand Up @@ -111,21 +109,4 @@ pub trait LazyFileListReader: Clone {
fn cloud_options(&self) -> Option<&CloudOptions> {
None
}

/// Returns a list of paths after resolving globs and directories, as well as
/// the string index at which to start parsing hive partitions.
fn expand_paths(&self, check_directory_level: bool) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
expand_paths(
self.paths(),
self.cloud_options(),
self.glob(),
check_directory_level,
)
}

/// Expand paths without performing any directory level or file extension
/// checks.
fn expand_paths_default(&self) -> PolarsResult<Arc<[PathBuf]>> {
self.expand_paths(false).map(|x| x.0)
}
}
11 changes: 2 additions & 9 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::path::{Path, PathBuf};
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::utils::expanded_from_single_directory;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
Expand Down Expand Up @@ -49,14 +48,8 @@ impl LazyIpcReader {
}

impl LazyFileListReader for LazyIpcReader {
fn finish(mut self) -> PolarsResult<LazyFrame> {
let (paths, hive_start_idx) =
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
expanded_from_single_directory(self.paths.as_ref(), paths.as_ref())
}));
self.args.hive_options.hive_start_idx = hive_start_idx;
fn finish(self) -> PolarsResult<LazyFrame> {
let paths = self.paths;
let args = self.args;

let options = IpcScanOptions {
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl LazyFileListReader for LazyJsonLineReader {
return self.finish_no_glob();
}

let paths = self.expand_paths_default()?;
let paths = self.paths;

let file_options = FileScanOptions {
n_rows: self.n_rows,
Expand All @@ -106,6 +106,7 @@ impl LazyFileListReader for LazyJsonLineReader {
rechunk: self.rechunk,
file_counter: 0,
hive_options: Default::default(),
glob: true,
};

let options = NDJsonReadOptions {
Expand Down Expand Up @@ -138,6 +139,7 @@ impl LazyFileListReader for LazyJsonLineReader {
rechunk: self.rechunk,
file_counter: 0,
hive_options: Default::default(),
glob: false,
};

let options = NDJsonReadOptions {
Expand Down
16 changes: 4 additions & 12 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::path::{Path, PathBuf};
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::read::ParallelStrategy;
use polars_io::utils::expanded_from_single_directory;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
Expand Down Expand Up @@ -57,19 +56,11 @@ impl LazyParquetReader {

impl LazyFileListReader for LazyParquetReader {
/// Get the final [LazyFrame].
fn finish(mut self) -> PolarsResult<LazyFrame> {
let (paths, hive_start_idx) =
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
expanded_from_single_directory(self.paths.as_ref(), paths.as_ref())
}));
self.args.hive_options.hive_start_idx = hive_start_idx;

fn finish(self) -> PolarsResult<LazyFrame> {
let row_index = self.args.row_index;

let mut lf: LazyFrame = DslBuilder::scan_parquet(
paths,
self.paths,
self.args.n_rows,
self.args.cache,
self.args.parallel,
Expand All @@ -79,11 +70,12 @@ impl LazyFileListReader for LazyParquetReader {
self.args.cloud_options,
self.args.use_statistics,
self.args.hive_options,
self.args.glob,
)?
.build()
.into();

// it is a bit hacky, but this row_index function updates the schema
// It's a bit hacky, but this row_index function updates the schema.
if let Some(row_index) = row_index {
lf = lf.with_row_index(&row_index.name, Some(row_index.offset))
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl DslBuilder {
enabled: Some(false),
..Default::default()
},
glob: false,
};

Ok(DslPlan::Scan {
Expand Down Expand Up @@ -83,6 +84,7 @@ impl DslBuilder {
cloud_options: Option<CloudOptions>,
use_statistics: bool,
hive_options: HiveOptions,
glob: bool,
) -> PolarsResult<Self> {
let paths = paths.into();

Expand All @@ -94,6 +96,7 @@ impl DslBuilder {
row_index,
file_counter: Default::default(),
hive_options,
glob,
};
Ok(DslPlan::Scan {
paths,
Expand Down Expand Up @@ -140,6 +143,7 @@ impl DslBuilder {
row_index,
file_counter: Default::default(),
hive_options,
glob: true,
},
predicate: None,
scan_type: FileScan::Ipc {
Expand All @@ -158,6 +162,7 @@ impl DslBuilder {
read_options: CsvReadOptions,
cache: bool,
cloud_options: Option<CloudOptions>,
glob: bool,
) -> PolarsResult<Self> {
let paths = paths.into();

Expand All @@ -176,6 +181,7 @@ impl DslBuilder {
enabled: Some(false),
..Default::default()
},
glob,
};
Ok(DslPlan::Scan {
paths,
Expand Down
49 changes: 49 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
#[cfg(any(feature = "ipc", feature = "parquet"))]
use std::path::PathBuf;

use arrow::datatypes::ArrowSchemaRef;
use either::Either;
use expr_expansion::{is_regex_projection, rewrite_projections};
use hive::{hive_partitions_from_paths, HivePartitions};
#[cfg(any(feature = "ipc", feature = "parquet"))]
use polars_io::cloud::CloudOptions;
#[cfg(any(feature = "csv", feature = "json"))]
use polars_io::utils::expand_paths;
#[cfg(any(feature = "ipc", feature = "parquet"))]
use polars_io::utils::{expand_paths_hive, expanded_from_single_directory};

use super::stack_opt::ConversionOptimizer;
use super::*;
Expand Down Expand Up @@ -90,6 +99,24 @@ pub fn to_alp_impl(
mut file_options,
mut scan_type,
} => {
let paths = match &scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
ref cloud_options, ..
} => expand_paths_with_hive_update(paths, &mut file_options, cloud_options)?,
#[cfg(feature = "ipc")]
FileScan::Ipc {
ref cloud_options, ..
} => expand_paths_with_hive_update(paths, &mut file_options, cloud_options)?,
#[cfg(feature = "csv")]
FileScan::Csv {
ref cloud_options, ..
} => expand_paths(&paths, file_options.glob, cloud_options.as_ref())?,
#[cfg(feature = "json")]
FileScan::NDJson { .. } => expand_paths(&paths, file_options.glob, None)?,
FileScan::Anonymous { .. } => paths,
};

let mut file_info = if let Some(file_info) = file_info {
file_info
} else {
Expand Down Expand Up @@ -662,6 +689,28 @@ pub fn to_alp_impl(
Ok(lp_arena.add(v))
}

#[cfg(any(feature = "ipc", feature = "parquet"))]
fn expand_paths_with_hive_update(
paths: Arc<[PathBuf]>,
file_options: &mut FileScanOptions,
cloud_options: &Option<CloudOptions>,
) -> PolarsResult<Arc<[PathBuf]>> {
let hive_enabled = file_options.hive_options.enabled;
let (expanded_paths, hive_start_idx) = expand_paths_hive(
&paths,
file_options.glob,
cloud_options.as_ref(),
hive_enabled.unwrap_or(false),
)?;
let inferred_hive_enabled = hive_enabled
.unwrap_or_else(|| expanded_from_single_directory(paths.as_ref(), expanded_paths.as_ref()));

file_options.hive_options.enabled = Some(inferred_hive_enabled);
file_options.hive_options.hive_start_idx = hive_start_idx;

Ok(expanded_paths)
}

fn expand_filter(predicate: Expr, input: Node, lp_arena: &Arena<IR>) -> PolarsResult<Expr> {
let schema = lp_arena.get(input).schema(lp_arena);
let predicate = if has_expr(&predicate, |e| match e {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct FileScanOptions {
pub rechunk: bool,
pub file_counter: FileCount,
pub hive_options: HiveOptions,
pub glob: bool,
}

#[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)]
Expand Down
46 changes: 23 additions & 23 deletions py-polars/polars/io/csv/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,35 +1233,35 @@ def _scan_csv_impl(
storage_options = None

pylf = PyLazyFrame.new_from_csv(
source,
sources,
separator,
has_header,
ignore_errors,
skip_rows,
n_rows,
cache,
dtype_list,
low_memory,
comment_prefix,
quote_char,
processed_null_values,
missing_utf8_is_empty_string,
infer_schema_length,
with_column_names,
rechunk,
skip_rows_after_header,
encoding,
parse_row_index_args(row_index_name, row_index_offset),
try_parse_dates,
path=source,
paths=sources,
separator=separator,
has_header=has_header,
ignore_errors=ignore_errors,
skip_rows=skip_rows,
n_rows=n_rows,
cache=cache,
overwrite_dtype=dtype_list,
low_memory=low_memory,
comment_prefix=comment_prefix,
quote_char=quote_char,
null_values=processed_null_values,
missing_utf8_is_empty_string=missing_utf8_is_empty_string,
infer_schema_length=infer_schema_length,
with_schema_modify=with_column_names,
rechunk=rechunk,
skip_rows_after_header=skip_rows_after_header,
encoding=encoding,
row_index=parse_row_index_args(row_index_name, row_index_offset),
try_parse_dates=try_parse_dates,
eol_char=eol_char,
raise_if_empty=raise_if_empty,
truncate_ragged_lines=truncate_ragged_lines,
decimal_comma=decimal_comma,
schema=schema,
glob=glob,
retries=retries,
schema=schema,
cloud_options=storage_options,
retries=retries,
file_cache_ttl=file_cache_ttl,
)
return wrap_ldf(pylf)
Loading

0 comments on commit 64b45a8

Please sign in to comment.