diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index a2825696c6916..4122bacc65e3a 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -703,38 +703,38 @@ impl LazyFrame { pub fn collect(self) -> PolarsResult { #[cfg(feature = "new_streaming")] { - if !self.opt_state.contains(OptState::NEW_STREAMING) { - if std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1") { - // Try to run using the new streaming engine, falling back - // if it fails in a todo!() error. - let mut new_stream_lazy = self.clone(); - new_stream_lazy.opt_state |= OptState::NEW_STREAMING; - let mut alp_plan = new_stream_lazy.to_alp_optimized()?; - let stream_lp_top = alp_plan.lp_arena.add(IR::Sink { - input: alp_plan.lp_top, - payload: SinkType::Memory, - }); - - let f = || { - polars_stream::run_query( - stream_lp_top, - alp_plan.lp_arena.clone(), - &alp_plan.expr_arena, - ) - }; - match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) { - Ok(r) => return r, - Err(e) => { - // Fallback to normal engine if error is due to not being implemented, - // otherwise propagate error. - if e.downcast_ref::<&str>() != Some(&"not yet implemented") { - if polars_core::config::verbose() { - eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine"); - } - std::panic::resume_unwind(e); + if !self.opt_state.contains(OptState::NEW_STREAMING) + && std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1") + { + // Try to run using the new streaming engine, falling back + // if it fails in a todo!() error. + let mut new_stream_lazy = self.clone(); + new_stream_lazy.opt_state |= OptState::NEW_STREAMING; + let mut alp_plan = new_stream_lazy.to_alp_optimized()?; + let stream_lp_top = alp_plan.lp_arena.add(IR::Sink { + input: alp_plan.lp_top, + payload: SinkType::Memory, + }); + + let f = || { + polars_stream::run_query( + stream_lp_top, + alp_plan.lp_arena.clone(), + &alp_plan.expr_arena, + ) + }; + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) { + Ok(r) => return r, + Err(e) => { + // Fallback to normal engine if error is due to not being implemented, + // otherwise propagate error. + if e.downcast_ref::<&str>() != Some(&"not yet implemented") { + if polars_core::config::verbose() { + eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine"); } - }, - } + std::panic::resume_unwind(e); + } + }, } }