From 671665133ed48e07ed3efb888a518311a97d038f Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 15 May 2024 10:22:06 +0200 Subject: [PATCH] feat: Show warning if expressions are very deep (#16233) --- crates/polars-lazy/src/dsl/eval.rs | 4 +- crates/polars-lazy/src/frame/mod.rs | 6 +- .../polars-lazy/src/physical_plan/exotic.rs | 10 +- .../src/physical_plan/planner/expr.rs | 149 ++++++++++++------ .../src/physical_plan/planner/lp.rs | 87 ++++++---- .../streaming/construct_pipeline.rs | 2 +- py-polars/polars/config.py | 15 ++ 7 files changed, 189 insertions(+), 84 deletions(-) diff --git a/crates/polars-lazy/src/dsl/eval.rs b/crates/polars-lazy/src/dsl/eval.rs index a1caad403f6f..550de2e72ed9 100644 --- a/crates/polars-lazy/src/dsl/eval.rs +++ b/crates/polars-lazy/src/dsl/eval.rs @@ -2,7 +2,7 @@ use polars_core::prelude::*; use rayon::prelude::*; use super::*; -use crate::physical_plan::planner::create_physical_expr; +use crate::physical_plan::planner::{create_physical_expr, ExpressionConversionState}; use crate::physical_plan::state::ExecutionState; use crate::prelude::*; @@ -61,7 +61,7 @@ pub trait ExprEvalExtension: IntoExpr + Sized { Context::Default, &arena, None, - &mut Default::default(), + &mut ExpressionConversionState::new(true, 0), )?; let state = ExecutionState::new(); diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index df3ad968f384..ab4eca5f8fee 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -37,7 +37,9 @@ use polars_plan::global::FETCH_ROWS; use smartstring::alias::String as SmartString; use crate::physical_plan::executors::Executor; -use crate::physical_plan::planner::{create_physical_expr, create_physical_plan}; +use crate::physical_plan::planner::{ + create_physical_expr, create_physical_plan, ExpressionConversionState, +}; use crate::physical_plan::state::ExecutionState; #[cfg(feature = "streaming")] use crate::physical_plan::streaming::insert_streaming_nodes; @@ -571,7 +573,7 @@ impl LazyFrame { Context::Default, expr_arena, None, - &mut Default::default(), + &mut ExpressionConversionState::new(true, 0), ) .ok()?; let io_expr = phys_expr_to_io_expr(phys_expr); diff --git a/crates/polars-lazy/src/physical_plan/exotic.rs b/crates/polars-lazy/src/physical_plan/exotic.rs index 9beded83827f..99976961d2cf 100644 --- a/crates/polars-lazy/src/physical_plan/exotic.rs +++ b/crates/polars-lazy/src/physical_plan/exotic.rs @@ -1,6 +1,6 @@ use polars_core::prelude::*; -use crate::physical_plan::planner::create_physical_expr; +use crate::physical_plan::planner::{create_physical_expr, ExpressionConversionState}; use crate::prelude::*; #[cfg(feature = "pivot")] @@ -34,5 +34,11 @@ pub(crate) fn prepare_expression_for_context( let lp = lp_arena.get(optimized); let aexpr = lp.get_exprs().pop().unwrap(); - create_physical_expr(&aexpr, ctxt, &expr_arena, None, &mut Default::default()) + create_physical_expr( + &aexpr, + ctxt, + &expr_arena, + None, + &mut ExpressionConversionState::new(true, 0), + ) } diff --git a/crates/polars-lazy/src/physical_plan/planner/expr.rs b/crates/polars-lazy/src/physical_plan/planner/expr.rs index 3aaeadfa6546..d157bae926d1 100644 --- a/crates/polars-lazy/src/physical_plan/planner/expr.rs +++ b/crates/polars-lazy/src/physical_plan/planner/expr.rs @@ -8,6 +8,18 @@ use rayon::prelude::*; use super::super::expressions as phys_expr; use crate::prelude::*; +pub(super) fn get_expr_depth_limit() -> PolarsResult { + let depth = if let Ok(d) = std::env::var("POLARS_MAX_EXPR_DEPTH") { + let v = d + .parse::() + .map_err(|_| polars_err!(ComputeError: "could not parse 'max_expr_depth': {}", d))?; + u16::try_from(v).unwrap_or(0) + } else { + 512 + }; + Ok(depth) +} + fn ok_checker(_state: &ExpressionConversionState) -> PolarsResult<()> { Ok(()) } @@ -78,7 +90,7 @@ where .collect() } -#[derive(Copy, Clone, Default)] +#[derive(Copy, Clone)] pub(crate) struct ExpressionConversionState { // settings per context // they remain activate between @@ -89,24 +101,48 @@ pub(crate) struct ExpressionConversionState { // settings per expression // those are reset every expression local: LocalConversionState, + depth_limit: u16, } -#[derive(Copy, Clone, Default)] +#[derive(Copy, Clone)] struct LocalConversionState { has_implode: bool, has_window: bool, has_lit: bool, + // Max depth an expression may have. + // 0 is unlimited. + depth_limit: u16, +} + +impl Default for LocalConversionState { + fn default() -> Self { + Self { + has_lit: false, + has_implode: false, + has_window: false, + depth_limit: 500, + } + } } impl ExpressionConversionState { - pub(crate) fn new(allow_threading: bool) -> Self { + pub(crate) fn new(allow_threading: bool, depth_limit: u16) -> Self { Self { + depth_limit, + has_cache: false, allow_threading, - ..Default::default() + has_windows: false, + local: LocalConversionState { + depth_limit, + ..Default::default() + }, } } fn reset(&mut self) { - self.local = Default::default() + self.local = LocalConversionState { + depth_limit: self.depth_limit, + ..Default::default() + } } fn has_implode(&self) -> bool { @@ -117,6 +153,17 @@ impl ExpressionConversionState { self.has_windows = true; self.local.has_window = true; } + + fn check_depth(&mut self) { + if self.local.depth_limit > 0 { + self.local.depth_limit -= 1; + + if self.local.depth_limit == 0 { + let depth = get_expr_depth_limit().unwrap(); + polars_warn!(format!("encountered expression deeper than {depth} elements; this may overflow the stack, consider refactoring")) + } + } + } } pub(crate) fn create_physical_expr( @@ -148,7 +195,9 @@ fn create_physical_expr_inner( ) -> PolarsResult> { use AExpr::*; - match expr_arena.get(expression).clone() { + state.check_depth(); + + match expr_arena.get(expression) { Len => Ok(Arc::new(phys_expr::CountExpr::new())), Window { mut function, @@ -178,7 +227,7 @@ fn create_physical_expr_inner( WindowType::Over(mapping) => { // TODO! Order by let group_by = create_physical_expressions_from_nodes( - &partition_by, + partition_by, Context::Default, expr_arena, schema, @@ -210,7 +259,7 @@ fn create_physical_expr_inner( out_name, function: function_expr, phys_function, - mapping, + mapping: *mapping, expr, })) }, @@ -219,7 +268,7 @@ fn create_physical_expr_inner( function: function_expr, phys_function, out_name, - options, + options: options.clone(), expr, })), } @@ -227,31 +276,31 @@ fn create_physical_expr_inner( Literal(value) => { state.local.has_lit = true; Ok(Arc::new(LiteralExpr::new( - value, + value.clone(), node_to_expr(expression, expr_arena), ))) }, BinaryExpr { left, op, right } => { - let lhs = create_physical_expr_inner(left, ctxt, expr_arena, schema, state)?; - let rhs = create_physical_expr_inner(right, ctxt, expr_arena, schema, state)?; + let lhs = create_physical_expr_inner(*left, ctxt, expr_arena, schema, state)?; + let rhs = create_physical_expr_inner(*right, ctxt, expr_arena, schema, state)?; Ok(Arc::new(phys_expr::BinaryExpr::new( lhs, - op, + *op, rhs, node_to_expr(expression, expr_arena), state.local.has_lit, ))) }, Column(column) => Ok(Arc::new(ColumnExpr::new( - column, + column.clone(), node_to_expr(expression, expr_arena), schema.cloned(), ))), Sort { expr, options } => { - let phys_expr = create_physical_expr_inner(expr, ctxt, expr_arena, schema, state)?; + let phys_expr = create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; Ok(Arc::new(SortExpr::new( phys_expr, - options, + *options, node_to_expr(expression, expr_arena), ))) }, @@ -260,13 +309,13 @@ fn create_physical_expr_inner( idx, returns_scalar, } => { - let phys_expr = create_physical_expr_inner(expr, ctxt, expr_arena, schema, state)?; - let phys_idx = create_physical_expr_inner(idx, ctxt, expr_arena, schema, state)?; + let phys_expr = create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; + let phys_idx = create_physical_expr_inner(*idx, ctxt, expr_arena, schema, state)?; Ok(Arc::new(GatherExpr { phys_expr, idx: phys_idx, expr: node_to_expr(expression, expr_arena), - returns_scalar, + returns_scalar: *returns_scalar, })) }, SortBy { @@ -275,19 +324,19 @@ fn create_physical_expr_inner( sort_options, } => { polars_ensure!(!by.is_empty(), InvalidOperation: "'sort_by' got an empty set"); - let phys_expr = create_physical_expr_inner(expr, ctxt, expr_arena, schema, state)?; + let phys_expr = create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; let phys_by = - create_physical_expressions_from_nodes(&by, ctxt, expr_arena, schema, state)?; + create_physical_expressions_from_nodes(by, ctxt, expr_arena, schema, state)?; Ok(Arc::new(SortByExpr::new( phys_expr, phys_by, node_to_expr(expression, expr_arena), - sort_options, + sort_options.clone(), ))) }, Filter { input, by } => { - let phys_input = create_physical_expr_inner(input, ctxt, expr_arena, schema, state)?; - let phys_by = create_physical_expr_inner(by, ctxt, expr_arena, schema, state)?; + let phys_input = create_physical_expr_inner(*input, ctxt, expr_arena, schema, state)?; + let phys_by = create_physical_expr_inner(*by, ctxt, expr_arena, schema, state)?; Ok(Arc::new(FilterExpr::new( phys_input, phys_by, @@ -306,6 +355,7 @@ fn create_physical_expr_inner( Context::Default if !matches!(agg, AAggExpr::Quantile { .. }) => { let function = match agg { AAggExpr::Min { propagate_nans, .. } => { + let propagate_nans = *propagate_nans; let state = *state; SpecialEq::new(Arc::new(move |s: &mut [Series]| { let s = std::mem::take(&mut s[0]); @@ -339,6 +389,7 @@ fn create_physical_expr_inner( }) as Arc) }, AAggExpr::Max { propagate_nans, .. } => { + let propagate_nans = *propagate_nans; let state = *state; SpecialEq::new(Arc::new(move |s: &mut [Series]| { let s = std::mem::take(&mut s[0]); @@ -429,6 +480,7 @@ fn create_physical_expr_inner( }) as Arc) }, AAggExpr::Count(_, include_nulls) => { + let include_nulls = *include_nulls; SpecialEq::new(Arc::new(move |s: &mut [Series]| { let s = std::mem::take(&mut s[0]); let count = s.len() - s.null_count() * !include_nulls as usize; @@ -438,12 +490,14 @@ fn create_physical_expr_inner( }) as Arc) }, AAggExpr::Std(_, ddof) => { + let ddof = *ddof; SpecialEq::new(Arc::new(move |s: &mut [Series]| { let s = std::mem::take(&mut s[0]); s.std_as_series(ddof).map(Some) }) as Arc) }, AAggExpr::Var(_, ddof) => { + let ddof = *ddof; SpecialEq::new(Arc::new(move |s: &mut [Series]| { let s = std::mem::take(&mut s[0]); s.var_as_series(ddof).map(Some) @@ -468,10 +522,10 @@ fn create_physical_expr_inner( } = agg { let input = - create_physical_expr_inner(expr, ctxt, expr_arena, schema, state)?; + create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; let quantile = - create_physical_expr_inner(quantile, ctxt, expr_arena, schema, state)?; - return Ok(Arc::new(AggQuantileExpr::new(input, quantile, interpol))); + create_physical_expr_inner(*quantile, ctxt, expr_arena, schema, state)?; + return Ok(Arc::new(AggQuantileExpr::new(input, quantile, *interpol))); } let field = schema .map(|schema| { @@ -482,7 +536,7 @@ fn create_physical_expr_inner( ) }) .transpose()?; - let agg_method: GroupByMethod = agg.into(); + let agg_method: GroupByMethod = agg.clone().into(); Ok(Arc::new(AggregationExpr::new(input, agg_method, field))) }, } @@ -492,12 +546,12 @@ fn create_physical_expr_inner( data_type, strict, } => { - let phys_expr = create_physical_expr_inner(expr, ctxt, expr_arena, schema, state)?; + let phys_expr = create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; Ok(Arc::new(CastExpr { input: phys_expr, - data_type, + data_type: data_type.clone(), expr: node_to_expr(expression, expr_arena), - strict, + strict: *strict, })) }, Ternary { @@ -507,13 +561,14 @@ fn create_physical_expr_inner( } => { let mut lit_count = 0u8; state.reset(); - let predicate = create_physical_expr_inner(predicate, ctxt, expr_arena, schema, state)?; + let predicate = + create_physical_expr_inner(*predicate, ctxt, expr_arena, schema, state)?; lit_count += state.local.has_lit as u8; state.reset(); - let truthy = create_physical_expr_inner(truthy, ctxt, expr_arena, schema, state)?; + let truthy = create_physical_expr_inner(*truthy, ctxt, expr_arena, schema, state)?; lit_count += state.local.has_lit as u8; state.reset(); - let falsy = create_physical_expr_inner(falsy, ctxt, expr_arena, schema, state)?; + let falsy = create_physical_expr_inner(*falsy, ctxt, expr_arena, schema, state)?; lit_count += state.local.has_lit as u8; Ok(Arc::new(TernaryExpr::new( predicate, @@ -541,7 +596,7 @@ fn create_physical_expr_inner( // Will be reset in the function so get that here. let has_window = state.local.has_window; let input = create_physical_expressions_check_state( - &input, + input, ctxt, expr_arena, schema, @@ -554,9 +609,9 @@ fn create_physical_expr_inner( Ok(Arc::new(ApplyExpr::new( input, - function, + function.clone(), node_to_expr(expression, expr_arena), - options, + *options, !state.has_cache, schema.cloned(), output_dtype, @@ -579,7 +634,7 @@ fn create_physical_expr_inner( // Will be reset in the function so get that here. let has_window = state.local.has_window; let input = create_physical_expressions_check_state( - &input, + input, ctxt, expr_arena, schema, @@ -592,9 +647,9 @@ fn create_physical_expr_inner( Ok(Arc::new(ApplyExpr::new( input, - function.into(), + function.clone().into(), node_to_expr(expression, expr_arena), - options, + *options, !state.has_cache, schema.cloned(), output_dtype, @@ -605,9 +660,9 @@ fn create_physical_expr_inner( offset, length, } => { - let input = create_physical_expr_inner(input, ctxt, expr_arena, schema, state)?; - let offset = create_physical_expr_inner(offset, ctxt, expr_arena, schema, state)?; - let length = create_physical_expr_inner(length, ctxt, expr_arena, schema, state)?; + let input = create_physical_expr_inner(*input, ctxt, expr_arena, schema, state)?; + let offset = create_physical_expr_inner(*offset, ctxt, expr_arena, schema, state)?; + let length = create_physical_expr_inner(*length, ctxt, expr_arena, schema, state)?; polars_ensure!(!(state.has_implode() && matches!(ctxt, Context::Aggregation)), InvalidOperation: "'implode' followed by a slice during aggregation is not allowed"); Ok(Arc::new(SliceExpr { input, @@ -617,7 +672,7 @@ fn create_physical_expr_inner( })) }, Explode(expr) => { - let input = create_physical_expr_inner(expr, ctxt, expr_arena, schema, state)?; + let input = create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; let function = SpecialEq::new(Arc::new(move |s: &mut [Series]| s[0].explode().map(Some)) as Arc); @@ -629,11 +684,11 @@ fn create_physical_expr_inner( ))) }, Alias(input, name) => { - let phys_expr = create_physical_expr_inner(input, ctxt, expr_arena, schema, state)?; + let phys_expr = create_physical_expr_inner(*input, ctxt, expr_arena, schema, state)?; Ok(Arc::new(AliasExpr::new( phys_expr, - name, - node_to_expr(input, expr_arena), + name.clone(), + node_to_expr(*input, expr_arena), ))) }, Wildcard => { diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index b0f68694b5e9..45acfaa1bb4e 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -126,10 +126,32 @@ fn partitionable_gb( partitionable } +struct ConversionState { + expr_depth: u16, +} + +impl ConversionState { + fn new() -> PolarsResult { + Ok(ConversionState { + expr_depth: get_expr_depth_limit()?, + }) + } +} + pub fn create_physical_plan( root: Node, lp_arena: &mut Arena, expr_arena: &mut Arena, +) -> PolarsResult> { + let state = ConversionState::new()?; + create_physical_plan_impl(root, lp_arena, expr_arena, &state) +} + +fn create_physical_plan_impl( + root: Node, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + state: &ConversionState, ) -> PolarsResult> { use IR::*; @@ -154,7 +176,7 @@ pub fn create_physical_plan( Union { inputs, options } => { let inputs = inputs .into_iter() - .map(|node| create_physical_plan(node, lp_arena, expr_arena)) + .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state)) .collect::>>()?; Ok(Box::new(executors::UnionExec { inputs, options })) }, @@ -163,12 +185,12 @@ pub fn create_physical_plan( } => { let inputs = inputs .into_iter() - .map(|node| create_physical_plan(node, lp_arena, expr_arena)) + .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state)) .collect::>>()?; Ok(Box::new(executors::HConcatExec { inputs, options })) }, Slice { input, offset, len } => { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; Ok(Box::new(executors::SliceExec { input, offset, len })) }, Filter { input, predicate } => { @@ -191,8 +213,8 @@ pub fn create_physical_plan( } } } - let input = create_physical_plan(input, lp_arena, expr_arena)?; - let mut state = ExpressionConversionState::default(); + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; + let mut state = ExpressionConversionState::new(true, state.expr_depth); let predicate = create_physical_expr( &predicate, Context::Default, @@ -217,7 +239,7 @@ pub fn create_physical_plan( mut file_options, } => { file_options.n_rows = _set_n_rows_for_scan(file_options.n_rows); - let mut state = ExpressionConversionState::default(); + let mut state = ExpressionConversionState::new(true, state.expr_depth); let predicate = predicate .map(|pred| { create_physical_expr( @@ -289,8 +311,11 @@ pub fn create_physical_plan( .. } => { let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); - let input = create_physical_plan(input, lp_arena, expr_arena)?; - let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len()); + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; + let mut state = ExpressionConversionState::new( + POOL.current_num_threads() > expr.len(), + state.expr_depth, + ); let streamable = if expr.has_sub_exprs() { false @@ -330,7 +355,7 @@ pub fn create_physical_plan( schema, .. } => { - let mut state = ExpressionConversionState::default(); + let mut state = ExpressionConversionState::new(true, state.expr_depth); let selection = predicate .map(|pred| { create_physical_expr( @@ -361,9 +386,9 @@ pub fn create_physical_plan( Context::Default, expr_arena, Some(input_schema.as_ref()), - &mut Default::default(), + &mut ExpressionConversionState::new(true, state.expr_depth), )?; - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; Ok(Box::new(executors::SortExec { input, by_column, @@ -376,7 +401,7 @@ pub fn create_physical_plan( id, cache_hits, } => { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; Ok(Box::new(executors::CacheExec { id, input, @@ -384,7 +409,7 @@ pub fn create_physical_plan( })) }, Distinct { input, options } => { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; Ok(Box::new(executors::UniqueExec { input, options })) }, GroupBy { @@ -403,20 +428,20 @@ pub fn create_physical_plan( Context::Default, expr_arena, Some(&input_schema), - &mut Default::default(), + &mut ExpressionConversionState::new(true, state.expr_depth), )?; let phys_aggs = create_physical_expressions_from_irs( &aggs, Context::Aggregation, expr_arena, Some(&input_schema), - &mut Default::default(), + &mut ExpressionConversionState::new(true, state.expr_depth), )?; let _slice = options.slice; #[cfg(feature = "dynamic_group_by")] if let Some(options) = options.dynamic { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; return Ok(Box::new(executors::GroupByDynamicExec { input, keys: phys_keys, @@ -430,7 +455,7 @@ pub fn create_physical_plan( #[cfg(feature = "dynamic_group_by")] if let Some(options) = options.rolling { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; return Ok(Box::new(executors::GroupByRollingExec { input, keys: phys_keys, @@ -452,7 +477,7 @@ pub fn create_physical_plan( false } }); - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; let keys = keys .iter() .map(|e| e.to_expr(expr_arena)) @@ -474,7 +499,7 @@ pub fn create_physical_plan( aggs, ))) } else { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; Ok(Box::new(executors::GroupByExec::new( input, phys_keys, @@ -509,21 +534,21 @@ pub fn create_physical_plan( false }; - let input_left = create_physical_plan(input_left, lp_arena, expr_arena)?; - let input_right = create_physical_plan(input_right, lp_arena, expr_arena)?; + let input_left = create_physical_plan_impl(input_left, lp_arena, expr_arena, state)?; + let input_right = create_physical_plan_impl(input_right, lp_arena, expr_arena, state)?; let left_on = create_physical_expressions_from_irs( &left_on, Context::Default, expr_arena, None, - &mut Default::default(), + &mut ExpressionConversionState::new(true, state.expr_depth), )?; let right_on = create_physical_expressions_from_irs( &right_on, Context::Default, expr_arena, None, - &mut Default::default(), + &mut ExpressionConversionState::new(true, state.expr_depth), )?; let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone()); Ok(Box::new(executors::JoinExec::new( @@ -542,7 +567,7 @@ pub fn create_physical_plan( options, } => { let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; let streamable = if exprs.has_sub_exprs() { false @@ -550,8 +575,10 @@ pub fn create_physical_plan( all_streamable(&exprs, expr_arena, Context::Default) }; - let mut state = - ExpressionConversionState::new(POOL.current_num_threads() > exprs.len()); + let mut state = ExpressionConversionState::new( + POOL.current_num_threads() > exprs.len(), + state.expr_depth, + ); let cse_exprs = create_physical_expressions_from_irs( exprs.cse_exprs(), @@ -581,21 +608,21 @@ pub fn create_physical_plan( MapFunction { input, function, .. } => { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; Ok(Box::new(executors::UdfExec { input, function })) }, ExtContext { input, contexts, .. } => { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; let contexts = contexts .into_iter() - .map(|node| create_physical_plan(node, lp_arena, expr_arena)) + .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state)) .collect::>()?; Ok(Box::new(executors::ExternalContext { input, contexts })) }, SimpleProjection { input, columns } => { - let input = create_physical_plan(input, lp_arena, expr_arena)?; + let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; let exec = executors::ProjectionSimple { input, columns }; Ok(Box::new(exec)) }, diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 93e6a55aa6f3..f0d364d64d24 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -57,7 +57,7 @@ fn to_physical_piped_expr( Context::Default, expr_arena, schema, - &mut ExpressionConversionState::new(false), + &mut ExpressionConversionState::new(false, 0), ) .map(|e| Arc::new(Wrap(e)) as Arc) } diff --git a/py-polars/polars/config.py b/py-polars/polars/config.py index 0c09f3a3fcbf..28bbea40181b 100644 --- a/py-polars/polars/config.py +++ b/py-polars/polars/config.py @@ -66,6 +66,7 @@ "POLARS_STREAMING_CHUNK_SIZE", "POLARS_TABLE_WIDTH", "POLARS_VERBOSE", + "POLARS_MAX_EXPR_DEPTH", } # vars that set the rust env directly should declare themselves here as the Config @@ -1317,3 +1318,17 @@ def warn_unstable(cls, active: bool | None = True) -> type[Config]: else: os.environ["POLARS_WARN_UNSTABLE"] = str(int(active)) return cls + + @classmethod + def set_expr_depth_warning(cls, limit: int) -> type[Config]: + """ + Set the the expression depth that Polars will accept without triggering a warning. + + Having too deep expressions (several 1000s) can lead to overflowing the stack and might be worth a refactor. + """ # noqa: W505 + if limit < 0: + msg = "limit should be positive" + raise ValueError(msg) + + os.environ["POLARS_MAX_EXPR_DEPTH"] = str(limit) + return cls