From 75eb0a76c8af679ef7fb457a9de53734a1297f5a Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 13 Mar 2024 16:28:16 +0100 Subject: [PATCH] Support multiple paths in IpcExec --- .../src/physical_plan/executors/scan/ipc.rs | 163 +++++++++++++++--- .../src/physical_plan/planner/lp.rs | 24 ++- 2 files changed, 147 insertions(+), 40 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 123eeb9461e16..886830a8079de 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -1,14 +1,18 @@ use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::RwLock; use polars_core::config::env_force_async; +use polars_core::utils::accumulate_dataframes_vertical; #[cfg(feature = "cloud")] use polars_io::cloud::CloudOptions; use polars_io::is_cloud_url; +use rayon::prelude::*; use super::*; pub struct IpcExec { - pub(crate) path: PathBuf, + pub(crate) paths: Arc<[PathBuf]>, pub(crate) schema: SchemaRef, pub(crate) predicate: Option>, pub(crate) options: IpcScanOptions, @@ -20,7 +24,7 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { - let is_cloud = is_cloud_url(&self.path); + let is_cloud = self.paths.iter().any(is_cloud_url); let force_async = env_force_async(); let mut out = if is_cloud || force_async { @@ -50,7 +54,6 @@ impl IpcExec { } fn read_sync(&mut self, verbose: bool) -> PolarsResult { - let file = std::fs::File::open(&self.path)?; let (projection, predicate) = prepare_scan_args( self.predicate.clone(), &mut self.file_options.with_columns, @@ -58,40 +61,104 @@ impl IpcExec { self.file_options.row_index.is_some(), None, ); - IpcReader::new(file) - .with_n_rows(self.file_options.n_rows) - .with_row_index(std::mem::take(&mut self.file_options.row_index)) - .set_rechunk(self.file_options.rechunk) - .with_projection(projection) - .memory_mapped(self.options.memmap) - .finish_with_scan_ops(predicate, verbose) + + let row_limit = self.file_options.n_rows.unwrap_or(usize::MAX); + + // Used to determine the next file to open. This guarantees the order. + let path_index = AtomicUsize::new(0); + let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + + let mut index_and_dfs = (0..self.paths.len()) + .into_par_iter() + .map(|_| -> PolarsResult<(usize, DataFrame)> { + let index = path_index.fetch_add(1, Ordering::SeqCst); + let path = &self.paths[index]; + + if row_counter.read().unwrap().sum() >= row_limit { + return Ok(Default::default()); + } + + let file = std::fs::File::open(path)?; + + let df = IpcReader::new(file) + // .with_n_rows(self.file_options.n_rows) + .with_row_index(self.file_options.row_index.clone()) + // .set_rechunk(self.file_options.rechunk) + .with_projection(projection.clone()) + .memory_mapped(self.options.memmap) + .finish_with_scan_ops(predicate.clone(), verbose)?; + + row_counter.write().unwrap().write(index, df.height()); + + Ok((index, df)) + }) + .collect::>>()?; + + index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); + + let df = accumulate_dataframes_vertical(index_and_dfs.into_iter().map(|(_, df)| df))?; + + Ok(df) } #[cfg(feature = "cloud")] async fn read_async(&mut self, verbose: bool) -> PolarsResult { let predicate = self.predicate.clone().map(phys_expr_to_io_expr); - let reader = - IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref()) - .await?; - reader - .data( - self.metadata.as_ref(), - IpcReadOptions::default() - .with_row_limit(self.file_options.n_rows) - .with_row_index(self.file_options.row_index.clone()) - .with_projection(self.file_options.with_columns.as_deref().cloned()) - .with_predicate(predicate), - verbose, - ) - .await + let mut dfs = vec![]; + + for path in self.paths.iter() { + let reader = + IpcReaderAsync::from_uri(path.to_str().unwrap(), self.cloud_options.as_ref()) + .await?; + dfs.push( + reader + .data( + self.metadata.as_ref(), + IpcReadOptions::default() + .with_row_limit(self.file_options.n_rows) + .with_row_index(self.file_options.row_index.clone()) + .with_projection(self.file_options.with_columns.as_deref().cloned()) + .with_predicate(predicate.clone()), + verbose, + ) + .await?, + ); + } + + accumulate_dataframes_vertical(dfs) + + // TODO: WIP + // let paths = self.paths.clone(); + // let cloud_options = self.cloud_options.clone(); + // let metadata + // use futures::{stream::{self, StreamExt}, TryStreamExt}; + // let dfs = stream::iter(&*paths).map( + // move |path| async move { + // let reader = + // IpcReaderAsync::from_uri(path.to_str().unwrap(), cloud_options.as_ref()) + // .await?; + // reader + // .data( + // self.metadata.as_ref(), + // IpcReadOptions::default() + // .with_row_limit(self.file_options.n_rows) + // .with_row_index(self.file_options.row_index.clone()) + // .with_projection(self.file_options.with_columns.as_deref().cloned()) + // .with_predicate(predicate.clone()), + // verbose, + // ) + // .await + // } + // ).buffer_unordered(100).try_collect::>().await?; + // accumulate_dataframes_vertical(dfs) } } impl Executor for IpcExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - paths: Arc::new([self.path.clone()]), + paths: Arc::clone(&self.paths), #[allow(clippy::useless_asref)] predicate: self .predicate @@ -101,7 +168,7 @@ impl Executor for IpcExec { }; let profile_name = if state.has_node_timer() { - let mut ids = vec![self.path.to_string_lossy().into()]; + let mut ids = vec![self.paths[0].to_string_lossy().into()]; if self.predicate.is_some() { ids.push("predicate".into()) } @@ -123,3 +190,47 @@ impl Executor for IpcExec { ) } } + +// Tracks the sum of consecutive values in a dynamically sized array where the values can be written +// in any order. +struct ConsecutiveCountState { + counts: Box<[usize]>, + next_index: usize, + sum: usize, +} + +impl ConsecutiveCountState { + fn new(len: usize) -> Self { + Self { + counts: vec![usize::MAX; len].into_boxed_slice(), + next_index: 0, + sum: 0, + } + } + + /// Sum of all consecutive counts. + fn sum(&self) -> usize { + self.sum + } + + /// Write count at index. + fn write(&mut self, index: usize, count: usize) { + debug_assert!( + self.counts[index] == usize::MAX, + "second write to same index" + ); + debug_assert!(count != usize::MAX, "count can not be usize::MAX"); + + self.counts[index] = count; + + // Update sum and next index. + while self.next_index < self.counts.len() { + let count = self.counts[self.next_index]; + if count == usize::MAX { + break; + } + self.sum += count; + self.next_index += 1; + } + } +} diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index dc6cbc81b255e..15db692585d4b 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -237,20 +237,16 @@ pub fn create_physical_plan( #[cfg(feature = "cloud")] cloud_options, metadata, - } => { - assert_eq!(paths.len(), 1); - let path = paths[0].clone(); - Ok(Box::new(executors::IpcExec { - path, - schema: file_info.schema, - predicate, - options, - file_options, - #[cfg(feature = "cloud")] - cloud_options, - metadata, - })) - }, + } => Ok(Box::new(executors::IpcExec { + paths, + schema: file_info.schema, + predicate, + options, + file_options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + })), #[cfg(feature = "parquet")] FileScan::Parquet { options,