From f42759228ae1e212514eec3a70212ac2db6b1186 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 23 Jan 2024 15:35:11 +0100 Subject: [PATCH] fix: recurse less in streaming shared sinks (#13930) --- crates/polars-pipe/src/pipeline/convert.rs | 13 ++- crates/polars-pipe/src/pipeline/dispatcher.rs | 103 +++++++++++------- 2 files changed, 74 insertions(+), 42 deletions(-) diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index b888640b11cc..d8566f46f23f 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -17,6 +17,7 @@ use crate::executors::sinks::*; use crate::executors::{operators, sources}; use crate::expressions::PhysicalPipedExpr; use crate::operators::{Operator, Sink as SinkTrait, Source}; +use crate::pipeline::dispatcher::SinkNode; use crate::pipeline::PipeLine; fn exprs_to_physical( @@ -668,7 +669,7 @@ where let operator_offset = operator_objects.len(); operator_objects.extend(operators); - let sink_nodes = sink_nodes + let sinks = sink_nodes .into_iter() .map(|(offset, node, shared_count)| { // ensure that shared sinks are really shared @@ -685,8 +686,12 @@ where Entry::Occupied(entry) => entry.get().split(0), } }; - - Ok((offset + operator_offset, node, sink, shared_count)) + Ok(SinkNode::new( + sink, + shared_count, + offset + operator_offset, + node, + )) }) .collect::>>()?; @@ -694,7 +699,7 @@ where source_objects, operator_objects, operator_nodes, - sink_nodes, + sinks, operator_offset, verbose, )) diff --git a/crates/polars-pipe/src/pipeline/dispatcher.rs b/crates/polars-pipe/src/pipeline/dispatcher.rs index 64ce1a6687ec..7fc7cdb9a35b 100644 --- a/crates/polars-pipe/src/pipeline/dispatcher.rs +++ b/crates/polars-pipe/src/pipeline/dispatcher.rs @@ -19,6 +19,44 @@ use crate::operators::{ }; use crate::pipeline::morsels_per_sink; +pub(super) struct SinkNode { + pub sinks: Vec>, + /// when that hits 0, the sink will finalize + pub shared_count: Rc>, + initial_shared_count: u32, + /// - offset in the operators vec + /// at that point the sink should be called. + /// the pipeline will first call the operators on that point and then + /// push the result in the sink. + pub operator_end: usize, + pub node: Node, +} + +impl SinkNode { + pub fn new( + sink: Box, + shared_count: Rc>, + operator_end: usize, + node: Node, + ) -> Self { + let n_threads = morsels_per_sink(); + let sinks = (0..n_threads).map(|i| sink.split(i)).collect(); + let initial_shared_count = *shared_count.borrow(); + SinkNode { + sinks, + initial_shared_count, + shared_count, + operator_end, + node, + } + } + + // Only the first node of a shared sink should recurse. The others should return. + fn allow_recursion(&self) -> bool { + self.initial_shared_count == *self.shared_count.borrow() + } +} + /// A pipeline consists of: /// /// - 1. One or more sources. @@ -69,8 +107,7 @@ pub struct PipeLine { /// - shared_count /// when that hits 0, the sink will finalize /// - node of the sink - #[allow(clippy::type_complexity)] - sinks: Vec<(usize, Rc>, Vec>)>, + sinks: Vec, /// are used to identify the sink shared with other pipeline branches sink_nodes: Vec, /// Other branch of the pipeline/tree that must be executed @@ -86,12 +123,11 @@ pub struct PipeLine { impl PipeLine { #[allow(clippy::type_complexity)] - pub fn new( + pub(super) fn new( sources: Vec>, operators: Vec>, operator_nodes: Vec, - // (offset, node (for identification), sink, shared_counter) - sink_and_nodes: Vec<(usize, Node, Box, Rc>)>, + sinks: Vec, operator_offset: usize, verbose: bool, ) -> PipeLine { @@ -100,19 +136,8 @@ impl PipeLine { // we only do that in the sinks itself. let n_threads = morsels_per_sink(); + let sink_nodes = sinks.iter().map(|s| s.node).collect(); // We split so that every thread gets an operator - let sink_nodes = sink_and_nodes.iter().map(|(_, node, _, _)| *node).collect(); - let sinks = sink_and_nodes - .into_iter() - .map(|(offset, _, sink, shared_count)| { - ( - offset, - shared_count, - (0..n_threads).map(|i| sink.split(i)).collect(), - ) - }) - .collect(); - // every index maps to a chain of operators than can be pushed as a pipeline for one thread let operators = (0..n_threads) .map(|i| operators.iter().map(|op| op.split(i)).collect()) @@ -142,11 +167,11 @@ impl PipeLine { sources, operators, vec![], - vec![( - operators_len, - Node::default(), + vec![SinkNode::new( sink, Rc::new(RefCell::new(1)), + operators_len, + Node::default(), )], 0, verbose, @@ -345,9 +370,7 @@ impl PipeLine { // we don't want to run the rest of the pipelines and we finalize early let mut sink_finished = false; - for (i, (operator_end, shared_count, mut sink)) in - std::mem::take(&mut self.sinks).into_iter().enumerate() - { + for (i, mut sink) in std::mem::take(&mut self.sinks).into_iter().enumerate() { for src in &mut std::mem::take(&mut self.sources) { let mut next_batches = src.get_batches(ec)?; @@ -357,10 +380,10 @@ impl PipeLine { let (sink_result, next_batches2) = self.par_process_chunks( chunks, - &mut sink, + &mut sink.sinks, ec, operator_start, - operator_end, + sink.operator_end, src, )?; next_batches = next_batches2; @@ -374,31 +397,35 @@ impl PipeLine { // Before we reduce we also check if we should continue. ec.execution_state.should_stop()?; + let allow_recursion = sink.allow_recursion(); // The sinks have taken all chunks thread locally, now we reduce them into a single // result sink. let mut reduced_sink = POOL .install(|| { - sink.into_par_iter().reduce_with(|mut a, mut b| { + sink.sinks.into_par_iter().reduce_with(|mut a, mut b| { a.combine(&mut *b); a }) }) .unwrap(); - operator_start = operator_end; + operator_start = sink.operator_end; let mut shared_sink_count = { - let mut shared_sink_count = shared_count.borrow_mut(); + let mut shared_sink_count = sink.shared_count.borrow_mut(); *shared_sink_count -= 1; *shared_sink_count }; - while shared_sink_count > 0 && !sink_finished { - let mut pipeline = pipeline_q.borrow_mut().pop_front().unwrap(); - let (count, mut sink) = - pipeline.run_pipeline_no_finalize(ec, pipeline_q.clone())?; - reduced_sink.combine(sink.as_mut()); - shared_sink_count = count; + // Prevent very deep recursion. Only the outer callee can pop and run. + if allow_recursion { + while shared_sink_count > 0 && !sink_finished { + let mut pipeline = pipeline_q.borrow_mut().pop_front().unwrap(); + let (count, mut sink) = + pipeline.run_pipeline_no_finalize(ec, pipeline_q.clone())?; + reduced_sink.combine(sink.as_mut()); + shared_sink_count = count; + } } if i != last_i { @@ -491,18 +518,18 @@ impl Debug for PipeLine { let mut fmt = String::new(); let mut start = 0usize; fmt.push_str(self.sources[0].fmt()); - for (offset_end, _, sink) in &self.sinks { + for sink in &self.sinks { fmt.push_str(" -> "); // take operators of a single thread let ops = &self.operators[0]; // slice the pipeline - let ops = &ops[start..*offset_end]; + let ops = &ops[start..sink.operator_end]; for op in ops { fmt.push_str(op.fmt()); fmt.push_str(" -> ") } - start = *offset_end; - fmt.push_str(sink[0].fmt()) + start = sink.operator_end; + fmt.push_str(sink.sinks[0].fmt()) } write!(f, "{fmt}") }