Skip to content

Commit

Permalink
fix: recurse less in streaming shared sinks (#13930)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 23, 2024
1 parent af19d13 commit f427592
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 42 deletions.
13 changes: 9 additions & 4 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::executors::sinks::*;
use crate::executors::{operators, sources};
use crate::expressions::PhysicalPipedExpr;
use crate::operators::{Operator, Sink as SinkTrait, Source};
use crate::pipeline::dispatcher::SinkNode;
use crate::pipeline::PipeLine;

fn exprs_to_physical<F>(
Expand Down Expand Up @@ -668,7 +669,7 @@ where
let operator_offset = operator_objects.len();
operator_objects.extend(operators);

let sink_nodes = sink_nodes
let sinks = sink_nodes
.into_iter()
.map(|(offset, node, shared_count)| {
// ensure that shared sinks are really shared
Expand All @@ -685,16 +686,20 @@ where
Entry::Occupied(entry) => entry.get().split(0),
}
};

Ok((offset + operator_offset, node, sink, shared_count))
Ok(SinkNode::new(
sink,
shared_count,
offset + operator_offset,
node,
))
})
.collect::<PolarsResult<Vec<_>>>()?;

Ok(PipeLine::new(
source_objects,
operator_objects,
operator_nodes,
sink_nodes,
sinks,
operator_offset,
verbose,
))
Expand Down
103 changes: 65 additions & 38 deletions crates/polars-pipe/src/pipeline/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,44 @@ use crate::operators::{
};
use crate::pipeline::morsels_per_sink;

pub(super) struct SinkNode {
pub sinks: Vec<Box<dyn Sink>>,
/// when that hits 0, the sink will finalize
pub shared_count: Rc<RefCell<u32>>,
initial_shared_count: u32,
/// - offset in the operators vec
/// at that point the sink should be called.
/// the pipeline will first call the operators on that point and then
/// push the result in the sink.
pub operator_end: usize,
pub node: Node,
}

impl SinkNode {
pub fn new(
sink: Box<dyn Sink>,
shared_count: Rc<RefCell<u32>>,
operator_end: usize,
node: Node,
) -> Self {
let n_threads = morsels_per_sink();
let sinks = (0..n_threads).map(|i| sink.split(i)).collect();
let initial_shared_count = *shared_count.borrow();
SinkNode {
sinks,
initial_shared_count,
shared_count,
operator_end,
node,
}
}

// Only the first node of a shared sink should recurse. The others should return.
fn allow_recursion(&self) -> bool {
self.initial_shared_count == *self.shared_count.borrow()
}
}

