diff --git a/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs b/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs index 7730e6954476..f80a40f6e6df 100644 --- a/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs +++ b/polars/polars-core/src/chunked_array/logical/categorical/ops/unique.rs @@ -1,5 +1,5 @@ use super::*; -use crate::prelude::groupby::IntoGroupsProxy; +use crate::frame::groupby::IntoGroupsProxy; impl CategoricalChunked { pub fn unique(&self) -> PolarsResult { diff --git a/polars/polars-core/src/frame/asof_join/mod.rs b/polars/polars-core/src/frame/asof_join/mod.rs index 9add902266f8..6c6deab35f8f 100644 --- a/polars/polars-core/src/frame/asof_join/mod.rs +++ b/polars/polars-core/src/frame/asof_join/mod.rs @@ -186,7 +186,7 @@ impl DataFrame { ) }; - self.finish_join(left, right_df, suffix) + self.finish_join(left, right_df, suffix.as_deref()) } /// This is similar to a left-join except that we match on nearest key rather than equal keys. diff --git a/polars/polars-core/src/frame/cross_join.rs b/polars/polars-core/src/frame/cross_join.rs index 3059a3435182..f8a4cf691b72 100644 --- a/polars/polars-core/src/frame/cross_join.rs +++ b/polars/polars-core/src/frame/cross_join.rs @@ -40,13 +40,12 @@ fn take_right(total_rows: IdxSize, n_rows_right: IdxSize, slice: Option<(i64, us } impl DataFrame { - /// Creates the cartesian product from both frames, preserves the order of the left keys. - pub(crate) fn cross_join( + fn cross_join_dfs( &self, other: &DataFrame, - suffix: Option, slice: Option<(i64, usize)>, - ) -> PolarsResult { + parallel: bool, + ) -> PolarsResult<(DataFrame, DataFrame)> { let n_rows_left = self.height() as IdxSize; let n_rows_right = other.height() as IdxSize; let total_rows = n_rows_right * n_rows_left; @@ -78,8 +77,43 @@ impl DataFrame { concat_df_unchecked(iter) } }; + let (l_df, r_df) = if parallel { + POOL.install(|| rayon::join(create_left_df, create_right_df)) + } else { + (create_left_df(), create_right_df()) + }; + Ok((l_df, r_df)) + } + + #[doc(hidden)] + /// used by streaming + pub fn _cross_join_with_names( + &self, + other: &DataFrame, + names: &[String], + ) -> PolarsResult { + let (mut l_df, r_df) = self.cross_join_dfs(other, None, false)?; + l_df.get_columns_mut().extend_from_slice(&r_df.columns); - let (l_df, r_df) = POOL.install(|| rayon::join(create_left_df, create_right_df)); + l_df.get_columns_mut() + .iter_mut() + .zip(names) + .for_each(|(s, name)| { + if s.name() != name { + s.rename(name); + } + }); + Ok(l_df) + } + + /// Creates the cartesian product from both frames, preserves the order of the left keys. + pub fn cross_join( + &self, + other: &DataFrame, + suffix: Option<&str>, + slice: Option<(i64, usize)>, + ) -> PolarsResult { + let (l_df, r_df) = self.cross_join_dfs(other, slice, true)?; self.finish_join(l_df, r_df, suffix) } diff --git a/polars/polars-core/src/frame/hash_join/mod.rs b/polars/polars-core/src/frame/hash_join/mod.rs index 4425a02c3200..57c88f2df83b 100644 --- a/polars/polars-core/src/frame/hash_join/mod.rs +++ b/polars/polars-core/src/frame/hash_join/mod.rs @@ -274,7 +274,7 @@ impl DataFrame { &self, mut df_left: DataFrame, mut df_right: DataFrame, - suffix: Option, + suffix: Option<&str>, ) -> PolarsResult { let mut left_names = PlHashSet::with_capacity(df_left.width()); @@ -289,7 +289,7 @@ impl DataFrame { rename_strs.push(series.name().to_owned()) } }); - let suffix = suffix.as_deref().unwrap_or("_right"); + let suffix = suffix.unwrap_or("_right"); for name in rename_strs { df_right.rename(&name, &format!("{}{}", name, suffix))?; @@ -354,7 +354,7 @@ impl DataFrame { ) -> PolarsResult { #[cfg(feature = "cross_join")] if let JoinType::Cross = how { - return self.cross_join(other, suffix, slice); + return self.cross_join(other, suffix.as_deref(), slice); } #[cfg(feature = "chunked_ids")] @@ -517,7 +517,7 @@ impl DataFrame { ._take_unchecked_slice(join_idx_right, true) }, ); - self.finish_join(df_left, df_right, suffix) + self.finish_join(df_left, df_right, suffix.as_deref()) } JoinType::Left => { let mut left = DataFrame::new_no_checks(selected_left_physical); @@ -565,7 +565,7 @@ impl DataFrame { } keys.extend_from_slice(df_left.get_columns()); let df_left = DataFrame::new_no_checks(keys); - self.finish_join(df_left, df_right, suffix) + self.finish_join(df_left, df_right, suffix.as_deref()) } #[cfg(feature = "asof_join")] JoinType::AsOf(_) => Err(PolarsError::ComputeError( @@ -638,7 +638,7 @@ impl DataFrame { { #[cfg(feature = "cross_join")] if let JoinType::Cross = how { - return self.cross_join(other, suffix, None); + return self.cross_join(other, suffix.as_deref(), None); } let selected_left = self.select_series(left_on)?; let selected_right = other.select_series(right_on)?; @@ -709,7 +709,7 @@ impl DataFrame { ._take_unchecked_slice(join_tuples_right, true) }, ); - self.finish_join(df_left, df_right, suffix) + self.finish_join(df_left, df_right, suffix.as_deref()) } /// Perform a left join on two DataFrames @@ -789,7 +789,7 @@ impl DataFrame { }; let (df_left, df_right) = POOL.join(materialize_left, materialize_right); - self.finish_join(df_left, df_right, suffix) + self.finish_join(df_left, df_right, suffix.as_deref()) } #[cfg(feature = "chunked_ids")] @@ -840,7 +840,7 @@ impl DataFrame { }; let (df_left, df_right) = POOL.join(materialize_left, materialize_right); - self.finish_join(df_left, df_right, suffix) + self.finish_join(df_left, df_right, suffix.as_deref()) } pub(crate) fn left_join_from_series( @@ -976,7 +976,7 @@ impl DataFrame { }; df_left.get_columns_mut().insert(join_column_index, s); - self.finish_join(df_left, df_right, suffix) + self.finish_join(df_left, df_right, suffix.as_deref()) } } diff --git a/polars/polars-core/src/prelude.rs b/polars/polars-core/src/prelude.rs index 2d302e2b4224..04078093e548 100644 --- a/polars/polars-core/src/prelude.rs +++ b/polars/polars-core/src/prelude.rs @@ -37,10 +37,10 @@ pub use crate::error::{PolarsError, PolarsResult}; pub use crate::frame::asof_join::*; pub use crate::frame::explode::MeltArgs; pub(crate) use crate::frame::groupby::aggregations::*; -pub use crate::frame::groupby::{GroupsIdx, GroupsProxy, GroupsSlice}; +pub use crate::frame::groupby::{GroupsIdx, GroupsProxy, GroupsSlice, IntoGroupsProxy}; pub use crate::frame::hash_join::JoinType; pub(crate) use crate::frame::hash_join::*; -pub use crate::frame::*; +pub use crate::frame::{DataFrame, UniqueKeepStrategy}; pub use crate::named_from::{NamedFrom, NamedFromOwned}; pub use crate::schema::*; #[cfg(feature = "checked_arithmetic")] diff --git a/polars/polars-core/src/utils/mod.rs b/polars/polars-core/src/utils/mod.rs index 953e9ba08299..c39d55f2f665 100644 --- a/polars/polars-core/src/utils/mod.rs +++ b/polars/polars-core/src/utils/mod.rs @@ -141,8 +141,8 @@ pub fn split_series(s: &Series, n: usize) -> PolarsResult> { } fn flatten_df(df: &DataFrame) -> impl Iterator + '_ { - df.iter_chunks().map(|chunk| { - DataFrame::new_no_checks( + df.iter_chunks().flat_map(|chunk| { + let df = DataFrame::new_no_checks( df.iter() .zip(chunk.into_arrays()) .map(|(s, arr)| { @@ -153,19 +153,15 @@ fn flatten_df(df: &DataFrame) -> impl Iterator + '_ { } }) .collect(), - ) + ); + if df.height() == 0 { + None + } else { + Some(df) + } }) } - -#[cfg(feature = "private")] -#[doc(hidden)] -/// Split a [`DataFrame`] into `n` parts. We take a `&mut` to be able to repartition/align chunks. -pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult> { - if n == 0 { - return Ok(vec![df.clone()]); - } - // make sure that chunks are aligned. - df.rechunk(); +pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult> { let total_len = df.height(); let chunk_size = total_len / n; @@ -199,6 +195,18 @@ pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult> { Ok(out) } +#[cfg(feature = "private")] +#[doc(hidden)] +/// Split a [`DataFrame`] into `n` parts. We take a `&mut` to be able to repartition/align chunks. +pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult> { + if n == 0 { + return Ok(vec![df.clone()]); + } + // make sure that chunks are aligned. + df.rechunk(); + split_df_as_ref(df, n) +} + pub fn slice_slice(vals: &[T], offset: i64, len: usize) -> &[T] { let (raw_offset, slice_len) = slice_offsets(offset, len, vals.len()); &vals[raw_offset..raw_offset + slice_len] diff --git a/polars/polars-lazy/Cargo.toml b/polars/polars-lazy/Cargo.toml index e967c3e1cccf..2781eca388a3 100644 --- a/polars/polars-lazy/Cargo.toml +++ b/polars/polars-lazy/Cargo.toml @@ -61,7 +61,7 @@ is_in = ["polars-plan/is_in"] repeat_by = ["polars-plan/repeat_by"] round_series = ["polars-plan/round_series"] is_first = ["polars-plan/is_first"] -cross_join = ["polars-plan/cross_join"] +cross_join = ["polars-plan/cross_join", "polars-pipe/cross_join"] asof_join = ["polars-plan/asof_join", "polars-time"] dot_product = ["polars-plan/dot_product"] concat_str = ["polars-plan/concat_str"] diff --git a/polars/polars-lazy/polars-pipe/Cargo.toml b/polars/polars-lazy/polars-pipe/Cargo.toml index 432beeb1a4f4..e08ea4f53860 100644 --- a/polars/polars-lazy/polars-pipe/Cargo.toml +++ b/polars/polars-lazy/polars-pipe/Cargo.toml @@ -22,3 +22,4 @@ rayon.workspace = true csv-file = ["polars-plan/csv-file", "polars-io/csv-file"] parquet = ["polars-plan/parquet", "polars-io/parquet"] nightly = ["polars-core/nightly", "polars-utils/nightly", "hashbrown/nightly"] +cross_join = ["polars-core/cross_join"] diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs new file mode 100644 index 000000000000..290010ea8585 --- /dev/null +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs @@ -0,0 +1,20 @@ +use polars_core::error::PolarsResult; + +use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext}; + +#[derive(Default)] +pub struct Dummy {} + +impl Operator for Dummy { + fn execute( + &mut self, + _context: &PExecutionContext, + _chunk: &DataChunk, + ) -> PolarsResult { + panic!("dummy should be replaced") + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(Self {}) + } +} diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs index 8905cad380a4..baf3c1cfc3b2 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs @@ -5,13 +5,14 @@ use polars_core::error::{PolarsError, PolarsResult}; use crate::expressions::PhysicalPipedExpr; use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext}; +#[derive(Clone)] pub(crate) struct FilterOperator { pub(crate) predicate: Arc, } impl Operator for FilterOperator { fn execute( - &self, + &mut self, context: &PExecutionContext, chunk: &DataChunk, ) -> PolarsResult { @@ -29,4 +30,7 @@ impl Operator for FilterOperator { Ok(OperatorResult::Finished(chunk.with_data(df))) } + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs index 99a085325d3a..7c2f5108f45c 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs @@ -1,5 +1,7 @@ +mod dummy; mod filter; mod projection; +pub(crate) use dummy::Dummy; pub(crate) use filter::*; pub(crate) use projection::*; diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs index 38de1b642fc8..8464f047aa73 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs @@ -6,28 +6,33 @@ use polars_core::frame::DataFrame; use crate::expressions::PhysicalPipedExpr; use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext}; +#[derive(Clone)] pub(crate) struct FastProjectionOperator { pub(crate) columns: Arc>>, } impl Operator for FastProjectionOperator { fn execute( - &self, + &mut self, _context: &PExecutionContext, chunk: &DataChunk, ) -> PolarsResult { let chunk = chunk.with_data(chunk.data.select(self.columns.as_slice())?); Ok(OperatorResult::Finished(chunk)) } + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } } +#[derive(Clone)] pub(crate) struct ProjectionOperator { pub(crate) exprs: Vec>, } impl Operator for ProjectionOperator { fn execute( - &self, + &mut self, context: &PExecutionContext, chunk: &DataChunk, ) -> PolarsResult { @@ -40,4 +45,34 @@ impl Operator for ProjectionOperator { let chunk = chunk.with_data(DataFrame::new_no_checks(projected)); Ok(OperatorResult::Finished(chunk)) } + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } +} + +#[derive(Clone)] +pub(crate) struct HstackOperator { + pub(crate) exprs: Vec>, +} + +impl Operator for HstackOperator { + fn execute( + &mut self, + context: &PExecutionContext, + chunk: &DataChunk, + ) -> PolarsResult { + let projected = self + .exprs + .iter() + .map(|e| e.evaluate(chunk, context.execution_state.as_ref())) + .collect::>>()?; + + let df = chunk.data.hstack(&projected)?; + + let chunk = chunk.with_data(df); + Ok(OperatorResult::Finished(chunk)) + } + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs index 3f7609c5ef75..dfaff4514238 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs @@ -17,7 +17,7 @@ use super::HASHMAP_INIT_SIZE; use crate::executors::sinks::groupby::aggregates::AggregateFunction; use crate::executors::sinks::groupby::utils::compute_slices; use crate::expressions::PhysicalPipedExpr; -use crate::operators::{DataChunk, PExecutionContext, Sink, SinkResult}; +use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; // This is the hash and the Index offset in the linear buffer type Key = (u64, IdxSize); @@ -108,6 +108,86 @@ impl GenericGroupbySink { fn number_of_keys(&self) -> usize { self.key_columns.len() } + + fn pre_finalize(&mut self) -> PolarsResult> { + let mut aggregators = std::mem::take(&mut self.aggregators); + let n_keys = self.number_of_keys(); + let slices = compute_slices(&self.pre_agg_partitions, self.slice); + + POOL.install(|| { + let dfs = + self.pre_agg_partitions + .par_iter() + .zip(aggregators.par_iter_mut()) + .zip(self.keys.par_iter()) + .zip(slices.par_iter()) + .filter_map(|(((agg_map, agg_fns), current_keys), slice)| { + let (offset, slice_len) = (*slice)?; + if agg_map.is_empty() { + return None; + } + let mut key_builders = self + .output_schema + .iter_dtypes() + .take(n_keys) + .map(|dtype| AnyValueBuffer::new(dtype, agg_map.len())) + .collect::>(); + let dtypes = agg_fns + .iter() + .take(self.number_of_aggs()) + .map(|func| func.dtype()) + .collect::>(); + + let mut buffers = dtypes + .iter() + .map(|dtype| AnyValueBuffer::new(dtype, slice_len)) + .collect::>(); + + agg_map.into_iter().skip(offset).take(slice_len).for_each( + |(k, &offset)| { + let keys_offset = k.1 as usize; + let keys = unsafe { + current_keys + .get_unchecked_release(keys_offset..keys_offset + n_keys) + }; + + for (key, key_builder) in keys.iter().zip(key_builders.iter_mut()) { + key_builder.add(key.as_borrowed()); + } + + for (i, buffer) in (offset as usize + ..offset as usize + self.aggregation_columns.len()) + .zip(buffers.iter_mut()) + { + unsafe { + let agg_fn = agg_fns.get_unchecked_release_mut(i); + let av = agg_fn.finalize(); + buffer.add(av); + } + } + }, + ); + + let mut cols = Vec::with_capacity(n_keys + self.number_of_aggs()); + for key_builder in key_builders { + cols.push(key_builder.into_series()); + } + cols.extend(buffers.into_iter().map(|buf| buf.into_series())); + for (s, (name, dtype)) in cols.iter_mut().zip(self.output_schema.iter()) { + if s.name() != name { + s.rename(name); + } + if s.dtype() != dtype { + *s = s.cast(dtype).unwrap() + } + } + Some(DataFrame::new_no_checks(cols)) + }) + .collect::>(); + + Ok(dfs) + }) + } } impl Sink for GenericGroupbySink { @@ -223,7 +303,7 @@ impl Sink for GenericGroupbySink { Ok(SinkResult::CanHaveMoreInput) } - fn combine(&mut self, other: Box) { + fn combine(&mut self, mut other: Box) { // don't parallel this as this is already done in parallel. let other = other.as_any().downcast_ref::().unwrap(); @@ -326,86 +406,13 @@ impl Sink for GenericGroupbySink { Box::new(new) } - fn finalize(&mut self) -> PolarsResult { - let mut aggregators = std::mem::take(&mut self.aggregators); - let n_keys = self.number_of_keys(); - let slices = compute_slices(&self.pre_agg_partitions, self.slice); - - POOL.install(|| { - let dfs = - self.pre_agg_partitions - .par_iter() - .zip(aggregators.par_iter_mut()) - .zip(self.keys.par_iter()) - .zip(slices.par_iter()) - .filter_map(|(((agg_map, agg_fns), current_keys), slice)| { - let (offset, slice_len) = (*slice)?; - if agg_map.is_empty() { - return None; - } - let mut key_builders = self - .output_schema - .iter_dtypes() - .take(n_keys) - .map(|dtype| AnyValueBuffer::new(dtype, agg_map.len())) - .collect::>(); - let dtypes = agg_fns - .iter() - .take(self.number_of_aggs()) - .map(|func| func.dtype()) - .collect::>(); - - let mut buffers = dtypes - .iter() - .map(|dtype| AnyValueBuffer::new(dtype, slice_len)) - .collect::>(); - - agg_map.into_iter().skip(offset).take(slice_len).for_each( - |(k, &offset)| { - let keys_offset = k.1 as usize; - let keys = unsafe { - current_keys - .get_unchecked_release(keys_offset..keys_offset + n_keys) - }; - - for (key, key_builder) in keys.iter().zip(key_builders.iter_mut()) { - key_builder.add(key.as_borrowed()); - } - - for (i, buffer) in (offset as usize - ..offset as usize + self.aggregation_columns.len()) - .zip(buffers.iter_mut()) - { - unsafe { - let agg_fn = agg_fns.get_unchecked_release_mut(i); - let av = agg_fn.finalize(); - buffer.add(av); - } - } - }, - ); - - let mut cols = Vec::with_capacity(n_keys + self.number_of_aggs()); - for key_builder in key_builders { - cols.push(key_builder.into_series()); - } - cols.extend(buffers.into_iter().map(|buf| buf.into_series())); - for (s, (name, dtype)) in cols.iter_mut().zip(self.output_schema.iter()) { - if s.name() != name { - s.rename(name); - } - if s.dtype() != dtype { - *s = s.cast(dtype).unwrap() - } - } - Some(DataFrame::new_no_checks(cols)) - }) - .collect::>(); - Ok(accumulate_dataframes_vertical_unchecked(dfs)) - }) + fn finalize(&mut self) -> PolarsResult { + let dfs = self.pre_finalize()?; + let mut df = accumulate_dataframes_vertical_unchecked(dfs); + DataFrame::new(std::mem::take(df.get_columns_mut())).map(FinalizedSink::Finished) } - fn as_any(&self) -> &dyn Any { + fn as_any(&mut self) -> &mut dyn Any { self } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs index bc43a9fe22b6..f80fdba7edf2 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs @@ -19,7 +19,7 @@ use super::HASHMAP_INIT_SIZE; use crate::executors::sinks::groupby::aggregates::AggregateFunction; use crate::executors::sinks::groupby::utils::compute_slices; use crate::expressions::PhysicalPipedExpr; -use crate::operators::{DataChunk, PExecutionContext, Sink, SinkResult}; +use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; // hash + value #[derive(Eq, Copy, Clone)] @@ -66,7 +66,10 @@ pub struct PrimitiveGroupbySink { slice: Option<(i64, usize)>, } -impl PrimitiveGroupbySink { +impl PrimitiveGroupbySink +where + ChunkedArray: IntoSeries, +{ pub fn new( key: Arc, aggregation_columns: Arc>>, @@ -109,6 +112,72 @@ impl PrimitiveGroupbySink { fn number_of_aggs(&self) -> usize { self.aggregation_columns.len() } + + fn pre_finalize(&mut self) -> PolarsResult> { + // TODO! parallel + let mut aggregators = std::mem::take(&mut self.aggregators); + let slices = compute_slices(&self.pre_agg_partitions, self.slice); + + POOL.install(|| { + let dfs = + self.pre_agg_partitions + .par_iter() + .zip(aggregators.par_iter_mut()) + .zip(slices.par_iter()) + .filter_map(|((agg_map, agg_fns), slice)| { + let (offset, slice_len) = (*slice)?; + if agg_map.is_empty() { + return None; + } + let mut key_builder = PrimitiveChunkedBuilder::::new( + self.output_schema.get_index(0).unwrap().0, + agg_map.len(), + ); + let dtypes = agg_fns + .iter() + .take(self.number_of_aggs()) + .map(|func| func.dtype()) + .collect::>(); + + let mut buffers = dtypes + .iter() + .map(|dtype| AnyValueBuffer::new(dtype, slice_len)) + .collect::>(); + + agg_map.into_iter().skip(offset).take(slice_len).for_each( + |(k, &offset)| { + key_builder.append_option(k.value); + + for (i, buffer) in (offset as usize + ..offset as usize + self.aggregation_columns.len()) + .zip(buffers.iter_mut()) + { + unsafe { + let agg_fn = agg_fns.get_unchecked_release_mut(i); + let av = agg_fn.finalize(); + buffer.add(av); + } + } + }, + ); + + let mut cols = Vec::with_capacity(1 + self.number_of_aggs()); + cols.push(key_builder.finish().into_series()); + cols.extend(buffers.into_iter().map(|buf| buf.into_series())); + for (s, (name, dtype)) in cols.iter_mut().zip(self.output_schema.iter()) { + if s.name() != name { + s.rename(name); + } + if s.dtype() != dtype { + *s = s.cast(dtype).unwrap() + } + } + Some(DataFrame::new_no_checks(cols)) + }) + .collect::>(); + Ok(dfs) + }) + } } impl Sink for PrimitiveGroupbySink @@ -182,7 +251,7 @@ where Ok(SinkResult::CanHaveMoreInput) } - fn combine(&mut self, other: Box) { + fn combine(&mut self, mut other: Box) { // don't parallel this as this is already done in parallel. let other = other.as_any().downcast_ref::().unwrap(); @@ -222,6 +291,12 @@ where ); } + fn finalize(&mut self) -> PolarsResult { + let dfs = self.pre_finalize()?; + let mut df = accumulate_dataframes_vertical_unchecked(dfs); + DataFrame::new(std::mem::take(df.get_columns_mut())).map(FinalizedSink::Finished) + } + fn split(&self, thread_no: usize) -> Box { let mut new = Self::new( self.key.clone(), @@ -235,74 +310,7 @@ where Box::new(new) } - fn finalize(&mut self) -> PolarsResult { - // TODO! parallel - let mut aggregators = std::mem::take(&mut self.aggregators); - let slices = compute_slices(&self.pre_agg_partitions, self.slice); - - POOL.install(|| { - let dfs = - self.pre_agg_partitions - .par_iter() - .zip(aggregators.par_iter_mut()) - .zip(slices.par_iter()) - .filter_map(|((agg_map, agg_fns), slice)| { - let (offset, slice_len) = (*slice)?; - if agg_map.is_empty() { - return None; - } - let mut key_builder = PrimitiveChunkedBuilder::::new( - self.output_schema.get_index(0).unwrap().0, - agg_map.len(), - ); - let dtypes = agg_fns - .iter() - .take(self.number_of_aggs()) - .map(|func| func.dtype()) - .collect::>(); - - let mut buffers = dtypes - .iter() - .map(|dtype| AnyValueBuffer::new(dtype, slice_len)) - .collect::>(); - - agg_map.into_iter().skip(offset).take(slice_len).for_each( - |(k, &offset)| { - key_builder.append_option(k.value); - - for (i, buffer) in (offset as usize - ..offset as usize + self.aggregation_columns.len()) - .zip(buffers.iter_mut()) - { - unsafe { - let agg_fn = agg_fns.get_unchecked_release_mut(i); - let av = agg_fn.finalize(); - buffer.add(av); - } - } - }, - ); - - let mut cols = Vec::with_capacity(1 + self.number_of_aggs()); - cols.push(key_builder.finish().into_series()); - cols.extend(buffers.into_iter().map(|buf| buf.into_series())); - for (s, (name, dtype)) in cols.iter_mut().zip(self.output_schema.iter()) { - if s.name() != name { - s.rename(name); - } - if s.dtype() != dtype { - *s = s.cast(dtype).unwrap() - } - } - Some(DataFrame::new_no_checks(cols)) - }) - .collect::>(); - let mut df = accumulate_dataframes_vertical_unchecked(dfs); - DataFrame::new(std::mem::take(df.get_columns_mut())) - }) - } - - fn as_any(&self) -> &dyn Any { + fn as_any(&mut self) -> &mut dyn Any { self } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs new file mode 100644 index 000000000000..44679cc4804d --- /dev/null +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs @@ -0,0 +1,158 @@ +use std::any::Any; +use std::borrow::Cow; +use std::iter::StepBy; +use std::ops::Range; +use std::sync::{Arc, Mutex}; +use std::vec; + +use polars_core::error::PolarsResult; +use polars_core::frame::DataFrame; + +use crate::operators::{ + chunks_to_df_unchecked, DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, + Sink, SinkResult, +}; + +#[derive(Default)] +pub struct CrossJoin { + chunks: Vec, + suffix: Cow<'static, str>, + shared: Arc>, +} + +impl CrossJoin { + pub(crate) fn new(suffix: Cow<'static, str>) -> Self { + CrossJoin { + chunks: vec![], + suffix, + shared: Default::default(), + } + } +} + +impl Sink for CrossJoin { + fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { + self.chunks.push(chunk); + Ok(SinkResult::CanHaveMoreInput) + } + + fn combine(&mut self, mut other: Box) { + let other = other.as_any().downcast_mut::().unwrap(); + let other_chunks = std::mem::take(&mut other.chunks); + self.chunks.extend(other_chunks.into_iter()); + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(Self { + suffix: self.suffix.clone(), + shared: self.shared.clone(), + ..Default::default() + }) + } + + fn finalize(&mut self) -> PolarsResult { + // todo! share sink + Ok(FinalizedSink::Operator(Box::new(CrossJoinPhase2 { + df: Arc::new(chunks_to_df_unchecked(std::mem::take(&mut self.chunks))), + suffix: Arc::from(self.suffix.as_ref()), + in_process_left: None, + in_process_right: None, + in_process_left_df: Default::default(), + output_names: None, + }))) + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } +} + +#[derive(Clone)] +pub struct CrossJoinPhase2 { + df: Arc, + suffix: Arc, + in_process_left: Option>>, + in_process_right: Option>>, + in_process_left_df: DataFrame, + output_names: Option>, +} + +impl Operator for CrossJoinPhase2 { + fn execute( + &mut self, + _context: &PExecutionContext, + chunk: &DataChunk, + ) -> PolarsResult { + // expected output size = size**2 + // so this is a small number + let size = 250; + + if self.in_process_left.is_none() { + let mut iter_left = (0..self.df.height()).step_by(size); + let offset = iter_left.next().unwrap(); + self.in_process_left_df = self.df.slice(offset as i64, size); + self.in_process_left = Some(iter_left); + } + if self.in_process_right.is_none() { + self.in_process_right = Some((0..chunk.data.height()).step_by(size)); + } + // output size is large we process in chunks + let iter_left = self.in_process_left.as_mut().unwrap(); + let iter_right = self.in_process_right.as_mut().unwrap(); + + match iter_right.next() { + None => { + self.in_process_right = None; + + // if right is depleted take the next left chunk + match iter_left.next() { + None => { + self.in_process_left = None; + Ok(OperatorResult::NeedsNewData) + } + Some(offset) => { + self.in_process_left_df = self.df.slice(offset as i64, size); + self.in_process_right = Some((0..chunk.data.height()).step_by(size)); + let iter_right = self.in_process_right.as_mut().unwrap(); + let offset = iter_right.next().unwrap(); + let right_df = chunk.data.slice(offset as i64, size); + let df = self.in_process_left_df.cross_join( + &right_df, + Some(self.suffix.as_ref()), + None, + )?; + Ok(OperatorResult::HaveMoreOutPut(chunk.with_data(df))) + } + } + } + // deplete the right chunks over the current left chunk + Some(offset) => { + // this will be the branch of the first call + + let right_df = chunk.data.slice(offset as i64, size); + + // we use the first join to determine the output names + // this we can amortize the name allocations. + let df = match &self.output_names { + None => { + let df = self.in_process_left_df.cross_join( + &right_df, + Some(self.suffix.as_ref()), + None, + )?; + self.output_names = Some(df.get_column_names_owned()); + df + } + Some(names) => self + .in_process_left_df + ._cross_join_with_names(&right_df, names)?, + }; + + Ok(OperatorResult::HaveMoreOutPut(chunk.with_data(df))) + } + } + } + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } +} diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/mod.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/mod.rs new file mode 100644 index 000000000000..736fa70efd11 --- /dev/null +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/mod.rs @@ -0,0 +1,5 @@ +#[cfg(feature = "cross_join")] +mod cross; + +#[cfg(feature = "cross_join")] +pub(crate) use cross::*; diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/mod.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/mod.rs index b38d956fecc3..af3d325c586f 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/mod.rs @@ -1,4 +1,7 @@ pub(crate) mod groupby; +mod joins; mod ordered; +#[cfg(feature = "cross_join")] +pub(crate) use joins::*; pub(crate) use ordered::*; diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs index 183242cb1d80..ec949ae72f26 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs @@ -1,10 +1,10 @@ use std::any::Any; use polars_core::error::PolarsResult; -use polars_core::frame::DataFrame; -use polars_core::utils::accumulate_dataframes_vertical_unchecked; -use crate::operators::{DataChunk, PExecutionContext, Sink, SinkResult}; +use crate::operators::{ + chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, +}; // Ensure the data is return in the order it was streamed #[derive(Clone)] @@ -28,7 +28,7 @@ impl Sink for OrderedSink { Ok(SinkResult::CanHaveMoreInput) } - fn combine(&mut self, other: Box) { + fn combine(&mut self, mut other: Box) { let other = other.as_any().downcast_ref::().unwrap(); self.chunks.extend_from_slice(&other.chunks); self.sort(); @@ -37,15 +37,12 @@ impl Sink for OrderedSink { fn split(&self, _thread_no: usize) -> Box { Box::new(self.clone()) } - fn finalize(&mut self) -> PolarsResult { + fn finalize(&mut self) -> PolarsResult { self.sort(); - Ok(accumulate_dataframes_vertical_unchecked( - std::mem::take(&mut self.chunks) - .into_iter() - .map(|chunk| chunk.data), - )) + let chunks = std::mem::take(&mut self.chunks); + Ok(FinalizedSink::Finished(chunks_to_df_unchecked(chunks))) } - fn as_any(&self) -> &dyn Any { + fn as_any(&mut self) -> &mut dyn Any { self } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs index 3948250ffafe..25c591c21532 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs @@ -8,6 +8,7 @@ use polars_plan::global::_set_n_rows_for_scan; use polars_plan::prelude::CsvParserOptions; use super::*; +use crate::CHUNK_SIZE; pub(crate) struct CsvSource { #[allow(dead_code)] @@ -58,7 +59,7 @@ impl CsvSource { .with_end_of_line_char(options.eol_char) .with_encoding(options.encoding) .with_rechunk(options.rechunk) - .with_chunk_size(50_000) + .with_chunk_size(CHUNK_SIZE) .with_row_count(options.row_count) .with_parse_dates(options.parse_dates); diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs new file mode 100644 index 000000000000..6628922a9d76 --- /dev/null +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs @@ -0,0 +1,42 @@ +use std::iter::Enumerate; +use std::vec::IntoIter; + +use polars_core::error::PolarsResult; +use polars_core::frame::DataFrame; +use polars_core::utils::split_df; +use polars_core::POOL; +use polars_utils::IdxSize; + +use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; + +pub struct DataFrameSource { + dfs: Enumerate>, + n_threads: usize, +} + +impl DataFrameSource { + pub(crate) fn from_df(mut df: DataFrame) -> Self { + let n_threads = POOL.current_num_threads(); + let dfs = split_df(&mut df, n_threads).unwrap(); + let dfs = dfs.into_iter().enumerate(); + Self { dfs, n_threads } + } +} + +impl Source for DataFrameSource { + fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { + let chunks = (&mut self.dfs) + .map(|(chunk_index, data)| DataChunk { + chunk_index: chunk_index as IdxSize, + data, + }) + .take(self.n_threads) + .collect::>(); + + if chunks.is_empty() { + Ok(SourceResult::Finished) + } else { + Ok(SourceResult::GotMoreData(chunks)) + } + } +} diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/mod.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/mod.rs index f4e7a7eee6f4..73b96f94da72 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/mod.rs @@ -1,11 +1,13 @@ #[cfg(feature = "csv-file")] mod csv; +mod frame; #[cfg(feature = "parquet")] mod parquet; mod union; #[cfg(feature = "csv-file")] pub(crate) use csv::CsvSource; +pub(crate) use frame::*; #[cfg(feature = "parquet")] pub(crate) use parquet::*; pub(crate) use union::*; diff --git a/polars/polars-lazy/polars-pipe/src/lib.rs b/polars/polars-lazy/polars-pipe/src/lib.rs index 31a5d4e75a0a..c46ee7227064 100644 --- a/polars/polars-lazy/polars-pipe/src/lib.rs +++ b/polars/polars-lazy/polars-pipe/src/lib.rs @@ -2,3 +2,7 @@ mod executors; pub mod expressions; pub mod operators; pub mod pipeline; + +// ideal chunk size we strive to +#[cfg(any(feature = "cross_join", feature = "csv-file"))] +pub(crate) const CHUNK_SIZE: usize = 50_000; diff --git a/polars/polars-lazy/polars-pipe/src/operators/chunks.rs b/polars/polars-lazy/polars-pipe/src/operators/chunks.rs index 40883be0ded6..d398440d3160 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/chunks.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/chunks.rs @@ -1,3 +1,5 @@ +use polars_core::utils::accumulate_dataframes_vertical_unchecked; + use super::*; #[derive(Clone, Debug)] @@ -14,3 +16,7 @@ impl DataChunk { } } } + +pub(crate) fn chunks_to_df_unchecked(chunks: Vec) -> DataFrame { + accumulate_dataframes_vertical_unchecked(chunks.into_iter().map(|c| c.data)) +} diff --git a/polars/polars-lazy/polars-pipe/src/operators/mod.rs b/polars/polars-lazy/polars-pipe/src/operators/mod.rs index aa324e07885b..c4c99745640e 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/mod.rs @@ -8,5 +8,5 @@ pub(crate) use chunks::*; pub(crate) use context::*; pub(crate) use operator::*; pub(crate) use polars_core::prelude::*; -pub(crate) use sink::*; +pub use sink::*; pub(crate) use source::*; diff --git a/polars/polars-lazy/polars-pipe/src/operators/operator.rs b/polars/polars-lazy/polars-pipe/src/operators/operator.rs index b462e58b32c5..d834e28d408c 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/operator.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/operator.rs @@ -1,15 +1,18 @@ use super::*; pub enum OperatorResult { - NeedMoreInput, + NeedsNewData, + // needs to be called again with same chunk. HaveMoreOutPut(DataChunk), Finished(DataChunk), } pub trait Operator: Send + Sync { fn execute( - &self, + &mut self, context: &PExecutionContext, chunk: &DataChunk, ) -> PolarsResult; + + fn split(&self, thread_no: usize) -> Box; } diff --git a/polars/polars-lazy/polars-pipe/src/operators/sink.rs b/polars/polars-lazy/polars-pipe/src/operators/sink.rs index e3f229d0943d..871aa0221810 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/sink.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/sink.rs @@ -8,6 +8,11 @@ pub enum SinkResult { CanHaveMoreInput, } +pub enum FinalizedSink { + Finished(DataFrame), + Operator(Box), +} + pub trait Sink: Send + Sync { fn sink(&mut self, context: &PExecutionContext, chunk: DataChunk) -> PolarsResult; @@ -15,7 +20,7 @@ pub trait Sink: Send + Sync { fn split(&self, thread_no: usize) -> Box; - fn finalize(&mut self) -> PolarsResult; + fn finalize(&mut self) -> PolarsResult; - fn as_any(&self) -> &dyn Any; + fn as_any(&mut self) -> &mut dyn Any; } diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs index f48173c86f38..876fc5aec9fc 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs @@ -1,15 +1,15 @@ use std::sync::Arc; -use polars_core::prelude::PolarsResult; +use polars_core::prelude::*; use polars_core::with_match_physical_integer_polars_type; use polars_plan::prelude::*; use crate::executors::sinks::groupby::aggregates::convert_to_hash_agg; -use crate::executors::sinks::{groupby, OrderedSink}; +use crate::executors::sinks::*; use crate::executors::{operators, sources}; use crate::expressions::PhysicalPipedExpr; use crate::operators::{Operator, Sink, Source}; -use crate::pipeline::Pipeline; +use crate::pipeline::PipeLine; fn exprs_to_physical( exprs: &[Node], @@ -27,7 +27,7 @@ where fn get_source( source: ALogicalPlan, - operator_objects: &mut Vec>, + operator_objects: &mut Vec>, expr_arena: &Arena, to_physical: &F, push_predicate: bool, @@ -37,6 +37,27 @@ where { use ALogicalPlan::*; match source { + DataFrameScan { + df, + projection, + selection, + .. + } => { + let mut df = (*df).clone(); + if push_predicate { + if let Some(predicate) = selection { + let predicate = to_physical(predicate, expr_arena)?; + let op = operators::FilterOperator { predicate }; + let op = Box::new(op) as Box; + operator_objects.push(op) + } + // projection is free + if let Some(projection) = projection { + df = df.select(projection.as_slice())?; + } + } + Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box) + } #[cfg(feature = "csv-file")] CsvScan { path, @@ -49,7 +70,7 @@ where if let (true, Some(predicate)) = (push_predicate, predicate) { let predicate = to_physical(predicate, expr_arena)?; let op = operators::FilterOperator { predicate }; - let op = Arc::new(op) as Arc; + let op = Box::new(op) as Box; operator_objects.push(op) } let src = sources::CsvSource::new(path, schema, options)?; @@ -67,7 +88,7 @@ where if let (true, Some(predicate)) = (push_predicate, predicate) { let predicate = to_physical(predicate, expr_arena)?; let op = operators::FilterOperator { predicate }; - let op = Arc::new(op) as Arc; + let op = Box::new(op) as Box; operator_objects.push(op) } let src = sources::ParquetSource::new(path, options, &schema)?; @@ -77,100 +98,184 @@ where } } -pub fn create_pipeline( - sources: &[Node], - operators: &[Node], - sink: Option, +pub fn get_sink( + node: Node, lp_arena: &mut Arena, expr_arena: &mut Arena, - to_physical: F, -) -> PolarsResult + to_physical: &F, +) -> Box where F: Fn(Node, &Arena) -> PolarsResult>, { use ALogicalPlan::*; + match lp_arena.get(node) { + #[cfg(feature = "cross_join")] + Join { options, .. } => match options.how { + JoinType::Cross => Box::new(CrossJoin::new(options.suffix.clone())) as Box, + _ => unimplemented!(), + }, + Aggregate { + input, + keys, + aggs, + schema: output_schema, + options, + .. + } => { + let key_columns = Arc::new( + keys.iter() + .map(|node| to_physical(*node, expr_arena).unwrap()) + .collect::>(), + ); - let sink = match sink { - None => Box::new(OrderedSink::new()) as Box, - Some(node) => match lp_arena.get(node) { - Aggregate { - input, - keys, - aggs, - schema: output_schema, - options, - .. - } => { - let key_columns = Arc::new( - keys.iter() - .map(|node| to_physical(*node, expr_arena).unwrap()) - .collect::>(), - ); - - let mut aggregation_columns = Vec::with_capacity(aggs.len()); - let mut agg_fns = Vec::with_capacity(aggs.len()); - - let input_schema = lp_arena.get(*input).schema(lp_arena); - - for node in aggs { - let (index, agg_fn) = - convert_to_hash_agg(*node, expr_arena, &input_schema, &to_physical); - aggregation_columns.push(index); - agg_fns.push(agg_fn) - } - let aggregation_columns = Arc::new(aggregation_columns); - - match ( - output_schema.get_index(0).unwrap().1.to_physical(), - keys.len(), - ) { - (dt, 1) if dt.is_integer() => { - with_match_physical_integer_polars_type!(dt, |$T| { - Box::new(groupby::PrimitiveGroupbySink::<$T>::new( - key_columns[0].clone(), - aggregation_columns, - agg_fns, - output_schema.clone(), - options.slice - )) as Box - }) - } - _ => Box::new(groupby::GenericGroupbySink::new( - key_columns, - aggregation_columns, - agg_fns, - output_schema.clone(), - options.slice, - )) as Box, - } + let mut aggregation_columns = Vec::with_capacity(aggs.len()); + let mut agg_fns = Vec::with_capacity(aggs.len()); + + let input_schema = lp_arena.get(*input).schema(lp_arena); + + for node in aggs { + let (index, agg_fn) = + convert_to_hash_agg(*node, expr_arena, &input_schema, &to_physical); + aggregation_columns.push(index); + agg_fns.push(agg_fn) } - _ => { - todo!() + let aggregation_columns = Arc::new(aggregation_columns); + + match ( + output_schema.get_index(0).unwrap().1.to_physical(), + keys.len(), + ) { + (dt, 1) if dt.is_integer() => { + with_match_physical_integer_polars_type!(dt, |$T| { + Box::new(groupby::PrimitiveGroupbySink::<$T>::new( + key_columns[0].clone(), + aggregation_columns, + agg_fns, + output_schema.clone(), + options.slice + )) as Box + }) + } + _ => Box::new(groupby::GenericGroupbySink::new( + key_columns, + aggregation_columns, + agg_fns, + output_schema.clone(), + options.slice, + )) as Box, } - }, + } + _ => { + todo!() + } + } +} + +pub fn get_dummy_operator() -> Box { + Box::new(operators::Dummy {}) +} + +pub fn get_operator( + node: Node, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + to_physical: &F, +) -> PolarsResult> +where + F: Fn(Node, &Arena) -> PolarsResult>, +{ + use ALogicalPlan::*; + let op = match lp_arena.get(node) { + Projection { expr, .. } => { + let op = operators::ProjectionOperator { + exprs: exprs_to_physical(expr, expr_arena, &to_physical)?, + }; + Box::new(op) as Box + } + HStack { exprs, .. } => { + let op = operators::HstackOperator { + exprs: exprs_to_physical(exprs, expr_arena, &to_physical)?, + }; + Box::new(op) as Box + } + Selection { predicate, .. } => { + let predicate = to_physical(*predicate, expr_arena)?; + let op = operators::FilterOperator { predicate }; + Box::new(op) as Box + } + MapFunction { + function: FunctionNode::FastProjection { columns }, + .. + } => { + let op = operators::FastProjectionOperator { + columns: columns.clone(), + }; + Box::new(op) as Box + } + + lp => { + panic!("operator {:?} not (yet) supported", lp) + } }; + Ok(op) +} + +pub fn create_pipeline( + sources: &[Node], + operators: Vec>, + operator_nodes: Vec, + sink_node: Option, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + to_physical: F, +) -> PolarsResult +where + F: Fn(Node, &Arena) -> PolarsResult>, +{ + use ALogicalPlan::*; let mut source_objects = Vec::with_capacity(sources.len()); let mut operator_objects = Vec::with_capacity(operators.len() + 1); for node in sources { - let src = match lp_arena.take(*node) { + let src = match lp_arena.get(*node) { + lp @ DataFrameScan { .. } => get_source( + lp.clone(), + &mut operator_objects, + expr_arena, + &to_physical, + true, + )?, #[cfg(feature = "csv-file")] - lp @ CsvScan { .. } => { - get_source(lp, &mut operator_objects, expr_arena, &to_physical, true)? - } + lp @ CsvScan { .. } => get_source( + lp.clone(), + &mut operator_objects, + expr_arena, + &to_physical, + true, + )?, #[cfg(feature = "parquet")] - lp @ ParquetScan { .. } => { - get_source(lp, &mut operator_objects, expr_arena, &to_physical, true)? - } + lp @ ParquetScan { .. } => get_source( + lp.clone(), + &mut operator_objects, + expr_arena, + &to_physical, + true, + )?, Union { inputs, .. } => { let sources = inputs .iter() .enumerate() .map(|(i, node)| { - let lp = lp_arena.take(*node); + let lp = lp_arena.get(*node); // only push predicate of first source - get_source(lp, &mut operator_objects, expr_arena, &to_physical, i == 0) + get_source( + lp.clone(), + &mut operator_objects, + expr_arena, + &to_physical, + i == 0, + ) }) .collect::>>()?; Box::new(sources::UnionSource::new(sources)) as Box @@ -181,34 +286,20 @@ where }; source_objects.push(src) } + let sink = sink_node + .map(|node| get_sink(node, lp_arena, expr_arena, &to_physical)) + .unwrap_or_else(|| Box::new(OrderedSink::new())); - for node in operators.iter() { - let op = match lp_arena.take(*node) { - Projection { expr, .. } => { - let op = operators::ProjectionOperator { - exprs: exprs_to_physical(&expr, expr_arena, &to_physical)?, - }; - Arc::new(op) as Arc - } - Selection { predicate, .. } => { - let predicate = to_physical(predicate, expr_arena)?; - let op = operators::FilterOperator { predicate }; - Arc::new(op) as Arc - } - MapFunction { - function: FunctionNode::FastProjection { columns }, - .. - } => { - let op = operators::FastProjectionOperator { columns }; - Arc::new(op) as Arc - } - - lp => { - panic!("operator {:?} not (yet) supported", lp) - } - }; - operator_objects.push(op) - } + // this offset is because the source might have inserted operators + let operator_offset = operator_objects.len(); + operator_objects.extend(operators); - Ok(Pipeline::new(source_objects, operator_objects, sink)) + Ok(PipeLine::new( + source_objects, + operator_objects, + operator_nodes, + sink, + sink_node, + operator_offset, + )) } diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs index 6076e31691d9..47ebb72b0d46 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs @@ -1,55 +1,100 @@ use std::any::Any; -use std::sync::Arc; use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; +use polars_core::utils::concat_df_unchecked; use polars_core::POOL; +use polars_utils::arena::Node; use rayon::prelude::*; use crate::operators::{ - DataChunk, Operator, OperatorResult, PExecutionContext, Sink, SinkResult, Source, SourceResult, + DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, Sink, SinkResult, + Source, SourceResult, }; -pub struct Pipeline { +pub struct PipeLine { sources: Vec>, - operators: Vec>, - // a sink for every thread + operators: Vec>>, + operator_nodes: Vec, sink: Vec>, + sink_node: Option, + rh_sides: Vec, + operator_offset: usize, } -impl Pipeline { +impl PipeLine { pub fn new( sources: Vec>, - operators: Vec>, + operators: Vec>, + operator_nodes: Vec, sink: Box, - ) -> Pipeline { + sink_node: Option, + operator_offset: usize, + ) -> PipeLine { + debug_assert_eq!(operators.len(), operator_nodes.len() + operator_offset); let n_threads = POOL.current_num_threads(); + // We split so that every thread get's an operator let sink = (0..n_threads).map(|i| sink.split(i)).collect(); - Pipeline { + // every index maps to a chain of operators than can be pushed as a pipeline for one thread + let operators = (0..n_threads) + .map(|i| operators.iter().map(|op| op.split(i)).collect()) + .collect(); + + PipeLine { sources, operators, + operator_nodes, sink, + sink_node, + rh_sides: vec![], + operator_offset, + } + } + + /// Add a parent + /// This should be in the right order + pub fn with_rhs(mut self, rhs: PipeLine) -> Self { + self.rh_sides.push(rhs); + self + } + + fn replace_operator(&mut self, op: &dyn Operator, node: Node) { + if let Some(pos) = self.operator_nodes.iter().position(|n| *n == node) { + let pos = pos + self.operator_offset; + for (i, operator_pipe) in &mut self.operators.iter_mut().enumerate() { + operator_pipe[pos] = op.split(i) + } } } fn par_process_chunks( - &self, + &mut self, chunks: Vec, sink: &mut [Box], ec: &PExecutionContext, ) -> PolarsResult> { debug_assert!(chunks.len() <= sink.len()); - POOL.install(|| { + let mut operators = std::mem::take(&mut self.operators); + let out = POOL.install(|| { chunks .into_par_iter() .zip(sink.par_iter_mut()) - .map(|(chunk, sink)| { - let chunk = match self.push_operators(chunk, ec)? { + .zip(operators.par_iter_mut()) + .map(|((chunk, sink), operator_pipe)| { + // operators don't like empty + if chunk.data.height() == 0 { + return Ok(SinkResult::Finished); + } + let chunk = match self.push_operators(chunk, ec, operator_pipe)? { OperatorResult::Finished(chunk) => chunk, _ => todo!(), }; + // sinks don't need to store empty + if chunk.data.height() == 0 { + return Ok(SinkResult::Finished); + } sink.sink(ec, chunk) }) // only collect failed and finished messages as there should be acted upon those @@ -60,55 +105,69 @@ impl Pipeline { Err(_) => true, }) .collect() - }) + }); + self.operators = operators; + out } fn push_operators( &self, chunk: DataChunk, ec: &PExecutionContext, + operators: &mut [Box], ) -> PolarsResult { let mut in_process = vec![]; - let mut op_iter = self.operators.iter(); - - if let Some(op) = op_iter.next() { - in_process.push((op, chunk)); - - while let Some((op, chunk)) = in_process.pop() { - match op.execute(ec, &chunk)? { - OperatorResult::Finished(chunk) => { - if let Some(op) = op_iter.next() { - in_process.push((op, chunk)) - } else { - return Ok(OperatorResult::Finished(chunk)); - } + let mut out = vec![]; + + let operator_offset = 0usize; + in_process.push((operator_offset, chunk)); + + while let Some((op_i, chunk)) = in_process.pop() { + // if chunk.data.height() == 0 { + // continue; + // } + match operators.get_mut(op_i) { + None => { + if chunk.data.height() > 0 || out.is_empty() { + // final chunk of the pipeline + out.push(chunk) } - OperatorResult::HaveMoreOutPut(output_chunk) => { - if let Some(op) = op_iter.next() { - in_process.push((op, output_chunk)) + } + Some(op) => { + match op.execute(ec, &chunk)? { + OperatorResult::Finished(chunk) => in_process.push((op_i + 1, chunk)), + OperatorResult::HaveMoreOutPut(output_chunk) => { + // first on the stack the next operator call + in_process.push((op_i, chunk)); + + // but first push the output in the next operator + // is a join can produce many rows, we want the filter to + // be executed in between. + in_process.push((op_i + 1, output_chunk)); + } + OperatorResult::NeedsNewData => { + // done, take another chunk from the stack } - // this operator first at the top of the stack - in_process.push((op, chunk)) - } - OperatorResult::NeedMoreInput => { - // current chunk will be used again - in_process.push((op, chunk)) } } } - unreachable!() - } else { - Ok(OperatorResult::Finished(chunk)) } + let out = match out.len() { + 1 => OperatorResult::Finished(out.pop().unwrap()), + _ => { + let data = concat_df_unchecked(out.iter().map(|chunk| &chunk.data)); + OperatorResult::Finished(out[out.len() - 1].with_data(data)) + } + }; + Ok(out) } - pub fn execute(&mut self, state: Box) -> PolarsResult { - let ec = PExecutionContext::new(state); + pub fn run_pipeline(&mut self, ec: &PExecutionContext) -> PolarsResult { let mut sink = std::mem::take(&mut self.sink); for src in &mut std::mem::take(&mut self.sources) { - while let SourceResult::GotMoreData(chunks) = src.get_batches(&ec)? { - let results = self.par_process_chunks(chunks, &mut sink, &ec)?; + while let SourceResult::GotMoreData(chunks) = src.get_batches(ec)? { + let results = self.par_process_chunks(chunks, &mut sink, ec)?; if results .iter() @@ -128,4 +187,35 @@ impl Pipeline { .unwrap(); reduced_sink.finalize() } + + pub fn execute(&mut self, state: Box) -> PolarsResult { + let ec = PExecutionContext::new(state); + let mut sink_out = self.run_pipeline(&ec)?; + let mut pipelines = self.rh_sides.iter_mut(); + let mut sink_node = self.sink_node; + + loop { + match &mut sink_out { + FinalizedSink::Finished(df) => return Ok(std::mem::take(df)), + + // + // 1/\ + // 2/\ + // 3\ + // the left hand side of the join has finished and now is an operator + // we replace the dummy node in the right hand side pipeline with this + // operator and then we run the pipeline rinse and repeat + // until the final right hand side pipeline ran + FinalizedSink::Operator(op) => { + // we unwrap, because the latest pipeline should not return an Operator + let pipeline = pipelines.next().unwrap(); + if let Some(sink_node) = sink_node { + pipeline.replace_operator(op.as_ref(), sink_node); + sink_out = pipeline.run_pipeline(&ec)?; + } + sink_node = pipeline.sink_node; + } + } + } + } } diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/mod.rs b/polars/polars-lazy/polars-pipe/src/pipeline/mod.rs index e4fcfc20b314..e248a048d194 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/mod.rs @@ -1,7 +1,7 @@ mod convert; mod dispatcher; -pub use convert::create_pipeline; -pub use dispatcher::Pipeline; +pub use convert::{create_pipeline, get_dummy_operator, get_operator, get_sink}; +pub use dispatcher::PipeLine; pub use crate::executors::sinks::groupby::aggregates::can_convert_to_hash_agg; diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index 3e4e8ccfe4b2..56c75656754b 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -522,9 +522,16 @@ impl LazyFrame { ) } - fn prepare_collect(self) -> PolarsResult<(ExecutionState, Box)> { + #[allow(unused_mut)] + fn prepare_collect(mut self) -> PolarsResult<(ExecutionState, Box)> { let file_caching = self.opt_state.file_caching; let streaming = self.opt_state.streaming; + + #[cfg(feature = "cse")] + if streaming && self.opt_state.common_subplan_elimination { + eprintln!("Cannot combine 'streaming' with 'common_subplan_elimination'. CSE will be turned off."); + self.opt_state.common_subplan_elimination = false; + } let mut expr_arena = Arena::with_capacity(256); let mut lp_arena = Arena::with_capacity(128); let mut scratch = vec![]; diff --git a/polars/polars-lazy/src/physical_plan/streaming/convert.rs b/polars/polars-lazy/src/physical_plan/streaming/convert.rs index ef2d65e3e936..842db853ea2b 100644 --- a/polars/polars-lazy/src/physical_plan/streaming/convert.rs +++ b/polars/polars-lazy/src/physical_plan/streaming/convert.rs @@ -1,13 +1,14 @@ use std::any::Any; +use std::collections::VecDeque; use std::sync::Arc; use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; -use polars_core::prelude::{Field, SchemaRef, Series}; +use polars_core::prelude::*; use polars_core::schema::Schema; use polars_pipe::expressions::PhysicalPipedExpr; use polars_pipe::operators::chunks::DataChunk; -use polars_pipe::pipeline::{create_pipeline, Pipeline}; +use polars_pipe::pipeline::{create_pipeline, get_dummy_operator, get_operator, PipeLine}; use polars_plan::prelude::*; use crate::physical_plan::planner::create_physical_expr; @@ -40,7 +41,11 @@ fn is_streamable(node: Node, expr_arena: &Arena) -> bool { AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => { matches!(options.collect_groups, ApplyOptions::ApplyFlat) } - AExpr::Cast { .. } | AExpr::Column(_) | AExpr::Literal(_) => true, + AExpr::Cast { .. } + | AExpr::Column(_) + | AExpr::Literal(_) + | AExpr::BinaryExpr { .. } + | AExpr::Alias(_, _) => true, _ => false, }) } @@ -60,6 +65,20 @@ pub(crate) fn insert_streaming_nodes( let mut stack = Vec::with_capacity(lp_arena.len() / 3 + 1); stack.push((root, State::default())); + + // A state holds a full pipeline until the breaker + // 1/\ + // 2/\ + // 3\ + // + // so 1 and 2 are short pipelines and 3 goes all the way to the root. + // but 3 can only run if 1 and 2 have finished and set the join as operator in 3 + // and state are filled with pipeline 1, 2, 3 in that order + // + // / \ + // /\ 3/\ + // 1 2 4\ + // or in this case 1, 2, 3, 4 let mut states = vec![]; use ALogicalPlan::*; @@ -67,7 +86,12 @@ pub(crate) fn insert_streaming_nodes( match lp_arena.get(root) { Selection { input, predicate } if is_streamable(*predicate, expr_arena) => { state.streamable = true; - state.operators.push(root); + state.operators_sinks.push((false, false, root)); + stack.push((*input, state)) + } + HStack { input, exprs, .. } if all_streamable(exprs, expr_arena) => { + state.streamable = true; + state.operators_sinks.push((false, false, root)); stack.push((*input, state)) } MapFunction { @@ -75,12 +99,12 @@ pub(crate) fn insert_streaming_nodes( function: FunctionNode::FastProjection { .. }, } => { state.streamable = true; - state.operators.push(root); + state.operators_sinks.push((false, false, root)); stack.push((*input, state)) } Projection { input, expr, .. } if all_streamable(expr, expr_arena) => { state.streamable = true; - state.operators.push(root); + state.operators_sinks.push((false, false, root)); stack.push((*input, state)) } MapFunction { @@ -105,6 +129,34 @@ pub(crate) fn insert_streaming_nodes( states.push(state) } } + DataFrameScan { .. } => { + if state.streamable { + state.sources.push(root); + states.push(state) + } + } + #[cfg(feature = "cross_join")] + Join { + input_left, + input_right, + options, + .. + } if matches!(options.how, JoinType::Cross) => { + state.streamable = true; + // rhs + let mut state_right = state; + state_right.operators_sinks.push((true, true, root)); + stack.push((*input_right, state_right)); + + // we want to traverse lhs first, so push it latest on the stack + // lhs is a new pipeline + let mut state_left = State { + streamable: true, + ..Default::default() + }; + state_left.operators_sinks.push((true, false, root)); + stack.push((*input_left, state_left)); + } // add globbing patterns #[cfg(all(feature = "csv-file", feature = "parquet"))] Union { inputs, .. } => { @@ -122,7 +174,7 @@ pub(crate) fn insert_streaming_nodes( }) { state.sources.push(root); - states.push(state) + states.push(state); } } Aggregate { @@ -137,7 +189,7 @@ pub(crate) fn insert_streaming_nodes( .all(|node| polars_pipe::pipeline::can_convert_to_hash_agg(*node, expr_arena)) { state.streamable = true; - state.sink = Some(root); + state.operators_sinks.push((true, false, root)); stack.push((*input, state)) } else { stack.push((*input, State::default())) @@ -153,45 +205,85 @@ pub(crate) fn insert_streaming_nodes( } } + let mut pipelines = VecDeque::with_capacity(states.len()); + + // if join is + // 1 + // /\2 + // /\3 + // + // we are iterating from 3 to 1. + + // the most far right branch will be the latest that sets this + // variable and thus will point to root + let mut latest = Default::default(); + for state in states { - let latest = match state.sink { - Some(node) => node, - None => { - if state.operators.is_empty() { - continue; + // should be reset for every branch + let mut sink_node = None; + + let mut operators = Vec::with_capacity(state.operators_sinks.len()); + let mut operator_nodes = Vec::with_capacity(state.operators_sinks.len()); + let mut iter = state.operators_sinks.into_iter().rev(); + + for (is_sink, is_rhs_join, node) in &mut iter { + latest = node; + if is_sink && !is_rhs_join { + sink_node = Some(node); + } else { + operator_nodes.push(node); + let op = if is_rhs_join { + get_dummy_operator() } else { - state.operators[state.operators.len() - 1] - } + get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)? + }; + operators.push(op) } - }; - let schema = lp_arena.get(latest).schema(lp_arena).into_owned(); + } + let pipeline = create_pipeline( &state.sources, - &state.operators, - state.sink, + operators, + operator_nodes, + sink_node, lp_arena, expr_arena, to_physical_piped_expr, )?; + pipelines.push_back(pipeline); + } + // the most right latest node should be the root of the pipeline + let schema = lp_arena.get(latest).schema(lp_arena).into_owned(); + if let Some(mut most_left) = pipelines.pop_front() { + while let Some(rhs) = pipelines.pop_front() { + most_left = most_left.with_rhs(rhs) + } // replace the part of the logical plan with a `MapFunction` that will execute the pipeline. - let pipeline_node = get_pipeline_node(lp_arena, pipeline, schema); + let pipeline_node = get_pipeline_node(lp_arena, most_left, schema); lp_arena.replace(latest, pipeline_node) + } else { + panic!() } + Ok(()) } -#[derive(Default, Debug)] +type IsSink = bool; +// a rhs of a join will be replaced later +type IsRhsJoin = bool; + +#[derive(Default, Debug, Clone)] struct State { streamable: bool, sources: Vec, - operators: Vec, - sink: Option, + // node is operator/sink + operators_sinks: Vec<(IsSink, IsRhsJoin, Node)>, } fn get_pipeline_node( lp_arena: &mut Arena, - mut pipeline: Pipeline, + mut pipeline: PipeLine, schema: SchemaRef, ) -> ALogicalPlan { // create a dummy input as the map function will call the input diff --git a/polars/polars-lazy/src/tests/streaming.rs b/polars/polars-lazy/src/tests/streaming.rs index a136db26672d..ec4881be017f 100644 --- a/polars/polars-lazy/src/tests/streaming.rs +++ b/polars/polars-lazy/src/tests/streaming.rs @@ -130,3 +130,39 @@ fn test_streaming_slice() -> PolarsResult<()> { assert_eq!(out_streaming.shape(), (3, 2)); Ok(()) } + +#[test] +fn test_streaming_cross_join() -> PolarsResult<()> { + let df = df![ + "a" => [1 ,2, 3] + ]?; + let q = df.lazy(); + let out = q.clone().cross_join(q).with_streaming(true).collect()?; + assert_eq!(out.shape(), (9, 2)); + + let q = get_parquet_file().with_projection_pushdown(false); // ;.slice(3, 3); + let q1 = q + .clone() + .select([col("calories")]) + .clone() + .cross_join(q.clone()) + .filter(col("calories").gt(col("calories_right"))); + let q2 = q1 + .select([all().suffix("_second")]) + .cross_join(q) + .filter(col("calories_right_second").lt(col("calories"))) + .select([ + col("calories"), + col("calories_right_second").alias("calories_right"), + ]); + + let q2 = q2.clone().with_streaming(true); + let out_streaming = q2.collect()?; + + assert_eq!( + out_streaming.get_column_names(), + &["calories", "calories_right"] + ); + assert_eq!(out_streaming.shape(), (5753, 2)); + Ok(()) +} diff --git a/py-polars/polars/internals/lazyframe/frame.py b/py-polars/polars/internals/lazyframe/frame.py index 577ab678664d..ea174d7e81c1 100644 --- a/py-polars/polars/internals/lazyframe/frame.py +++ b/py-polars/polars/internals/lazyframe/frame.py @@ -908,6 +908,9 @@ def collect( slice_pushdown = False common_subplan_elimination = False + if allow_streaming: + common_subplan_elimination = False + ldf = self._ldf.optimization_toggle( type_coercion, predicate_pushdown, diff --git a/py-polars/src/dataframe.rs b/py-polars/src/dataframe.rs index 13906e10ca4d..fc23fc187b12 100644 --- a/py-polars/src/dataframe.rs +++ b/py-polars/src/dataframe.rs @@ -13,7 +13,7 @@ use polars::io::RowCount; use polars::prelude::*; use polars_core::export::arrow::datatypes::IntegerType; use polars_core::frame::explode::MeltArgs; -use polars_core::frame::ArrowChunk; +use polars_core::frame::*; use polars_core::prelude::QuantileInterpolOptions; use polars_core::utils::arrow::compute::cast::CastOptions; use polars_core::utils::try_get_supertype;