diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/function.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/function.rs new file mode 100644 index 0000000000000..54897f790b10c --- /dev/null +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/function.rs @@ -0,0 +1,29 @@ +use polars_core::error::PolarsResult; +use polars_plan::prelude::*; + +use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext}; + +#[derive(Clone)] +pub struct FunctionOperator { + pub(crate) function: FunctionNode, +} + +impl Operator for FunctionOperator { + fn execute( + &mut self, + _context: &PExecutionContext, + chunk: &DataChunk, + ) -> PolarsResult { + Ok(OperatorResult::Finished( + chunk.with_data(self.function.evaluate(chunk.data.clone())?), + )) + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } + + fn fmt(&self) -> &str { + "function" + } +} diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs index bb76528174a8b..3a4bae8f069b9 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs @@ -1,7 +1,9 @@ -mod dummy; mod filter; +mod function; +mod placeholder; mod projection; -pub(crate) use dummy::PlaceHolder; pub(crate) use filter::*; +pub(crate) use function::*; +pub(crate) use placeholder::PlaceHolder; pub(crate) use projection::*; diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/placeholder.rs similarity index 100% rename from polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs rename to polars/polars-lazy/polars-pipe/src/executors/operators/placeholder.rs diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs index 4069957b963b2..ca8c31a670ed2 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs @@ -320,6 +320,12 @@ where }; Box::new(op) as Box } + MapFunction { function, .. } => { + let op = operators::FunctionOperator { + function: function.clone(), + }; + Box::new(op) as Box + } lp => { panic!("operator {lp:?} not (yet) supported") diff --git a/polars/polars-lazy/polars-plan/src/dsl/function_expr/strings.rs b/polars/polars-lazy/polars-plan/src/dsl/function_expr/strings.rs index 1bce8d768ba51..ca93cacdeac6f 100644 --- a/polars/polars-lazy/polars-plan/src/dsl/function_expr/strings.rs +++ b/polars/polars-lazy/polars-plan/src/dsl/function_expr/strings.rs @@ -327,7 +327,7 @@ pub(super) fn strptime(s: &Series, options: &StrpTimeOptions) -> PolarsResult TZ_AWARE_RE.is_match(fmt), (false, _) => false, }; diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/aexpr.rs b/polars/polars-lazy/polars-plan/src/logical_plan/aexpr.rs index 3e103ded3a65c..9f5e01df428b6 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/aexpr.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/aexpr.rs @@ -118,7 +118,7 @@ impl AExpr { use AExpr::*; match self { Function { options, .. } | AnonymousFunction { options, .. } => { - options.collect_groups == ApplyOptions::ApplyGroups + options.is_groups_sensitive() } Sort { .. } | SortBy { .. } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs index 2ba16dfddb483..068f824fa97f6 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs @@ -687,6 +687,7 @@ impl LogicalPlanBuilder { schema, predicate_pd: optimizations.predicate_pushdown, projection_pd: optimizations.projection_pushdown, + streamable: optimizations.streaming, fmt_str: name, }, } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs index 5024b722692f4..f3d5cd7866b60 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs @@ -26,6 +26,7 @@ pub enum FunctionNode { predicate_pd: bool, /// allow projection pushdown optimizations projection_pd: bool, + streamable: bool, // used for formatting #[cfg_attr(feature = "serde", serde(skip))] fmt_str: &'static str, @@ -70,6 +71,18 @@ impl PartialEq for FunctionNode { } impl FunctionNode { + /// Whether this function can run on batches of data at a time. + pub fn is_streamable(&self) -> bool { + use FunctionNode::*; + match self { + Rechunk | Pipeline { .. } => false, + #[cfg(feature = "merge_sorted")] + MergeSorted { .. } => false, + DropNulls { .. } | FastProjection { .. } | Unnest { .. } => true, + Opaque { streamable, .. } => *streamable, + } + } + pub(crate) fn schema<'a>( &self, input_schema: &'a SchemaRef, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/options.rs b/polars/polars-lazy/polars-plan/src/logical_plan/options.rs index aa2f642eb9483..c45e619e0e1a3 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/options.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/options.rs @@ -217,9 +217,12 @@ pub struct FunctionOptions { } impl FunctionOptions { - /// Whether this can simply applied elementwise - pub fn is_mappable(&self) -> bool { - !matches!(self.collect_groups, ApplyOptions::ApplyGroups) + /// Any function that is sensitive to the number of elements in a group + /// - Aggregations + /// - Sorts + /// - Counts + pub fn is_groups_sensitive(&self) -> bool { + matches!(self.collect_groups, ApplyOptions::ApplyGroups) } } diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index 8e81642782f75..33698f8371784 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -302,6 +302,7 @@ impl LazyFrame { AllowedOptimizations { projection_pushdown: false, predicate_pushdown: false, + streaming: true, ..Default::default() }, Some(Arc::new(udf_schema)), @@ -1198,6 +1199,7 @@ impl LazyFrame { AllowedOptimizations { slice_pushdown: false, predicate_pushdown: false, + streaming: false, ..Default::default() } } else { diff --git a/polars/polars-lazy/src/physical_plan/expressions/ternary.rs b/polars/polars-lazy/src/physical_plan/expressions/ternary.rs index 3c05fd7abb596..cabb458d8487c 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/ternary.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/ternary.rs @@ -286,7 +286,7 @@ impl PhysicalExpr for TernaryExpr { Expr::Agg(_) => has_agg = true, Expr::Function { options, .. } | Expr::AnonymousFunction { options, .. } - if !options.is_mappable() => + if options.is_groups_sensitive() => { has_agg = true } diff --git a/polars/polars-lazy/src/physical_plan/streaming/convert.rs b/polars/polars-lazy/src/physical_plan/streaming/convert.rs index 06eaf34facb93..380f7324b44a6 100644 --- a/polars/polars-lazy/src/physical_plan/streaming/convert.rs +++ b/polars/polars-lazy/src/physical_plan/streaming/convert.rs @@ -20,6 +20,13 @@ use crate::physical_plan::PhysicalExpr; pub struct Wrap(Arc); +type IsSink = bool; +// a rhs of a join will be replaced later +type IsRhsJoin = bool; + +const IS_SINK: bool = true; +const IS_RHS_JOIN: bool = true; + impl PhysicalPipedExpr for Wrap { fn evaluate(&self, chunk: &DataChunk, state: &dyn Any) -> PolarsResult { let state = state.downcast_ref::().unwrap(); @@ -105,6 +112,30 @@ fn streamable_join(join_type: &JoinType) -> bool { } } +// The index of the pipeline tree we are building at this moment +// if we have a node we cannot do streaming, we have finished that pipeline tree +// and start a new one. +type CurrentIdx = usize; + +fn process_non_streamable_node( + current_idx: &mut CurrentIdx, + state: &mut Branch, + stack: &mut Vec<(Node, Branch, CurrentIdx)>, + scratch: &mut Vec, + pipeline_trees: &mut Vec>, + lp: &ALogicalPlan, +) { + if state.streamable { + *current_idx += 1; + pipeline_trees.push(vec![]); + } + state.streamable = false; + lp.copy_inputs(scratch); + while let Some(input) = scratch.pop() { + stack.push((input, Branch::default(), *current_idx)) + } +} + pub(crate) fn insert_streaming_nodes( root: Node, lp_arena: &mut Arena, @@ -120,10 +151,6 @@ pub(crate) fn insert_streaming_nodes( let mut stack = Vec::with_capacity(16); - // The index of the pipeline tree we are building at this moment - // if we have a node we cannot do streaming, we have finished that pipeline tree - // and start a new one. - type CurrentIdx = usize; stack.push((root, Branch::default(), 0 as CurrentIdx)); // A state holds a full pipeline until the breaker @@ -148,17 +175,17 @@ pub(crate) fn insert_streaming_nodes( match lp_arena.get(root) { Selection { input, predicate } if is_streamable(*predicate, expr_arena) => { state.streamable = true; - state.operators_sinks.push((false, false, root)); + state.operators_sinks.push((!IS_SINK, !IS_RHS_JOIN, root)); stack.push((*input, state, current_idx)) } HStack { input, exprs, .. } if all_streamable(exprs, expr_arena) => { state.streamable = true; - state.operators_sinks.push((false, false, root)); + state.operators_sinks.push((!IS_SINK, !IS_RHS_JOIN, root)); stack.push((*input, state, current_idx)) } Slice { input, offset, .. } if *offset >= 0 => { state.streamable = true; - state.operators_sinks.push((true, false, root)); + state.operators_sinks.push((IS_SINK, !IS_RHS_JOIN, root)); stack.push((*input, state, current_idx)) } FileSink { input, .. } => { @@ -175,30 +202,39 @@ pub(crate) fn insert_streaming_nodes( && all_column(by_column, expr_arena) => { state.streamable = true; - state.operators_sinks.push((true, false, root)); - stack.push((*input, state, current_idx)) - } - MapFunction { - input, - function: FunctionNode::FastProjection { .. }, - } => { - state.streamable = true; - state.operators_sinks.push((false, false, root)); + state.operators_sinks.push((IS_SINK, !IS_RHS_JOIN, root)); stack.push((*input, state, current_idx)) } Projection { input, expr, .. } if all_streamable(expr, expr_arena) => { state.streamable = true; - state.operators_sinks.push((false, false, root)); + state.operators_sinks.push((!IS_SINK, !IS_RHS_JOIN, root)); stack.push((*input, state, current_idx)) } + // Rechunks are ignored MapFunction { input, function: FunctionNode::Rechunk, } => { - // we ignore a rechunk state.streamable = true; stack.push((*input, state, current_idx)) } + // Streamable functions will be converted + lp @ MapFunction { input, function } => { + if function.is_streamable() { + state.streamable = true; + state.operators_sinks.push((!IS_SINK, !IS_RHS_JOIN, root)); + stack.push((*input, state, current_idx)) + } else { + process_non_streamable_node( + &mut current_idx, + &mut state, + &mut stack, + scratch, + &mut pipeline_trees, + lp, + ) + } + } #[cfg(feature = "csv-file")] CsvScan { .. } => { if state.streamable { @@ -245,7 +281,9 @@ pub(crate) fn insert_streaming_nodes( // rhs is second, so that is first on the stack let mut state_right = state; state_right.join_count = 0; - state_right.operators_sinks.push((true, true, root)); + state_right + .operators_sinks + .push((IS_SINK, IS_RHS_JOIN, root)); stack.push((input_right, state_right, current_idx)); // we want to traverse lhs first, so push it latest on the stack @@ -255,7 +293,9 @@ pub(crate) fn insert_streaming_nodes( join_count, ..Default::default() }; - state_left.operators_sinks.push((true, false, root)); + state_left + .operators_sinks + .push((IS_SINK, !IS_RHS_JOIN, root)); stack.push((input_left, state_left, current_idx)); } // add globbing patterns @@ -309,23 +349,20 @@ pub(crate) fn insert_streaming_nodes( .all(|dt| allowed_dtype(dt, string_cache)) { state.streamable = true; - state.operators_sinks.push((true, false, root)); + state.operators_sinks.push((IS_SINK, !IS_RHS_JOIN, root)); stack.push((*input, state, current_idx)) } else { stack.push((*input, Branch::default(), current_idx)) } } - lp => { - if state.streamable { - current_idx += 1; - pipeline_trees.push(vec![]); - } - state.streamable = false; - lp.copy_inputs(scratch); - while let Some(input) = scratch.pop() { - stack.push((input, Branch::default(), current_idx)) - } - } + lp => process_non_streamable_node( + &mut current_idx, + &mut state, + &mut stack, + scratch, + &mut pipeline_trees, + lp, + ), } } let mut inserted = false; @@ -448,10 +485,6 @@ pub(crate) fn insert_streaming_nodes( Ok(inserted) } -type IsSink = bool; -// a rhs of a join will be replaced later -type IsRhsJoin = bool; - #[derive(Default, Debug, Clone)] struct Branch { streamable: bool, diff --git a/py-polars/polars/internals/lazyframe/frame.py b/py-polars/polars/internals/lazyframe/frame.py index b03323a128a80..94f3801f45253 100644 --- a/py-polars/polars/internals/lazyframe/frame.py +++ b/py-polars/polars/internals/lazyframe/frame.py @@ -3794,6 +3794,7 @@ def map( no_optimizations: bool = False, schema: None | SchemaDict = None, validate_output_schema: bool = True, + streamable: bool = False, ) -> LDF: """ Apply a custom function. @@ -3820,6 +3821,10 @@ def map( the output schema of this function will be checked with the expected schema. Setting this to ``False`` will not do this check, but may lead to hard to debug bugs. + streamable + Whether the function that is given is eligible ot running in the streaming + engine. That means that the function must produce the same result if it + is exectuted on batches as it would when executed on the full dataset. Warnings -------- @@ -3856,8 +3861,9 @@ def map( predicate_pushdown, projection_pushdown, slice_pushdown, - schema, - validate_output_schema, + streamable=streamable, + schema=schema, + validate_output=validate_output_schema, ) ) diff --git a/py-polars/src/lazy/dataframe.rs b/py-polars/src/lazy/dataframe.rs index 1f5cda40b3370..176624f9964f9 100644 --- a/py-polars/src/lazy/dataframe.rs +++ b/py-polars/src/lazy/dataframe.rs @@ -90,10 +90,11 @@ impl PyLazyGroupBy { pypolars.getattr("wrap_df").unwrap().call1((pydf,)).unwrap(); // call the lambda and get a python side DataFrame wrapper - let result_df_wrapper = match lambda.call1(py, (python_df_wrapper,)) { - Ok(pyobj) => pyobj, - Err(e) => panic!("UDF failed: {}", e.value(py)), - }; + let result_df_wrapper = lambda.call1(py, (python_df_wrapper,)).map_err(|e| { + PolarsError::ComputeError( + format!("User provided python function failed: {e}").into(), + ) + })?; // unpack the wrapper in a PyDataFrame let py_pydf = result_df_wrapper.getattr(py, "_df").expect( "Could net get DataFrame attribute '_df'. Make sure that you return a DataFrame object.", @@ -813,13 +814,15 @@ impl PyLazyFrame { ldf.with_row_count(name, offset).into() } - #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, schema, validate_output))] + #[pyo3(signature = (lambda, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))] + #[allow(clippy::too_many_arguments)] pub fn map( &self, lambda: PyObject, predicate_pushdown: bool, projection_pushdown: bool, slice_pushdown: bool, + streamable: bool, schema: Option>, validate_output: bool, ) -> Self { @@ -827,6 +830,7 @@ impl PyLazyFrame { predicate_pushdown, projection_pushdown, slice_pushdown, + streaming: streamable, ..Default::default() }; let schema = schema.map(|schema| Arc::new(schema.0)); @@ -857,23 +861,22 @@ impl PyLazyFrame { .call1(py, (pydf,)) .unwrap(); // call the lambda and get a python side Series wrapper - let result_df_wrapper = match lambda.call1(py, (python_df_wrapper,)) { - Ok(pyobj) => pyobj, - Err(e) => panic!("UDF failed: {}", e.value(py)), - }; + + let result_df_wrapper = lambda.call1(py, (python_df_wrapper,)).map_err(|e| { + PolarsError::ComputeError( + format!("User provided python function failed: {e}").into(), + ) + })?; // unpack the wrapper in a PyDataFrame - let py_pydf = match result_df_wrapper.getattr(py, "_df") { - Ok(df) => df, - Err(_) => { - let pytype = result_df_wrapper.as_ref(py).get_type(); - return Err(PolarsError::ComputeError( - format!( - "Expected 'LazyFrame.map' to return a 'DataFrame', got a {pytype}", - ) - .into(), - )); - } - }; + let py_pydf = result_df_wrapper.getattr(py, "_df").map_err(|_| { + let pytype = result_df_wrapper.as_ref(py).get_type(); + PolarsError::ComputeError( + format!( + "Expected 'LazyFrame.map' to return a 'DataFrame', got a '{pytype}'", + ) + .into(), + ) + })?; // Downcast to Rust let pydf = py_pydf.extract::(py).unwrap(); diff --git a/py-polars/tests/unit/test_cfg.py b/py-polars/tests/unit/test_cfg.py index 2f23d03ca77bf..908cbf7cb8f89 100644 --- a/py-polars/tests/unit/test_cfg.py +++ b/py-polars/tests/unit/test_cfg.py @@ -414,6 +414,7 @@ def test_string_cache() -> None: assert_frame_equal(out, expected) +@pytest.mark.xdist_group(name="group1") def test_config_load_save() -> None: # set some config options... pl.Config.set_tbl_cols(12) diff --git a/py-polars/tests/unit/test_lazy.py b/py-polars/tests/unit/test_lazy.py index 53ef62e183139..65022e1efb241 100644 --- a/py-polars/tests/unit/test_lazy.py +++ b/py-polars/tests/unit/test_lazy.py @@ -1542,6 +1542,7 @@ def test_lazy_cache_same_key() -> None: ).collect().to_dict(False) == {"a": [-1, 2, 7], "c": ["x", "y", "z"]} +@pytest.mark.xdist_group(name="group1") def test_lazy_cache_hit(capfd: Any) -> None: os.environ["POLARS_VERBOSE"] = "1" df = pl.DataFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": ["x", "y", "z"]}).lazy() @@ -1551,6 +1552,7 @@ def test_lazy_cache_hit(capfd: Any) -> None: ).collect().to_dict(False) == {"a": [0, 0, 0], "c": ["x", "y", "z"]} (out, _) = capfd.readouterr() assert "CACHE HIT" in out + os.unsetenv("POLARS_VERBOSE") def test_quadratic_behavior_4736() -> None: diff --git a/py-polars/tests/unit/test_streaming.py b/py-polars/tests/unit/test_streaming.py index 85dcd4675fb61..707a2176ed6ad 100644 --- a/py-polars/tests/unit/test_streaming.py +++ b/py-polars/tests/unit/test_streaming.py @@ -1,4 +1,6 @@ +import os from datetime import date +from typing import Any import numpy as np import pytest @@ -198,3 +200,21 @@ def test_streaming_block_on_literals_6054() -> None: assert df.lazy().with_columns(s).groupby("col_1").agg(pl.all().first()).collect( streaming=True ).sort("col_1").to_dict(False) == {"col_1": [0, 1], "col_2": [0, 5]} + + +@pytest.mark.xdist_group(name="group1") +def test_streaming_streamable_functions(capfd: Any) -> None: + os.environ["POLARS_VERBOSE"] = "1" + assert ( + pl.DataFrame({"a": [1, 2, 3]}) + .lazy() + .map( + f=lambda df: df.with_columns(pl.col("a").alias("b")), + schema={"a": pl.Int64, "b": pl.Int64}, + streamable=True, + ) + ).collect(streaming=True).to_dict(False) == {"a": [1, 2, 3], "b": [1, 2, 3]} + + (_, err) = capfd.readouterr() + assert "df -> function -> ordered_sink" in err + os.unsetenv("POLARS_VERBOSE")