Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Aug 23, 2024
1 parent 7107f53 commit 64c871e
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,38 +703,38 @@ impl LazyFrame {
pub fn collect(self) -> PolarsResult<DataFrame> {
#[cfg(feature = "new_streaming")]
{
if !self.opt_state.contains(OptState::NEW_STREAMING) {
if std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1") {
// Try to run using the new streaming engine, falling back
// if it fails in a todo!() error.
let mut new_stream_lazy = self.clone();
new_stream_lazy.opt_state |= OptState::NEW_STREAMING;
let mut alp_plan = new_stream_lazy.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
payload: SinkType::Memory,
});

let f = || {
polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena.clone(),
&alp_plan.expr_arena,
)
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(r) => return r,
Err(e) => {
// Fallback to normal engine if error is due to not being implemented,
// otherwise propagate error.
if e.downcast_ref::<&str>() != Some(&"not yet implemented") {
if polars_core::config::verbose() {
eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine");
}
std::panic::resume_unwind(e);
if !self.opt_state.contains(OptState::NEW_STREAMING)
&& std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1")
{
// Try to run using the new streaming engine, falling back
// if it fails in a todo!() error.
let mut new_stream_lazy = self.clone();
new_stream_lazy.opt_state |= OptState::NEW_STREAMING;
let mut alp_plan = new_stream_lazy.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
payload: SinkType::Memory,
});

let f = || {
polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena.clone(),
&alp_plan.expr_arena,
)
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(r) => return r,
Err(e) => {
// Fallback to normal engine if error is due to not being implemented,
// otherwise propagate error.
if e.downcast_ref::<&str>() != Some(&"not yet implemented") {
if polars_core::config::verbose() {
eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine");
}
},
}
std::panic::resume_unwind(e);
}
},
}
}

Expand Down

0 comments on commit 64c871e

Please sign in to comment.