Skip to content

Commit

Permalink
refactor(rust): Disable common sub-expr elim for new streaming engine
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Aug 23, 2024
1 parent cdc741d commit 7107f53
Showing 1 changed file with 42 additions and 39 deletions.
81 changes: 42 additions & 39 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,19 @@ impl LazyFrame {
#[allow(unused_mut)]
let mut opt_state = self.opt_state;
let streaming = self.opt_state.contains(OptState::STREAMING);
let new_streaming = self.opt_state.contains(OptState::NEW_STREAMING);
#[cfg(feature = "cse")]
if streaming && self.opt_state.contains(OptState::COMM_SUBPLAN_ELIM) {
if streaming && !new_streaming {
opt_state &= !OptState::COMM_SUBPLAN_ELIM;
}

// The new streaming engine can't deal with the way the common
// subexpression elimination adds length-incorrect with_columns.
#[cfg(feature = "cse")]
if new_streaming {
opt_state &= !OptState::COMM_SUBEXPR_ELIM;
}

let lp_top = optimize(
self.logical_plan,
opt_state,
Expand Down Expand Up @@ -694,48 +703,42 @@ impl LazyFrame {
pub fn collect(self) -> PolarsResult<DataFrame> {
#[cfg(feature = "new_streaming")]
{
let force_new_streaming = self.opt_state.contains(OptState::NEW_STREAMING);
let mut alp_plan = self.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
payload: SinkType::Memory,
});

if force_new_streaming {
return polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena,
&alp_plan.expr_arena,
);
}

if std::env::var("POLARS_AUTO_NEW_STREAMING")
.as_deref()
.unwrap_or("")
== "1"
{
let f = || {
polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena.clone(),
&alp_plan.expr_arena,
)
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(r) => return r,
Err(e) => {
// Fallback to normal engine if error is due to not being implemented,
// otherwise propagate error.
if e.downcast_ref::<&str>() != Some(&"not yet implemented") {
if polars_core::config::verbose() {
eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine");
if !self.opt_state.contains(OptState::NEW_STREAMING) {
if std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1") {
// Try to run using the new streaming engine, falling back
// if it fails in a todo!() error.
let mut new_stream_lazy = self.clone();
new_stream_lazy.opt_state |= OptState::NEW_STREAMING;
let mut alp_plan = new_stream_lazy.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
payload: SinkType::Memory,
});

let f = || {
polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena.clone(),
&alp_plan.expr_arena,
)
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(r) => return r,
Err(e) => {
// Fallback to normal engine if error is due to not being implemented,
// otherwise propagate error.
if e.downcast_ref::<&str>() != Some(&"not yet implemented") {
if polars_core::config::verbose() {
eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine");
}
std::panic::resume_unwind(e);
}
std::panic::resume_unwind(e);
}
},
},
}
}
}

let mut alp_plan = self.to_alp_optimized()?;
let mut physical_plan = create_physical_plan(
alp_plan.lp_top,
&mut alp_plan.lp_arena,
Expand Down

0 comments on commit 7107f53

Please sign in to comment.