Skip to content

Commit

Permalink
fix(rust, python): dedicated rename implementation. (pola-rs#6688)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and vincent committed Feb 9, 2023
1 parent 2b068d3 commit 685a130
Show file tree
Hide file tree
Showing 18 changed files with 507 additions and 174 deletions.
87 changes: 83 additions & 4 deletions polars/polars-lazy/polars-plan/src/logical_plan/aexpr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_utils::arena::{Arena, Node};

use crate::dsl::function_expr::FunctionExpr;
use crate::logical_plan::Context;
use crate::prelude::aexpr::NodeInputs::Single;
use crate::prelude::names::COUNT;
use crate::prelude::*;

Expand Down Expand Up @@ -173,12 +174,90 @@ impl AExpr {
}
}

pub(crate) fn get_input(&self) -> Node {
pub(crate) fn get_input(&self) -> NodeInputs {
use AExpr::*;
use NodeInputs::*;
match self {
Alias(input, _) => *input,
Cast { expr, .. } => *expr,
_ => todo!(),
Alias(input, _) => Single(*input),
Cast { expr, .. } => Single(*expr),
Explode(input) => Single(*input),
Column(_) => Leaf,
Literal(_) => Leaf,
BinaryExpr { left, right, .. } => Many(vec![*left, *right]),
Sort { expr, .. } => Single(*expr),
Take { expr, .. } => Single(*expr),
SortBy { expr, by, .. } => {
let mut many = by.clone();
many.push(*expr);
Many(many)
}
Filter { input, .. } => Single(*input),
Agg(a) => a.get_input(),
Ternary {
truthy,
falsy,
predicate,
} => Many(vec![*truthy, *falsy, *predicate]),
AnonymousFunction { input, .. } | Function { input, .. } => match input.len() {
1 => Single(input[0]),
_ => Many(input.clone()),
},
Window {
function,
order_by,
partition_by,
..
} => {
let mut out = Vec::with_capacity(partition_by.len() + 2);
out.push(*function);
if let Some(a) = order_by {
out.push(*a);
}
out.extend(partition_by);
Many(out)
}
Wildcard => panic!("no wildcard expected"),
Slice { input, .. } => Single(*input),
Count => Leaf,
Nth(_) => Leaf,
}
}
}

impl AAggExpr {
pub(crate) fn get_input(&self) -> NodeInputs {
use AAggExpr::*;
match self {
Min { input, .. } => Single(*input),
Max { input, .. } => Single(*input),
Median(input) => Single(*input),
NUnique(input) => Single(*input),
First(input) => Single(*input),
Last(input) => Single(*input),
Mean(input) => Single(*input),
List(input) => Single(*input),
Quantile { expr, .. } => Single(*expr),
Sum(input) => Single(*input),
Count(input) => Single(*input),
Std(input, _) => Single(*input),
Var(input, _) => Single(*input),
AggGroups(input) => Single(*input),
}
}
}

pub(crate) enum NodeInputs {
Leaf,
Single(Node),
Many(Vec<Node>),
}

impl NodeInputs {
pub(crate) fn first(&self) -> Node {
match self {
Single(node) => *node,
NodeInputs::Many(nodes) => nodes[0],
NodeInputs::Leaf => panic!(),
}
}
}
1 change: 1 addition & 0 deletions polars/polars-lazy/polars-plan/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl ALogicalPlan {
Melt { schema, .. } => schema,
MapFunction { input, function } => {
let input_schema = arena.get(*input).schema(arena);

return match input_schema {
Cow::Owned(schema) => {
Cow::Owned(function.schema(&schema).unwrap().into_owned())
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-lazy/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ impl LogicalPlanBuilder {
.into()
}

pub fn add_err(self, err: PolarsError) -> Self {
LogicalPlan::Error {
input: Box::new(self.0),
err: err.into(),
}
.into()
}

pub fn with_context(self, contexts: Vec<LogicalPlan>) -> Self {
let mut schema = try_delayed!(self.0.schema(), &self.0, into)
.as_ref()
Expand Down
32 changes: 29 additions & 3 deletions polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(feature = "merge_sorted")]
mod merge_sorted;
mod rename;

use std::borrow::Cow;
use std::fmt::{Debug, Display, Formatter};
Expand Down Expand Up @@ -56,6 +57,12 @@ pub enum FunctionNode {
// sorted column that serves as the key
column: Arc<str>,
},
Rename {
existing: Arc<Vec<String>>,
new: Arc<Vec<String>>,
// A column name gets swapped with an existing column
swapping: bool,
},
}

impl PartialEq for FunctionNode {
Expand All @@ -65,6 +72,18 @@ impl PartialEq for FunctionNode {
(FastProjection { columns: l }, FastProjection { columns: r }) => l == r,
(DropNulls { subset: l }, DropNulls { subset: r }) => l == r,
(Rechunk, Rechunk) => true,
(
Rename {
existing: existing_l,
new: new_l,
..
},
Rename {
existing: existing_r,
new: new_r,
..
},
) => existing_l == existing_r && new_l == new_r,
_ => false,
}
}
Expand All @@ -78,7 +97,7 @@ impl FunctionNode {
Rechunk | Pipeline { .. } => false,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => false,
DropNulls { .. } | FastProjection { .. } | Unnest { .. } => true,
DropNulls { .. } | FastProjection { .. } | Unnest { .. } | Rename { .. } => true,
Opaque { streamable, .. } => *streamable,
}
}
Expand Down Expand Up @@ -141,14 +160,17 @@ impl FunctionNode {
}
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)),
Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new),
}
}

