diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/aexpr/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/aexpr/mod.rs index 260e354287e29..3c6cb98f6c315 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/aexpr/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/aexpr/mod.rs @@ -9,6 +9,7 @@ use polars_utils::arena::{Arena, Node}; use crate::dsl::function_expr::FunctionExpr; use crate::logical_plan::Context; +use crate::prelude::aexpr::NodeInputs::Single; use crate::prelude::names::COUNT; use crate::prelude::*; @@ -173,12 +174,90 @@ impl AExpr { } } - pub(crate) fn get_input(&self) -> Node { + pub(crate) fn get_input(&self) -> NodeInputs { use AExpr::*; + use NodeInputs::*; match self { - Alias(input, _) => *input, - Cast { expr, .. } => *expr, - _ => todo!(), + Alias(input, _) => Single(*input), + Cast { expr, .. } => Single(*expr), + Explode(input) => Single(*input), + Column(_) => Leaf, + Literal(_) => Leaf, + BinaryExpr { left, right, .. } => Many(vec![*left, *right]), + Sort { expr, .. } => Single(*expr), + Take { expr, .. } => Single(*expr), + SortBy { expr, by, .. } => { + let mut many = by.clone(); + many.push(*expr); + Many(many) + } + Filter { input, .. } => Single(*input), + Agg(a) => a.get_input(), + Ternary { + truthy, + falsy, + predicate, + } => Many(vec![*truthy, *falsy, *predicate]), + AnonymousFunction { input, .. } | Function { input, .. } => match input.len() { + 1 => Single(input[0]), + _ => Many(input.clone()), + }, + Window { + function, + order_by, + partition_by, + .. + } => { + let mut out = Vec::with_capacity(partition_by.len() + 2); + out.push(*function); + if let Some(a) = order_by { + out.push(*a); + } + out.extend(partition_by); + Many(out) + } + Wildcard => panic!("no wildcard expected"), + Slice { input, .. } => Single(*input), + Count => Leaf, + Nth(_) => Leaf, + } + } +} + +impl AAggExpr { + pub(crate) fn get_input(&self) -> NodeInputs { + use AAggExpr::*; + match self { + Min { input, .. } => Single(*input), + Max { input, .. } => Single(*input), + Median(input) => Single(*input), + NUnique(input) => Single(*input), + First(input) => Single(*input), + Last(input) => Single(*input), + Mean(input) => Single(*input), + List(input) => Single(*input), + Quantile { expr, .. } => Single(*expr), + Sum(input) => Single(*input), + Count(input) => Single(*input), + Std(input, _) => Single(*input), + Var(input, _) => Single(*input), + AggGroups(input) => Single(*input), + } + } +} + +pub(crate) enum NodeInputs { + Leaf, + Single(Node), + Many(Vec), +} + +impl NodeInputs { + pub(crate) fn first(&self) -> Node { + match self { + Single(node) => *node, + NodeInputs::Many(nodes) => nodes[0], + NodeInputs::Leaf => panic!(), } } } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs b/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs index 3e7a26abf7ff6..433c7d9a94693 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs @@ -237,6 +237,7 @@ impl ALogicalPlan { Melt { schema, .. } => schema, MapFunction { input, function } => { let input_schema = arena.get(*input).schema(arena); + return match input_schema { Cow::Owned(schema) => { Cow::Owned(function.schema(&schema).unwrap().into_owned()) diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs index ba6916906e790..c8792c5f2fa42 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs @@ -385,6 +385,14 @@ impl LogicalPlanBuilder { .into() } + pub fn add_err(self, err: PolarsError) -> Self { + LogicalPlan::Error { + input: Box::new(self.0), + err: err.into(), + } + .into() + } + pub fn with_context(self, contexts: Vec) -> Self { let mut schema = try_delayed!(self.0.schema(), &self.0, into) .as_ref() diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs index 82e1a33db5768..f8f6d05c3ac09 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs @@ -1,5 +1,6 @@ #[cfg(feature = "merge_sorted")] mod merge_sorted; +mod rename; use std::borrow::Cow; use std::fmt::{Debug, Display, Formatter}; @@ -56,6 +57,12 @@ pub enum FunctionNode { // sorted column that serves as the key column: Arc, }, + Rename { + existing: Arc>, + new: Arc>, + // A column name gets swapped with an existing column + swapping: bool, + }, } impl PartialEq for FunctionNode { @@ -65,6 +72,18 @@ impl PartialEq for FunctionNode { (FastProjection { columns: l }, FastProjection { columns: r }) => l == r, (DropNulls { subset: l }, DropNulls { subset: r }) => l == r, (Rechunk, Rechunk) => true, + ( + Rename { + existing: existing_l, + new: new_l, + .. + }, + Rename { + existing: existing_r, + new: new_r, + .. + }, + ) => existing_l == existing_r && new_l == new_r, _ => false, } } @@ -78,7 +97,7 @@ impl FunctionNode { Rechunk | Pipeline { .. } => false, #[cfg(feature = "merge_sorted")] MergeSorted { .. } => false, - DropNulls { .. } | FastProjection { .. } | Unnest { .. } => true, + DropNulls { .. } | FastProjection { .. } | Unnest { .. } | Rename { .. } => true, Opaque { streamable, .. } => *streamable, } } @@ -141,6 +160,7 @@ impl FunctionNode { } #[cfg(feature = "merge_sorted")] MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)), + Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new), } } @@ -148,7 +168,9 @@ impl FunctionNode { use FunctionNode::*; match self { Opaque { predicate_pd, .. } => *predicate_pd, - FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } => true, + FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } | Rename { .. } => { + true + } #[cfg(feature = "merge_sorted")] MergeSorted { .. } => true, Pipeline { .. } => unimplemented!(), @@ -159,7 +181,9 @@ impl FunctionNode { use FunctionNode::*; match self { Opaque { projection_pd, .. } => *projection_pd, - FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } => true, + FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } | Rename { .. } => { + true + } #[cfg(feature = "merge_sorted")] MergeSorted { .. } => true, Pipeline { .. } => unimplemented!(), @@ -209,6 +233,7 @@ impl FunctionNode { Arc::get_mut(function).unwrap().call_udf(df) } } + Rename { existing, new, .. } => rename::rename_impl(df, existing, new), } } } @@ -252,6 +277,7 @@ impl Display for FunctionNode { writeln!(f, "PIPELINE") } } + Rename { .. } => write!(f, "RENAME"), } } } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/functions/rename.rs b/polars/polars-lazy/polars-plan/src/logical_plan/functions/rename.rs new file mode 100644 index 0000000000000..9c6ba2e1ec154 --- /dev/null +++ b/polars/polars-lazy/polars-plan/src/logical_plan/functions/rename.rs @@ -0,0 +1,41 @@ +use super::*; + +pub(super) fn rename_impl( + mut df: DataFrame, + existing: &[String], + new: &[String], +) -> PolarsResult { + let positions = existing + .iter() + .map(|old| df.find_idx_by_name(old)) + .collect::>(); + + for (pos, name) in positions.iter().zip(new.iter()) { + // the column might be removed due to projection pushdown + // so we only update if we can find it. + if let Some(pos) = pos { + df.get_columns_mut()[*pos].rename(name); + } + } + // recreate dataframe so we check duplicates + let columns = std::mem::take(df.get_columns_mut()); + DataFrame::new(columns) +} + +pub(super) fn rename_schema<'a>( + input_schema: &'a SchemaRef, + existing: &[String], + new: &[String], +) -> PolarsResult> { + let mut new_schema = (**input_schema).clone(); + for (old, new) in existing.iter().zip(new.iter()) { + // the column might be removed due to projection pushdown + // so we only update if we can find it. + if let Some(dtype) = input_schema.get(old) { + if new_schema.with_column(new.clone(), dtype.clone()).is_none() { + new_schema.remove(old); + } + } + } + Ok(Cow::Owned(Arc::new(new_schema))) +} diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/iterator.rs b/polars/polars-lazy/polars-plan/src/logical_plan/iterator.rs index ec27a1de5c43f..40580e6a451ba 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/iterator.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/iterator.rs @@ -97,7 +97,7 @@ macro_rules! push_expr { impl Expr { /// Expr::mutate().apply(fn()) pub fn mutate(&mut self) -> ExprMut { - let mut stack = Vec::with_capacity(8); + let mut stack = Vec::with_capacity(4); stack.push(self); ExprMut { stack } } @@ -151,7 +151,7 @@ impl<'a> IntoIterator for &'a Expr { type IntoIter = ExprIter<'a>; fn into_iter(self) -> Self::IntoIter { - let mut stack = Vec::with_capacity(8); + let mut stack = Vec::with_capacity(4); stack.push(self); ExprIter { stack } } @@ -282,7 +282,7 @@ pub trait ArenaExprIter<'a> { impl<'a> ArenaExprIter<'a> for &'a Arena { fn iter(&self, root: Node) -> AExprIter<'a> { - let mut stack = Vec::with_capacity(8); + let mut stack = Vec::with_capacity(4); stack.push(root); AExprIter { stack, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/keys.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/keys.rs new file mode 100644 index 0000000000000..a11e2a8f00933 --- /dev/null +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/keys.rs @@ -0,0 +1,40 @@ +//! Keys in the `acc_predicates` hashmap. +use super::*; + +// an invisible ascii token we use as delimiter +const HIDDEN_DELIMITER: char = '\u{1D17A}'; + +/// Determine the hashmap key by combining all the leaf column names of a predicate +pub(super) fn predicate_to_key(predicate: Node, expr_arena: &Arena) -> Arc { + let mut iter = aexpr_to_leaf_names_iter(predicate, expr_arena); + if let Some(first) = iter.next() { + if let Some(second) = iter.next() { + let mut new = String::with_capacity(32 * iter.size_hint().0); + new.push_str(&first); + new.push(HIDDEN_DELIMITER); + new.push_str(&second); + + for name in iter { + new.push(HIDDEN_DELIMITER); + new.push_str(&name); + } + return Arc::from(new); + } + first + } else { + let mut s = String::new(); + s.push(HIDDEN_DELIMITER); + Arc::from(s) + } +} + +pub(super) fn key_has_name(key: &str, name: &str) -> bool { + if key.contains(HIDDEN_DELIMITER) { + for root_name in key.split(HIDDEN_DELIMITER) { + if root_name == name { + return true; + } + } + } + key == name +} diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index f77ae71e53e58..2fe6ff728a25c 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -1,3 +1,5 @@ +mod keys; +mod rename; mod utils; use polars_core::datatypes::PlHashMap; @@ -7,6 +9,7 @@ use utils::*; use super::*; use crate::dsl::function_expr::FunctionExpr; use crate::logical_plan::{optimizer, Context}; +use crate::prelude::optimizer::predicate_pushdown::rename::process_rename; use crate::utils::{aexprs_to_schema, check_input_node, has_aexpr}; #[derive(Default)] @@ -516,7 +519,25 @@ impl PredicatePushDown { MapFunction { ref function, .. } => { if function.allow_predicate_pd() { - self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) + match function { + FunctionNode::Rename { + existing, + new, + .. + } => { + let local_predicates = process_rename(&mut acc_predicates, + expr_arena, + existing, + new, + )?; + let lp = self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)?; + Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) + }, _ => { + self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) + } + } + + } else { self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/rename.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/rename.rs new file mode 100644 index 0000000000000..afc6728d85cfb --- /dev/null +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/rename.rs @@ -0,0 +1,62 @@ +use super::*; +use crate::prelude::optimizer::predicate_pushdown::keys::{key_has_name, predicate_to_key}; + +fn remove_any_key_referencing_renamed( + new: &str, + acc_predicates: &mut PlHashMap, Node>, + local_predicates: &mut Vec, +) { + let mut move_to_local = vec![]; + for key in acc_predicates.keys() { + if key_has_name(key, new) { + move_to_local.push(key.clone()) + } + } + + for key in move_to_local { + local_predicates.push(acc_predicates.remove(&key).unwrap()) + } +} + +pub(super) fn process_rename( + acc_predicates: &mut PlHashMap, Node>, + expr_arena: &mut Arena, + existing: &[String], + new: &[String], +) -> PolarsResult> { + let mut local_predicates = vec![]; + for (existing, new) in existing.iter().zip(new.iter()) { + let has_existing = acc_predicates.contains_key(existing.as_str()); + let has_new = acc_predicates.contains_key(new.as_str()); + let has_both = has_existing && has_new; + + // swapping path add to local for now + if has_both { + // Search for the key and add it to local because swapping is more complicated + if let Some(to_local) = acc_predicates.remove(new.as_str()) { + local_predicates.push(to_local); + } else { + // The keys can be combined eg. `a AND b AND c` in this case replacing/finding + // the key that should be renamed is more complicated, so for now + // we just move it to local. + remove_any_key_referencing_renamed(new, acc_predicates, &mut local_predicates) + } + continue; + } + // simple new name path + else { + // Find the key and update the predicate as well as the key + // This ensure the optimization is pushed down. + if let Some(node) = acc_predicates.remove(new.as_str()) { + let new_node = rename_matching_aexpr_leaf_names(node, expr_arena, new, existing); + acc_predicates.insert(predicate_to_key(new_node, expr_arena), new_node); + } else { + // The keys can be combined eg. `a AND b AND c` in this case replacing/finding + // the key that should be renamed is more complicated, so for now + // we just move it to local. + remove_any_key_referencing_renamed(new, acc_predicates, &mut local_predicates) + } + } + } + Ok(local_predicates) +} diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/utils.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/utils.rs index 6c11b2d4b310f..f3c7e974579df 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/utils.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/utils.rs @@ -1,6 +1,7 @@ use polars_core::datatypes::PlHashMap; use polars_core::prelude::*; +use super::keys::*; use crate::logical_plan::Context; use crate::prelude::*; use crate::utils::{aexpr_to_leaf_names, check_input_node, has_aexpr, rename_aexpr_leaf_names}; @@ -75,44 +76,6 @@ pub(super) fn predicate_at_scan( } } -// an invisible ascii token we use as delimiter -const HIDDEN_DELIMITER: char = '\u{1D17A}'; - -/// Determine the hashmap key by combining all the leaf column names of a predicate -pub(super) fn predicate_to_key(predicate: Node, expr_arena: &Arena) -> Arc { - let mut iter = aexpr_to_leaf_names_iter(predicate, expr_arena); - if let Some(first) = iter.next() { - if let Some(second) = iter.next() { - let mut new = String::with_capacity(32 * iter.size_hint().0); - new.push_str(&first); - new.push(HIDDEN_DELIMITER); - new.push_str(&second); - - for name in iter { - new.push(HIDDEN_DELIMITER); - new.push_str(&name); - } - return Arc::from(new); - } - first - } else { - let mut s = String::new(); - s.push(HIDDEN_DELIMITER); - Arc::from(s) - } -} - -fn key_has_name(key: &str, name: &str) -> bool { - if key.contains(HIDDEN_DELIMITER) { - for root_name in key.split(HIDDEN_DELIMITER) { - if root_name == name { - return true; - } - } - } - key == name -} - // this checks if a predicate from a node upstream can pass // the predicate in this filter // Cases where this cannot be the case: diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs index 1e0018d0b4c6d..5efce9cd1b78c 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs @@ -254,7 +254,7 @@ pub(super) fn process_join( if name.contains(suffix.as_ref()) && schema_after_join.get(&name).is_none() { let new_name = &name.as_ref()[..name.len() - suffix.len()]; - let renamed = aexpr_assign_renamed_root(*proj, expr_arena, &name, new_name); + let renamed = aexpr_assign_renamed_leaf(*proj, expr_arena, &name, new_name); let aliased = expr_arena.add(AExpr::Alias(renamed, name)); *proj = aliased; diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index d1be6da19e23d..b2e1a9a8ab9d4 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -4,6 +4,7 @@ mod hstack; mod joins; mod melt; mod projection; +mod rename; #[cfg(feature = "semi_anti_join")] mod semi_anti_join; @@ -20,9 +21,10 @@ use crate::prelude::optimizer::projection_pushdown::hstack::process_hstack; use crate::prelude::optimizer::projection_pushdown::joins::process_join; use crate::prelude::optimizer::projection_pushdown::melt::process_melt; use crate::prelude::optimizer::projection_pushdown::projection::process_projection; +use crate::prelude::optimizer::projection_pushdown::rename::process_rename; use crate::prelude::*; use crate::utils::{ - aexpr_assign_renamed_root, aexpr_to_leaf_names, aexpr_to_leaf_nodes, check_input_node, + aexpr_assign_renamed_leaf, aexpr_to_leaf_names, aexpr_to_leaf_nodes, check_input_node, expr_is_projected_upstream, }; @@ -758,6 +760,34 @@ impl ProjectionPushDown { input, function: function.clone(), }; + + if let FunctionNode::Rename { + existing, + new, + swapping, + } = function + { + process_rename( + &mut acc_projections, + &mut projected_names, + expr_arena, + existing, + new, + *swapping, + )?; + self.pushdown_and_assign( + input, + acc_projections, + projected_names, + projections_seen, + lp_arena, + expr_arena, + )?; + return Ok(lp); + } + + let MapFunction {ref function, ..} = lp else { unreachable!() }; + if function.allow_projection_pd() && !acc_projections.is_empty() { let original_acc_projection_len = acc_projections.len(); diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/rename.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/rename.rs new file mode 100644 index 0000000000000..f934ea009dcb5 --- /dev/null +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/rename.rs @@ -0,0 +1,78 @@ +use std::collections::BTreeSet; + +use super::*; + +fn iter_and_update_nodes( + existing: &str, + new: &str, + acc_projections: &mut [Node], + expr_arena: &mut Arena, + processed: &mut BTreeSet, +) { + for node in acc_projections.iter_mut() { + if !processed.contains(&node.0) { + let new_node = rename_matching_aexpr_leaf_names(*node, expr_arena, new, existing); + if new_node != *node { + *node = new_node; + processed.insert(node.0); + } + } + } +} + +#[allow(clippy::too_many_arguments)] +pub(super) fn process_rename( + acc_projections: &mut [Node], + projected_names: &mut PlHashSet>, + expr_arena: &mut Arena, + existing: &[String], + new: &[String], + swapping: bool, +) -> PolarsResult<()> { + let mut processed = BTreeSet::new(); + if swapping { + for (existing, new) in existing.iter().zip(new.iter()) { + let has_existing = projected_names.contains(existing.as_str()); + let has_new = projected_names.contains(new.as_str()); + let has_both = has_existing && has_new; + let has_any = has_existing || has_new; + + if has_any { + // swapping path + // this must leave projected names intact, as we only swap + if has_both { + iter_and_update_nodes( + existing, + new, + acc_projections, + expr_arena, + &mut processed, + ); + } + // simple new name path + // this must add and remove names + else { + projected_names.remove(new.as_str()); + let name: Arc = Arc::from(existing.as_str()); + projected_names.insert(name); + iter_and_update_nodes( + existing, + new, + acc_projections, + expr_arena, + &mut processed, + ); + } + } + } + } else { + for (existing, new) in existing.iter().zip(new.iter()) { + if projected_names.remove(new.as_str()) { + let name: Arc = Arc::from(existing.as_str()); + projected_names.insert(name); + iter_and_update_nodes(existing, new, acc_projections, expr_arena, &mut processed); + } + } + } + Ok(()) +} diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_expr.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_expr.rs index b2ac68a322788..f3d59dab4eafd 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_expr.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_expr.rs @@ -29,7 +29,7 @@ impl OptimizationRule for SlicePushDown { let out = match expr_arena.get(*input) { m @ Alias(..) | m @ Cast { .. } => { let m = m.clone(); - let input = m.get_input(); + let input = m.get_input().first(); let new_input = pushdown(input, offset, length, expr_arena); Some(m.replace_input(new_input)) } diff --git a/polars/polars-lazy/polars-plan/src/utils.rs b/polars/polars-lazy/polars-plan/src/utils.rs index 7f714d6c1570b..34fab32eb9f7c 100644 --- a/polars/polars-lazy/polars-plan/src/utils.rs +++ b/polars/polars-lazy/polars-plan/src/utils.rs @@ -258,17 +258,43 @@ pub(crate) fn rename_aexpr_leaf_names( to_aexpr(new_expr, arena) } +/// If the leaf names match `current`, the node will be replaced +/// with a renamed expression. +pub(crate) fn rename_matching_aexpr_leaf_names( + node: Node, + arena: &mut Arena, + current: &str, + new_name: &str, +) -> Node { + let mut leaves = aexpr_to_leaf_nodes_iter(node, arena); + + if leaves.any(|node| matches!(arena.get(node), AExpr::Column(name) if &**name == current)) { + // we convert to expression as we cannot easily copy the aexpr. + let mut new_expr = node_to_expr(node, arena); + new_expr.mutate().apply(|e| match e { + Expr::Column(name) if &**name == current => { + *name = Arc::from(new_name); + true + } + _ => true, + }); + to_aexpr(new_expr, arena) + } else { + node + } +} + /// Rename the root of the expression from `current` to `new` and assign to new node in arena. /// Returns `Node` on first successful rename. -pub(crate) fn aexpr_assign_renamed_root( +pub(crate) fn aexpr_assign_renamed_leaf( node: Node, arena: &mut Arena, current: &str, new_name: &str, ) -> Node { - let roots = aexpr_to_leaf_nodes(node, arena); + let leafs = aexpr_to_leaf_nodes_iter(node, arena); - for node in roots { + for node in leafs { match arena.get(node) { AExpr::Column(name) if &**name == current => { return arena.add(AExpr::Column(Arc::from(new_name))) diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index 33698f8371784..32172b28ffd33 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -266,96 +266,6 @@ impl LazyFrame { self.select_local(vec![col("*").reverse()]) } - fn rename_impl_swapping(self, existing: Vec, new: Vec) -> Self { - assert_eq!(new.len(), existing.len()); - - let existing2 = existing.clone(); - let new2 = new.clone(); - let udf_schema = move |old_schema: &Schema| { - let mut new_schema = old_schema.clone(); - - // schema after renaming - for (old, new) in existing2.iter().zip(new2.iter()) { - let dtype = old_schema.try_get(old)?; - if new_schema.with_column(new.clone(), dtype.clone()).is_none() { - new_schema.remove(old); - } - } - Ok(Arc::new(new_schema)) - }; - - self.map( - move |mut df: DataFrame| { - let positions = existing - .iter() - .map(|old| df.try_find_idx_by_name(old)) - .collect::>>()?; - - for (pos, name) in positions.iter().zip(new.iter()) { - df.get_columns_mut()[*pos].rename(name); - } - // recreate dataframe so we check duplicates - let columns = std::mem::take(df.get_columns_mut()); - DataFrame::new(columns) - }, - // Don't allow optimizations. Swapping names are opaque to the optimizer - AllowedOptimizations { - projection_pushdown: false, - predicate_pushdown: false, - streaming: true, - ..Default::default() - }, - Some(Arc::new(udf_schema)), - Some("RENAME_SWAPPING"), - ) - } - - fn rename_impl(self, existing: Vec, new: Vec) -> Self { - let existing2 = existing.clone(); - let new2 = new.clone(); - let udf_schema = move |s: &Schema| { - let mut new_schema = s.clone(); - for (old, new) in existing2.iter().zip(&new2) { - let _ = new_schema.rename(old, new.clone()); - } - Ok(Arc::new(new_schema)) - }; - - self.with_columns( - existing - .iter() - .zip(&new) - .map(|(old, new)| col(old).alias(new.as_ref())) - .collect::>(), - ) - .map( - move |mut df: DataFrame| { - let cols = df.get_columns_mut(); - let mut removed_count = 0; - for (existing, new) in existing.iter().zip(new.iter()) { - let idx_a = cols.iter().position(|s| s.name() == existing.as_str()); - let idx_b = cols.iter().position(|s| s.name() == new.as_str()); - - match (idx_a, idx_b) { - (Some(idx_a), Some(idx_b)) => { - cols.swap(idx_a, idx_b); - } - // renamed columns are removed by predicate pushdown - _ => { - removed_count += 1; - continue; - } - } - } - cols.truncate(cols.len() - (existing.len() - removed_count)); - Ok(df) - }, - Default::default(), - Some(Arc::new(udf_schema)), - Some("RENAME"), - ) - } - /// Rename columns in the DataFrame. pub fn rename(self, existing: I, new: J) -> Self where @@ -364,38 +274,48 @@ impl LazyFrame { T: AsRef, S: AsRef, { - // We dispatch to 2 implementations. - // 1 is swapping eg. rename a -> b and b -> a - // 2 is non-swapping eg. rename a -> new_name - // the latter allows predicate pushdown. - let existing = existing - .into_iter() - .map(|a| a.as_ref().to_string()) - .collect::>(); - let new = new - .into_iter() - .map(|a| a.as_ref().to_string()) - .collect::>(); - - fn inner(lf: LazyFrame, existing: Vec, new: Vec) -> LazyFrame { - // remove mappings that map to themselves. - let (existing, new): (Vec<_>, Vec<_>) = existing - .into_iter() - .zip(new) - .flat_map(|(a, b)| if a == b { None } else { Some((a, b)) }) - .unzip(); - - // todo! make delayed - let schema = &*lf.schema().unwrap(); - // a column gets swapped - if new.iter().any(|name| schema.get(name).is_some()) { - lf.rename_impl_swapping(existing, new) - } else { - lf.rename_impl(existing, new) + let iter = existing.into_iter(); + let cap = iter.size_hint().0; + let mut existing_vec = Vec::with_capacity(cap); + let mut new_vec = Vec::with_capacity(cap); + + for (existing, new) in iter.zip(new.into_iter()) { + let existing = existing.as_ref(); + let new = new.as_ref(); + + if new != existing { + existing_vec.push(existing.to_string()); + new_vec.push(new.to_string()); } } - inner(self, existing, new) + // a column gets swapped + let schema = &*self.schema().unwrap(); + let swapping = new_vec.iter().any(|name| schema.get(name).is_some()); + + let mut opt_not_found = None; + existing_vec.iter().for_each(|name| { + let invalid = schema.get(name).is_none(); + + if invalid && opt_not_found.is_none() { + opt_not_found = Some(name) + } + }); + + if let Some(name) = opt_not_found { + let lp = self + .clone() + .get_plan_builder() + .add_err(PolarsError::SchemaFieldNotFound(name.to_string().into())) + .build(); + Self::from_logical_plan(lp, self.opt_state) + } else { + self.map_private(FunctionNode::Rename { + existing: existing_vec.into(), + new: new_vec.into(), + swapping, + }) + } } /// Removes columns from the DataFrame. diff --git a/polars/polars-lazy/src/tests/mod.rs b/polars/polars-lazy/src/tests/mod.rs index d2862a82ddb09..053002c597a9f 100644 --- a/polars/polars-lazy/src/tests/mod.rs +++ b/polars/polars-lazy/src/tests/mod.rs @@ -150,3 +150,21 @@ pub(crate) fn get_df() -> DataFrame { .unwrap(); df } + +#[test] +fn test_foo() -> PolarsResult<()> { + let df: DataFrame = df!["a" => [1], + "b" => [2], + "c" => [2] + ]?; + + let out = df + .lazy() + .rename(["a", "b"], ["a", "b"]) + .select([col("a"), col("b")]) + .collect()?; + + dbg!(out); + + Ok(()) +} diff --git a/py-polars/tests/unit/test_schema.py b/py-polars/tests/unit/test_schema.py index 671da7b33d9ce..800b13f2294e3 100644 --- a/py-polars/tests/unit/test_schema.py +++ b/py-polars/tests/unit/test_schema.py @@ -313,3 +313,23 @@ def test_schema_true_divide_6643() -> None: df = pl.DataFrame({"a": [1]}) a = pl.col("a") assert df.lazy().select(a / 2).select(pl.col(pl.Int64)).collect().shape == (0, 0) + + +def test_rename_schema_order_6660() -> None: + df = pl.DataFrame( + { + "a": [], + "b": [], + "c": [], + "d": [], + } + ) + + mapper = {"a": "1", "b": "2", "c": "3", "d": "4"} + + renamed = df.lazy().rename(mapper) + + computed = renamed.select([pl.all(), pl.col("4").alias("computed")]) + + assert renamed.schema == renamed.collect().schema + assert computed.schema == computed.collect().schema