diff --git a/crates/polars-plan/src/plans/ir/schema.rs b/crates/polars-plan/src/plans/ir/schema.rs index bad0ba918b3a..1586463a8c0f 100644 --- a/crates/polars-plan/src/plans/ir/schema.rs +++ b/crates/polars-plan/src/plans/ir/schema.rs @@ -151,7 +151,9 @@ impl IR { | Join { schema, .. } | HStack { schema, .. } | ExtContext { schema, .. } - | SimpleProjection { columns: schema, .. } => schema.clone(), + | SimpleProjection { + columns: schema, .. + } => schema.clone(), MapFunction { input, function } => { let input_schema = IR::schema_with_cache(*input, arena, cache); function.schema(&input_schema).unwrap().into_owned() diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 8500f0145906..7bc1ae21da87 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -28,44 +28,51 @@ pub fn lower_ir( IR::SimpleProjection { input, columns } => { let columns = columns.iter_names().map(|s| s.to_string()).collect(); let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?; - PhysNodeKind::SimpleProjection { input: phys_input, columns } + PhysNodeKind::SimpleProjection { + input: phys_input, + columns, + } }, // TODO: split partially streamable selections to avoid fallback as much as possible. - IR::Select { - input, - expr, - .. - } if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) => { + IR::Select { input, expr, .. } + if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) => + { let selectors = expr.clone(); let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?; - PhysNodeKind::Select { input: phys_input, selectors, extend_original: false } + PhysNodeKind::Select { + input: phys_input, + selectors, + extend_original: false, + } }, // TODO: split partially streamable selections to avoid fallback as much as possible. - IR::HStack { - input, - exprs, - .. - } if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) => { + IR::HStack { input, exprs, .. } + if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) => + { let selectors = exprs.clone(); let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?; - PhysNodeKind::Select { input: phys_input, selectors, extend_original: true } + PhysNodeKind::Select { + input: phys_input, + selectors, + extend_original: true, + } }, // TODO: split reductions and streamable selections. E.g. sum(a) + sum(b) should be split // into Select(a + b) -> Reduce(sum(a), sum(b) - IR::Select { - input, - expr, - .. - } if expr - .iter() - .all(|e| can_convert_into_reduction(e.node(), expr_arena)) => + IR::Select { input, expr, .. } + if expr + .iter() + .all(|e| can_convert_into_reduction(e.node(), expr_arena)) => { let exprs = expr.clone(); let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?; - PhysNodeKind::Reduce { input: phys_input, exprs } + PhysNodeKind::Reduce { + input: phys_input, + exprs, + } }, IR::Slice { input, offset, len } => { @@ -73,7 +80,11 @@ pub fn lower_ir( let offset = *offset as usize; let length = *len as usize; let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?; - PhysNodeKind::StreamingSlice { input: phys_input, offset, length } + PhysNodeKind::StreamingSlice { + input: phys_input, + offset, + length, + } } else { todo!() } @@ -82,7 +93,10 @@ pub fn lower_ir( IR::Filter { input, predicate } if is_streamable(predicate.node(), expr_arena) => { let predicate = predicate.clone(); let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?; - PhysNodeKind::Filter { input: phys_input, predicate } + PhysNodeKind::Filter { + input: phys_input, + predicate, + } }, IR::DataFrameScan { @@ -99,7 +113,10 @@ pub fn lower_ir( let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind)); node_kind = PhysNodeKind::SimpleProjection { input: phys_input, - columns: projection_schema.iter_names().map(|s| s.to_string()).collect(), + columns: projection_schema + .iter_names() + .map(|s| s.to_string()) + .collect(), }; schema = projection_schema.clone(); } @@ -110,9 +127,12 @@ pub fn lower_ir( } let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind)); - node_kind = PhysNodeKind::Filter { input: phys_input, predicate }; + node_kind = PhysNodeKind::Filter { + input: phys_input, + predicate, + }; } - + node_kind }, @@ -131,10 +151,16 @@ pub fn lower_ir( if function.is_streamable() { let map = Arc::new(move |df| function.evaluate(df)); - PhysNodeKind::Map { input: phys_input, map } + PhysNodeKind::Map { + input: phys_input, + map, + } } else { let map = Arc::new(move |df| function.evaluate(df)); - PhysNodeKind::InMemoryMap { input: phys_input, map } + PhysNodeKind::InMemoryMap { + input: phys_input, + map, + } } }, @@ -143,13 +169,11 @@ pub fn lower_ir( by_column, slice, sort_options, - } => { - PhysNodeKind::Sort { - by_column: by_column.clone(), - slice: *slice, - sort_options: sort_options.clone(), - input: lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?, - } + } => PhysNodeKind::Sort { + by_column: by_column.clone(), + slice: *slice, + sort_options: sort_options.clone(), + input: lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?, }, IR::Union { inputs, options } => { @@ -183,7 +207,7 @@ pub fn lower_ir( _ => todo!(), }; - + let output_schema = IR::schema_with_cache(node, ir_arena, schema_cache); Ok(phys_sm.insert(PhysNode::new(output_schema, node_kind))) } diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index 3017f7be1e2e..bed1f70467cb 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -29,7 +29,10 @@ pub struct PhysNode { impl PhysNode { pub fn new(output_schema: Arc, kind: PhysNodeKind) -> Self { - Self { output_schema, kind } + Self { + output_schema, + kind, + } } } diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 94561da9d2f7..b35fd371ec0f 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -14,7 +14,7 @@ use polars_utils::itertools::Itertools; use recursive::recursive; use slotmap::{SecondaryMap, SlotMap}; -use super::{PhysNode, PhysNodeKind, PhysNodeKey}; +use super::{PhysNode, PhysNodeKey, PhysNodeKind}; use crate::expression::StreamExpr; use crate::graph::{Graph, GraphNodeKey}; use crate::nodes; @@ -130,10 +130,7 @@ fn to_graph_rec<'a>( [input_key], ) }, - Reduce { - input, - exprs, - } => { + Reduce { input, exprs } => { let input_key = to_graph_rec(*input, ctx)?; let input_schema = &ctx.phys_sm[*input].output_schema; @@ -142,8 +139,7 @@ fn to_graph_rec<'a>( for e in exprs { let (red, input_node) = - into_reduction(e.node(), ctx.expr_arena, input_schema)? - .expect("invariant"); + into_reduction(e.node(), ctx.expr_arena, input_schema)?.expect("invariant"); reductions.push(red); let input_phys = @@ -157,17 +153,11 @@ fn to_graph_rec<'a>( [input_key], ) }, - SimpleProjection { - input, - columns, - } => { + SimpleProjection { input, columns } => { let input_schema = ctx.phys_sm[*input].output_schema.clone(); let input_key = to_graph_rec(*input, ctx)?; ctx.graph.add_node( - nodes::simple_projection::SimpleProjectionNode::new( - columns.clone(), - input_schema, - ), + nodes::simple_projection::SimpleProjectionNode::new(columns.clone(), input_schema), [input_key], ) }, @@ -181,10 +171,7 @@ fn to_graph_rec<'a>( ) }, - InMemoryMap { - input, - map, - } => { + InMemoryMap { input, map } => { let input_schema = ctx.phys_sm[*input].output_schema.clone(); let input_key = to_graph_rec(*input, ctx)?; ctx.graph.add_node( @@ -248,7 +235,10 @@ fn to_graph_rec<'a>( inputs, null_extend, } => { - let input_schemas = inputs.iter().map(|i| ctx.phys_sm[*i].output_schema.clone()).collect_vec(); + let input_schemas = inputs + .iter() + .map(|i| ctx.phys_sm[*i].output_schema.clone()) + .collect_vec(); let input_keys = inputs .iter() .map(|i| to_graph_rec(*i, ctx)) diff --git a/crates/polars-stream/src/skeleton.rs b/crates/polars-stream/src/skeleton.rs index cdbc3e8a0c2d..392f132609c1 100644 --- a/crates/polars-stream/src/skeleton.rs +++ b/crates/polars-stream/src/skeleton.rs @@ -19,7 +19,13 @@ pub fn run_query( ) -> PolarsResult { let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len()); let mut schema_cache = PlHashMap::with_capacity(ir_arena.len()); - let root = crate::physical_plan::lower_ir(node, &mut ir_arena, expr_arena, &mut phys_sm, &mut schema_cache)?; + let root = crate::physical_plan::lower_ir( + node, + &mut ir_arena, + expr_arena, + &mut phys_sm, + &mut schema_cache, + )?; let (mut graph, phys_to_graph) = crate::physical_plan::physical_plan_to_graph(&phys_sm, expr_arena)?; let mut results = crate::execute::execute_graph(&mut graph)?;