/// A pipeline consists of:
///
/// - 1. One or more sources.
Expand Down Expand Up @@ -69,8 +107,7 @@ pub struct PipeLine {
/// - shared_count
/// when that hits 0, the sink will finalize
/// - node of the sink
#[allow(clippy::type_complexity)]
sinks: Vec<(usize, Rc<RefCell<u32>>, Vec<Box<dyn Sink>>)>,
sinks: Vec<SinkNode>,
/// are used to identify the sink shared with other pipeline branches
sink_nodes: Vec<Node>,
/// Other branch of the pipeline/tree that must be executed
Expand All @@ -86,12 +123,11 @@ pub struct PipeLine {

impl PipeLine {
#[allow(clippy::type_complexity)]
pub fn new(
pub(super) fn new(
sources: Vec<Box<dyn Source>>,
operators: Vec<Box<dyn Operator>>,
operator_nodes: Vec<Node>,
// (offset, node (for identification), sink, shared_counter)
sink_and_nodes: Vec<(usize, Node, Box<dyn Sink>, Rc<RefCell<u32>>)>,
sinks: Vec<SinkNode>,
operator_offset: usize,
verbose: bool,
) -> PipeLine {
Expand All @@ -100,19 +136,8 @@ impl PipeLine {
// we only do that in the sinks itself.
let n_threads = morsels_per_sink();

let sink_nodes = sinks.iter().map(|s| s.node).collect();
// We split so that every thread gets an operator
let sink_nodes = sink_and_nodes.iter().map(|(_, node, _, _)| *node).collect();
let sinks = sink_and_nodes
.into_iter()
.map(|(offset, _, sink, shared_count)| {
(
offset,
shared_count,
(0..n_threads).map(|i| sink.split(i)).collect(),
)
})
.collect();

// every index maps to a chain of operators than can be pushed as a pipeline for one thread
let operators = (0..n_threads)
.map(|i| operators.iter().map(|op| op.split(i)).collect())
Expand Down Expand Up @@ -142,11 +167,11 @@ impl PipeLine {
sources,
operators,
vec![],
vec![(
operators_len,
Node::default(),
vec![SinkNode::new(
sink,
Rc::new(RefCell::new(1)),
operators_len,
Node::default(),
)],
0,
verbose,
Expand Down Expand Up @@ -345,9 +370,7 @@ impl PipeLine {
// we don't want to run the rest of the pipelines and we finalize early
let mut sink_finished = false;

for (i, (operator_end, shared_count, mut sink)) in
std::mem::take(&mut self.sinks).into_iter().enumerate()
{
for (i, mut sink) in std::mem::take(&mut self.sinks).into_iter().enumerate() {
for src in &mut std::mem::take(&mut self.sources) {
let mut next_batches = src.get_batches(ec)?;

Expand All @@ -357,10 +380,10 @@ impl PipeLine {

let (sink_result, next_batches2) = self.par_process_chunks(
chunks,
&mut sink,
&mut sink.sinks,
ec,
operator_start,
operator_end,
sink.operator_end,
src,
)?;
next_batches = next_batches2;
Expand All @@ -374,31 +397,35 @@ impl PipeLine {

// Before we reduce we also check if we should continue.
ec.execution_state.should_stop()?;
let allow_recursion = sink.allow_recursion();

// The sinks have taken all chunks thread locally, now we reduce them into a single
// result sink.
let mut reduced_sink = POOL
.install(|| {
sink.into_par_iter().reduce_with(|mut a, mut b| {
sink.sinks.into_par_iter().reduce_with(|mut a, mut b| {
a.combine(&mut *b);
a
})
})
.unwrap();
operator_start = operator_end;
operator_start = sink.operator_end;

let mut shared_sink_count = {
let mut shared_sink_count = shared_count.borrow_mut();
let mut shared_sink_count = sink.shared_count.borrow_mut();
*shared_sink_count -= 1;
*shared_sink_count
};

while shared_sink_count > 0 && !sink_finished {
let mut pipeline = pipeline_q.borrow_mut().pop_front().unwrap();
let (count, mut sink) =
pipeline.run_pipeline_no_finalize(ec, pipeline_q.clone())?;
reduced_sink.combine(sink.as_mut());
shared_sink_count = count;
// Prevent very deep recursion. Only the outer callee can pop and run.
if allow_recursion {
while shared_sink_count > 0 && !sink_finished {
let mut pipeline = pipeline_q.borrow_mut().pop_front().unwrap();
let (count, mut sink) =
pipeline.run_pipeline_no_finalize(ec, pipeline_q.clone())?;
reduced_sink.combine(sink.as_mut());
shared_sink_count = count;
}
}

if i != last_i {
Expand Down Expand Up @@ -491,18 +518,18 @@ impl Debug for PipeLine {
let mut fmt = String::new();
let mut start = 0usize;
fmt.push_str(self.sources[0].fmt());
for (offset_end, _, sink) in &self.sinks {
for sink in &self.sinks {
fmt.push_str(" -> ");
// take operators of a single thread
let ops = &self.operators[0];
// slice the pipeline
let ops = &ops[start..*offset_end];
let ops = &ops[start..sink.operator_end];
for op in ops {
fmt.push_str(op.fmt());
fmt.push_str(" -> ")
}
start = *offset_end;
fmt.push_str(sink[0].fmt())
start = sink.operator_end;
fmt.push_str(sink.sinks[0].fmt())
}
write!(f, "{fmt}")
}
Expand Down

0 comments on commit f427592

Please sign in to comment.