From 47cefd8de2baf45e4dd186ba0f42673208bdad41 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 1 Aug 2024 15:51:02 +1000 Subject: [PATCH 1/4] c --- .../polars-io/src/parquet/read/async_impl.rs | 19 +- .../polars-io/src/parquet/read/read_impl.rs | 194 +++++++++------- crates/polars-io/src/parquet/read/reader.rs | 32 +-- crates/polars-io/src/utils/mod.rs | 1 + crates/polars-io/src/utils/other.rs | 24 -- crates/polars-io/src/utils/slice.rs | 33 +++ .../physical_plan/streaming/convert_alp.rs | 6 +- crates/polars-lazy/src/scan/ndjson.rs | 2 +- crates/polars-lazy/src/tests/io.rs | 2 +- .../src/tests/optimization_checks.rs | 2 +- .../src/executors/scan/csv.rs | 5 +- .../src/executors/scan/ipc.rs | 10 +- .../src/executors/scan/mod.rs | 5 +- .../src/executors/scan/ndjson.rs | 5 +- .../src/executors/scan/parquet.rs | 207 ++++++++++++------ crates/polars-mem-engine/src/planner/lp.rs | 7 +- .../polars-pipe/src/executors/sources/csv.rs | 13 +- .../src/executors/sources/parquet.rs | 105 +++++---- crates/polars-plan/src/plans/builder_dsl.rs | 8 +- crates/polars-plan/src/plans/ir/format.rs | 10 +- .../plans/optimizer/predicate_pushdown/mod.rs | 2 +- .../src/plans/optimizer/slice_pushdown_lp.rs | 29 ++- crates/polars-plan/src/plans/options.rs | 2 +- py-polars/src/dataframe/io.rs | 4 +- py-polars/src/lazyframe/visitor/nodes.rs | 2 +- py-polars/tests/unit/io/test_lazy_parquet.py | 37 ++++ 26 files changed, 503 insertions(+), 263 deletions(-) create mode 100644 crates/polars-io/src/utils/slice.rs diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index fd96b68697ec..97e4829581bc 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -14,7 +14,6 @@ use tokio::sync::Mutex; use super::mmap::ColumnStore; use super::predicates::read_this_row_group; -use super::read_impl::compute_row_group_range; use crate::cloud::{ build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, }; @@ -262,8 +261,8 @@ impl FetchRowGroupsFromObjectStore { schema: ArrowSchemaRef, projection: Option<&[usize]>, predicate: Option>, + row_group_range: Range, row_groups: &[RowGroupMetaData], - limit: usize, ) -> PolarsResult { let projected_fields: Option> = projection.map(|projection| { projection @@ -272,26 +271,22 @@ impl FetchRowGroupsFromObjectStore { .collect() }); - let row_groups_end = compute_row_group_range(0, row_groups.len(), limit, row_groups); - let row_groups = &row_groups[0..row_groups_end]; - let mut prefetched: PlHashMap = PlHashMap::new(); let mut row_groups = if let Some(pred) = predicate.as_deref() { - row_groups - .iter() - .enumerate() - .filter(|(i, rg)| { + row_group_range + .filter_map(|i| { + let rg = &row_groups[i]; let should_be_read = matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)); // Already add the row groups that will be skipped to the prefetched data. if !should_be_read { - prefetched.insert(*i, Default::default()); + prefetched.insert(i, Default::default()); } - should_be_read + + should_be_read.then(|| (i, rg.clone())) }) - .map(|(i, rg)| (i, rg.clone())) .collect::>() } else { row_groups.iter().cloned().enumerate().collect() diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 9a0fed9e246c..e160559586b3 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -23,6 +23,7 @@ use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetaDataRef; use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::utils::get_reader_bytes; +use crate::utils::slice::split_slice_at_file; use crate::RowIndex; #[cfg(debug_assertions)] @@ -156,7 +157,7 @@ fn rg_to_dfs( previous_row_count: &mut IdxSize, row_group_start: usize, row_group_end: usize, - remaining_rows: &mut usize, + slice: (usize, usize), file_metadata: &FileMetaData, schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, @@ -172,7 +173,7 @@ fn rg_to_dfs( previous_row_count, row_group_start, row_group_end, - remaining_rows, + slice, file_metadata, schema, predicate, @@ -188,7 +189,7 @@ fn rg_to_dfs( row_group_start, row_group_end, previous_row_count, - remaining_rows, + slice, file_metadata, schema, predicate, @@ -207,7 +208,7 @@ fn rg_to_dfs_optionally_par_over_columns( previous_row_count: &mut IdxSize, row_group_start: usize, row_group_end: usize, - remaining_rows: &mut usize, + slice: (usize, usize), file_metadata: &FileMetaData, schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, @@ -219,14 +220,21 @@ fn rg_to_dfs_optionally_par_over_columns( ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); + let mut n_rows_processed: usize = (0..row_group_start) + .map(|i| file_metadata.row_groups[i].num_rows()) + .sum(); + let slice_end = slice.0 + slice.1; + for rg_idx in row_group_start..row_group_end { let md = &file_metadata.row_groups[rg_idx]; + let rg_slice = + split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end); let current_row_count = md.num_rows() as IdxSize; if use_statistics && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? { - *previous_row_count += current_row_count; + *previous_row_count += rg_slice.1 as IdxSize; continue; } // test we don't read the parquet file if this env var is set @@ -235,13 +243,20 @@ fn rg_to_dfs_optionally_par_over_columns( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } - let projection_height = (*remaining_rows).min(md.num_rows()); + let idx_to_series_projection_height = rg_slice.0 + rg_slice.1; let columns = if let ParallelStrategy::Columns = parallel { POOL.install(|| { projection .par_iter() .map(|column_i| { - column_idx_to_series(*column_i, md, projection_height, schema, store) + column_idx_to_series( + *column_i, + md, + idx_to_series_projection_height, + schema, + store, + ) + .map(|s| s.slice(rg_slice.0 as i64, rg_slice.1)) }) .collect::>>() })? @@ -249,33 +264,34 @@ fn rg_to_dfs_optionally_par_over_columns( projection .iter() .map(|column_i| { - column_idx_to_series(*column_i, md, projection_height, schema, store) + column_idx_to_series( + *column_i, + md, + idx_to_series_projection_height, + schema, + store, + ) + .map(|s| s.slice(rg_slice.0 as i64, rg_slice.1)) }) .collect::>>()? }; - *remaining_rows -= projection_height; - let mut df = unsafe { DataFrame::new_no_checks(columns) }; if let Some(rc) = &row_index { df.with_row_index_mut(&rc.name, Some(*previous_row_count + rc.offset)); } - materialize_hive_partitions( - &mut df, - schema.as_ref(), - hive_partition_columns, - projection_height, - ); + materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1); apply_predicate(&mut df, predicate, true)?; *previous_row_count += current_row_count; dfs.push(df); - if *remaining_rows == 0 { + if *previous_row_count as usize >= slice_end { break; } } + Ok(dfs) } @@ -286,7 +302,7 @@ fn rg_to_dfs_par_over_rg( row_group_start: usize, row_group_end: usize, previous_row_count: &mut IdxSize, - remaining_rows: &mut usize, + slice: (usize, usize), file_metadata: &FileMetaData, schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, @@ -296,28 +312,32 @@ fn rg_to_dfs_par_over_rg( hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { // compute the limits per row group and the row count offsets - let row_groups = file_metadata - .row_groups - .iter() - .enumerate() - .skip(row_group_start) - .take(row_group_end - row_group_start) - .map(|(rg_idx, rg_md)| { - let row_count_start = *previous_row_count; - let num_rows = rg_md.num_rows(); - *previous_row_count += num_rows as IdxSize; - let projection_height = (*remaining_rows).min(num_rows); - *remaining_rows -= projection_height; - - (rg_idx, rg_md, projection_height, row_count_start) - }) - .collect::>(); + let mut row_groups = Vec::with_capacity(row_group_end - row_group_start); + + let mut n_rows_processed: usize = (0..row_group_start) + .map(|i| file_metadata.row_groups[i].num_rows()) + .sum(); + let slice_end = slice.0 + slice.1; + + for i in row_group_start..row_group_end { + let row_count_start = *previous_row_count; + let rg_md = &file_metadata.row_groups[i]; + let rg_slice = + split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end); + *previous_row_count += rg_slice.1 as IdxSize; + + if rg_slice.1 == 0 { + continue; + } + + row_groups.push((i, rg_md, rg_slice, row_count_start)); + } let dfs = POOL.install(|| { row_groups .into_par_iter() - .map(|(rg_idx, md, projection_height, row_count_start)| { - if projection_height == 0 + .map(|(rg_idx, md, slice, row_count_start)| { + if slice.1 == 0 || use_statistics && !read_this_row_group( predicate, @@ -336,7 +356,8 @@ fn rg_to_dfs_par_over_rg( let columns = projection .iter() .map(|column_i| { - column_idx_to_series(*column_i, md, projection_height, schema, store) + column_idx_to_series(*column_i, md, slice.0 + slice.1, schema, store) + .map(|x| x.slice(slice.0 as i64, slice.1)) }) .collect::>>()?; @@ -350,7 +371,7 @@ fn rg_to_dfs_par_over_rg( &mut df, schema.as_ref(), hive_partition_columns, - projection_height, + slice.1, ); apply_predicate(&mut df, predicate, false)?; @@ -364,7 +385,7 @@ fn rg_to_dfs_par_over_rg( #[allow(clippy::too_many_arguments)] pub fn read_parquet( mut reader: R, - mut limit: usize, + slice: (usize, usize), projection: Option<&[usize]>, reader_schema: &ArrowSchemaRef, metadata: Option, @@ -375,7 +396,7 @@ pub fn read_parquet( hive_partition_columns: Option<&[Series]>, ) -> PolarsResult { // Fast path. - if limit == 0 { + if slice.1 == 0 { return Ok(materialize_empty_df( projection, reader_schema, @@ -430,7 +451,7 @@ pub fn read_parquet( &mut 0, 0, n_row_groups, - &mut limit, + slice, &file_metadata, reader_schema, predicate, @@ -511,28 +532,51 @@ impl RowGroupFetcher { pub(super) fn compute_row_group_range( row_group_start: usize, row_group_end: usize, - limit: usize, + slice: (usize, usize), row_groups: &[RowGroupMetaData], -) -> usize { - let mut row_group_end_truncated = row_group_start; - let mut acc_row_count = 0; +) -> std::ops::Range { + let mut start = row_group_start; + let mut cum_rows: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum(); + let row_group_end = row_groups.len().min(row_group_end); - #[allow(clippy::needless_range_loop)] - for rg_i in row_group_start..(std::cmp::min(row_group_end, row_groups.len())) { - if acc_row_count >= limit { + loop { + if start == row_group_end { break; } - row_group_end_truncated = rg_i + 1; - acc_row_count += row_groups[rg_i].num_rows(); + + cum_rows += row_groups[start].num_rows(); + + if cum_rows >= slice.0 { + break; + } + + start += 1; + } + + let slice_end = slice.0 + slice.1; + let mut end = (1 + start).min(row_group_end); + + loop { + if end == row_group_end { + break; + } + + if cum_rows >= slice_end { + break; + } + + cum_rows += row_groups[end].num_rows(); + end += 1; } - row_group_end_truncated + + start..end } pub struct BatchedParquetReader { // use to keep ownership #[allow(dead_code)] row_group_fetcher: RowGroupFetcher, - limit: usize, + slice: (usize, usize), projection: Arc<[usize]>, schema: ArrowSchemaRef, metadata: FileMetaDataRef, @@ -557,7 +601,7 @@ impl BatchedParquetReader { row_group_fetcher: RowGroupFetcher, metadata: FileMetaDataRef, schema: ArrowSchemaRef, - limit: usize, + slice: (usize, usize), projection: Option>, predicate: Option>, row_index: Option, @@ -589,7 +633,7 @@ impl BatchedParquetReader { Ok(BatchedParquetReader { row_group_fetcher, - limit, + slice, projection, schema, metadata, @@ -609,10 +653,6 @@ impl BatchedParquetReader { }) } - pub fn limit_reached(&self) -> bool { - self.limit == 0 - } - pub fn schema(&self) -> &ArrowSchemaRef { &self.schema } @@ -626,7 +666,7 @@ impl BatchedParquetReader { } pub async fn next_batches(&mut self, n: usize) -> PolarsResult>> { - if self.limit == 0 && self.has_returned { + if self.rows_read as usize == self.slice.0 + self.slice.1 && self.has_returned { return if self.chunks_fifo.is_empty() { Ok(None) } else { @@ -638,28 +678,30 @@ impl BatchedParquetReader { let mut skipped_all_rgs = false; // fill up fifo stack - if self.row_group_offset < self.n_row_groups && self.chunks_fifo.len() < n { + if (self.rows_read as usize) < self.slice.0 + self.slice.1 + && self.row_group_offset < self.n_row_groups + && self.chunks_fifo.len() < n + { // Ensure we apply the limit on the metadata, before we download the row-groups. - let row_group_start = self.row_group_offset; - let row_group_end = compute_row_group_range( - row_group_start, - row_group_start + n, - self.limit, + let row_group_range = compute_row_group_range( + self.row_group_offset, + self.row_group_offset + n, + self.slice, &self.metadata.row_groups, ); let store = self .row_group_fetcher - .fetch_row_groups(row_group_start..row_group_end) + .fetch_row_groups(row_group_range.clone()) .await?; let mut dfs = match store { ColumnStore::Local(_) => rg_to_dfs( &store, &mut self.rows_read, - row_group_start, - row_group_end, - &mut self.limit, + row_group_range.start, + row_group_range.end, + self.slice, &self.metadata, &self.schema, self.predicate.as_deref(), @@ -680,7 +722,6 @@ impl BatchedParquetReader { // Make everything 'static. let mut rows_read = self.rows_read; - let mut limit = self.limit; let row_index = self.row_index.clone(); let predicate = self.predicate.clone(); let schema = self.schema.clone(); @@ -689,14 +730,15 @@ impl BatchedParquetReader { let projection = self.projection.clone(); let use_statistics = self.use_statistics; let hive_partition_columns = self.hive_partition_columns.clone(); + let slice = self.slice; let f = move || { let dfs = rg_to_dfs( &store, &mut rows_read, - row_group_start, - row_group_end, - &mut limit, + row_group_range.start, + row_group_range.end, + slice, &metadata, &schema, predicate.as_deref(), @@ -708,7 +750,7 @@ impl BatchedParquetReader { ); // Don't unwrap send attempt - async task could be cancelled. - let _ = tx.send((dfs, rows_read, limit)); + let _ = tx.send((dfs, rows_read)); }; // Spawn the task and wait on it asynchronously. @@ -721,10 +763,8 @@ impl BatchedParquetReader { POOL.spawn(f); }; - let (dfs, rows_read, limit) = rx.await.unwrap(); - + let (dfs, rows_read) = rx.await.unwrap(); self.rows_read = rows_read; - self.limit = limit; dfs }, }?; diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 8c810c868e0f..965012997014 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -12,7 +12,7 @@ use super::async_impl::FetchRowGroupsFromObjectStore; #[cfg(feature = "cloud")] use super::async_impl::ParquetObjectStore; pub use super::read_impl::BatchedParquetReader; -use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader}; +use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader}; #[cfg(feature = "cloud")] use super::utils::materialize_empty_df; #[cfg(feature = "cloud")] @@ -28,7 +28,7 @@ use crate::RowIndex; pub struct ParquetReader { reader: R, rechunk: bool, - n_rows: Option, + slice: (usize, usize), columns: Option>, projection: Option>, parallel: ParallelStrategy, @@ -56,9 +56,8 @@ impl ParquetReader { self } - /// Stop reading at `num_rows` rows. - pub fn with_n_rows(mut self, num_rows: Option) -> Self { - self.n_rows = num_rows; + pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self { + self.slice = slice.unwrap_or((0, usize::MAX)); self } @@ -166,7 +165,7 @@ impl ParquetReader { row_group_fetcher, metadata, schema, - self.n_rows.unwrap_or(usize::MAX), + self.slice, self.projection, self.predicate.clone(), self.row_index, @@ -185,7 +184,7 @@ impl SerReader for ParquetReader { ParquetReader { reader, rechunk: false, - n_rows: None, + slice: (0, usize::MAX), columns: None, projection: None, parallel: Default::default(), @@ -216,7 +215,7 @@ impl SerReader for ParquetReader { let mut df = read_parquet( self.reader, - self.n_rows.unwrap_or(usize::MAX), + self.slice, self.projection.as_deref(), &schema, Some(metadata), @@ -253,7 +252,7 @@ impl SerReader for ParquetReader { #[cfg(feature = "cloud")] pub struct ParquetAsyncReader { reader: ParquetObjectStore, - n_rows: Option, + slice: (usize, usize), rechunk: bool, projection: Option>, predicate: Option>, @@ -275,7 +274,7 @@ impl ParquetAsyncReader { Ok(ParquetAsyncReader { reader: ParquetObjectStore::from_uri(uri, cloud_options, metadata).await?, rechunk: false, - n_rows: None, + slice: (0, usize::MAX), projection: None, row_index: None, predicate: None, @@ -322,8 +321,8 @@ impl ParquetAsyncReader { self.reader.num_rows().await } - pub fn with_n_rows(mut self, n_rows: Option) -> Self { - self.n_rows = n_rows; + pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self { + self.slice = slice.unwrap_or((0, usize::MAX)); self } @@ -384,15 +383,20 @@ impl ParquetAsyncReader { schema.clone(), self.projection.as_deref(), self.predicate.clone(), + compute_row_group_range( + 0, + metadata.row_groups.len(), + self.slice, + &metadata.row_groups, + ), &metadata.row_groups, - self.n_rows.unwrap_or(usize::MAX), )? .into(); BatchedParquetReader::new( row_group_fetcher, metadata, schema, - self.n_rows.unwrap_or(usize::MAX), + self.slice, self.projection, self.predicate.clone(), self.row_index, diff --git a/crates/polars-io/src/utils/mod.rs b/crates/polars-io/src/utils/mod.rs index a940870c9270..5ed22c76561c 100644 --- a/crates/polars-io/src/utils/mod.rs +++ b/crates/polars-io/src/utils/mod.rs @@ -3,6 +3,7 @@ mod other; pub use compression::is_compressed; pub use other::*; +pub mod slice; pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS .add(b'/') diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index 9ab9772d955e..4df6a6910997 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -88,30 +88,6 @@ pub unsafe fn maybe_decompress_bytes<'a>( } } -/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read -/// concurrently/parallel -/// -/// This takes an iterator over the number of rows per file. -pub fn get_sequential_row_statistics( - iter: I, - mut total_rows_to_read: usize, -) -> Vec<(usize, usize)> -where - I: Iterator, -{ - let mut cumulative_read = 0; - iter.map(|rows_this_file| { - let remaining_rows_to_read = total_rows_to_read; - total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file); - - let current_cumulative_read = cumulative_read; - cumulative_read += rows_this_file; - - (remaining_rows_to_read, current_cumulative_read) - }) - .collect() -} - #[cfg(any( feature = "ipc", feature = "ipc_streaming", diff --git a/crates/polars-io/src/utils/slice.rs b/crates/polars-io/src/utils/slice.rs new file mode 100644 index 000000000000..78ff29cf1b29 --- /dev/null +++ b/crates/polars-io/src/utils/slice.rs @@ -0,0 +1,33 @@ +/// Given a `slice` that is relative to the start of a list of files, calculate the slice to apply +/// at a file with a row offset of `current_row_offset`. +pub fn split_slice_at_file( + current_row_offset: &mut usize, + n_rows_this_file: usize, + global_slice_start: usize, + global_slice_end: usize, +) -> (usize, usize) { + let next_file_offset = *current_row_offset + n_rows_this_file; + // e.g. + // slice: (start: 1, end: 2) + // files: + // 0: (1 row): current_offset: 0, next_file_offset: 1 + // 1: (1 row): current_offset: 1, next_file_offset: 2 + // 2: (1 row): current_offset: 2, next_file_offset: 3 + // in this example we want to include only file 1. + let has_overlap_with_slice = + *current_row_offset < global_slice_end && next_file_offset > global_slice_start; + + let (rel_start, slice_len) = if !has_overlap_with_slice { + (0, 0) + } else { + let n_rows_to_skip = global_slice_start.saturating_sub(*current_row_offset); + let n_excess_rows = next_file_offset.saturating_sub(global_slice_end); + ( + n_rows_to_skip, + n_rows_this_file - n_rows_to_skip - n_excess_rows, + ) + }; + + *current_row_offset = next_file_offset; + (rel_start, slice_len) +} diff --git a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs index 7dbce5e69ee6..a014d5f70e5c 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs @@ -234,7 +234,11 @@ pub(crate) fn insert_streaming_nodes( ) } }, - Scan { scan_type, .. } if scan_type.streamable() => { + Scan { + scan_type, + file_options: FileScanOptions { slice, .. }, + .. + } if scan_type.streamable() && slice.map(|slice| slice.0 >= 0).unwrap_or(true) => { if state.streamable { state.sources.push(root); pipeline_trees[current_idx].push(state) diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index fcc110248607..ac5dd21b0988 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -120,7 +120,7 @@ impl LazyFileListReader for LazyJsonLineReader { let paths = Arc::new(Mutex::new((self.paths, false))); let file_options = FileScanOptions { - n_rows: self.n_rows, + slice: self.n_rows.map(|x| (0, x)), with_columns: None, cache: false, row_index: self.row_index, diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index ff8a93f0112b..8c3f6e5334b2 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -399,7 +399,7 @@ fn test_scan_parquet_limit_9001() { let sliced = options.slice.unwrap(); sliced.1 == 3 }, - IR::Scan { file_options, .. } => file_options.n_rows == Some(3), + IR::Scan { file_options, .. } => file_options.slice == Some((0, 3)), _ => true, }); } diff --git a/crates/polars-lazy/src/tests/optimization_checks.rs b/crates/polars-lazy/src/tests/optimization_checks.rs index 58918cb75624..2ed1205241bc 100644 --- a/crates/polars-lazy/src/tests/optimization_checks.rs +++ b/crates/polars-lazy/src/tests/optimization_checks.rs @@ -93,7 +93,7 @@ fn slice_at_scan(q: LazyFrame) -> bool { (&lp_arena).iter(lp).any(|(_, lp)| { use IR::*; match lp { - Scan { file_options, .. } => file_options.n_rows.is_some(), + Scan { file_options, .. } => file_options.slice.is_some(), _ => false, } }) diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 4f9dd90c3376..0f587cb0b322 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -25,7 +25,10 @@ impl CsvExec { // Interpret selecting no columns as selecting all columns. .filter(|columns| !columns.is_empty()); - let n_rows = _set_n_rows_for_scan(self.file_options.n_rows); + let n_rows = _set_n_rows_for_scan(self.file_options.slice.map(|x| { + assert_eq!(x.0, 0); + x.1 + })); let predicate = self.predicate.clone().map(phys_expr_to_io_expr); let options_base = self .options diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index 66cde754e948..c3f25b1cbd62 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -57,7 +57,10 @@ impl IpcExec { if config::verbose() { eprintln!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}", self.file_options.row_index.as_ref(), - self.file_options.n_rows.as_ref(), + self.file_options.slice.map(|x| { + assert_eq!(x.0, 0); + x.1 + }).as_ref(), self.predicate.is_some(), self.paths ); @@ -94,7 +97,10 @@ impl IpcExec { .finish() }; - let mut dfs = if let Some(mut n_rows) = self.file_options.n_rows { + let mut dfs = if let Some(mut n_rows) = self.file_options.slice.map(|x| { + assert_eq!(x.0, 0); + x.1 + }) { let mut out = Vec::with_capacity(self.paths.len()); for i in 0..self.paths.len() { diff --git a/crates/polars-mem-engine/src/executors/scan/mod.rs b/crates/polars-mem-engine/src/executors/scan/mod.rs index 8dbb9750de7f..ddc1f1b4e6e1 100644 --- a/crates/polars-mem-engine/src/executors/scan/mod.rs +++ b/crates/polars-mem-engine/src/executors/scan/mod.rs @@ -110,7 +110,10 @@ pub(crate) struct AnonymousScanExec { impl Executor for AnonymousScanExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let mut args = AnonymousScanArgs { - n_rows: self.file_options.n_rows, + n_rows: self.file_options.slice.map(|x| { + assert_eq!(x.0, 0); + x.1 + }), with_columns: self.file_options.with_columns.clone(), schema: self.file_info.schema.clone(), output_schema: self.output_schema.clone(), diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 23fb6511ad8a..ba4aed8c7af6 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -47,7 +47,10 @@ impl JsonExec { eprintln!("ASYNC READING FORCED"); } - let mut n_rows = self.file_scan_options.n_rows; + let mut n_rows = self.file_scan_options.slice.map(|x| { + assert_eq!(x.0, 0); + x.1 + }); // Avoid panicking if n_rows == Some(0) { diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 7465f297fbd5..78dc23a9016d 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -8,6 +8,7 @@ use polars_core::utils::accumulate_dataframes_vertical; use polars_io::cloud::CloudOptions; use polars_io::parquet::metadata::FileMetaDataRef; use polars_io::path_utils::is_cloud_url; +use polars_io::utils::slice::split_slice_at_file; use polars_io::RowIndex; use super::*; @@ -59,17 +60,60 @@ impl ParquetExec { let mut result = vec![]; - let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); - let mut base_row_index = self.file_options.row_index.take(); - // Limit no. of files at a time to prevent open file limits. let step = std::cmp::min(POOL.current_num_threads(), 128); + // Modified if we have a negative slice + let mut first_file = 0; + + // (offset, end) + let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice { + if slice.0 >= 0 { + (slice.0 as usize, slice.1.saturating_add(slice.0 as usize)) + } else { + let n_from_end = -slice.0 as usize; + let mut cum_rows = 0; + POOL.install(|| { + for path_indexes in (0..self.paths.len()).rev().collect::>().chunks(8) { + let row_counts = path_indexes + .into_par_iter() + .map(|i| { + ParquetReader::new(std::fs::File::open(&self.paths[*i])?).num_rows() + }) + .collect::>>()?; + + for (path_idx, rc) in path_indexes.iter().zip(row_counts) { + cum_rows += rc; + + if cum_rows >= n_from_end { + first_file = *path_idx; + break; + } + } + + if first_file > 0 { + break; + } + } - for i in (0..self.paths.len()).step_by(step) { + PolarsResult::Ok(()) + })?; + + let start = cum_rows - n_from_end; + (start, start + slice.1) + } + } else { + (0, usize::MAX) + }; + + let mut current_offset = 0; + let base_row_index = self.file_options.row_index.take(); + // Limit no. of files at a time to prevent open file limits. + + for i in (first_file..self.paths.len()).step_by(step) { let end = std::cmp::min(i.saturating_add(step), self.paths.len()); let paths = &self.paths[i..end]; let hive_parts = self.hive_parts.as_ref().map(|x| &x[i..end]); - if remaining_rows_to_read == 0 && !result.is_empty() { + if current_offset >= slice_end && !result.is_empty() { return Ok(result); } @@ -110,38 +154,32 @@ impl ParquetExec { // We do this in parallel because wide tables can take a long time deserializing metadata. let readers_and_metadata = POOL.install(|| iter.collect::>>())?; - let iter = readers_and_metadata + let current_offset_ref = &mut current_offset; + let row_statistics = readers_and_metadata .iter() - .map(|(_, num_rows, _, _)| *num_rows); - - let rows_statistics = get_sequential_row_statistics(iter, remaining_rows_to_read); + .map(|(_, num_rows, _, _)| { + let cum_rows = *current_offset_ref; + ( + cum_rows, + split_slice_at_file(current_offset_ref, *num_rows, slice_offset, slice_end), + ) + }) + .collect::>(); let out = POOL.install(|| { readers_and_metadata .into_par_iter() - .zip(rows_statistics.into_par_iter()) + .zip(row_statistics.into_par_iter()) .enumerate() .map( - |( - i, - ( - (reader, num_rows_this_file, predicate, projection), - (remaining_rows_to_read, cumulative_read), - ), - )| { - let remaining_rows_to_read = - if num_rows_this_file < remaining_rows_to_read { - None - } else { - Some(remaining_rows_to_read) - }; + |(i, ((reader, _, predicate, projection), (cumulative_read, slice)))| { let row_index = base_row_index.as_ref().map(|rc| RowIndex { name: rc.name.clone(), offset: rc.offset + cumulative_read as IdxSize, }); let mut df = reader - .with_n_rows(remaining_rows_to_read) + .with_slice(Some(slice)) .with_row_index(row_index) .with_predicate(predicate.clone()) .with_projection(projection.clone()) @@ -162,11 +200,7 @@ impl ParquetExec { StringChunked::full( col, path, - std::cmp::max( - df.height(), - remaining_rows_to_read - .unwrap_or(num_rows_this_file), - ), + std::cmp::max(df.height(), slice.1), ) .into_series(), ) @@ -179,11 +213,6 @@ impl ParquetExec { .collect::>>() })?; - let n_read = out.iter().map(|df| df.height()).sum(); - remaining_rows_to_read = remaining_rows_to_read.saturating_sub(n_read); - if let Some(rc) = &mut base_row_index { - rc.offset += n_read as IdxSize; - } if result.is_empty() { result = out; } else { @@ -195,6 +224,10 @@ impl ParquetExec { #[cfg(feature = "cloud")] async fn read_async(&mut self) -> PolarsResult> { + use futures::{stream, StreamExt}; + use polars_io::pl_async; + use polars_io::utils::slice::split_slice_at_file; + let verbose = verbose(); let first_metadata = &self.metadata; let cloud_options = self.cloud_options.as_ref(); @@ -206,16 +239,71 @@ impl ParquetExec { eprintln!("POLARS PREFETCH_SIZE: {}", batch_size) } - let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); - let mut base_row_index = self.file_options.row_index.take(); + // Modified if we have a negative slice + let mut first_file_idx = 0; + + // (offset, end) + let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice { + if slice.0 >= 0 { + (slice.0 as usize, slice.1.saturating_add(slice.0 as usize)) + } else { + let n_from_end = -slice.0 as usize; + let mut cum_rows = 0; + + let paths = &self.paths; + let cloud_options = Arc::new(self.cloud_options.clone()); + + let paths = paths.clone(); + let cloud_options = cloud_options.clone(); + + let mut iter = stream::iter((0..self.paths.len()).rev().map(|i| { + let paths = paths.clone(); + let cloud_options = cloud_options.clone(); + + pl_async::get_runtime().spawn(async move { + PolarsResult::Ok(( + i, + ParquetAsyncReader::from_uri( + paths[i].to_str().unwrap(), + cloud_options.as_ref().as_ref(), + None, + ) + .await? + .num_rows() + .await?, + )) + }) + })) + .buffered(8); + + while let Some(v) = iter.next().await { + let (path_idx, num_rows) = v.unwrap()?; + + cum_rows += num_rows; + + if cum_rows >= n_from_end { + first_file_idx = path_idx; + break; + } + } + + let start = cum_rows - n_from_end; + (start, start + slice.1) + } + } else { + (0, usize::MAX) + }; + + let mut current_offset = 0; + let base_row_index = self.file_options.row_index.take(); let mut processed = 0; - for batch_start in (0..self.paths.len()).step_by(batch_size) { + for batch_start in (first_file_idx..self.paths.len()).step_by(batch_size) { let end = std::cmp::min(batch_start.saturating_add(batch_size), self.paths.len()); let paths = &self.paths[batch_start..end]; let hive_parts = self.hive_parts.as_ref().map(|x| &x[batch_start..end]); - if remaining_rows_to_read == 0 && !result.is_empty() { + if current_offset >= slice_end && !result.is_empty() { return Ok(result); } processed += paths.len(); @@ -245,14 +333,20 @@ impl ParquetExec { }); let readers_and_metadata = futures::future::try_join_all(iter).await?; + let current_offset_ref = &mut current_offset; + // Then compute `n_rows` to be taken per file up front, so we can actually read concurrently // after this. - let iter = readers_and_metadata + let row_statistics = readers_and_metadata .iter() - .map(|(num_rows, _)| num_rows) - .copied(); - - let rows_statistics = get_sequential_row_statistics(iter, remaining_rows_to_read); + .map(|(num_rows, _)| { + let cum_rows = *current_offset_ref; + ( + cum_rows, + split_slice_at_file(current_offset_ref, *num_rows, slice_offset, slice_end), + ) + }) + .collect::>(); // Now read the actual data. let file_info = &self.file_info; @@ -266,9 +360,11 @@ impl ParquetExec { eprintln!("reading of {}/{} file...", processed, self.paths.len()); } - let iter = readers_and_metadata.into_iter().enumerate().map( - |(i, (num_rows_this_file, reader))| { - let (remaining_rows_to_read, cumulative_read) = &rows_statistics[i]; + let iter = readers_and_metadata + .into_iter() + .enumerate() + .map(|(i, (_, reader))| { + let (cumulative_read, slice) = row_statistics[i]; let hive_partitions = hive_parts .as_ref() .map(|x| x[i].materialize_partition_columns()); @@ -284,16 +380,9 @@ impl ParquetExec { async move { let file_info = file_info.clone(); - let remaining_rows_to_read = *remaining_rows_to_read; - let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read - { - None - } else { - Some(remaining_rows_to_read) - }; let row_index = base_row_index_ref.as_ref().map(|rc| RowIndex { name: rc.name.clone(), - offset: rc.offset + *cumulative_read as IdxSize, + offset: rc.offset + cumulative_read as IdxSize, }); let (projection, predicate) = prepare_scan_args( @@ -305,7 +394,7 @@ impl ParquetExec { ); let df = reader - .with_n_rows(remaining_rows_to_read) + .with_slice(Some(slice)) .with_row_index(row_index) .with_projection(projection) .check_schema(schema.as_ref()) @@ -323,15 +412,9 @@ impl ParquetExec { PolarsResult::Ok(df) } - }, - ); + }); let dfs = futures::future::try_join_all(iter).await?; - let n_read = dfs.iter().map(|df| df.height()).sum(); - remaining_rows_to_read = remaining_rows_to_read.saturating_sub(n_read); - if let Some(rc) = &mut base_row_index { - rc.offset += n_read as IdxSize; - } result.extend(dfs.into_iter()) } diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 643aa2f4c995..bc52c17a796d 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -284,7 +284,12 @@ fn create_physical_plan_impl( predicate, mut file_options, } => { - file_options.n_rows = _set_n_rows_for_scan(file_options.n_rows); + file_options.slice = if let Some((off, len)) = file_options.slice { + Some((off, _set_n_rows_for_scan(Some(len)).unwrap())) + } else { + _set_n_rows_for_scan(None).map(|x| (0, x)) + }; + let mut state = ExpressionConversionState::new(true, state.expr_depth); let predicate = predicate .map(|pred| { diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 70eb702f7b12..92e994066b8d 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -38,8 +38,13 @@ impl CsvSource { fn init_next_reader(&mut self) -> PolarsResult<()> { let file_options = self.file_options.clone(); + let n_rows = file_options.slice.map(|x| { + assert_eq!(x.0, 0); + x.1 + }); + if self.current_path_idx == self.paths.len() - || (file_options.n_rows.is_some() && file_options.n_rows.unwrap() <= self.n_rows_read) + || (n_rows.is_some() && n_rows.unwrap() <= self.n_rows_read) { return Ok(()); } @@ -72,7 +77,11 @@ impl CsvSource { }; let n_rows = _set_n_rows_for_scan( file_options - .n_rows + .slice + .map(|x| { + assert_eq!(x.0, 0); + x.1 + }) .map(|n| n.saturating_sub(self.n_rows_read)), ); let row_index = file_options.row_index.map(|mut ri| { diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 267542e82826..bec92827bfed 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -16,6 +16,7 @@ use polars_io::predicates::PhysicalIoExpr; use polars_io::prelude::materialize_projection; #[cfg(feature = "async")] use polars_io::prelude::ParquetAsyncReader; +use polars_io::utils::slice::split_slice_at_file; use polars_io::SerReader; use polars_plan::plans::FileInfo; use polars_plan::prelude::hive::HivePartitions; @@ -31,6 +32,7 @@ pub struct ParquetSource { batched_readers: VecDeque, n_threads: usize, processed_paths: usize, + processed_rows: usize, iter: Range, paths: Arc<[PathBuf]>, options: ParquetOptions, @@ -111,13 +113,19 @@ impl ParquetSource { let Some(index) = self.iter.next() else { return Ok(()); }; + if let Some(slice) = self.file_options.slice { + if self.processed_rows >= slice.0 as usize + slice.1 { + return Ok(()); + } + } + let predicate = self.predicate.clone(); let (path, options, file_options, projection, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; let batched_reader = { let file = std::fs::File::open(path).unwrap(); - ParquetReader::new(file) + let mut reader = ParquetReader::new(file) .with_projection(projection) .check_schema( self.file_info @@ -127,7 +135,6 @@ impl ParquetSource { .as_ref() .unwrap_left(), )? - .with_n_rows(file_options.n_rows) .with_row_index(file_options.row_index) .with_predicate(predicate.clone()) .use_statistics(options.use_statistics) @@ -137,8 +144,25 @@ impl ParquetSource { .include_file_paths .as_ref() .map(|x| (x.clone(), Arc::from(path.to_str().unwrap()))), + ); + + let n_rows_this_file = reader.num_rows().unwrap(); + + let slice = file_options.slice.map(|slice| { + assert!(slice.0 >= 0); + let slice_start = slice.0 as usize; + let slice_end = slice_start + slice.1; + split_slice_at_file( + &mut self.processed_rows.clone(), + n_rows_this_file, + slice_start, + slice_end, ) - .batched(chunk_size)? + }); + + self.processed_rows += n_rows_this_file; + reader = reader.with_slice(slice); + reader.batched(chunk_size)? }; self.finish_init_reader(batched_reader)?; Ok(()) @@ -158,11 +182,12 @@ impl ParquetSource { let (path, options, file_options, projection, chunk_size, hive_partitions) = self.prepare_init_reader(index)?; + assert_eq!(file_options.slice, None); + let batched_reader = { let uri = path.to_string_lossy(); ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) .await? - .with_n_rows(file_options.n_rows) .with_row_index(file_options.row_index) .with_projection(projection) .check_schema( @@ -216,6 +241,7 @@ impl ParquetSource { batched_readers: VecDeque::new(), n_threads, processed_paths: 0, + processed_rows: 0, options, file_options, iter, @@ -243,37 +269,35 @@ impl ParquetSource { // // It is important we do this for a reasonable batch size, that's why we start this when we // have just 2 readers left. - if self.batched_readers.len() <= 2 && self.file_options.n_rows.is_none() - || self.batched_readers.is_empty() + if self.file_options.slice.is_none() + && self.run_async + && (self.batched_readers.len() <= 2 || self.batched_readers.is_empty()) { - let range = 0..self.prefetch_size - self.batched_readers.len(); - - if self.run_async { - #[cfg(not(feature = "async"))] - panic!("activate 'async' feature"); - - #[cfg(feature = "async")] - { - let range = range - .zip(&mut self.iter) - .map(|(_, index)| index) - .collect::>(); - let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); - - let batched_readers = polars_io::pl_async::get_runtime() - .block_on_potential_spawn(async { - futures::future::try_join_all(init_iter).await - })?; - - for r in batched_readers { - self.finish_init_reader(r)?; - } - } - } else { - for _ in 0..self.prefetch_size - self.batched_readers.len() { - self.init_next_reader()? + #[cfg(not(feature = "async"))] + panic!("activate 'async' feature"); + + #[cfg(feature = "async")] + { + let range = 0..self.prefetch_size - self.batched_readers.len(); + let range = range + .zip(&mut self.iter) + .map(|(_, index)| index) + .collect::>(); + let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); + + let batched_readers = + polars_io::pl_async::get_runtime().block_on_potential_spawn(async { + futures::future::try_join_all(init_iter).await + })?; + + for r in batched_readers { + self.finish_init_reader(r)?; } } + } else { + for _ in 0..self.prefetch_size - self.batched_readers.len() { + self.init_next_reader()? + } } Ok(()) } @@ -293,10 +317,6 @@ impl Source for ParquetSource { Ok(match batches { None => { - if reader.limit_reached() { - return Ok(SourceResult::Finished); - } - // reset the reader self.init_next_reader()?; return self.get_batches(_context); @@ -306,16 +326,9 @@ impl Source for ParquetSource { let out = batches .into_iter() .enumerate_u32() - .map(|(i, data)| { - // Keep the row limit updated so the next reader will have a correct limit. - if let Some(n_rows) = &mut self.file_options.n_rows { - *n_rows = n_rows.saturating_sub(data.height()) - } - - DataChunk { - chunk_index: (idx_offset + i) as IdxSize, - data, - } + .map(|(i, data)| DataChunk { + chunk_index: (idx_offset + i) as IdxSize, + data, }) .collect::>(); get_source_index(out.len() as u32); diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index 9981d021148a..4bb111c77652 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -44,7 +44,7 @@ impl DslBuilder { let file_info = FileInfo::new(schema.clone(), None, (n_rows, n_rows.unwrap_or(usize::MAX))); let file_options = FileScanOptions { - n_rows, + slice: n_rows.map(|x| (0, x)), with_columns: None, cache: false, row_index: None, @@ -97,7 +97,7 @@ impl DslBuilder { let options = FileScanOptions { with_columns: None, cache, - n_rows, + slice: n_rows.map(|x| (0, x)), rechunk, row_index, file_counter: Default::default(), @@ -146,7 +146,7 @@ impl DslBuilder { file_options: FileScanOptions { with_columns: None, cache, - n_rows, + slice: n_rows.map(|x| (0, x)), rechunk, row_index, file_counter: Default::default(), @@ -182,7 +182,7 @@ impl DslBuilder { let options = FileScanOptions { with_columns: None, cache, - n_rows: read_options_clone.n_rows, + slice: read_options_clone.n_rows.map(|x| (0, x)), rechunk: read_options_clone.rechunk, row_index: read_options_clone.row_index, file_counter: Default::default(), diff --git a/crates/polars-plan/src/plans/ir/format.rs b/crates/polars-plan/src/plans/ir/format.rs index 2ad3050c3d0a..5c685284e858 100644 --- a/crates/polars-plan/src/plans/ir/format.rs +++ b/crates/polars-plan/src/plans/ir/format.rs @@ -61,7 +61,7 @@ fn write_scan( n_columns: i64, total_columns: usize, predicate: &Option>, - n_rows: Option, + slice: Option<(i64, usize)>, row_index: Option<&RowIndex>, ) -> fmt::Result { write!(f, "{:indent$}{name} SCAN {}", "", PathsDisplay(path))?; @@ -79,8 +79,8 @@ fn write_scan( if let Some(predicate) = predicate { write!(f, "\n{:indent$}SELECTION: {predicate}", "")?; } - if let Some(n_rows) = n_rows { - write!(f, "\n{:indent$}N_ROWS: {n_rows}", "")?; + if let Some(slice) = slice { + write!(f, "\n{:indent$}SLICE: {slice:?}", "")?; } if let Some(row_index) = row_index { write!(f, "\n{:indent$}ROW_INDEX: {}", "", row_index.name)?; @@ -176,7 +176,7 @@ impl<'a> IRDisplay<'a> { n_columns, total_columns, &predicate, - options.n_rows, + options.n_rows.map(|x| (0, x)), None, ) }, @@ -244,7 +244,7 @@ impl<'a> IRDisplay<'a> { n_columns, file_info.schema.len(), &predicate, - file_options.n_rows, + file_options.slice, file_options.row_index.as_ref(), ) }, diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index b410954fe551..c065b7e3a7cf 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -409,7 +409,7 @@ impl<'a> PredicatePushDown<'a> { let mut do_optimization = match &scan_type { #[cfg(feature = "csv")] - FileScan::Csv { .. } => options.n_rows.is_none(), + FileScan::Csv { .. } => options.slice.is_none(), FileScan::Anonymous { function, .. } => function.allows_predicate_pushdown(), #[cfg(feature = "json")] FileScan::NDJson { .. } => true, diff --git a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs index 949b4288af08..34ab66e9499c 100644 --- a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs @@ -173,7 +173,7 @@ impl SlicePushDown { predicate, scan_type: FileScan::Csv { options, cloud_options }, }, Some(state)) if predicate.is_none() && state.offset >= 0 => { - file_options.n_rows = Some(state.offset as usize + state.len as usize); + file_options.slice = Some((0, state.offset as usize + state.len as usize)); let lp = Scan { paths, @@ -187,6 +187,30 @@ impl SlicePushDown { self.no_pushdown_finish_opt(lp, Some(state), lp_arena) }, + #[cfg(feature = "parquet")] + (Scan { + paths, + file_info, + hive_parts, + output_schema, + mut file_options, + predicate, + scan_type: scan_type @ FileScan::Parquet { .. }, + }, Some(state)) if predicate.is_none() => { + file_options.slice = Some((state.offset, state.len as usize)); + + let lp = Scan { + paths, + file_info, + hive_parts, + output_schema, + scan_type, + file_options, + predicate, + }; + + Ok(lp) + }, // TODO! we currently skip slice pushdown if there is a predicate. (Scan { paths, @@ -197,7 +221,8 @@ impl SlicePushDown { predicate, scan_type }, Some(state)) if state.offset == 0 && predicate.is_none() => { - options.n_rows = Some(state.len as usize); + options.slice = Some((0, state.len as usize)); + let lp = Scan { paths, file_info, diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index e1232020711c..3528387c06f0 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -29,7 +29,7 @@ pub type FileCount = u32; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] /// Generic options for all file types. pub struct FileScanOptions { - pub n_rows: Option, + pub slice: Option<(i64, usize)>, pub with_columns: Option>, pub cache: bool, pub row_index: Option, diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index 6c4453a9594e..e258abe036fa 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -161,7 +161,7 @@ impl PyDataFrame { .with_projection(projection) .with_columns(columns) .read_parallel(parallel.0) - .with_n_rows(n_rows) + .with_slice(n_rows.map(|x| (0, x))) .with_row_index(row_index) .set_low_memory(low_memory) .use_statistics(use_statistics) @@ -174,7 +174,7 @@ impl PyDataFrame { .with_projection(projection) .with_columns(columns) .read_parallel(parallel.0) - .with_n_rows(n_rows) + .with_slice(n_rows.map(|x| (0, x))) .with_row_index(row_index) .use_statistics(use_statistics) .set_rechunk(rechunk) diff --git a/py-polars/src/lazyframe/visitor/nodes.rs b/py-polars/src/lazyframe/visitor/nodes.rs index 833cdf76b239..e08b3bfb37a1 100644 --- a/py-polars/src/lazyframe/visitor/nodes.rs +++ b/py-polars/src/lazyframe/visitor/nodes.rs @@ -50,7 +50,7 @@ impl PyFileOptions { fn n_rows(&self, py: Python<'_>) -> PyResult { Ok(self .inner - .n_rows + .slice .map_or_else(|| py.None(), |n| n.into_py(py))) } #[getter] diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index 7b7350bdf496..797714614ec4 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -9,6 +9,7 @@ import pytest import polars as pl +from polars.exceptions import ComputeError from polars.testing import assert_frame_equal if TYPE_CHECKING: @@ -475,3 +476,39 @@ def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None: .collect(), expect, ) + + +@pytest.mark.parametrize("streaming", [True, False]) +def test_parquet_slice_pushdown_nonzero_offset(tmp_path: Path, streaming: bool) -> None: + paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"] + dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))] + + for df, p in zip(dfs, paths): + df.write_parquet(p) + + # Parquet files containing only the metadata - i.e. the data parts are removed. + # Used to test that a reader doesn't try to read any data. + def trim_to_metadata(path: str | Path) -> None: + path = Path(path) + v = path.read_bytes() + metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little") + path.write_bytes(v[-metadata_and_footer_len:]) + + trim_to_metadata(paths[0]) + trim_to_metadata(paths[2]) + + # Check baseline: + # * Metadata can be read without error + assert pl.read_parquet_schema(paths[0]) == dfs[0].schema + # * Attempting to read any data will error + with pytest.raises(ComputeError): + pl.scan_parquet(paths[0]).collect() + + df = dfs[1] + assert_frame_equal(pl.scan_parquet(paths).slice(1, 1).collect(), df) + assert_frame_equal(pl.scan_parquet(paths[1:]).head(1).collect(), df) + + # Negative slice unsupported in streaming + if not streaming: + assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df) + assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df) From f43d9c3185614ab313126c5f4417f06b8a92ea78 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 1 Aug 2024 16:47:49 +1000 Subject: [PATCH 2/4] c --- .../src/executors/sources/parquet.rs | 2 +- py-polars/tests/unit/io/test_hive.py | 30 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index bec92827bfed..7125a2c9a9d7 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -296,7 +296,7 @@ impl ParquetSource { } } else { for _ in 0..self.prefetch_size - self.batched_readers.len() { - self.init_next_reader()? + self.init_next_reader_sync()? } } Ok(()) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 7a7217e9cfba..618cef391b51 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -137,7 +137,10 @@ def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files( @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() -def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> None: +@pytest.mark.parametrize("streaming", [True, False]) +def test_hive_partitioned_slice_pushdown( + io_files_path: Path, tmp_path: Path, streaming: bool +) -> None: df = pl.read_ipc(io_files_path / "*.ipc") root = tmp_path / "partitioned_data" @@ -152,21 +155,18 @@ def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> ) q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True) + schema = q.collect_schema() + expect_count = pl.select(pl.lit(1, dtype=pl.UInt32).alias(x) for x in schema) - # tests: 11682 - for streaming in [True, False]: - assert ( - q.head(1) - .collect(streaming=streaming) - .select(pl.all_horizontal(pl.all().count() == 1)) - .item() - ) - assert q.head(0).collect(streaming=streaming).columns == [ - "calories", - "sugars_g", - "category", - "fats_g", - ] + assert_frame_equal( + q.head(1).collect(streaming=streaming).select(pl.all().len()), expect_count + ) + assert q.head(0).collect(streaming=streaming).columns == [ + "calories", + "sugars_g", + "category", + "fats_g", + ] @pytest.mark.xdist_group("streaming") From 2ed49c9c63b0576e7ec7e9480f2eaa5d1bc2a7cc Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 1 Aug 2024 17:01:06 +1000 Subject: [PATCH 3/4] c --- crates/polars-io/src/parquet/read/reader.rs | 2 ++ .../polars-mem-engine/src/executors/scan/parquet.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 965012997014..30eb593191eb 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -321,6 +321,8 @@ impl ParquetAsyncReader { self.reader.num_rows().await } + /// Only positive offsets are supported for simplicity - the caller should + /// translate negative offsets into the positive equivalent. pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self { self.slice = slice.unwrap_or((0, usize::MAX)); self diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 78dc23a9016d..57e8d3baaa6e 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -69,10 +69,17 @@ impl ParquetExec { if slice.0 >= 0 { (slice.0 as usize, slice.1.saturating_add(slice.0 as usize)) } else { + // Walk the files in reverse until we find the first file, and then translate the + // slice into a positive-offset equivalent. let n_from_end = -slice.0 as usize; let mut cum_rows = 0; + let chunk_size = 8; POOL.install(|| { - for path_indexes in (0..self.paths.len()).rev().collect::>().chunks(8) { + for path_indexes in (0..self.paths.len()) + .rev() + .collect::>() + .chunks(chunk_size) + { let row_counts = path_indexes .into_par_iter() .map(|i| { @@ -247,6 +254,8 @@ impl ParquetExec { if slice.0 >= 0 { (slice.0 as usize, slice.1.saturating_add(slice.0 as usize)) } else { + // Walk the files in reverse until we find the first file, and then translate the + // slice into a positive-offset equivalent. let n_from_end = -slice.0 as usize; let mut cum_rows = 0; From 5adcf88e51106c063bc703a087ea467682f9147e Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 1 Aug 2024 18:21:19 +1000 Subject: [PATCH 4/4] c --- crates/polars-mem-engine/src/planner/lp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index bc52c17a796d..31be11ae93b6 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -284,8 +284,8 @@ fn create_physical_plan_impl( predicate, mut file_options, } => { - file_options.slice = if let Some((off, len)) = file_options.slice { - Some((off, _set_n_rows_for_scan(Some(len)).unwrap())) + file_options.slice = if let Some((offset, len)) = file_options.slice { + Some((offset, _set_n_rows_for_scan(Some(len)).unwrap())) } else { _set_n_rows_for_scan(None).map(|x| (0, x)) };