Skip to content

Commit

Permalink
fix(rust, python): fix streaming joins where the join order has been … (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 9, 2023
1 parent 3de901c commit 45ad9e3
Show file tree
Hide file tree
Showing 28 changed files with 214 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ impl Operator for Dummy {
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(Self {})
}

fn fmt(&self) -> &str {
"dummy"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ impl Operator for FilterOperator {
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
fn fmt(&self) -> &str {
"filter"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ impl Operator for FastProjectionOperator {
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
fn fmt(&self) -> &str {
"fast_join_projection"
}
}

#[derive(Clone)]
Expand All @@ -49,6 +52,9 @@ impl Operator for ProjectionOperator {
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
fn fmt(&self) -> &str {
"projection"
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -79,4 +85,7 @@ impl Operator for HstackOperator {
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
fn fmt(&self) -> &str {
"hstack"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::datatypes::Field;
use polars_core::error::PolarsResult;
use polars_core::prelude::{DataType, SchemaRef, Series, IDX_DTYPE};
use polars_core::schema::Schema;
use polars_plan::dsl::Expr;
use polars_plan::logical_plan::{ArenaExprIter, Context};
use polars_plan::prelude::{AAggExpr, AExpr};
use polars_utils::arena::{Arena, Node};
Expand All @@ -30,6 +31,10 @@ impl PhysicalPipedExpr for Count {
fn field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
todo!()
}

fn expression(&self) -> Expr {
Expr::Count
}
}

pub fn can_convert_to_hash_agg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,4 +428,7 @@ impl Sink for GenericGroupbySink {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"generic_groupby"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ where
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"primitive_groupby"
}
}

fn insert_and_get<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ impl Sink for Utf8GroupbySink {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"utf8_groupby"
}
}

// write agg_idx to the hashes buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl Sink for CrossJoin {
fn as_any(&mut self) -> &mut dyn Any {
self
}

fn fmt(&self) -> &str {
"cross_join_sink"
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -152,4 +156,8 @@ impl Operator for CrossJoinProbe {
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}

fn fmt(&self) -> &str {
"cross_join_probe"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl GenericBuild {
chunk: &DataChunk,
) -> PolarsResult<&[Series]> {
self.join_series.clear();

for phys_e in self.join_columns_left.iter() {
let s = phys_e.evaluate(chunk, context.execution_state.as_any())?;
let s = s.to_physical_repr();
Expand Down Expand Up @@ -337,7 +338,7 @@ impl Sink for GenericBuild {
let suffix = self.suffix.clone();
let hb = self.hb.clone();
let hash_tables = Arc::new(std::mem::take(&mut self.hash_tables));
let join_columns_left = self.join_columns_right.clone();
let join_columns_left = self.join_columns_left.clone();
let join_columns_right = self.join_columns_right.clone();

// take the buffers, this saves one allocation
Expand Down Expand Up @@ -369,6 +370,9 @@ impl Sink for GenericBuild {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"generic_join_build"
}
}

pub(super) struct KeysIter<'a> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,12 @@ impl GenericJoinProbe {
hash_tables: Arc<Vec<PlIdHashMap<Key, Vec<ChunkId>>>>,
join_columns_left: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
join_columns_right: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
mut swapped_or_left: bool,
swapped_or_left: bool,
join_series: Vec<Series>,
hashes: Vec<u64>,
context: &PExecutionContext,
how: JoinType,
) -> Self {
if matches!(how, JoinType::Left) {
swapped_or_left = true
}
if swapped_or_left {
let tmp = DataChunk {
data: df_a.slice(0, 1),
Expand Down Expand Up @@ -329,4 +326,7 @@ impl Operator for GenericJoinProbe {
let new = self.clone();
Box::new(new)
}
fn fmt(&self) -> &str {
"generic_join_probe"
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ impl Sink for OrderedSink {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"ordered_sink"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,7 @@ impl Sink for ParquetSink {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"parquet_sink"
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,7 @@ impl Sink for SliceSink {
fn as_any(&mut self) -> &mut dyn Any {
self
}
fn fmt(&self) -> &str {
"slice_sink"
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@ impl Source for CsvSource {
),
})
}
fn fmt(&self) -> &str {
"csv"
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ impl Source for DataFrameSource {
Ok(SourceResult::GotMoreData(chunks))
}
}
fn fmt(&self) -> &str {
"df"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,7 @@ impl Source for ParquetSource {
),
})
}
fn fmt(&self) -> &str {
"parquet"
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sources/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ impl Source for UnionSource {
}
Ok(SourceResult::Finished)
}
fn fmt(&self) -> &str {
"union"
}
}
3 changes: 3 additions & 0 deletions polars/polars-lazy/polars-pipe/src/expressions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::any::Any;

use polars_core::prelude::*;
use polars_plan::dsl::Expr;

use crate::operators::DataChunk;

Expand All @@ -10,4 +11,6 @@ pub trait PhysicalPipedExpr: Send + Sync {
fn evaluate(&self, chunk: &DataChunk, lazy_state: &dyn Any) -> PolarsResult<Series>;

fn field(&self, input_schema: &Schema) -> PolarsResult<Field>;

fn expression(&self) -> Expr;
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/operators/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ pub trait Operator: Send + Sync {
) -> PolarsResult<OperatorResult>;

fn split(&self, thread_no: usize) -> Box<dyn Operator>;

fn fmt(&self) -> &str;
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/operators/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ pub trait Sink: Send + Sync {
fn finalize(&mut self, context: &PExecutionContext) -> PolarsResult<FinalizedSink>;

fn as_any(&mut self) -> &mut dyn Any;

fn fmt(&self) -> &str;
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/operators/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pub enum SourceResult {

pub trait Source: Send + Sync {
fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult<SourceResult>;

fn fmt(&self) -> &str;
}
9 changes: 5 additions & 4 deletions polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,9 @@ where
}

pub fn swap_join_order(options: &JoinOptions) -> bool {
match (options.rows_left, options.rows_right) {
((Some(left), _), (Some(right), _)) => left > right,
((_, left), (_, right)) => left > right,
}
matches!(options.how, JoinType::Left)
|| match (options.rows_left, options.rows_right) {
((Some(left), _), (Some(right), _)) => left > right,
((_, left), (_, right)) => left > right,
}
}
68 changes: 66 additions & 2 deletions polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::collections::VecDeque;

use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::POOL;
use polars_utils::arena::Node;
use rayon::prelude::*;

use crate::executors::operators::Dummy;
use crate::executors::sources::DataFrameSource;
use crate::operators::{
DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, SExecutionContext, Sink,
Expand Down Expand Up @@ -66,12 +69,16 @@ impl PipeLine {
self
}

fn replace_operator(&mut self, op: &dyn Operator, node: Node) {
// returns if operator was successfully replaced
fn replace_operator(&mut self, op: &dyn Operator, node: Node) -> bool {
if let Some(pos) = self.operator_nodes.iter().position(|n| *n == node) {
let pos = pos + self.operator_offset;
for (i, operator_pipe) in &mut self.operators.iter_mut().enumerate() {
operator_pipe[pos] = op.split(i)
}
true
} else {
false
}
}

Expand Down Expand Up @@ -214,12 +221,46 @@ impl PipeLine {
Ok(out.unwrap())
}

/// print the branches of the pipeline
/// in the order they run.
#[cfg(test)]
fn show(&self) {
let mut fmt = String::new();
let mut start = 0usize;
fmt.push_str(self.sources[0].fmt());
fmt.push_str(" -> ");
for (offset_end, sink) in &self.sinks {
// take operators of a single thread
let ops = &self.operators[0];
// slice the pipeline
let ops = &ops[start..*offset_end];
for op in ops {
fmt.push_str(op.fmt());
fmt.push_str(" -> ")
}
start = *offset_end;
fmt.push_str(sink[0].fmt())
}
println!("{fmt}");
for pl in &self.rh_sides {
pl.show()
}
}

pub fn execute(&mut self, state: Box<dyn SExecutionContext>) -> PolarsResult<DataFrame> {
let ec = PExecutionContext::new(state);
let mut sink_out = self.run_pipeline(&ec)?;
let mut pipelines = self.rh_sides.iter_mut();
let mut sink_nodes = std::mem::take(&mut self.sink_nodes);

// This is a stack of operators that should replace the sinks of join nodes
// If we don't reorder joins, the order we run the pipelines coincide with the
// order the sinks need to be replaced, however this is not always the case
// if we reorder joins.
// This stack ensures we still replace the dummy operators even if they are all in
// the most right branch
let mut operators_to_replace: VecDeque<(Box<dyn Operator>, Node)> = VecDeque::new();

loop {
match &mut sink_out {
FinalizedSink::Finished(df) => return Ok(std::mem::take(df)),
Expand All @@ -236,10 +277,33 @@ impl PipeLine {
// we unwrap, because the latest pipeline should not return an Operator
let pipeline = pipelines.next().unwrap();

// First check the operators
// keep a counter as we also push to the front of deque
// otherwise we keep iterating
let mut remaining = operators_to_replace.len();
while let Some((op, sink_node)) = operators_to_replace.pop_back() {
if !pipeline.replace_operator(op.as_ref(), sink_node) {
operators_to_replace.push_front((op, sink_node))
} else {
}
if remaining == 0 {
break;
}
remaining -= 1;
}

// latest sink_node will be the operator, as the left side of the join
// always finishes that branch.
if let Some(sink_node) = sink_nodes.pop() {
pipeline.replace_operator(op.as_ref(), sink_node);
// if dummy that should be replaces is not found in this branch
// we push it to the operators stack that should be replaced
// on the next branch of the pipeline we first check this stack.
// this only happens if we reorder joins
if !pipeline.replace_operator(op.as_ref(), sink_node) {
let mut swap = Box::<Dummy>::default() as Box<dyn Operator>;
std::mem::swap(op, &mut swap);
operators_to_replace.push_back((swap, sink_node));
}
}
sink_out = pipeline.run_pipeline(&ec)?;
sink_nodes = std::mem::take(&mut pipeline.sink_nodes);
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-plan/src/dsl/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct JoinOptions {
pub how: JoinType,
pub suffix: Cow<'static, str>,
pub slice: Option<(i64, usize)>,
/// Proxy of the number of rows in both sides of the joins
/// Holds `(Option<known_size>, estimated_size)`
pub rows_left: (Option<usize>, usize),
pub rows_right: (Option<usize>, usize),
}
Expand Down
Loading

0 comments on commit 45ad9e3

Please sign in to comment.