From 432b6488633f83f898a7eb383a6fb93f29d25a30 Mon Sep 17 00:00:00 2001 From: ritchie Date: Wed, 17 Apr 2024 17:56:25 +0200 Subject: [PATCH] wrap up --- crates/polars-plan/src/dot.rs | 9 --- .../src/logical_plan/builder_dsl.rs | 22 ------ .../conversion/dsl_plan_to_ir_plan.rs | 70 +++++++++++------- crates/polars-plan/src/logical_plan/format.rs | 1 - .../src/logical_plan/functions/dsl.rs | 10 +-- crates/polars-plan/src/logical_plan/mod.rs | 71 +------------------ .../src/logical_plan/tree_format.rs | 1 - py-polars/src/lazyframe/mod.rs | 9 +-- py-polars/tests/unit/datatypes/test_float.py | 2 +- py-polars/tests/unit/test_errors.py | 10 --- 10 files changed, 53 insertions(+), 152 deletions(-) diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs index b46de4bfd4630..e8be059bf9056 100644 --- a/crates/polars-plan/src/dot.rs +++ b/crates/polars-plan/src/dot.rs @@ -407,15 +407,6 @@ impl DslPlan { self.write_dot(acc_str, prev_node, current_node, id_map)?; input.dot(acc_str, (branch, id + 1), current_node, id_map) }, - Error { err, .. } => { - let fmt = format!("{:?}", &err.0); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map) - }, } } diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index 623e306e3c6db..712a7dfab8fb2 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -451,14 +451,6 @@ impl DslBuilder { .into() } - pub fn add_err(self, err: PolarsError) -> Self { - DslPlan::Error { - input: Arc::new(self.0), - err: err.into(), - } - .into() - } - pub fn with_context(self, contexts: Vec) -> Self { DslPlan::ExtContext { input: Arc::new(self.0), @@ -582,20 +574,6 @@ impl DslBuilder { right_on: Vec, options: Arc, ) -> Self { - for e in left_on.iter().chain(right_on.iter()) { - if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) { - return DslPlan::Error { - input: Arc::new(self.0), - err: polars_err!( - ComputeError: - "'alias' is not allowed in a join key, use 'with_columns' first", - ) - .into(), - } - .into(); - } - } - DslPlan::Join { input_left: Arc::new(self.0), input_right: Arc::new(other), diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs index 54d950feb9cb4..f7d1dc270cc91 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs @@ -13,6 +13,16 @@ fn expand_expressions( Ok(to_expr_irs(exprs, expr_arena)) } +fn empty_df() -> IR { + IR::DataFrameScan { + df: Arc::new(Default::default()), + schema: Arc::new(Default::default()), + output_schema: None, + projection: None, + selection: None, + } +} + /// converts LogicalPlan to IR /// it adds expressions & lps to the respective arenas as it traverses the plan /// finally it returns the top node of the logical plan @@ -33,7 +43,7 @@ pub fn to_alp( } => { if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); - schema + *schema = schema .new_inserting_at_index(0, row_index.name.as_str().into(), IDX_DTYPE) .unwrap(); } @@ -107,8 +117,7 @@ pub fn to_alp( let (exprs, schema) = prepare_projection(expr, &schema)?; if exprs.is_empty() { - // Ensure that input will be the `Default` impl which is an empty DF. - let _ = lp_arena.take(input); + lp_arena.replace(input, empty_df()); } let schema = Arc::new(schema); @@ -184,6 +193,15 @@ pub fn to_alp( right_on, options, } => { + for e in left_on.iter().chain(right_on.iter()) { + if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) { + polars_bail!( + ComputeError: + "'alias' is not allowed in a join key, use 'with_columns' first", + ) + } + } + let input_left = to_alp(owned(input_left), expr_arena, lp_arena)?; let input_right = to_alp(owned(input_right), expr_arena, lp_arena)?; @@ -225,11 +243,11 @@ pub fn to_alp( }, DslPlan::MapFunction { input, function } => { let input = to_alp(owned(input), expr_arena, lp_arena)?; - let schema = lp_arena.get(input).schema(lp_arena); + let input_schema = lp_arena.get(input).schema(lp_arena); match function { DslFunction::FillNan(fill_value) => { - let exprs = schema + let exprs = input_schema .iter() .filter_map(|(name, dtype)| match dtype { DataType::Float32 | DataType::Float64 => { @@ -260,22 +278,24 @@ pub fn to_alp( .collect::>(), ), }?; + let predicate = rewrite_projections(vec![predicate], &input_schema, &[])? + .pop() + .unwrap(); let predicate = to_expr_ir(predicate, expr_arena); IR::Filter { predicate, input } }, DslFunction::Drop(to_drop) => { let mut output_schema = - Schema::with_capacity(schema.len().saturating_sub(to_drop.len())); + Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len())); - for (col_name, dtype) in schema.iter() { + for (col_name, dtype) in input_schema.iter() { if !to_drop.contains(col_name.as_str()) { output_schema.with_column(col_name.clone(), dtype.clone()); } } if output_schema.is_empty() { - // Ensure that input will be the `Default` impl which is an empty DF. - let _ = lp_arena.take(input); + lp_arena.replace(input, empty_df()); } IR::SimpleProjection { @@ -289,17 +309,17 @@ pub fn to_alp( StatsFunction::Var { ddof } => stats_helper( |dt| dt.is_numeric() || dt.is_bool(), |name| col(name).var(ddof), - &schema, + &input_schema, ), StatsFunction::Std { ddof } => stats_helper( |dt| dt.is_numeric() || dt.is_bool(), |name| col(name).std(ddof), - &schema, + &input_schema, ), StatsFunction::Quantile { quantile, interpol } => stats_helper( |dt| dt.is_numeric(), |name| col(name).quantile(quantile.clone(), interpol), - &schema, + &input_schema, ), StatsFunction::Mean => stats_helper( |dt| { @@ -313,7 +333,7 @@ pub fn to_alp( ) }, |name| col(name).mean(), - &schema, + &input_schema, ), StatsFunction::Sum => stats_helper( |dt| { @@ -322,13 +342,13 @@ pub fn to_alp( || matches!(dt, DataType::Boolean | DataType::Duration(_)) }, |name| col(name).sum(), - &schema, + &input_schema, ), StatsFunction::Min => { - stats_helper(|dt| dt.is_ord(), |name| col(name).min(), &schema) + stats_helper(|dt| dt.is_ord(), |name| col(name).min(), &input_schema) }, StatsFunction::Max => { - stats_helper(|dt| dt.is_ord(), |name| col(name).max(), &schema) + stats_helper(|dt| dt.is_ord(), |name| col(name).max(), &input_schema) }, StatsFunction::Median => stats_helper( |dt| { @@ -342,11 +362,14 @@ pub fn to_alp( ) }, |name| col(name).median(), - &schema, + &input_schema, ), }; - let schema = - Arc::new(expressions_to_schema(&exprs, &schema, Context::Default)?); + let schema = Arc::new(expressions_to_schema( + &exprs, + &input_schema, + Context::Default, + )?); let eirs = to_expr_irs(exprs, expr_arena); let expr = eirs.into(); IR::Select { @@ -360,16 +383,11 @@ pub fn to_alp( } }, _ => { - let function = function.into_function_node(&schema)?; + let function = function.into_function_node(&input_schema)?; IR::MapFunction { input, function } }, } }, - DslPlan::Error { err, .. } => { - // We just take the error. The LogicalPlan should not be used anymore once this - // is taken. - return Err(err.take()); - }, DslPlan::ExtContext { input, contexts } => { let input = to_alp(owned(input), expr_arena, lp_arena)?; let contexts = contexts @@ -502,7 +520,7 @@ fn resolve_group_by( let current_schema = lp_arena.get(input).schema(lp_arena); let current_schema = current_schema.as_ref(); let keys = rewrite_projections(keys, current_schema, &[])?; - let aggs = rewrite_projections(aggs, current_schema, &[])?; + let aggs = rewrite_projections(aggs, current_schema, &keys)?; // Initialize schema from keys let mut schema = expressions_to_schema(&keys, current_schema, Context::Default)?; diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index f63609661de47..7f343f4714580 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -228,7 +228,6 @@ impl DslPlan { write!(f, "{:indent$}{function_fmt}", "")?; input._format(f, sub_indent) }, - Error { err, .. } => write!(f, "{err:?}"), ExtContext { input, .. } => { write!(f, "{:indent$}EXTERNAL_CONTEXT", "")?; input._format(f, sub_indent) diff --git a/crates/polars-plan/src/logical_plan/functions/dsl.rs b/crates/polars-plan/src/logical_plan/functions/dsl.rs index 43293ae37e809..737fdeef3a857 100644 --- a/crates/polars-plan/src/logical_plan/functions/dsl.rs +++ b/crates/polars-plan/src/logical_plan/functions/dsl.rs @@ -48,10 +48,10 @@ pub enum StatsFunction { } impl DslFunction { - pub(crate) fn into_function_node(self, schema: &Schema) -> PolarsResult { + pub(crate) fn into_function_node(self, input_schema: &Schema) -> PolarsResult { let function = match self { DslFunction::Explode { columns } => { - let columns = rewrite_projections(columns, schema, &[])?; + let columns = rewrite_projections(columns, input_schema, &[])?; // columns to string let columns = columns .iter() @@ -79,11 +79,11 @@ impl DslFunction { schema: Default::default(), }, DslFunction::Rename { existing, new } => { - let swapping = new.iter().any(|name| schema.get(name).is_some()); + let swapping = new.iter().any(|name| input_schema.get(name).is_some()); // Check if the name exists. - for name in new.iter() { - let _ = schema.try_get(name)?; + for name in existing.iter() { + let _ = input_schema.try_get(name)?; } FunctionNode::Rename { diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index ea2800ec8ff2f..84ce1b812e4fc 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use polars_core::prelude::*; use recursive::recursive; @@ -66,68 +66,6 @@ pub enum Context { Default, } -#[derive(Debug)] -pub(crate) struct ErrorStateUnsync { - n_times: usize, - err: PolarsError, -} - -#[derive(Clone)] -pub struct ErrorState(pub(crate) Arc>); - -impl std::fmt::Debug for ErrorState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let this = self.0.lock().unwrap(); - // Skip over the Arc> and just print the fields we care - // about. Technically this is misleading, but the insides of ErrorState are not - // public, so this only affects authors of polars, not users (and the odds that - // this affects authors is slim) - f.debug_struct("ErrorState") - .field("n_times", &this.n_times) - .field("err", &this.err) - .finish() - } -} - -impl From for ErrorState { - fn from(err: PolarsError) -> Self { - Self(Arc::new(Mutex::new(ErrorStateUnsync { n_times: 0, err }))) - } -} - -impl ErrorState { - fn take(&self) -> PolarsError { - let mut this = self.0.lock().unwrap(); - - let ret_err = if this.n_times == 0 { - this.err.wrap_msg(&|msg| msg.to_owned()) - } else { - this.err.wrap_msg(&|msg| { - let n_times = this.n_times; - - let plural_s; - let was_were; - - if n_times == 1 { - plural_s = ""; - was_were = "was" - } else { - plural_s = "s"; - was_were = "were"; - }; - format!( - "{msg}\n\nLogicalPlan had already failed with the above error; \ - after failure, {n_times} additional operation{plural_s} \ - {was_were} attempted on the LazyFrame", - ) - }) - }; - this.n_times += 1; - - ret_err - } -} - // https://stackoverflow.com/questions/1031076/what-are-projection-and-selection #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum DslPlan { @@ -224,12 +162,6 @@ pub enum DslPlan { schema: SchemaRef, options: HConcatOptions, }, - /// Catches errors and throws them later - #[cfg_attr(feature = "serde", serde(skip))] - Error { - input: Arc, - err: ErrorState, - }, /// This allows expressions to access other tables ExtContext { input: Arc, @@ -265,7 +197,6 @@ impl Clone for DslPlan { Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() }, Self::Union { inputs, options } => Self::Union { inputs: inputs.clone(), options: options.clone() }, Self::HConcat { inputs, schema, options } => Self::HConcat { inputs: inputs.clone(), schema: schema.clone(), options: options.clone() }, - Self::Error { input, err } => Self::Error { input: input.clone(), err: err.clone() }, Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() }, Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, } diff --git a/crates/polars-plan/src/logical_plan/tree_format.rs b/crates/polars-plan/src/logical_plan/tree_format.rs index 4d7cc456aab60..353c6481b2201 100644 --- a/crates/polars-plan/src/logical_plan/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/tree_format.rs @@ -292,7 +292,6 @@ impl<'a> TreeFmtNode<'a> { NL(h, MapFunction { input, function }) => { ND(wh(h, &format!("{function}")), vec![NL(None, input)]) }, - NL(h, Error { input, err }) => ND(wh(h, &format!("{err:?}")), vec![NL(None, input)]), NL(h, ExtContext { input, .. }) => ND(wh(h, "EXTERNAL_CONTEXT"), vec![NL(None, input)]), NL(h, Sink { input, payload }) => ND( wh( diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index b2d7a51c1f6c1..e01882bc26576 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -980,14 +980,9 @@ impl PyLazyFrame { out.into() } - fn quantile( - &self, - quantile: PyExpr, - interpolation: Wrap, - ) -> Self { + fn quantile(&self, quantile: PyExpr, interpolation: Wrap) -> Self { let ldf = self.ldf.clone(); - let out = ldf - .quantile(quantile.inner, interpolation.0); + let out = ldf.quantile(quantile.inner, interpolation.0); out.into() } diff --git a/py-polars/tests/unit/datatypes/test_float.py b/py-polars/tests/unit/datatypes/test_float.py index 5a318fd0015cc..eac6c70f92adc 100644 --- a/py-polars/tests/unit/datatypes/test_float.py +++ b/py-polars/tests/unit/datatypes/test_float.py @@ -142,7 +142,7 @@ def test_hash() -> None: assert s.item(2) == s.item(3) # hash(float('-nan')) == hash(float('nan')) -def test_group_by() -> None: +def test_group_by_float() -> None: # Test num_groups_proxy # * -0.0 and 0.0 in same groups # * -nan and nan in same groups diff --git a/py-polars/tests/unit/test_errors.py b/py-polars/tests/unit/test_errors.py index 43b2f6e646ea9..721cef2ff6f63 100644 --- a/py-polars/tests/unit/test_errors.py +++ b/py-polars/tests/unit/test_errors.py @@ -692,16 +692,6 @@ def test_error_list_to_array() -> None: ).with_columns(array=pl.col("a").list.to_array(2)) -# https://github.com/pola-rs/polars/issues/8079 -def test_error_lazyframe_not_repeating() -> None: - lf = pl.LazyFrame({"a": 1, "b": range(2)}) - with pytest.raises(pl.ColumnNotFoundError) as exc_info: - lf.select("c").select("d").select("e").collect() - - match = "Error originated just after this operation:" - assert str(exc_info).count(match) == 1 - - def test_raise_not_found_in_simplify_14974() -> None: df = pl.DataFrame() with pytest.raises(pl.ColumnNotFoundError):