diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index b37d76f0c762f..96ec54896e837 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -232,18 +232,30 @@ impl LazyFrame { Ok(self.clone().to_alp()?.describe_tree_format()) } + // @NOTE: this is used because we want to set the `enable_fmt` flag of `optimize_with_scratch` + // to `true` for describe. + fn _describe_to_alp_optimized(mut self) -> PolarsResult { + let (mut lp_arena, mut expr_arena) = self.get_arenas(); + let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], true)?; + + Ok(IRPlan::new(node, lp_arena, expr_arena)) + } + /// Return a String describing the optimized logical plan. /// /// Returns `Err` if optimizing the logical plan fails. pub fn describe_optimized_plan(&self) -> PolarsResult { - Ok(self.clone().to_alp_optimized()?.describe()) + Ok(self.clone()._describe_to_alp_optimized()?.describe()) } /// Return a String describing the optimized logical plan in tree format. /// /// Returns `Err` if optimizing the logical plan fails. pub fn describe_optimized_plan_tree(&self) -> PolarsResult { - Ok(self.clone().to_alp_optimized()?.describe_tree_format()) + Ok(self + .clone() + ._describe_to_alp_optimized()? + .describe_tree_format()) } /// Return a String describing the logical plan. @@ -551,7 +563,7 @@ impl LazyFrame { lp_arena: &mut Arena, expr_arena: &mut Arena, scratch: &mut Vec, - _fmt: bool, + enable_fmt: bool, ) -> PolarsResult { #[allow(unused_mut)] let mut opt_state = self.opt_state; @@ -591,16 +603,18 @@ impl LazyFrame { lp_arena, expr_arena, scratch, - _fmt, + enable_fmt, true, opt_state.row_estimate, )?; } #[cfg(not(feature = "streaming"))] { + _ = enable_fmt; panic!("activate feature 'streaming'") } } + Ok(lp_top) } diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 6b906126aeeb3..9b63808129ab8 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -212,7 +212,7 @@ pub(super) fn construct( }; // keep the original around for formatting purposes let original_lp = if fmt { - let original_lp = node_to_lp_cloned(insertion_location, expr_arena, lp_arena); + let original_lp = IRPlan::new(insertion_location, lp_arena.clone(), expr_arena.clone()); Some(original_lp) } else { None @@ -233,7 +233,7 @@ fn get_pipeline_node( lp_arena: &mut Arena, mut pipelines: Vec, schema: SchemaRef, - original_lp: Option, + original_lp: Option, ) -> IR { // create a dummy input as the map function will call the input // so we just create a scan that returns an empty df diff --git a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs index 03d8a5c6617e8..7dbce5e69ee62 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs @@ -429,6 +429,7 @@ pub(crate) fn insert_streaming_nodes( }, } } + let mut inserted = false; for tree in pipeline_trees { if is_valid_tree(&tree) diff --git a/crates/polars-plan/src/logical_plan/alp/dot.rs b/crates/polars-plan/src/logical_plan/alp/dot.rs index a0692b7ef9d6f..c50df1c0b9820 100644 --- a/crates/polars-plan/src/logical_plan/alp/dot.rs +++ b/crates/polars-plan/src/logical_plan/alp/dot.rs @@ -6,7 +6,10 @@ use crate::constants::UNLIMITED_CACHE; use crate::prelude::alp::format::ColumnsDisplay; use crate::prelude::*; -pub struct IRDotDisplay<'a>(pub(crate) IRPlanRef<'a>); +pub struct IRDotDisplay<'a> { + is_streaming: bool, + lp: IRPlanRef<'a>, +} const INDENT: &str = " "; @@ -43,18 +46,39 @@ fn write_label<'a, 'b>( } impl<'a> IRDotDisplay<'a> { + pub fn new(lp: IRPlanRef<'a>) -> Self { + if let Some(streaming_lp) = lp.extract_streaming_plan() { + return Self::new_streaming(streaming_lp); + } + + Self { + is_streaming: false, + lp, + } + } + + fn new_streaming(lp: IRPlanRef<'a>) -> Self { + Self { + is_streaming: true, + lp, + } + } + fn with_root(&self, root: Node) -> Self { - Self(self.0.with_root(root)) + Self { + is_streaming: false, + lp: self.lp.with_root(root), + } } fn display_expr(&self, expr: &'a ExprIR) -> ExprIRDisplay<'a> { - expr.display(self.0.expr_arena) + expr.display(self.lp.expr_arena) } fn display_exprs(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { ExprIRSliceDisplay { exprs, - expr_arena: self.0.expr_arena, + expr_arena: self.lp.expr_arena, } } @@ -66,7 +90,21 @@ impl<'a> IRDotDisplay<'a> { ) -> std::fmt::Result { use fmt::Write; - let root = self.0.root(); + let root = self.lp.root(); + + let mut parent = parent; + if self.is_streaming { + *last += 1; + let streaming_node = DotNode::Plain(*last); + + if let Some(parent) = parent { + writeln!(f, "{INDENT}{parent} -- {streaming_node}")?; + write_label(f, streaming_node, |f| f.write_str("STREAMING"))?; + } + + parent = Some(streaming_node); + } + let parent = parent; let id = if let IR::Cache { id, .. } = root { DotNode::Cache(*id) @@ -250,8 +288,12 @@ impl<'a> IRDotDisplay<'a> { MapFunction { input, function, .. } => { - self.with_root(*input)._format(f, Some(id), last)?; - write_label(f, id, |f| write!(f, "{function}"))?; + if let Some(streaming_lp) = function.to_streaming_lp() { + Self::new_streaming(streaming_lp)._format(f, Some(id), last)?; + } else { + self.with_root(*input)._format(f, Some(id), last)?; + write_label(f, id, |f| write!(f, "{function}"))?; + } }, ExtContext { input, .. } => { self.with_root(*input)._format(f, Some(id), last)?; @@ -271,7 +313,7 @@ impl<'a> IRDotDisplay<'a> { }, SimpleProjection { input, columns } => { let num_columns = columns.as_ref().len(); - let total_columns = self.0.lp_arena.get(*input).schema(self.0.lp_arena).len(); + let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len(); let columns = ColumnsDisplay(columns.as_ref()); self.with_root(*input)._format(f, Some(id), last)?; diff --git a/crates/polars-plan/src/logical_plan/alp/format.rs b/crates/polars-plan/src/logical_plan/alp/format.rs index 5de2006ed602b..672a42dfd2e1a 100644 --- a/crates/polars-plan/src/logical_plan/alp/format.rs +++ b/crates/polars-plan/src/logical_plan/alp/format.rs @@ -9,7 +9,10 @@ use recursive::recursive; use crate::prelude::*; -pub struct IRDisplay<'a>(pub(crate) IRPlanRef<'a>); +pub struct IRDisplay<'a> { + is_streaming: bool, + lp: IRPlanRef<'a>, +} #[derive(Clone, Copy)] pub struct ExprIRDisplay<'a> { @@ -58,9 +61,6 @@ fn write_scan( predicate: &Option>, n_rows: Option, ) -> fmt::Result { - if indent != 0 { - writeln!(f)?; - } let path_fmt = match path.len() { 1 => path[0].to_string_lossy(), 0 => "".into(), @@ -91,13 +91,66 @@ fn write_scan( } impl<'a> IRDisplay<'a> { + pub fn new(lp: IRPlanRef<'a>) -> Self { + if let Some(streaming_lp) = lp.extract_streaming_plan() { + return Self::new_streaming(streaming_lp); + } + + Self { + is_streaming: false, + lp, + } + } + + fn new_streaming(lp: IRPlanRef<'a>) -> Self { + Self { + is_streaming: true, + lp, + } + } + + fn root(&self) -> &IR { + self.lp.root() + } + + fn with_root(&self, root: Node) -> Self { + Self { + is_streaming: false, + lp: self.lp.with_root(root), + } + } + + fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> { + ExprIRDisplay { + node: root.node(), + output_name: root.output_name_inner(), + expr_arena: self.lp.expr_arena, + } + } + + fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { + ExprIRSliceDisplay { + exprs, + expr_arena: self.lp.expr_arena, + } + } + #[recursive] fn _format(&self, f: &mut Formatter, indent: usize) -> fmt::Result { - if indent != 0 { - writeln!(f)?; - } + let indent = if self.is_streaming { + writeln!(f, "{:indent$}STREAMING:", "")?; + indent + 2 + } else { + if indent != 0 { + writeln!(f)?; + } + + indent + }; + let sub_indent = indent + 2; use IR::*; + match self.root() { #[cfg(feature = "python")] PythonScan { options, predicate } => { @@ -293,9 +346,12 @@ impl<'a> IRDisplay<'a> { MapFunction { input, function, .. } => { - let function_fmt = format!("{function}"); - write!(f, "{:indent$}{function_fmt}", "")?; - self.with_root(*input)._format(f, sub_indent) + if let Some(streaming_lp) = function.to_streaming_lp() { + IRDisplay::new_streaming(streaming_lp)._format(f, indent) + } else { + write!(f, "{:indent$}{function}", "")?; + self.with_root(*input)._format(f, sub_indent) + } }, ExtContext { input, .. } => { write!(f, "{:indent$}EXTERNAL_CONTEXT", "")?; @@ -313,7 +369,7 @@ impl<'a> IRDisplay<'a> { }, SimpleProjection { input, columns } => { let num_columns = columns.as_ref().len(); - let total_columns = self.0.lp_arena.get(*input).schema(self.0.lp_arena).len(); + let total_columns = self.lp.lp_arena.get(*input).schema(self.lp.lp_arena).len(); let columns = ColumnsDisplay(columns.as_ref()); write!( @@ -329,31 +385,6 @@ impl<'a> IRDisplay<'a> { } } -impl<'a> IRDisplay<'a> { - fn root(&self) -> &IR { - self.0.root() - } - - fn with_root(&self, root: Node) -> Self { - Self(self.0.with_root(root)) - } - - fn display_expr(&self, root: &'a ExprIR) -> ExprIRDisplay<'a> { - ExprIRDisplay { - node: root.node(), - output_name: root.output_name_inner(), - expr_arena: self.0.expr_arena, - } - } - - fn display_expr_slice(&self, exprs: &'a [ExprIR]) -> ExprIRSliceDisplay<'a, ExprIR> { - ExprIRSliceDisplay { - exprs, - expr_arena: self.0.expr_arena, - } - } -} - impl<'a> ExprIRDisplay<'a> { fn with_slice(&self, exprs: &'a [T]) -> ExprIRSliceDisplay<'a, T> { ExprIRSliceDisplay { diff --git a/crates/polars-plan/src/logical_plan/alp/mod.rs b/crates/polars-plan/src/logical_plan/alp/mod.rs index 34838498de13b..d05ad1199a666 100644 --- a/crates/polars-plan/src/logical_plan/alp/mod.rs +++ b/crates/polars-plan/src/logical_plan/alp/mod.rs @@ -171,6 +171,11 @@ impl IRPlan { } } + /// Extract the original logical plan if the plan is for the Streaming Engine + pub fn extract_streaming_plan(&self) -> Option { + self.as_ref().extract_streaming_plan() + } + pub fn describe(&self) -> String { self.as_ref().describe() } @@ -180,11 +185,11 @@ impl IRPlan { } pub fn display(&self) -> format::IRDisplay { - format::IRDisplay(self.as_ref()) + self.as_ref().display() } pub fn display_dot(&self) -> dot::IRDotDisplay { - dot::IRDotDisplay(self.as_ref()) + self.as_ref().display_dot() } } @@ -201,12 +206,28 @@ impl<'a> IRPlanRef<'a> { } } + /// Extract the original logical plan if the plan is for the Streaming Engine + pub fn extract_streaming_plan(self) -> Option> { + // @NOTE: the streaming engine replaces the whole tree with a MapFunction { Pipeline, .. } + // and puts the original plan somewhere in there. This is how we extract it. Disgusting, I + // know. + let IR::MapFunction { input: _, function } = self.root() else { + return None; + }; + + let FunctionNode::Pipeline { original, .. } = function else { + return None; + }; + + Some(original.as_ref()?.as_ref().as_ref()) + } + pub fn display(self) -> format::IRDisplay<'a> { - format::IRDisplay(self) + format::IRDisplay::new(self) } pub fn display_dot(self) -> dot::IRDotDisplay<'a> { - dot::IRDotDisplay(self) + dot::IRDotDisplay::new(self) } pub fn describe(self) -> String { diff --git a/crates/polars-plan/src/logical_plan/alp/tree_format.rs b/crates/polars-plan/src/logical_plan/alp/tree_format.rs index 0c1761fc95c05..7e69dca163a30 100644 --- a/crates/polars-plan/src/logical_plan/alp/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/alp/tree_format.rs @@ -102,6 +102,10 @@ fn multiline_expression(expr: &str) -> std::borrow::Cow<'_, str> { impl<'a> TreeFmtNode<'a> { pub fn root_logical_plan(lp: IRPlanRef<'a>) -> Self { + if let Some(streaming_lp) = lp.extract_streaming_plan() { + return Self::streaming_root_logical_plan(streaming_lp); + } + Self { h: None, content: TreeFmtNodeContent::LogicalPlan(lp.lp_top), @@ -110,6 +114,15 @@ impl<'a> TreeFmtNode<'a> { } } + fn streaming_root_logical_plan(lp: IRPlanRef<'a>) -> Self { + Self { + h: Some("Streaming".to_string()), + content: TreeFmtNodeContent::LogicalPlan(lp.lp_top), + + lp, + } + } + pub fn lp_node(&self, h: Option, root: Node) -> Self { Self { h, @@ -325,10 +338,19 @@ impl<'a> TreeFmtNode<'a> { wh(h, &format!("SLICE[offset: {offset}, len: {len}]")), vec![self.lp_node(None, *input)], ), - MapFunction { input, function } => ND( - wh(h, &format!("{function}")), - vec![self.lp_node(None, *input)], - ), + MapFunction { input, function } => { + if let Some(streaming_lp) = function.to_streaming_lp() { + ND( + String::default(), + vec![TreeFmtNode::streaming_root_logical_plan(streaming_lp)], + ) + } else { + ND( + wh(h, &format!("{function}")), + vec![self.lp_node(None, *input)], + ) + } + }, ExtContext { input, .. } => { ND(wh(h, "EXTERNAL_CONTEXT"), vec![self.lp_node(None, *input)]) }, diff --git a/crates/polars-plan/src/logical_plan/functions/mod.rs b/crates/polars-plan/src/logical_plan/functions/mod.rs index 8c12cf8eae8bb..bec1dd78c0738 100644 --- a/crates/polars-plan/src/logical_plan/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/functions/mod.rs @@ -58,10 +58,11 @@ pub enum FunctionNode { alias: Option>, }, #[cfg_attr(feature = "serde", serde(skip))] + /// Streaming engine pipeline Pipeline { function: Arc, schema: SchemaRef, - original: Option>, + original: Option>, }, Unnest { columns: Arc<[Arc]>, @@ -301,6 +302,19 @@ impl FunctionNode { RowIndex { name, offset, .. } => df.with_row_index(name.as_ref(), *offset), } } + + pub fn to_streaming_lp(&self) -> Option { + let Self::Pipeline { + function: _, + schema: _, + original, + } = self + else { + return None; + }; + + Some(original.as_ref()?.as_ref().as_ref()) + } } impl Debug for FunctionNode { @@ -327,8 +341,7 @@ impl Display for FunctionNode { MergeSorted { .. } => write!(f, "MERGE SORTED"), Pipeline { original, .. } => { if let Some(original) = original { - let ir_plan = original.as_ref().clone().to_alp().unwrap(); - let ir_display = ir_plan.display(); + let ir_display = original.as_ref().display(); writeln!(f, "--- STREAMING")?; write!(f, "{ir_display}")?; diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index 4a0021c2a9e8c..06741ff454fe6 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -31,7 +31,7 @@ impl Default for Node { static ARENA_VERSION: AtomicU32 = AtomicU32::new(0); -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Arena { version: u32, items: Vec,