diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs index 290010ea8585..069b1d2dfd61 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs @@ -17,4 +17,8 @@ impl Operator for Dummy { fn split(&self, _thread_no: usize) -> Box { Box::new(Self {}) } + + fn fmt(&self) -> &str { + "dummy" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs index bbdf36f7d9f9..f4a409521deb 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs @@ -33,4 +33,7 @@ impl Operator for FilterOperator { fn split(&self, _thread_no: usize) -> Box { Box::new(self.clone()) } + fn fmt(&self) -> &str { + "filter" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs index 2a0fa6d4036f..2486e5a70605 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs @@ -24,6 +24,9 @@ impl Operator for FastProjectionOperator { fn split(&self, _thread_no: usize) -> Box { Box::new(self.clone()) } + fn fmt(&self) -> &str { + "fast_join_projection" + } } #[derive(Clone)] @@ -49,6 +52,9 @@ impl Operator for ProjectionOperator { fn split(&self, _thread_no: usize) -> Box { Box::new(self.clone()) } + fn fmt(&self) -> &str { + "projection" + } } #[derive(Clone)] @@ -79,4 +85,7 @@ impl Operator for HstackOperator { fn split(&self, _thread_no: usize) -> Box { Box::new(self.clone()) } + fn fmt(&self) -> &str { + "hstack" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs index 5976d9baf418..0684921a02f1 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs @@ -5,6 +5,7 @@ use polars_core::datatypes::Field; use polars_core::error::PolarsResult; use polars_core::prelude::{DataType, SchemaRef, Series, IDX_DTYPE}; use polars_core::schema::Schema; +use polars_plan::dsl::Expr; use polars_plan::logical_plan::{ArenaExprIter, Context}; use polars_plan::prelude::{AAggExpr, AExpr}; use polars_utils::arena::{Arena, Node}; @@ -30,6 +31,10 @@ impl PhysicalPipedExpr for Count { fn field(&self, _input_schema: &Schema) -> PolarsResult { todo!() } + + fn expression(&self) -> Expr { + Expr::Count + } } pub fn can_convert_to_hash_agg( diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs index dabe8c4edc24..94c31dc77fb2 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs @@ -428,4 +428,7 @@ impl Sink for GenericGroupbySink { fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "generic_groupby" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs index a7c3bed68388..ab629b69428f 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs @@ -360,6 +360,9 @@ where fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "primitive_groupby" + } } fn insert_and_get( diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs index 0a93f8764302..4e9f0bd0caf1 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs @@ -358,6 +358,9 @@ impl Sink for Utf8GroupbySink { fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "utf8_groupby" + } } // write agg_idx to the hashes buffer. diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs index 016a06804f03..3cb68da5946e 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs @@ -62,6 +62,10 @@ impl Sink for CrossJoin { fn as_any(&mut self) -> &mut dyn Any { self } + + fn fmt(&self) -> &str { + "cross_join_sink" + } } #[derive(Clone)] @@ -152,4 +156,8 @@ impl Operator for CrossJoinProbe { fn split(&self, _thread_no: usize) -> Box { Box::new(self.clone()) } + + fn fmt(&self) -> &str { + "cross_join_probe" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs index db3ba812409a..e62de3438ed0 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs @@ -151,6 +151,7 @@ impl GenericBuild { chunk: &DataChunk, ) -> PolarsResult<&[Series]> { self.join_series.clear(); + for phys_e in self.join_columns_left.iter() { let s = phys_e.evaluate(chunk, context.execution_state.as_any())?; let s = s.to_physical_repr(); @@ -337,7 +338,7 @@ impl Sink for GenericBuild { let suffix = self.suffix.clone(); let hb = self.hb.clone(); let hash_tables = Arc::new(std::mem::take(&mut self.hash_tables)); - let join_columns_left = self.join_columns_right.clone(); + let join_columns_left = self.join_columns_left.clone(); let join_columns_right = self.join_columns_right.clone(); // take the buffers, this saves one allocation @@ -369,6 +370,9 @@ impl Sink for GenericBuild { fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "generic_join_build" + } } pub(super) struct KeysIter<'a> { diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs index 9d2cc049f97d..767360dbe957 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs @@ -66,15 +66,12 @@ impl GenericJoinProbe { hash_tables: Arc>>>, join_columns_left: Arc>>, join_columns_right: Arc>>, - mut swapped_or_left: bool, + swapped_or_left: bool, join_series: Vec, hashes: Vec, context: &PExecutionContext, how: JoinType, ) -> Self { - if matches!(how, JoinType::Left) { - swapped_or_left = true - } if swapped_or_left { let tmp = DataChunk { data: df_a.slice(0, 1), @@ -329,4 +326,7 @@ impl Operator for GenericJoinProbe { let new = self.clone(); Box::new(new) } + fn fmt(&self) -> &str { + "generic_join_probe" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs index fa3aad8cb084..211d84e79a37 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs @@ -48,4 +48,7 @@ impl Sink for OrderedSink { fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "ordered_sink" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs index ef05b547a7ff..bdae5fcb75dc 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs @@ -108,4 +108,7 @@ impl Sink for ParquetSink { fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "parquet_sink" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs index 862f484250d0..0153ea222831 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs @@ -102,4 +102,7 @@ impl Sink for SliceSink { fn as_any(&mut self) -> &mut dyn Any { self } + fn fmt(&self) -> &str { + "slice_sink" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs index 2776181b5ff2..c45473d97dfb 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs @@ -110,4 +110,7 @@ impl Source for CsvSource { ), }) } + fn fmt(&self) -> &str { + "csv" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs index 6628922a9d76..b8a4ff4bae20 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs @@ -39,4 +39,7 @@ impl Source for DataFrameSource { Ok(SourceResult::GotMoreData(chunks)) } } + fn fmt(&self) -> &str { + "df" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs index 665d6f667ac4..bb3b38adcc91 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs @@ -84,4 +84,7 @@ impl Source for ParquetSource { ), }) } + fn fmt(&self) -> &str { + "parquet" + } } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs index 929deb92f95d..771c21f27c04 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs @@ -29,4 +29,7 @@ impl Source for UnionSource { } Ok(SourceResult::Finished) } + fn fmt(&self) -> &str { + "union" + } } diff --git a/polars/polars-lazy/polars-pipe/src/expressions.rs b/polars/polars-lazy/polars-pipe/src/expressions.rs index c30e2d477481..5c4a78cc896f 100644 --- a/polars/polars-lazy/polars-pipe/src/expressions.rs +++ b/polars/polars-lazy/polars-pipe/src/expressions.rs @@ -1,6 +1,7 @@ use std::any::Any; use polars_core::prelude::*; +use polars_plan::dsl::Expr; use crate::operators::DataChunk; @@ -10,4 +11,6 @@ pub trait PhysicalPipedExpr: Send + Sync { fn evaluate(&self, chunk: &DataChunk, lazy_state: &dyn Any) -> PolarsResult; fn field(&self, input_schema: &Schema) -> PolarsResult; + + fn expression(&self) -> Expr; } diff --git a/polars/polars-lazy/polars-pipe/src/operators/operator.rs b/polars/polars-lazy/polars-pipe/src/operators/operator.rs index d834e28d408c..66e5fe009a31 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/operator.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/operator.rs @@ -15,4 +15,6 @@ pub trait Operator: Send + Sync { ) -> PolarsResult; fn split(&self, thread_no: usize) -> Box; + + fn fmt(&self) -> &str; } diff --git a/polars/polars-lazy/polars-pipe/src/operators/sink.rs b/polars/polars-lazy/polars-pipe/src/operators/sink.rs index f2337e9a3c0f..80dd48a3ce5c 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/sink.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/sink.rs @@ -23,4 +23,6 @@ pub trait Sink: Send + Sync { fn finalize(&mut self, context: &PExecutionContext) -> PolarsResult; fn as_any(&mut self) -> &mut dyn Any; + + fn fmt(&self) -> &str; } diff --git a/polars/polars-lazy/polars-pipe/src/operators/source.rs b/polars/polars-lazy/polars-pipe/src/operators/source.rs index a4280ec71bfb..80006347b65b 100644 --- a/polars/polars-lazy/polars-pipe/src/operators/source.rs +++ b/polars/polars-lazy/polars-pipe/src/operators/source.rs @@ -7,4 +7,6 @@ pub enum SourceResult { pub trait Source: Send + Sync { fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult; + + fn fmt(&self) -> &str; } diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs index 04c5010afcc5..8035afcf3df0 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs @@ -407,8 +407,9 @@ where } pub fn swap_join_order(options: &JoinOptions) -> bool { - match (options.rows_left, options.rows_right) { - ((Some(left), _), (Some(right), _)) => left > right, - ((_, left), (_, right)) => left > right, - } + matches!(options.how, JoinType::Left) + || match (options.rows_left, options.rows_right) { + ((Some(left), _), (Some(right), _)) => left > right, + ((_, left), (_, right)) => left > right, + } } diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs index 108268f882a2..deaa3c88a81c 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs @@ -1,9 +1,12 @@ +use std::collections::VecDeque; + use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; use polars_core::POOL; use polars_utils::arena::Node; use rayon::prelude::*; +use crate::executors::operators::Dummy; use crate::executors::sources::DataFrameSource; use crate::operators::{ DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, SExecutionContext, Sink, @@ -66,12 +69,16 @@ impl PipeLine { self } - fn replace_operator(&mut self, op: &dyn Operator, node: Node) { + // returns if operator was successfully replaced + fn replace_operator(&mut self, op: &dyn Operator, node: Node) -> bool { if let Some(pos) = self.operator_nodes.iter().position(|n| *n == node) { let pos = pos + self.operator_offset; for (i, operator_pipe) in &mut self.operators.iter_mut().enumerate() { operator_pipe[pos] = op.split(i) } + true + } else { + false } } @@ -214,12 +221,46 @@ impl PipeLine { Ok(out.unwrap()) } + /// print the branches of the pipeline + /// in the order they run. + #[cfg(test)] + fn show(&self) { + let mut fmt = String::new(); + let mut start = 0usize; + fmt.push_str(self.sources[0].fmt()); + fmt.push_str(" -> "); + for (offset_end, sink) in &self.sinks { + // take operators of a single thread + let ops = &self.operators[0]; + // slice the pipeline + let ops = &ops[start..*offset_end]; + for op in ops { + fmt.push_str(op.fmt()); + fmt.push_str(" -> ") + } + start = *offset_end; + fmt.push_str(sink[0].fmt()) + } + println!("{fmt}"); + for pl in &self.rh_sides { + pl.show() + } + } + pub fn execute(&mut self, state: Box) -> PolarsResult { let ec = PExecutionContext::new(state); let mut sink_out = self.run_pipeline(&ec)?; let mut pipelines = self.rh_sides.iter_mut(); let mut sink_nodes = std::mem::take(&mut self.sink_nodes); + // This is a stack of operators that should replace the sinks of join nodes + // If we don't reorder joins, the order we run the pipelines coincide with the + // order the sinks need to be replaced, however this is not always the case + // if we reorder joins. + // This stack ensures we still replace the dummy operators even if they are all in + // the most right branch + let mut operators_to_replace: VecDeque<(Box, Node)> = VecDeque::new(); + loop { match &mut sink_out { FinalizedSink::Finished(df) => return Ok(std::mem::take(df)), @@ -236,10 +277,33 @@ impl PipeLine { // we unwrap, because the latest pipeline should not return an Operator let pipeline = pipelines.next().unwrap(); + // First check the operators + // keep a counter as we also push to the front of deque + // otherwise we keep iterating + let mut remaining = operators_to_replace.len(); + while let Some((op, sink_node)) = operators_to_replace.pop_back() { + if !pipeline.replace_operator(op.as_ref(), sink_node) { + operators_to_replace.push_front((op, sink_node)) + } else { + } + if remaining == 0 { + break; + } + remaining -= 1; + } + // latest sink_node will be the operator, as the left side of the join // always finishes that branch. if let Some(sink_node) = sink_nodes.pop() { - pipeline.replace_operator(op.as_ref(), sink_node); + // if dummy that should be replaces is not found in this branch + // we push it to the operators stack that should be replaced + // on the next branch of the pipeline we first check this stack. + // this only happens if we reorder joins + if !pipeline.replace_operator(op.as_ref(), sink_node) { + let mut swap = Box::::default() as Box; + std::mem::swap(op, &mut swap); + operators_to_replace.push_back((swap, sink_node)); + } } sink_out = pipeline.run_pipeline(&ec)?; sink_nodes = std::mem::take(&mut pipeline.sink_nodes); diff --git a/polars/polars-lazy/polars-plan/src/dsl/options.rs b/polars/polars-lazy/polars-plan/src/dsl/options.rs index f744b62705e2..07b44e3e966c 100644 --- a/polars/polars-lazy/polars-plan/src/dsl/options.rs +++ b/polars/polars-lazy/polars-plan/src/dsl/options.rs @@ -44,6 +44,8 @@ pub struct JoinOptions { pub how: JoinType, pub suffix: Cow<'static, str>, pub slice: Option<(i64, usize)>, + /// Proxy of the number of rows in both sides of the joins + /// Holds `(Option, estimated_size)` pub rows_left: (Option, usize), pub rows_right: (Option, usize), } diff --git a/polars/polars-lazy/src/physical_plan/expressions/column.rs b/polars/polars-lazy/src/physical_plan/expressions/column.rs index de9908367eaa..39fc7cab6c33 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/column.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/column.rs @@ -61,7 +61,9 @@ impl ColumnExpr { // this path should not happen #[cfg(feature = "panic_on_schema")] { - if _state.ext_contexts.is_empty() { + if _state.ext_contexts.is_empty() + && std::env::var("POLARS_NO_SCHEMA_CHECK").is_err() + { panic!( "got {} expected: {} from schema: {:?} and DataFrame: {:?}", out.name(), @@ -88,7 +90,10 @@ impl ColumnExpr { ) -> PolarsResult { #[cfg(feature = "panic_on_schema")] { - if _panic_during_test && _state.ext_contexts.is_empty() { + if _panic_during_test + && _state.ext_contexts.is_empty() + && std::env::var("POLARS_NO_SCHEMA_CHECK").is_err() + { panic!("invalid schema") } } diff --git a/polars/polars-lazy/src/physical_plan/streaming/convert.rs b/polars/polars-lazy/src/physical_plan/streaming/convert.rs index 8da154dc0515..77d52c40c593 100644 --- a/polars/polars-lazy/src/physical_plan/streaming/convert.rs +++ b/polars/polars-lazy/src/physical_plan/streaming/convert.rs @@ -28,6 +28,10 @@ impl PhysicalPipedExpr for Wrap { fn field(&self, input_schema: &Schema) -> PolarsResult { self.0.to_field(input_schema) } + + fn expression(&self) -> Expr { + self.0.as_expression().unwrap().clone() + } } fn to_physical_piped_expr( @@ -206,12 +210,11 @@ pub(crate) fn insert_streaming_nodes( // *except* for a left join. In a left join we use the right // table as build table and we stream the left table. This way // we maintain order in the left join. - let (input_left, input_right) = - if swap_join_order(options) || matches!(options.how, JoinType::Left) { - (input_right, input_left) - } else { - (input_left, input_right) - }; + let (input_left, input_right) = if swap_join_order(options) { + (input_right, input_left) + } else { + (input_left, input_right) + }; // rhs is second, so that is first on the stack let mut state_right = state; @@ -357,7 +360,7 @@ pub(crate) fn insert_streaming_nodes( .. } = lp_arena.get(node) { - let slice_node = lp_arena.add(ALogicalPlan::Slice { + let slice_node = lp_arena.add(Slice { input: Node::default(), offset: *offset, len: *len as IdxSize, diff --git a/polars/polars-lazy/src/tests/streaming.rs b/polars/polars-lazy/src/tests/streaming.rs index ee5a4df41c4d..192c21996075 100644 --- a/polars/polars-lazy/src/tests/streaming.rs +++ b/polars/polars-lazy/src/tests/streaming.rs @@ -230,7 +230,7 @@ fn test_streaming_left_join() -> PolarsResult<()> { let q = lf_left.left_join(lf_right, col("a"), col("a")); - let out1 = q.clone().with_streaming(false).collect()?; + let out1 = q.clone().with_streaming(true).collect()?; let out2 = q.clone().with_streaming(false).collect()?; assert!(out1.frame_equal_missing(&out2)); @@ -308,3 +308,48 @@ fn test_streaming_aggregate_join() -> PolarsResult<()> { assert_eq!(out_streaming.shape(), (3, 3)); Ok(()) } + +#[test] +fn test_streaming_double_left_join() -> PolarsResult<()> { + // A left join swaps the tables, so that checks the swapping of the branches + let q1 = df![ + "id" => ["1a"], + "p_id" => ["1b"], + "m_id" => ["1c"], + ]? + .lazy(); + + let q2 = df![ + "p_id2" => ["2a"], + "p_code" => ["2b"], + ]? + .lazy(); + + let q3 = df![ + "m_id3" => ["3a"], + "m_code" => ["3b"], + ]? + .lazy(); + + let q = q1 + .clone() + .left_join(q2, col("p_id"), col("p_id2")) + .left_join(q3.clone(), col("m_id"), col("m_id3")); + + let out = q.clone().with_streaming(true).collect()?; + let expected = q.clone().with_streaming(false).collect()?; + + assert_eq!(out, expected); + + // more joins + let q = q + .left_join(q1, col("p_id"), col("p_id")) + .left_join(q3, col("m_id"), col("m_id3")); + + let out = q.clone().with_streaming(true).collect()?; + let expected = q.clone().with_streaming(false).collect()?; + + assert_eq!(out, expected); + + Ok(()) +} diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 07b777ae473d..a4fc0e626045 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -1718,7 +1718,7 @@ dependencies = [ [[package]] name = "py-polars" -version = "0.15.13" +version = "0.15.14" dependencies = [ "ahash", "bincode",