From 6d2ca13d078de396f1386083525881d191cc0e0c Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 13 Mar 2024 16:28:16 +0100 Subject: [PATCH 01/17] Support multiple paths in IpcExec --- .../src/physical_plan/executors/scan/ipc.rs | 163 +++++++++++++++--- .../src/physical_plan/planner/lp.rs | 24 ++- 2 files changed, 147 insertions(+), 40 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 123eeb9461e1..886830a8079d 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -1,14 +1,18 @@ use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::RwLock; use polars_core::config::env_force_async; +use polars_core::utils::accumulate_dataframes_vertical; #[cfg(feature = "cloud")] use polars_io::cloud::CloudOptions; use polars_io::is_cloud_url; +use rayon::prelude::*; use super::*; pub struct IpcExec { - pub(crate) path: PathBuf, + pub(crate) paths: Arc<[PathBuf]>, pub(crate) schema: SchemaRef, pub(crate) predicate: Option>, pub(crate) options: IpcScanOptions, @@ -20,7 +24,7 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { - let is_cloud = is_cloud_url(&self.path); + let is_cloud = self.paths.iter().any(is_cloud_url); let force_async = env_force_async(); let mut out = if is_cloud || force_async { @@ -50,7 +54,6 @@ impl IpcExec { } fn read_sync(&mut self, verbose: bool) -> PolarsResult { - let file = std::fs::File::open(&self.path)?; let (projection, predicate) = prepare_scan_args( self.predicate.clone(), &mut self.file_options.with_columns, @@ -58,40 +61,104 @@ impl IpcExec { self.file_options.row_index.is_some(), None, ); - IpcReader::new(file) - .with_n_rows(self.file_options.n_rows) - .with_row_index(std::mem::take(&mut self.file_options.row_index)) - .set_rechunk(self.file_options.rechunk) - .with_projection(projection) - .memory_mapped(self.options.memmap) - .finish_with_scan_ops(predicate, verbose) + + let row_limit = self.file_options.n_rows.unwrap_or(usize::MAX); + + // Used to determine the next file to open. This guarantees the order. + let path_index = AtomicUsize::new(0); + let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + + let mut index_and_dfs = (0..self.paths.len()) + .into_par_iter() + .map(|_| -> PolarsResult<(usize, DataFrame)> { + let index = path_index.fetch_add(1, Ordering::SeqCst); + let path = &self.paths[index]; + + if row_counter.read().unwrap().sum() >= row_limit { + return Ok(Default::default()); + } + + let file = std::fs::File::open(path)?; + + let df = IpcReader::new(file) + // .with_n_rows(self.file_options.n_rows) + .with_row_index(self.file_options.row_index.clone()) + // .set_rechunk(self.file_options.rechunk) + .with_projection(projection.clone()) + .memory_mapped(self.options.memmap) + .finish_with_scan_ops(predicate.clone(), verbose)?; + + row_counter.write().unwrap().write(index, df.height()); + + Ok((index, df)) + }) + .collect::>>()?; + + index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); + + let df = accumulate_dataframes_vertical(index_and_dfs.into_iter().map(|(_, df)| df))?; + + Ok(df) } #[cfg(feature = "cloud")] async fn read_async(&mut self, verbose: bool) -> PolarsResult { let predicate = self.predicate.clone().map(phys_expr_to_io_expr); - let reader = - IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref()) - .await?; - reader - .data( - self.metadata.as_ref(), - IpcReadOptions::default() - .with_row_limit(self.file_options.n_rows) - .with_row_index(self.file_options.row_index.clone()) - .with_projection(self.file_options.with_columns.as_deref().cloned()) - .with_predicate(predicate), - verbose, - ) - .await + let mut dfs = vec![]; + + for path in self.paths.iter() { + let reader = + IpcReaderAsync::from_uri(path.to_str().unwrap(), self.cloud_options.as_ref()) + .await?; + dfs.push( + reader + .data( + self.metadata.as_ref(), + IpcReadOptions::default() + .with_row_limit(self.file_options.n_rows) + .with_row_index(self.file_options.row_index.clone()) + .with_projection(self.file_options.with_columns.as_deref().cloned()) + .with_predicate(predicate.clone()), + verbose, + ) + .await?, + ); + } + + accumulate_dataframes_vertical(dfs) + + // TODO: WIP + // let paths = self.paths.clone(); + // let cloud_options = self.cloud_options.clone(); + // let metadata + // use futures::{stream::{self, StreamExt}, TryStreamExt}; + // let dfs = stream::iter(&*paths).map( + // move |path| async move { + // let reader = + // IpcReaderAsync::from_uri(path.to_str().unwrap(), cloud_options.as_ref()) + // .await?; + // reader + // .data( + // self.metadata.as_ref(), + // IpcReadOptions::default() + // .with_row_limit(self.file_options.n_rows) + // .with_row_index(self.file_options.row_index.clone()) + // .with_projection(self.file_options.with_columns.as_deref().cloned()) + // .with_predicate(predicate.clone()), + // verbose, + // ) + // .await + // } + // ).buffer_unordered(100).try_collect::>().await?; + // accumulate_dataframes_vertical(dfs) } } impl Executor for IpcExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - paths: Arc::new([self.path.clone()]), + paths: Arc::clone(&self.paths), #[allow(clippy::useless_asref)] predicate: self .predicate @@ -101,7 +168,7 @@ impl Executor for IpcExec { }; let profile_name = if state.has_node_timer() { - let mut ids = vec![self.path.to_string_lossy().into()]; + let mut ids = vec![self.paths[0].to_string_lossy().into()]; if self.predicate.is_some() { ids.push("predicate".into()) } @@ -123,3 +190,47 @@ impl Executor for IpcExec { ) } } + +// Tracks the sum of consecutive values in a dynamically sized array where the values can be written +// in any order. +struct ConsecutiveCountState { + counts: Box<[usize]>, + next_index: usize, + sum: usize, +} + +impl ConsecutiveCountState { + fn new(len: usize) -> Self { + Self { + counts: vec![usize::MAX; len].into_boxed_slice(), + next_index: 0, + sum: 0, + } + } + + /// Sum of all consecutive counts. + fn sum(&self) -> usize { + self.sum + } + + /// Write count at index. + fn write(&mut self, index: usize, count: usize) { + debug_assert!( + self.counts[index] == usize::MAX, + "second write to same index" + ); + debug_assert!(count != usize::MAX, "count can not be usize::MAX"); + + self.counts[index] = count; + + // Update sum and next index. + while self.next_index < self.counts.len() { + let count = self.counts[self.next_index]; + if count == usize::MAX { + break; + } + self.sum += count; + self.next_index += 1; + } + } +} diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index dc6cbc81b255..15db692585d4 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -237,20 +237,16 @@ pub fn create_physical_plan( #[cfg(feature = "cloud")] cloud_options, metadata, - } => { - assert_eq!(paths.len(), 1); - let path = paths[0].clone(); - Ok(Box::new(executors::IpcExec { - path, - schema: file_info.schema, - predicate, - options, - file_options, - #[cfg(feature = "cloud")] - cloud_options, - metadata, - })) - }, + } => Ok(Box::new(executors::IpcExec { + paths, + schema: file_info.schema, + predicate, + options, + file_options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + })), #[cfg(feature = "parquet")] FileScan::Parquet { options, From 46a3df159b388a40c4ec02906bd651d3a65747b4 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Mon, 18 Mar 2024 13:01:06 +0100 Subject: [PATCH 02/17] WIP --- crates/polars-io/src/lib.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index 8e75c9c74c99..d8dbe0093a97 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -87,6 +87,15 @@ pub trait ArrowReader { fn next_record_batch(&mut self) -> PolarsResult>; } +pub struct ReadOutput { + /// The resulting data frame. + pub df: DataFrame, + /// The total number of rows in the file if known. The row count can be + /// unknown if file parsing stops early because of a row limit or for + /// another reason. + pub total_rows: Option, +} + #[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))] pub(crate) fn finish_reader( mut reader: R, @@ -95,7 +104,7 @@ pub(crate) fn finish_reader( predicate: Option>, arrow_schema: &ArrowSchema, row_index: Option, -) -> PolarsResult { +) -> PolarsResult { use polars_core::utils::accumulate_dataframes_vertical; let mut num_rows = 0; @@ -149,10 +158,19 @@ pub(crate) fn finish_reader( } }; - match rechunk { - true => Ok(df.agg_chunks()), - false => Ok(df), - } + let is_total_rows_known = match n_rows { + Some(limit) => num_rows < limit, + None => true, + }; + + Ok(ReadOutput { + df: if rechunk { + df.agg_chunks() + } else { + df + }, + total_rows: is_total_rows_known.then_some(num_rows) + }) } static CLOUD_URL: Lazy = From a0a5593d7c077947800987654fa0e939def227fc Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 19 Mar 2024 11:37:41 +0100 Subject: [PATCH 03/17] ipc sync seems to work --- crates/polars-io/src/lib.rs | 25 +---- .../src/physical_plan/executors/scan/ipc.rs | 95 +++++++++++++++---- 2 files changed, 79 insertions(+), 41 deletions(-) diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index d8dbe0093a97..7d60e5dd64b9 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -87,15 +87,6 @@ pub trait ArrowReader { fn next_record_batch(&mut self) -> PolarsResult>; } -pub struct ReadOutput { - /// The resulting data frame. - pub df: DataFrame, - /// The total number of rows in the file if known. The row count can be - /// unknown if file parsing stops early because of a row limit or for - /// another reason. - pub total_rows: Option, -} - #[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))] pub(crate) fn finish_reader( mut reader: R, @@ -104,7 +95,7 @@ pub(crate) fn finish_reader( predicate: Option>, arrow_schema: &ArrowSchema, row_index: Option, -) -> PolarsResult { +) -> PolarsResult { use polars_core::utils::accumulate_dataframes_vertical; let mut num_rows = 0; @@ -158,19 +149,7 @@ pub(crate) fn finish_reader( } }; - let is_total_rows_known = match n_rows { - Some(limit) => num_rows < limit, - None => true, - }; - - Ok(ReadOutput { - df: if rechunk { - df.agg_chunks() - } else { - df - }, - total_rows: is_total_rows_known.then_some(num_rows) - }) + Ok(if rechunk { df.agg_chunks() } else { df }) } static CLOUD_URL: Lazy = diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 886830a8079d..ffb1f78706d0 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -7,6 +7,7 @@ use polars_core::utils::accumulate_dataframes_vertical; #[cfg(feature = "cloud")] use polars_io::cloud::CloudOptions; use polars_io::is_cloud_url; +use polars_io::predicates::apply_predicate; use rayon::prelude::*; use super::*; @@ -22,6 +23,20 @@ pub struct IpcExec { pub(crate) metadata: Option, } +fn prefix_sum_in_place<'a, I: IntoIterator>(values: I) { + let mut values = values.into_iter(); + let Some(first) = values.next() else { + return; + }; + let mut sum = *first; + *first = 0; + for val in values { + let new_sum = sum + *val; + *val = sum; + sum = new_sum; + } +} + impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { let is_cloud = self.paths.iter().any(is_cloud_url); @@ -43,7 +58,7 @@ impl IpcExec { .block_on_potential_spawn(self.read_async(verbose))? } } else { - self.read_sync(verbose)? + self.read_sync()? }; if self.file_options.rechunk { @@ -53,7 +68,7 @@ impl IpcExec { Ok(out) } - fn read_sync(&mut self, verbose: bool) -> PolarsResult { + fn read_sync(&mut self) -> PolarsResult { let (projection, predicate) = prepare_scan_args( self.predicate.clone(), &mut self.file_options.with_columns, @@ -62,7 +77,13 @@ impl IpcExec { None, ); - let row_limit = self.file_options.n_rows.unwrap_or(usize::MAX); + // TODO: Make `n_rows: IdxSize`. + let n_rows = self + .file_options + .n_rows + .map(|n| IdxSize::try_from(n).unwrap()); + + let row_limit = n_rows.unwrap_or(IdxSize::MAX); // Used to determine the next file to open. This guarantees the order. let path_index = AtomicUsize::new(0); @@ -74,21 +95,41 @@ impl IpcExec { let index = path_index.fetch_add(1, Ordering::SeqCst); let path = &self.paths[index]; - if row_counter.read().unwrap().sum() >= row_limit { + let already_read_in_sequence = row_counter.read().unwrap().sum(); + if already_read_in_sequence >= row_limit { return Ok(Default::default()); } let file = std::fs::File::open(path)?; let df = IpcReader::new(file) - // .with_n_rows(self.file_options.n_rows) + .with_n_rows( + // NOTE: If there is any file that by itself exceeds the + // row limit, passing the total row limit to each + // individual reader helps. + n_rows.map(|n| { + n.saturating_sub(already_read_in_sequence) + .try_into() + .unwrap() + }), + ) .with_row_index(self.file_options.row_index.clone()) - // .set_rechunk(self.file_options.rechunk) .with_projection(projection.clone()) .memory_mapped(self.options.memmap) - .finish_with_scan_ops(predicate.clone(), verbose)?; - - row_counter.write().unwrap().write(index, df.height()); + .finish()?; + // TODO: We can not supply a filter until the readers return + // how many total rows have been read before applying the + // filter. Without that, we can not correctly compute the + // pre-filter row count. + // .finish_with_scan_ops( + // predicate.clone(), + // verbose, + // )?; + + row_counter + .write() + .unwrap() + .write(index, df.height().try_into().unwrap()); Ok((index, df)) }) @@ -96,7 +137,25 @@ impl IpcExec { index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); - let df = accumulate_dataframes_vertical(index_and_dfs.into_iter().map(|(_, df)| df))?; + if let Some(row_index) = self.file_options.row_index.as_ref() { + let offsets = { + let mut row_counter = row_counter.into_inner().unwrap(); + prefix_sum_in_place(&mut row_counter.counts[..]); + row_counter.counts + }; + + for &mut (index, ref mut df) in index_and_dfs.iter_mut() { + let offset = offsets[index]; + df.apply(&row_index.name, |series| { + series.idx().expect("index column should be of index type") + offset + }) + .expect("index column should exist"); + } + } + + let mut df = accumulate_dataframes_vertical(index_and_dfs.into_iter().map(|(_, df)| df))?; + + apply_predicate(&mut df, predicate.as_deref(), true)?; Ok(df) } @@ -194,39 +253,39 @@ impl Executor for IpcExec { // Tracks the sum of consecutive values in a dynamically sized array where the values can be written // in any order. struct ConsecutiveCountState { - counts: Box<[usize]>, + counts: Box<[IdxSize]>, next_index: usize, - sum: usize, + sum: IdxSize, } impl ConsecutiveCountState { fn new(len: usize) -> Self { Self { - counts: vec![usize::MAX; len].into_boxed_slice(), + counts: vec![IdxSize::MAX; len].into_boxed_slice(), next_index: 0, sum: 0, } } /// Sum of all consecutive counts. - fn sum(&self) -> usize { + fn sum(&self) -> IdxSize { self.sum } /// Write count at index. - fn write(&mut self, index: usize, count: usize) { + fn write(&mut self, index: usize, count: IdxSize) { debug_assert!( - self.counts[index] == usize::MAX, + self.counts[index] == IdxSize::MAX, "second write to same index" ); - debug_assert!(count != usize::MAX, "count can not be usize::MAX"); + debug_assert!(count != IdxSize::MAX, "count can not be IdxSize::MAX"); self.counts[index] = count; // Update sum and next index. while self.next_index < self.counts.len() { let count = self.counts[self.next_index]; - if count == usize::MAX { + if count == IdxSize::MAX { break; } self.sum += count; From ce0b352bc2ed938dd6a67a7743ee4cc9f1073579 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 19 Mar 2024 14:55:07 +0100 Subject: [PATCH 04/17] Use async in scan tests --- crates/polars-core/src/config.rs | 2 +- .../src/physical_plan/executors/scan/ipc.rs | 6 +- .../physical_plan/executors/scan/parquet.rs | 8 +- .../src/executors/sources/parquet.rs | 4 +- py-polars/tests/unit/io/test_scan.py | 169 ++++++++++++++++-- 5 files changed, 163 insertions(+), 26 deletions(-) diff --git a/crates/polars-core/src/config.rs b/crates/polars-core/src/config.rs index dee5e0103e54..a1d06d081e8e 100644 --- a/crates/polars-core/src/config.rs +++ b/crates/polars-core/src/config.rs @@ -55,7 +55,7 @@ pub fn get_rg_prefetch_size() -> usize { .unwrap_or_else(|_| std::cmp::max(get_file_prefetch_size(), 128)) } -pub fn env_force_async() -> bool { +pub fn force_async() -> bool { std::env::var("POLARS_FORCE_ASYNC") .map(|value| value == "1") .unwrap_or_default() diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index ffb1f78706d0..edc5cc74ff2f 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; -use polars_core::config::env_force_async; +use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; #[cfg(feature = "cloud")] use polars_io::cloud::CloudOptions; @@ -40,9 +40,7 @@ fn prefix_sum_in_place<'a, I: IntoIterator>(values: I) { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { let is_cloud = self.paths.iter().any(is_cloud_url); - let force_async = env_force_async(); - - let mut out = if is_cloud || force_async { + let mut out = if is_cloud || config::force_async() { #[cfg(not(feature = "cloud"))] { panic!("activate cloud feature") diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index ef5d7f8da914..002025d27afe 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use polars_core::config::env_force_async; +use polars_core::config; #[cfg(feature = "cloud")] use polars_core::config::{get_file_prefetch_size, verbose}; use polars_core::utils::accumulate_dataframes_vertical; @@ -346,7 +346,7 @@ impl ParquetExec { )); }, }; - let force_async = env_force_async(); + let force_async = config::force_async(); let out = if is_cloud || force_async { #[cfg(not(feature = "cloud"))] @@ -354,6 +354,10 @@ impl ParquetExec { panic!("activate cloud feature") } + if !is_cloud && config::verbose() { + eprintln!("ASYNC READING FORCED"); + } + #[cfg(feature = "cloud")] { polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())? diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 85e19f4bfc71..40855c9cc3a0 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -4,7 +4,7 @@ use std::path::PathBuf; use std::sync::Arc; use arrow::datatypes::ArrowSchemaRef; -use polars_core::config::{env_force_async, get_file_prefetch_size}; +use polars_core::config::{self, get_file_prefetch_size}; use polars_core::error::*; use polars_core::prelude::Series; use polars_core::POOL; @@ -204,7 +204,7 @@ impl ParquetSource { if verbose { eprintln!("POLARS PREFETCH_SIZE: {}", prefetch_size) } - let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || env_force_async(); + let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async(); let mut source = ParquetSource { batched_readers: VecDeque::new(), diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 2ea3bfa91a60..2e0ec921fda9 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from math import ceil -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import pytest @@ -21,6 +21,18 @@ class _RowIndex: offset: int = 0 +def _enable_force_async(monkeypatch: pytest.MonkeyPatch) -> None: + """Modifies the provided monkeypatch context.""" + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + + +def _assert_force_async(capfd: Any) -> None: + """Calls `capfd.readouterr`, consuming the captured output so far.""" + captured = capfd.readouterr().err + assert "ASYNC READING FORCED" in captured + + def _scan( file_path: Path, schema: SchemaDict | None = None, @@ -29,27 +41,31 @@ def _scan( suffix = file_path.suffix row_index_name = None if row_index is None else row_index.name row_index_offset = 0 if row_index is None else row_index.offset + if suffix == ".ipc": - return pl.scan_ipc( + result = pl.scan_ipc( file_path, row_index_name=row_index_name, row_index_offset=row_index_offset, ) - if suffix == ".parquet": - return pl.scan_parquet( + elif suffix == ".parquet": + result = pl.scan_parquet( file_path, row_index_name=row_index_name, row_index_offset=row_index_offset, ) - if suffix == ".csv": - return pl.scan_csv( + elif suffix == ".csv": + result = pl.scan_csv( file_path, schema=schema, row_index_name=row_index_name, row_index_offset=row_index_offset, ) - msg = f"Unknown suffix {suffix}" - raise NotImplementedError(msg) + else: + msg = f"Unknown suffix {suffix}" + raise NotImplementedError(msg) + + return result def _write(df: pl.DataFrame, file_path: Path) -> None: @@ -77,6 +93,17 @@ def session_tmp_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: return tmp_path_factory.mktemp("polars-test") +@pytest.fixture( + params=[False, True], + ids=["sync", "async"], +) +def force_async( + request: pytest.FixtureRequest, monkeypatch: pytest.MonkeyPatch +) -> bool: + value: bool = request.param + return value + + @dataclass class _DataFile: path: Path @@ -161,14 +188,38 @@ def data_file( @pytest.mark.write_disk() -def test_scan(data_file: _DataFile) -> None: +def test_scan( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = _scan(data_file.path, data_file.df.schema).collect() + + if force_async: + _assert_force_async(capfd) + assert_frame_equal(df, data_file.df) @pytest.mark.write_disk() -def test_scan_with_limit(data_file: _DataFile) -> None: +def test_scan_with_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = _scan(data_file.path, data_file.df.schema).limit(4483).collect() + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -180,12 +231,24 @@ def test_scan_with_limit(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_filter(data_file: _DataFile) -> None: +def test_scan_with_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -197,13 +260,25 @@ def test_scan_with_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_filter_and_limit(data_file: _DataFile) -> None: +def test_scan_with_filter_and_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema) .filter(pl.col("sequence") % 2 == 0) .limit(4483) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -215,13 +290,25 @@ def test_scan_with_filter_and_limit(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_limit_and_filter(data_file: _DataFile) -> None: +def test_scan_with_limit_and_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema) .limit(4483) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -233,12 +320,24 @@ def test_scan_with_limit_and_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_and_limit(data_file: _DataFile) -> None: +def test_scan_with_row_index_and_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .limit(4483) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -252,12 +351,24 @@ def test_scan_with_row_index_and_limit(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_and_filter(data_file: _DataFile) -> None: +def test_scan_with_row_index_and_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -271,13 +382,25 @@ def test_scan_with_row_index_and_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_limit_and_filter(data_file: _DataFile) -> None: +def test_scan_with_row_index_limit_and_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .limit(4483) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -291,13 +414,25 @@ def test_scan_with_row_index_limit_and_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_filter_and_limit(data_file: _DataFile) -> None: +def test_scan_with_row_index_filter_and_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .filter(pl.col("sequence") % 2 == 0) .limit(4483) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( From 241f53601255ebcf16b73fc9cec5d3e8cfec5df0 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 19 Mar 2024 15:20:58 +0100 Subject: [PATCH 05/17] Expose force async ipc not working --- py-polars/tests/unit/io/test_scan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 2e0ec921fda9..9f8a483626f9 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -30,7 +30,7 @@ def _enable_force_async(monkeypatch: pytest.MonkeyPatch) -> None: def _assert_force_async(capfd: Any) -> None: """Calls `capfd.readouterr`, consuming the captured output so far.""" captured = capfd.readouterr().err - assert "ASYNC READING FORCED" in captured + assert captured.count("ASYNC READING FORCED") == 1 def _scan( From 8111a8a230bc0916b23caabca13bc999449023c4 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 19 Mar 2024 16:15:56 +0100 Subject: [PATCH 06/17] Fix ipc sync and introduce python process aborted --- .../src/physical_plan/executors/scan/ipc.rs | 49 ++++++++++++++++--- crates/polars-lazy/src/scan/ipc.rs | 30 +++++++++--- .../polars-plan/src/logical_plan/builder.rs | 13 +++-- 3 files changed, 74 insertions(+), 18 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index edc5cc74ff2f..2aca41d5edd2 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -67,6 +67,15 @@ impl IpcExec { } fn read_sync(&mut self) -> PolarsResult { + if config::verbose() { + println!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}", + self.file_options.row_index.as_ref(), + self.file_options.n_rows.as_ref(), + self.predicate.is_some(), + self.paths + ); + } + let (projection, predicate) = prepare_scan_args( self.predicate.clone(), &mut self.file_options.with_columns, @@ -135,13 +144,15 @@ impl IpcExec { index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); - if let Some(row_index) = self.file_options.row_index.as_ref() { - let offsets = { - let mut row_counter = row_counter.into_inner().unwrap(); - prefix_sum_in_place(&mut row_counter.counts[..]); - row_counter.counts - }; + let offsets = { + let mut counts = row_counter.into_inner().unwrap().into_counts(); + prefix_sum_in_place(&mut counts[..]); + counts + }; + // Update the row indices now that we know how many rows are in each + // file. + if let Some(row_index) = self.file_options.row_index.as_ref() { for &mut (index, ref mut df) in index_and_dfs.iter_mut() { let offset = offsets[index]; df.apply(&row_index.name, |series| { @@ -151,7 +162,21 @@ impl IpcExec { } } - let mut df = accumulate_dataframes_vertical(index_and_dfs.into_iter().map(|(_, df)| df))?; + // Accumulate dataframes while adjusting for the possibility of having + // read more rows than the limit. + let mut df = + accumulate_dataframes_vertical(index_and_dfs.into_iter().filter_map(|(index, df)| { + let count: IdxSize = df.height().try_into().unwrap(); + if count == 0 { + return None; + } + let remaining = row_limit.checked_sub(offsets[index])?; + Some(if remaining < count { + df.slice(0, remaining.try_into().unwrap()) + } else { + df + }) + }))?; apply_predicate(&mut df, predicate.as_deref(), true)?; @@ -290,4 +315,14 @@ impl ConsecutiveCountState { self.next_index += 1; } } + + fn into_counts(mut self) -> Box<[IdxSize]> { + // Set the counts to which no value has been written to zero. + for i in self.next_index..self.counts.len() { + if self.counts[i] == IdxSize::MAX { + self.counts[i] = 0; + } + } + self.counts + } } diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index a849bec3d1c8..c3fec4d85689 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -49,19 +49,35 @@ impl LazyIpcReader { } impl LazyFileListReader for LazyIpcReader { + fn finish(mut self) -> PolarsResult { + if let Some(paths) = self.iter_paths()? { + let paths = paths + .into_iter() + .collect::>>()?; + self.paths = paths; + } + self.finish_no_glob() + } + fn finish_no_glob(self) -> PolarsResult { let args = self.args; - let path = self.path; + + let paths = if self.paths.is_empty() { + Arc::new([self.path]) as Arc<[PathBuf]> + } else { + self.paths + }; let options = IpcScanOptions { memmap: args.memmap, }; + let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc( - path, + paths, options, args.n_rows, args.cache, - args.row_index.clone(), + args.row_index, args.rechunk, #[cfg(feature = "cloud")] args.cloud_options, @@ -70,10 +86,10 @@ impl LazyFileListReader for LazyIpcReader { .into(); lf.opt_state.file_caching = true; - // it is a bit hacky, but this `with_row_index` function updates the schema - if let Some(row_index) = args.row_index { - lf = lf.with_row_index(&row_index.name, Some(row_index.offset)) - } + // // it is a bit hacky, but this `with_row_index` function updates the schema + // if let Some(row_index) = args.row_index { + // lf = lf.with_row_index(&row_index.name, Some(row_index.offset)) + // } Ok(lf) } diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index b3beecddfead..fc0f5f1956f9 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -230,8 +230,8 @@ impl LogicalPlanBuilder { } #[cfg(feature = "ipc")] - pub fn scan_ipc>( - path: P, + pub fn scan_ipc>>( + paths: P, options: IpcScanOptions, n_rows: Option, cache: bool, @@ -241,7 +241,12 @@ impl LogicalPlanBuilder { ) -> PolarsResult { use polars_io::is_cloud_url; - let path = path.into(); + let paths = paths.into(); + + // Use first path to get schema. + let path = paths + .get(0) + .ok_or_else(|| polars_err!(ComputeError: "expected at least 1 path"))?; let metadata = if is_cloud_url(&path) { #[cfg(not(feature = "cloud"))] @@ -266,7 +271,7 @@ impl LogicalPlanBuilder { }; Ok(LogicalPlan::Scan { - paths: Arc::new([path]), + paths, file_info: FileInfo::new( prepare_schema(metadata.schema.as_ref().into(), row_index.as_ref()), Some(Arc::clone(&metadata.schema)), From eb4b2804379c26937844ff4dfea5d02181223ebe Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 10:00:33 +0100 Subject: [PATCH 07/17] Fixup lints --- crates/polars-plan/src/logical_plan/builder.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index fc0f5f1956f9..be7ed03b2a0f 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -245,10 +245,10 @@ impl LogicalPlanBuilder { // Use first path to get schema. let path = paths - .get(0) + .first() .ok_or_else(|| polars_err!(ComputeError: "expected at least 1 path"))?; - let metadata = if is_cloud_url(&path) { + let metadata = if is_cloud_url(path) { #[cfg(not(feature = "cloud"))] panic!( "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." @@ -266,7 +266,7 @@ impl LogicalPlanBuilder { } } else { arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new( - polars_utils::open_file(&path)?, + polars_utils::open_file(path)?, ))? }; From e1c9870dcb1c0cc729211c4088cde2077478160b Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 09:59:00 +0100 Subject: [PATCH 08/17] Make ipc async work --- .../src/physical_plan/executors/scan/ipc.rs | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 2aca41d5edd2..65d929fc7b80 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -200,15 +200,65 @@ impl IpcExec { IpcReadOptions::default() .with_row_limit(self.file_options.n_rows) .with_row_index(self.file_options.row_index.clone()) - .with_projection(self.file_options.with_columns.as_deref().cloned()) - .with_predicate(predicate.clone()), + .with_projection(self.file_options.with_columns.as_deref().cloned()), + // .with_predicate(predicate.clone()), verbose, ) .await?, ); } - accumulate_dataframes_vertical(dfs) + let offsets = { + // NOTE: Could use `scan` here but I believe this gets optimized + // better, unverified. + let mut counts = dfs + .iter() + .map(|df| IdxSize::try_from(df.height()).unwrap()) + .collect::>(); + prefix_sum_in_place(&mut counts[..]); + counts + }; + + // Update the row indices now that we know how many rows are in each + // file. + if let Some(row_index) = self.file_options.row_index.as_ref() { + let mut offset = 0; + for df in dfs.iter_mut() { + let count = df.height(); + df.apply(&row_index.name, |series| { + series.idx().expect("index column should be of index type") + offset + }) + .expect("index column should exist"); + offset += count; + } + } + + // Accumulate dataframes while adjusting for the possibility of having + // read more rows than the limit. + let row_limit = self + .file_options + .n_rows + .map(|limit| limit.try_into().unwrap()) + .unwrap_or(IdxSize::MAX); + + let mut df = accumulate_dataframes_vertical(dfs.into_iter().enumerate().filter_map( + |(index, df)| { + let count: IdxSize = df.height().try_into().unwrap(); + if count == 0 { + return None; + } + let remaining = row_limit.checked_sub(offsets[index])?; + Some(if remaining < count { + df.slice(0, remaining.try_into().unwrap()) + } else { + df + }) + }, + ))?; + + apply_predicate(&mut df, predicate.as_deref(), true)?; + + Ok(df) // TODO: WIP // let paths = self.paths.clone(); From cd9a71b6ce2dae2545db9ccc1c8b1e232caf5c56 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 10:06:19 +0100 Subject: [PATCH 09/17] Fixup async reading forced --- .../src/physical_plan/executors/scan/parquet.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 002025d27afe..083874daaf75 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -354,12 +354,12 @@ impl ParquetExec { panic!("activate cloud feature") } - if !is_cloud && config::verbose() { - eprintln!("ASYNC READING FORCED"); - } - #[cfg(feature = "cloud")] { + if !is_cloud && config::verbose() { + eprintln!("ASYNC READING FORCED"); + } + polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())? } } else { From 732a5ee19384f5be5e5ba5b88dd048d3cdaf7778 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 10:48:07 +0100 Subject: [PATCH 10/17] unordered async --- .../src/physical_plan/executors/scan/ipc.rs | 136 +++++++++--------- 1 file changed, 71 insertions(+), 65 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 65d929fc7b80..b7df6463271c 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -104,7 +104,7 @@ impl IpcExec { let already_read_in_sequence = row_counter.read().unwrap().sum(); if already_read_in_sequence >= row_limit { - return Ok(Default::default()); + return Ok((index, Default::default())); } let file = std::fs::File::open(path)?; @@ -185,36 +185,75 @@ impl IpcExec { #[cfg(feature = "cloud")] async fn read_async(&mut self, verbose: bool) -> PolarsResult { - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + use futures::stream::{self, StreamExt}; + use futures::TryStreamExt; + + /// See https://users.rust-lang.org/t/implementation-of-fnonce-is-not-general-enough-with-async-block/83427/3. + trait AssertSend { + fn assert_send(self) -> impl Send + stream::Stream + where + Self: Send + stream::Stream + Sized, + { + self + } + } - let mut dfs = vec![]; + impl AssertSend for T {} - for path in self.paths.iter() { - let reader = - IpcReaderAsync::from_uri(path.to_str().unwrap(), self.cloud_options.as_ref()) - .await?; - dfs.push( - reader - .data( - self.metadata.as_ref(), - IpcReadOptions::default() - .with_row_limit(self.file_options.n_rows) - .with_row_index(self.file_options.row_index.clone()) - .with_projection(self.file_options.with_columns.as_deref().cloned()), - // .with_predicate(predicate.clone()), - verbose, + let row_limit = self + .file_options + .n_rows + .map(|limit| limit.try_into().unwrap()) + .unwrap_or(IdxSize::MAX); + + let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + + let mut index_and_dfs = stream::iter(&*self.paths) + .enumerate() + .map(|(index, path)| { + let this = &*self; + let row_counter = &row_counter; + async move { + let already_read_in_sequence = row_counter.read().unwrap().sum(); + if already_read_in_sequence >= row_limit { + return Ok((index, Default::default())); + } + + let reader = IpcReaderAsync::from_uri( + path.to_str().unwrap(), + this.cloud_options.as_ref(), ) - .await?, - ); - } + .await?; + let df = reader + .data( + this.metadata.as_ref(), + IpcReadOptions::default() + .with_row_limit(this.file_options.n_rows) + .with_row_index(this.file_options.row_index.clone()) + .with_projection( + this.file_options.with_columns.as_deref().cloned(), + ), + verbose, + ) + .await?; + + row_counter + .write() + .unwrap() + .write(index, df.height().try_into().unwrap()); + + PolarsResult::Ok((index, df)) + } + }) + .assert_send() + .buffer_unordered(100) + .try_collect::>() + .await?; + + index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); let offsets = { - // NOTE: Could use `scan` here but I believe this gets optimized - // better, unverified. - let mut counts = dfs - .iter() - .map(|df| IdxSize::try_from(df.height()).unwrap()) - .collect::>(); + let mut counts = row_counter.into_inner().unwrap().into_counts(); prefix_sum_in_place(&mut counts[..]); counts }; @@ -222,27 +261,19 @@ impl IpcExec { // Update the row indices now that we know how many rows are in each // file. if let Some(row_index) = self.file_options.row_index.as_ref() { - let mut offset = 0; - for df in dfs.iter_mut() { - let count = df.height(); + for &mut (index, ref mut df) in index_and_dfs.iter_mut() { + let offset = offsets[index]; df.apply(&row_index.name, |series| { series.idx().expect("index column should be of index type") + offset }) .expect("index column should exist"); - offset += count; } } // Accumulate dataframes while adjusting for the possibility of having // read more rows than the limit. - let row_limit = self - .file_options - .n_rows - .map(|limit| limit.try_into().unwrap()) - .unwrap_or(IdxSize::MAX); - - let mut df = accumulate_dataframes_vertical(dfs.into_iter().enumerate().filter_map( - |(index, df)| { + let mut df = + accumulate_dataframes_vertical(index_and_dfs.into_iter().filter_map(|(index, df)| { let count: IdxSize = df.height().try_into().unwrap(); if count == 0 { return None; @@ -253,37 +284,12 @@ impl IpcExec { } else { df }) - }, - ))?; + }))?; + let predicate = self.predicate.clone().map(phys_expr_to_io_expr); apply_predicate(&mut df, predicate.as_deref(), true)?; Ok(df) - - // TODO: WIP - // let paths = self.paths.clone(); - // let cloud_options = self.cloud_options.clone(); - // let metadata - // use futures::{stream::{self, StreamExt}, TryStreamExt}; - // let dfs = stream::iter(&*paths).map( - // move |path| async move { - // let reader = - // IpcReaderAsync::from_uri(path.to_str().unwrap(), cloud_options.as_ref()) - // .await?; - // reader - // .data( - // self.metadata.as_ref(), - // IpcReadOptions::default() - // .with_row_limit(self.file_options.n_rows) - // .with_row_index(self.file_options.row_index.clone()) - // .with_projection(self.file_options.with_columns.as_deref().cloned()) - // .with_predicate(predicate.clone()), - // verbose, - // ) - // .await - // } - // ).buffer_unordered(100).try_collect::>().await?; - // accumulate_dataframes_vertical(dfs) } } From 5493c4d267abce437b3cc01734d6e0b92f15df34 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 11:37:08 +0100 Subject: [PATCH 11/17] Deduplicate and maybe fix impl --- .../src/physical_plan/executors/scan/ipc.rs | 202 ++++++++---------- 1 file changed, 92 insertions(+), 110 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index b7df6463271c..51c5991aaf65 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -6,8 +6,8 @@ use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; #[cfg(feature = "cloud")] use polars_io::cloud::CloudOptions; -use polars_io::is_cloud_url; use polars_io::predicates::apply_predicate; +use polars_io::{is_cloud_url, RowIndex}; use rayon::prelude::*; use super::*; @@ -23,20 +23,6 @@ pub struct IpcExec { pub(crate) metadata: Option, } -fn prefix_sum_in_place<'a, I: IntoIterator>(values: I) { - let mut values = values.into_iter(); - let Some(first) = values.next() else { - return; - }; - let mut sum = *first; - *first = 0; - for val in values { - let new_sum = sum + *val; - *val = sum; - sum = new_sum; - } -} - impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { let is_cloud = self.paths.iter().any(is_cloud_url); @@ -76,15 +62,16 @@ impl IpcExec { ); } - let (projection, predicate) = prepare_scan_args( - self.predicate.clone(), - &mut self.file_options.with_columns, - &mut self.schema, - self.file_options.row_index.is_some(), + let projection = materialize_projection( + self.file_options + .with_columns + .as_deref() + .map(|cols| cols.deref()), + &self.schema, None, + self.file_options.row_index.is_some(), ); - // TODO: Make `n_rows: IdxSize`. let n_rows = self .file_options .n_rows @@ -96,7 +83,7 @@ impl IpcExec { let path_index = AtomicUsize::new(0); let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); - let mut index_and_dfs = (0..self.paths.len()) + let index_and_dfs = (0..self.paths.len()) .into_par_iter() .map(|_| -> PolarsResult<(usize, DataFrame)> { let index = path_index.fetch_add(1, Ordering::SeqCst); @@ -142,45 +129,13 @@ impl IpcExec { }) .collect::>>()?; - index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); - - let offsets = { - let mut counts = row_counter.into_inner().unwrap().into_counts(); - prefix_sum_in_place(&mut counts[..]); - counts - }; - - // Update the row indices now that we know how many rows are in each - // file. - if let Some(row_index) = self.file_options.row_index.as_ref() { - for &mut (index, ref mut df) in index_and_dfs.iter_mut() { - let offset = offsets[index]; - df.apply(&row_index.name, |series| { - series.idx().expect("index column should be of index type") + offset - }) - .expect("index column should exist"); - } - } - - // Accumulate dataframes while adjusting for the possibility of having - // read more rows than the limit. - let mut df = - accumulate_dataframes_vertical(index_and_dfs.into_iter().filter_map(|(index, df)| { - let count: IdxSize = df.height().try_into().unwrap(); - if count == 0 { - return None; - } - let remaining = row_limit.checked_sub(offsets[index])?; - Some(if remaining < count { - df.slice(0, remaining.try_into().unwrap()) - } else { - df - }) - }))?; - - apply_predicate(&mut df, predicate.as_deref(), true)?; - - Ok(df) + finish_index_and_dfs( + index_and_dfs, + row_counter.into_inner().unwrap(), + self.file_options.row_index.as_ref(), + row_limit, + self.predicate.as_ref(), + ) } #[cfg(feature = "cloud")] @@ -200,15 +155,16 @@ impl IpcExec { impl AssertSend for T {} - let row_limit = self + let n_rows = self .file_options .n_rows - .map(|limit| limit.try_into().unwrap()) - .unwrap_or(IdxSize::MAX); + .map(|limit| limit.try_into().unwrap()); + + let row_limit = n_rows.unwrap_or(IdxSize::MAX); let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); - let mut index_and_dfs = stream::iter(&*self.paths) + let index_and_dfs = stream::iter(&*self.paths) .enumerate() .map(|(index, path)| { let this = &*self; @@ -228,7 +184,17 @@ impl IpcExec { .data( this.metadata.as_ref(), IpcReadOptions::default() - .with_row_limit(this.file_options.n_rows) + .with_row_limit( + // NOTE: If there is any file that by itself + // exceeds the row limit, passing the total + // row limit to each individual reader + // helps. + n_rows.map(|n| { + n.saturating_sub(already_read_in_sequence) + .try_into() + .unwrap() + }), + ) .with_row_index(this.file_options.row_index.clone()) .with_projection( this.file_options.with_columns.as_deref().cloned(), @@ -250,47 +216,67 @@ impl IpcExec { .try_collect::>() .await?; - index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); + finish_index_and_dfs( + index_and_dfs, + row_counter.into_inner().unwrap(), + self.file_options.row_index.as_ref(), + row_limit, + self.predicate.as_ref(), + ) + } +} - let offsets = { - let mut counts = row_counter.into_inner().unwrap().into_counts(); - prefix_sum_in_place(&mut counts[..]); - counts - }; +fn finish_index_and_dfs( + mut index_and_dfs: Vec<(usize, DataFrame)>, + row_counter: ConsecutiveCountState, + row_index: Option<&RowIndex>, + row_limit: IdxSize, + predicate: Option<&Arc>, +) -> PolarsResult { + index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); + + #[cfg(debug_assertions)] + { + assert!( + index_and_dfs.iter().enumerate().all(|(a, &(b, _))| a == b), + "expected dataframe indices in order from 0 to len" + ); + } - // Update the row indices now that we know how many rows are in each - // file. - if let Some(row_index) = self.file_options.row_index.as_ref() { - for &mut (index, ref mut df) in index_and_dfs.iter_mut() { - let offset = offsets[index]; - df.apply(&row_index.name, |series| { - series.idx().expect("index column should be of index type") + offset - }) - .expect("index column should exist"); - } - } + debug_assert_eq!(index_and_dfs.len(), row_counter.counts.len()); + let mut offset = 0; + let mut df = accumulate_dataframes_vertical( + index_and_dfs + .into_iter() + .zip(row_counter.counts()) + .filter_map(|((_, mut df), count)| { + let count = count?; - // Accumulate dataframes while adjusting for the possibility of having - // read more rows than the limit. - let mut df = - accumulate_dataframes_vertical(index_and_dfs.into_iter().filter_map(|(index, df)| { - let count: IdxSize = df.height().try_into().unwrap(); - if count == 0 { - return None; + let remaining = row_limit.checked_sub(offset)?; + + // If necessary, correct having read too much from a single file. + if remaining < count { + df = df.slice(0, remaining.try_into().unwrap()); } - let remaining = row_limit.checked_sub(offsets[index])?; - Some(if remaining < count { - df.slice(0, remaining.try_into().unwrap()) - } else { - df - }) - }))?; - - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); - apply_predicate(&mut df, predicate.as_deref(), true)?; - - Ok(df) - } + + // If necessary, correct row indices now that we know the offset. + if let Some(row_index) = row_index { + df.apply(&row_index.name, |series| { + series.idx().expect("index column should be of index type") + offset + }) + .expect("index column should exist"); + } + + offset += count; + + Some(df) + }), + )?; + + let predicate = predicate.cloned().map(phys_expr_to_io_expr); + apply_predicate(&mut df, predicate.as_deref(), true)?; + + Ok(df) } impl Executor for IpcExec { @@ -372,13 +358,9 @@ impl ConsecutiveCountState { } } - fn into_counts(mut self) -> Box<[IdxSize]> { - // Set the counts to which no value has been written to zero. - for i in self.next_index..self.counts.len() { - if self.counts[i] == IdxSize::MAX { - self.counts[i] = 0; - } - } + fn counts(&self) -> impl Iterator> + '_ { self.counts + .iter() + .map(|&count| (count != IdxSize::MAX).then_some(count)) } } From 3dc79c4074f6187c913b0768a1fba6d20a2bbc65 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 15:13:28 +0100 Subject: [PATCH 12/17] Use config::get_file_prefetch_size() --- crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 51c5991aaf65..e822080c5661 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -212,7 +212,7 @@ impl IpcExec { } }) .assert_send() - .buffer_unordered(100) + .buffer_unordered(config::get_file_prefetch_size()) .try_collect::>() .await?; From 0ed33849bb8f5e64ca4ee00d6909d233e2ad726e Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 15:13:52 +0100 Subject: [PATCH 13/17] Remove obsolete code --- crates/polars-lazy/src/scan/ipc.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index c3fec4d85689..6d378708c8f8 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -86,11 +86,6 @@ impl LazyFileListReader for LazyIpcReader { .into(); lf.opt_state.file_caching = true; - // // it is a bit hacky, but this `with_row_index` function updates the schema - // if let Some(row_index) = args.row_index { - // lf = lf.with_row_index(&row_index.name, Some(row_index.offset)) - // } - Ok(lf) } From a31460084a4524fb1df4ddbd71b479cf29f2e0da Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 15:14:45 +0100 Subject: [PATCH 14/17] Use eprintln instead of println for verbose debugging output --- crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index e822080c5661..152142c36de9 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -54,7 +54,7 @@ impl IpcExec { fn read_sync(&mut self) -> PolarsResult { if config::verbose() { - println!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}", + eprintln!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}", self.file_options.row_index.as_ref(), self.file_options.n_rows.as_ref(), self.predicate.is_some(), From f5271af36215e527c5d626c608658e4f853c86a4 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 15:22:26 +0100 Subject: [PATCH 15/17] Use Ordering::Relaxed for atomic counter --- crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 152142c36de9..d314a1577e44 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -86,7 +86,7 @@ impl IpcExec { let index_and_dfs = (0..self.paths.len()) .into_par_iter() .map(|_| -> PolarsResult<(usize, DataFrame)> { - let index = path_index.fetch_add(1, Ordering::SeqCst); + let index = path_index.fetch_add(1, Ordering::Relaxed); let path = &self.paths[index]; let already_read_in_sequence = row_counter.read().unwrap().sum(); From 2532241bc693b0fd93265e743cbf3970ab219e8e Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 15:23:02 +0100 Subject: [PATCH 16/17] Remove comment --- .../polars-lazy/src/physical_plan/executors/scan/ipc.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index d314a1577e44..ad93d9f3f2e9 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -111,14 +111,6 @@ impl IpcExec { .with_projection(projection.clone()) .memory_mapped(self.options.memmap) .finish()?; - // TODO: We can not supply a filter until the readers return - // how many total rows have been read before applying the - // filter. Without that, we can not correctly compute the - // pre-filter row count. - // .finish_with_scan_ops( - // predicate.clone(), - // verbose, - // )?; row_counter .write() From a0194c92aacc9f1191e77aeec0cd51d992b65fd4 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 20 Mar 2024 15:25:01 +0100 Subject: [PATCH 17/17] Refactor debug assertion --- .../src/physical_plan/executors/scan/ipc.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index ad93d9f3f2e9..f3759a54c727 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -227,13 +227,10 @@ fn finish_index_and_dfs( ) -> PolarsResult { index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); - #[cfg(debug_assertions)] - { - assert!( - index_and_dfs.iter().enumerate().all(|(a, &(b, _))| a == b), - "expected dataframe indices in order from 0 to len" - ); - } + debug_assert!( + index_and_dfs.iter().enumerate().all(|(a, &(b, _))| a == b), + "expected dataframe indices in order from 0 to len" + ); debug_assert_eq!(index_and_dfs.len(), row_counter.counts.len()); let mut offset = 0;