diff --git a/crates/polars-expr/src/reduce/convert.rs b/crates/polars-expr/src/reduce/convert.rs index f5a33aca1a0b..cfbecdfb2014 100644 --- a/crates/polars-expr/src/reduce/convert.rs +++ b/crates/polars-expr/src/reduce/convert.rs @@ -5,26 +5,15 @@ use polars_utils::arena::{Arena, Node}; use super::extrema::*; use super::sum::SumReduce; use super::*; +use crate::reduce::len::LenReduce; use crate::reduce::mean::MeanReduce; -pub fn can_convert_into_reduction(node: Node, expr_arena: &Arena) -> bool { - match expr_arena.get(node) { - AExpr::Agg(agg) => matches!( - agg, - IRAggExpr::Min { .. } - | IRAggExpr::Max { .. } - | IRAggExpr::Mean { .. } - | IRAggExpr::Sum(_) - ), - _ => false, - } -} - +/// Converts a node into a reduction + its associated selector expression. pub fn into_reduction( node: Node, - expr_arena: &Arena, + expr_arena: &mut Arena, schema: &Schema, -) -> PolarsResult, Node)>> { +) -> PolarsResult<(Box, Node)> { let e = expr_arena.get(node); let field = e.to_field(schema, Context::Default, expr_arena)?; let out = match expr_arena.get(node) { @@ -74,9 +63,20 @@ pub fn into_reduction( let out: Box = Box::new(MeanReduce::new(field.dtype.clone())); (out, *input) }, - _ => return Ok(None), + _ => unreachable!(), + }, + AExpr::Len => { + // Compute length on the first column, or if none exist we'll never + // be called and correctly return 0 as length anyway. + let out: Box = Box::new(LenReduce::new()); + let expr = if let Some(first_column) = schema.iter_names().next() { + expr_arena.add(AExpr::Column(first_column.as_str().into())) + } else { + expr_arena.add(AExpr::Literal(LiteralValue::Null)) + }; + (out, expr) }, - _ => return Ok(None), + _ => unreachable!(), }; - Ok(Some(out)) + Ok(out) } diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs new file mode 100644 index 000000000000..bf9391e8fd33 --- /dev/null +++ b/crates/polars-expr/src/reduce/len.rs @@ -0,0 +1,45 @@ +use polars_core::error::constants::LENGTH_LIMIT_MSG; + +use super::*; + +#[derive(Clone)] +pub struct LenReduce { + len: u64, +} + +impl LenReduce { + pub(crate) fn new() -> Self { + Self { len: 0 } + } +} + +impl Reduction for LenReduce { + fn init_dyn(&self) -> Box { + Box::new(Self::new()) + } + + fn reset(&mut self) { + self.len = 0; + } + + fn update(&mut self, batch: &Series) -> PolarsResult<()> { + self.len += batch.len() as u64; + Ok(()) + } + + fn combine(&mut self, other: &dyn Reduction) -> PolarsResult<()> { + let other = other.as_any().downcast_ref::().unwrap(); + self.len += other.len; + Ok(()) + } + + fn finalize(&mut self) -> PolarsResult { + #[allow(clippy::useless_conversion)] + let as_idx: IdxSize = self.len.try_into().expect(LENGTH_LIMIT_MSG); + Ok(Scalar::new(IDX_DTYPE, as_idx.into())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/crates/polars-expr/src/reduce/mean.rs b/crates/polars-expr/src/reduce/mean.rs index 0d06974d956b..4c5fe635a442 100644 --- a/crates/polars-expr/src/reduce/mean.rs +++ b/crates/polars-expr/src/reduce/mean.rs @@ -1,5 +1,3 @@ -use polars_core::utils::Container; - use super::*; #[derive(Clone)] diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index bb51ba5c8a8d..9c7bb4f6f6bc 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -1,11 +1,12 @@ mod convert; mod extrema; +mod len; mod mean; mod sum; use std::any::Any; -pub use convert::{can_convert_into_reduction, into_reduction}; +pub use convert::into_reduction; use polars_core::prelude::*; #[allow(dead_code)] diff --git a/crates/polars-stream/src/physical_plan/lower_expr.rs b/crates/polars-stream/src/physical_plan/lower_expr.rs index fe98cc8efe4c..60e5142d914f 100644 --- a/crates/polars-stream/src/physical_plan/lower_expr.rs +++ b/crates/polars-stream/src/physical_plan/lower_expr.rs @@ -410,7 +410,8 @@ fn lower_exprs_with_ctx( let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?; let exploded_name = unique_column_name(); let trans_inner = ctx.expr_arena.add(AExpr::Explode(trans_exprs[0])); - let explode_expr = ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone())); + let explode_expr = + ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone())); let output_schema = schema_for_select(trans_input, &[explode_expr.clone()], ctx)?; let node_kind = PhysNodeKind::Select { input: trans_input, @@ -596,19 +597,26 @@ fn lower_exprs_with_ctx( transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name))); }, }, - AExpr::AnonymousFunction { - .. - } - | AExpr::Function { - .. - } - | AExpr::Len // TODO: this one makes me really sad, make this streaming ASAP. + AExpr::Len => { + let out_name = unique_column_name(); + let expr_ir = ExprIR::new(expr, OutputName::Alias(out_name.clone())); + let output_schema = schema_for_select(input, &[expr_ir.clone()], ctx)?; + let kind = PhysNodeKind::Reduce { + input, + exprs: vec![expr_ir], + }; + let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind)); + input_nodes.insert(reduce_node_key); + transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name))); + }, + AExpr::AnonymousFunction { .. } + | AExpr::Function { .. } | AExpr::Slice { .. } | AExpr::Window { .. } => { let out_name = unique_column_name(); fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone()))); transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name))); - } + }, } } diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 44e32e6fc348..d0bd342b0f65 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -47,7 +47,7 @@ fn create_stream_expr( struct GraphConversionContext<'a> { phys_sm: &'a SlotMap, - expr_arena: &'a Arena, + expr_arena: &'a mut Arena, graph: Graph, phys_to_graph: SecondaryMap, expr_conversion_state: ExpressionConversionState, @@ -56,7 +56,7 @@ struct GraphConversionContext<'a> { pub fn physical_plan_to_graph( root: PhysNodeKey, phys_sm: &SlotMap, - expr_arena: &Arena, + expr_arena: &mut Arena, ) -> PolarsResult<(Graph, SecondaryMap)> { let expr_depth_limit = get_expr_depth_limit()?; let mut ctx = GraphConversionContext { @@ -138,8 +138,7 @@ fn to_graph_rec<'a>( let mut inputs = Vec::with_capacity(reductions.len()); for e in exprs { - let (red, input_node) = - into_reduction(e.node(), ctx.expr_arena, input_schema)?.expect("invariant"); + let (red, input_node) = into_reduction(e.node(), ctx.expr_arena, input_schema)?; reductions.push(red); let input_phys = diff --git a/py-polars/tests/unit/constructors/test_constructors.py b/py-polars/tests/unit/constructors/test_constructors.py index ffda370aa538..fda072930525 100644 --- a/py-polars/tests/unit/constructors/test_constructors.py +++ b/py-polars/tests/unit/constructors/test_constructors.py @@ -1677,6 +1677,7 @@ def __arrow_c_array__(self, requested_schema: object = None) -> object: def test_pycapsule_interface(df: pl.DataFrame) -> None: + df = df.rechunk() pyarrow_table = df.to_arrow() # Array via C data interface diff --git a/py-polars/tests/unit/test_projections.py b/py-polars/tests/unit/test_projections.py index 9586bfb0a2ae..700100ced4c4 100644 --- a/py-polars/tests/unit/test_projections.py +++ b/py-polars/tests/unit/test_projections.py @@ -78,11 +78,19 @@ def test_unnest_projection_pushdown() -> None: pl.col("field_2").cast(pl.Categorical).alias("col"), pl.col("value"), ) - out = mlf.collect().to_dict(as_series=False) + + out = ( + mlf.sort( + [pl.col.row.cast(pl.String), pl.col.col.cast(pl.String)], + maintain_order=True, + ) + .collect() + .to_dict(as_series=False) + ) assert out == { - "row": ["y", "y", "b", "b"], - "col": ["z", "z", "c", "c"], - "value": [1, 2, 2, 3], + "row": ["b", "b", "y", "y"], + "col": ["c", "c", "z", "z"], + "value": [2, 3, 1, 2], }