pub(crate) fn allow_predicate_pd(&self) -> bool {
use FunctionNode::*;
match self {
Opaque { predicate_pd, .. } => *predicate_pd,
FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } => true,
FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } | Rename { .. } => {
true
}
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => true,
Pipeline { .. } => unimplemented!(),
Expand All @@ -159,7 +181,9 @@ impl FunctionNode {
use FunctionNode::*;
match self {
Opaque { projection_pd, .. } => *projection_pd,
FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } => true,
FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } | Rename { .. } => {
true
}
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => true,
Pipeline { .. } => unimplemented!(),
Expand Down Expand Up @@ -209,6 +233,7 @@ impl FunctionNode {
Arc::get_mut(function).unwrap().call_udf(df)
}
}
Rename { existing, new, .. } => rename::rename_impl(df, existing, new),
}
}
}
Expand Down Expand Up @@ -252,6 +277,7 @@ impl Display for FunctionNode {
writeln!(f, "PIPELINE")
}
}
Rename { .. } => write!(f, "RENAME"),
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use super::*;

pub(super) fn rename_impl(
mut df: DataFrame,
existing: &[String],
new: &[String],
) -> PolarsResult<DataFrame> {
let positions = existing
.iter()
.map(|old| df.find_idx_by_name(old))
.collect::<Vec<_>>();

for (pos, name) in positions.iter().zip(new.iter()) {
// the column might be removed due to projection pushdown
// so we only update if we can find it.
if let Some(pos) = pos {
df.get_columns_mut()[*pos].rename(name);
}
}
// recreate dataframe so we check duplicates
let columns = std::mem::take(df.get_columns_mut());
DataFrame::new(columns)
}

pub(super) fn rename_schema<'a>(
input_schema: &'a SchemaRef,
existing: &[String],
new: &[String],
) -> PolarsResult<Cow<'a, SchemaRef>> {
let mut new_schema = (**input_schema).clone();
for (old, new) in existing.iter().zip(new.iter()) {
// the column might be removed due to projection pushdown
// so we only update if we can find it.
if let Some(dtype) = input_schema.get(old) {
if new_schema.with_column(new.clone(), dtype.clone()).is_none() {
new_schema.remove(old);
}
}
}
Ok(Cow::Owned(Arc::new(new_schema)))
}
6 changes: 3 additions & 3 deletions polars/polars-lazy/polars-plan/src/logical_plan/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ macro_rules! push_expr {
impl Expr {
/// Expr::mutate().apply(fn())
pub fn mutate(&mut self) -> ExprMut {
let mut stack = Vec::with_capacity(8);
let mut stack = Vec::with_capacity(4);
stack.push(self);
ExprMut { stack }
}
Expand Down Expand Up @@ -151,7 +151,7 @@ impl<'a> IntoIterator for &'a Expr {
type IntoIter = ExprIter<'a>;

fn into_iter(self) -> Self::IntoIter {
let mut stack = Vec::with_capacity(8);
let mut stack = Vec::with_capacity(4);
stack.push(self);
ExprIter { stack }
}
Expand Down Expand Up @@ -282,7 +282,7 @@ pub trait ArenaExprIter<'a> {

impl<'a> ArenaExprIter<'a> for &'a Arena<AExpr> {
fn iter(&self, root: Node) -> AExprIter<'a> {
let mut stack = Vec::with_capacity(8);
let mut stack = Vec::with_capacity(4);
stack.push(root);
AExprIter {
stack,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! Keys in the `acc_predicates` hashmap.
use super::*;

// an invisible ascii token we use as delimiter
const HIDDEN_DELIMITER: char = '\u{1D17A}';

/// Determine the hashmap key by combining all the leaf column names of a predicate
pub(super) fn predicate_to_key(predicate: Node, expr_arena: &Arena<AExpr>) -> Arc<str> {
let mut iter = aexpr_to_leaf_names_iter(predicate, expr_arena);
if let Some(first) = iter.next() {
if let Some(second) = iter.next() {
let mut new = String::with_capacity(32 * iter.size_hint().0);
new.push_str(&first);
new.push(HIDDEN_DELIMITER);
new.push_str(&second);

for name in iter {
new.push(HIDDEN_DELIMITER);
new.push_str(&name);
}
return Arc::from(new);
}
first
} else {
let mut s = String::new();
s.push(HIDDEN_DELIMITER);
Arc::from(s)
}
}

pub(super) fn key_has_name(key: &str, name: &str) -> bool {
if key.contains(HIDDEN_DELIMITER) {
for root_name in key.split(HIDDEN_DELIMITER) {
if root_name == name {
return true;
}
}
}
key == name
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod keys;
mod rename;
mod utils;

use polars_core::datatypes::PlHashMap;
Expand All @@ -7,6 +9,7 @@ use utils::*;
use super::*;
use crate::dsl::function_expr::FunctionExpr;
use crate::logical_plan::{optimizer, Context};
use crate::prelude::optimizer::predicate_pushdown::rename::process_rename;
use crate::utils::{aexprs_to_schema, check_input_node, has_aexpr};

#[derive(Default)]
Expand Down Expand Up @@ -516,7 +519,25 @@ impl PredicatePushDown {
MapFunction { ref function, .. } => {
if function.allow_predicate_pd()
{
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
match function {
FunctionNode::Rename {
existing,
new,
..
} => {
let local_predicates = process_rename(&mut acc_predicates,
expr_arena,
existing,
new,
)?;
let lp = self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)?;
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}, _ => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
}
}


} else {
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}
Expand Down
Loading

0 comments on commit 685a130

Please sign in to comment.