Skip to content

Commit

Permalink
feat(rust, python): Streaming joins architecture and Cross join imple…
Browse files Browse the repository at this point in the history
…mentation. (#5339)
  • Loading branch information
ritchie46 committed Oct 31, 2022
1 parent 4dd178b commit e1c25e7
Show file tree
Hide file tree
Showing 34 changed files with 1,044 additions and 380 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use crate::prelude::groupby::IntoGroupsProxy;
use crate::frame::groupby::IntoGroupsProxy;

impl CategoricalChunked {
pub fn unique(&self) -> PolarsResult<Self> {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/asof_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl DataFrame {
)
};

self.finish_join(left, right_df, suffix)
self.finish_join(left, right_df, suffix.as_deref())
}

/// This is similar to a left-join except that we match on nearest key rather than equal keys.
Expand Down
44 changes: 39 additions & 5 deletions polars/polars-core/src/frame/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ fn take_right(total_rows: IdxSize, n_rows_right: IdxSize, slice: Option<(i64, us
}

impl DataFrame {
/// Creates the cartesian product from both frames, preserves the order of the left keys.
pub(crate) fn cross_join(
fn cross_join_dfs(
&self,
other: &DataFrame,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {
parallel: bool,
) -> PolarsResult<(DataFrame, DataFrame)> {
let n_rows_left = self.height() as IdxSize;
let n_rows_right = other.height() as IdxSize;
let total_rows = n_rows_right * n_rows_left;
Expand Down Expand Up @@ -78,8 +77,43 @@ impl DataFrame {
concat_df_unchecked(iter)
}
};
let (l_df, r_df) = if parallel {
POOL.install(|| rayon::join(create_left_df, create_right_df))
} else {
(create_left_df(), create_right_df())
};
Ok((l_df, r_df))
}

#[doc(hidden)]
/// used by streaming
pub fn _cross_join_with_names(
&self,
other: &DataFrame,
names: &[String],
) -> PolarsResult<DataFrame> {
let (mut l_df, r_df) = self.cross_join_dfs(other, None, false)?;
l_df.get_columns_mut().extend_from_slice(&r_df.columns);

let (l_df, r_df) = POOL.install(|| rayon::join(create_left_df, create_right_df));
l_df.get_columns_mut()
.iter_mut()
.zip(names)
.for_each(|(s, name)| {
if s.name() != name {
s.rename(name);
}
});
Ok(l_df)
}

/// Creates the cartesian product from both frames, preserves the order of the left keys.
pub fn cross_join(
&self,
other: &DataFrame,
suffix: Option<&str>,
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {
let (l_df, r_df) = self.cross_join_dfs(other, slice, true)?;

self.finish_join(l_df, r_df, suffix)
}
Expand Down
20 changes: 10 additions & 10 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl DataFrame {
&self,
mut df_left: DataFrame,
mut df_right: DataFrame,
suffix: Option<String>,
suffix: Option<&str>,
) -> PolarsResult<DataFrame> {
let mut left_names = PlHashSet::with_capacity(df_left.width());

Expand All @@ -289,7 +289,7 @@ impl DataFrame {
rename_strs.push(series.name().to_owned())
}
});
let suffix = suffix.as_deref().unwrap_or("_right");
let suffix = suffix.unwrap_or("_right");

for name in rename_strs {
df_right.rename(&name, &format!("{}{}", name, suffix))?;
Expand Down Expand Up @@ -354,7 +354,7 @@ impl DataFrame {
) -> PolarsResult<DataFrame> {
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
return self.cross_join(other, suffix, slice);
return self.cross_join(other, suffix.as_deref(), slice);
}

#[cfg(feature = "chunked_ids")]
Expand Down Expand Up @@ -517,7 +517,7 @@ impl DataFrame {
._take_unchecked_slice(join_idx_right, true)
},
);
self.finish_join(df_left, df_right, suffix)
self.finish_join(df_left, df_right, suffix.as_deref())
}
JoinType::Left => {
let mut left = DataFrame::new_no_checks(selected_left_physical);
Expand Down Expand Up @@ -565,7 +565,7 @@ impl DataFrame {
}
keys.extend_from_slice(df_left.get_columns());
let df_left = DataFrame::new_no_checks(keys);
self.finish_join(df_left, df_right, suffix)
self.finish_join(df_left, df_right, suffix.as_deref())
}
#[cfg(feature = "asof_join")]
JoinType::AsOf(_) => Err(PolarsError::ComputeError(
Expand Down Expand Up @@ -638,7 +638,7 @@ impl DataFrame {
{
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
return self.cross_join(other, suffix, None);
return self.cross_join(other, suffix.as_deref(), None);
}
let selected_left = self.select_series(left_on)?;
let selected_right = other.select_series(right_on)?;
Expand Down Expand Up @@ -709,7 +709,7 @@ impl DataFrame {
._take_unchecked_slice(join_tuples_right, true)
},
);
self.finish_join(df_left, df_right, suffix)
self.finish_join(df_left, df_right, suffix.as_deref())
}

/// Perform a left join on two DataFrames
Expand Down Expand Up @@ -789,7 +789,7 @@ impl DataFrame {
};
let (df_left, df_right) = POOL.join(materialize_left, materialize_right);

self.finish_join(df_left, df_right, suffix)
self.finish_join(df_left, df_right, suffix.as_deref())
}

#[cfg(feature = "chunked_ids")]
Expand Down Expand Up @@ -840,7 +840,7 @@ impl DataFrame {
};
let (df_left, df_right) = POOL.join(materialize_left, materialize_right);

self.finish_join(df_left, df_right, suffix)
self.finish_join(df_left, df_right, suffix.as_deref())
}

pub(crate) fn left_join_from_series(
Expand Down Expand Up @@ -976,7 +976,7 @@ impl DataFrame {
};

df_left.get_columns_mut().insert(join_column_index, s);
self.finish_join(df_left, df_right, suffix)
self.finish_join(df_left, df_right, suffix.as_deref())
}
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ pub use crate::error::{PolarsError, PolarsResult};
pub use crate::frame::asof_join::*;
pub use crate::frame::explode::MeltArgs;
pub(crate) use crate::frame::groupby::aggregations::*;
pub use crate::frame::groupby::{GroupsIdx, GroupsProxy, GroupsSlice};
pub use crate::frame::groupby::{GroupsIdx, GroupsProxy, GroupsSlice, IntoGroupsProxy};
pub use crate::frame::hash_join::JoinType;
pub(crate) use crate::frame::hash_join::*;
pub use crate::frame::*;
pub use crate::frame::{DataFrame, UniqueKeepStrategy};
pub use crate::named_from::{NamedFrom, NamedFromOwned};
pub use crate::schema::*;
#[cfg(feature = "checked_arithmetic")]
Expand Down
34 changes: 21 additions & 13 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ pub fn split_series(s: &Series, n: usize) -> PolarsResult<Vec<Series>> {
}

fn flatten_df(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
df.iter_chunks().map(|chunk| {
DataFrame::new_no_checks(
df.iter_chunks().flat_map(|chunk| {
let df = DataFrame::new_no_checks(
df.iter()
.zip(chunk.into_arrays())
.map(|(s, arr)| {
Expand All @@ -153,19 +153,15 @@ fn flatten_df(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
}
})
.collect(),
)
);
if df.height() == 0 {
None
} else {
Some(df)
}
})
}

#[cfg(feature = "private")]
#[doc(hidden)]
/// Split a [`DataFrame`] into `n` parts. We take a `&mut` to be able to repartition/align chunks.
pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
if n == 0 {
return Ok(vec![df.clone()]);
}
// make sure that chunks are aligned.
df.rechunk();
pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
let total_len = df.height();
let chunk_size = total_len / n;

Expand Down Expand Up @@ -199,6 +195,18 @@ pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
Ok(out)
}

#[cfg(feature = "private")]
#[doc(hidden)]
/// Split a [`DataFrame`] into `n` parts. We take a `&mut` to be able to repartition/align chunks.
pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
if n == 0 {
return Ok(vec![df.clone()]);
}
// make sure that chunks are aligned.
df.rechunk();
split_df_as_ref(df, n)
}

pub fn slice_slice<T>(vals: &[T], offset: i64, len: usize) -> &[T] {
let (raw_offset, slice_len) = slice_offsets(offset, len, vals.len());
&vals[raw_offset..raw_offset + slice_len]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ is_in = ["polars-plan/is_in"]
repeat_by = ["polars-plan/repeat_by"]
round_series = ["polars-plan/round_series"]
is_first = ["polars-plan/is_first"]
cross_join = ["polars-plan/cross_join"]
cross_join = ["polars-plan/cross_join", "polars-pipe/cross_join"]
asof_join = ["polars-plan/asof_join", "polars-time"]
dot_product = ["polars-plan/dot_product"]
concat_str = ["polars-plan/concat_str"]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ rayon.workspace = true
csv-file = ["polars-plan/csv-file", "polars-io/csv-file"]
parquet = ["polars-plan/parquet", "polars-io/parquet"]
nightly = ["polars-core/nightly", "polars-utils/nightly", "hashbrown/nightly"]
cross_join = ["polars-core/cross_join"]
20 changes: 20 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use polars_core::error::PolarsResult;

use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Default)]
pub struct Dummy {}

impl Operator for Dummy {
fn execute(
&mut self,
_context: &PExecutionContext,
_chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
panic!("dummy should be replaced")
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(Self {})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use polars_core::error::{PolarsError, PolarsResult};
use crate::expressions::PhysicalPipedExpr;
use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Clone)]
pub(crate) struct FilterOperator {
pub(crate) predicate: Arc<dyn PhysicalPipedExpr>,
}

impl Operator for FilterOperator {
fn execute(
&self,
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
Expand All @@ -29,4 +30,7 @@ impl Operator for FilterOperator {

Ok(OperatorResult::Finished(chunk.with_data(df)))
}
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod dummy;
mod filter;
mod projection;

pub(crate) use dummy::Dummy;
pub(crate) use filter::*;
pub(crate) use projection::*;
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ use polars_core::frame::DataFrame;
use crate::expressions::PhysicalPipedExpr;
use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Clone)]
pub(crate) struct FastProjectionOperator {
pub(crate) columns: Arc<Vec<Arc<str>>>,
}

impl Operator for FastProjectionOperator {
fn execute(
&self,
&mut self,
_context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
let chunk = chunk.with_data(chunk.data.select(self.columns.as_slice())?);
Ok(OperatorResult::Finished(chunk))
}
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
}

#[derive(Clone)]
pub(crate) struct ProjectionOperator {
pub(crate) exprs: Vec<Arc<dyn PhysicalPipedExpr>>,
}

impl Operator for ProjectionOperator {
fn execute(
&self,
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
Expand All @@ -40,4 +45,34 @@ impl Operator for ProjectionOperator {
let chunk = chunk.with_data(DataFrame::new_no_checks(projected));
Ok(OperatorResult::Finished(chunk))
}
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
}

#[derive(Clone)]
pub(crate) struct HstackOperator {
pub(crate) exprs: Vec<Arc<dyn PhysicalPipedExpr>>,
}

impl Operator for HstackOperator {
fn execute(
&mut self,
context: &PExecutionContext,
chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
let projected = self
.exprs
.iter()
.map(|e| e.evaluate(chunk, context.execution_state.as_ref()))
.collect::<PolarsResult<Vec<_>>>()?;

let df = chunk.data.hstack(&projected)?;

let chunk = chunk.with_data(df);
Ok(OperatorResult::Finished(chunk))
}
fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(self.clone())
}
}
Loading

0 comments on commit e1c25e7

Please sign in to comment.