Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Group arguments in conversion in a Context #18418

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(

for lf in &mut inputs[1..] {
// Ensure we enable file caching if any lf has it enabled.
if lf.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
if lf.opt_state.contains(OptFlags::FILE_CACHING) {
opt_state |= OptFlags::FILE_CACHING;
}
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp)
Expand Down Expand Up @@ -67,8 +67,8 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(

for lf in &lfs[1..] {
// Ensure we enable file caching if any lf has it enabled.
if lf.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
if lf.opt_state.contains(OptFlags::FILE_CACHING) {
opt_state |= OptFlags::FILE_CACHING;
}
}

Expand Down
13 changes: 8 additions & 5 deletions crates/polars-lazy/src/frame/cached_arenas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ impl LazyFrame {
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<SchemaRef> {
let node = to_alp(self.logical_plan.clone(), expr_arena, lp_arena, false, true)?;
let node = to_alp(
self.logical_plan.clone(),
expr_arena,
lp_arena,
&mut OptFlags::schema_only(),
)?;

let schema = lp_arena.get(node).schema(lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
Expand Down Expand Up @@ -48,8 +53,7 @@ impl LazyFrame {
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
&mut OptFlags::schema_only(),
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();
Expand Down Expand Up @@ -83,8 +87,7 @@ impl LazyFrame {
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
&mut OptFlags::schema_only(),
)?;

let schema = arenas
Expand Down
73 changes: 36 additions & 37 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
use polars_ops::frame::JoinCoalesce;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
use polars_plan::global::FETCH_ROWS;
use smartstring::alias::String as SmartString;

Expand Down Expand Up @@ -74,15 +74,15 @@ impl IntoLazy for LazyFrame {
#[must_use]
pub struct LazyFrame {
pub logical_plan: DslPlan,
pub(crate) opt_state: OptState,
pub(crate) opt_state: OptFlags,
pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
}

impl From<DslPlan> for LazyFrame {
fn from(plan: DslPlan) -> Self {
Self {
logical_plan: plan,
opt_state: OptState::default() | OptState::FILE_CACHING,
opt_state: OptFlags::default() | OptFlags::FILE_CACHING,
cached_arena: Default::default(),
}
}
Expand All @@ -91,7 +91,7 @@ impl From<DslPlan> for LazyFrame {
impl LazyFrame {
pub(crate) fn from_inner(
logical_plan: DslPlan,
opt_state: OptState,
opt_state: OptFlags,
cached_arena: Arc<Mutex<Option<CachedArena>>>,
) -> Self {
Self {
Expand All @@ -105,11 +105,11 @@ impl LazyFrame {
DslBuilder::from(self.logical_plan)
}

fn get_opt_state(&self) -> OptState {
fn get_opt_state(&self) -> OptFlags {
self.opt_state
}

fn from_logical_plan(logical_plan: DslPlan, opt_state: OptState) -> Self {
fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
LazyFrame {
logical_plan,
opt_state,
Expand All @@ -118,91 +118,91 @@ impl LazyFrame {
}

/// Get current optimizations.
pub fn get_current_optimizations(&self) -> OptState {
pub fn get_current_optimizations(&self) -> OptFlags {
self.opt_state
}

/// Set allowed optimizations.
pub fn with_optimizations(mut self, opt_state: OptState) -> Self {
pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
self.opt_state = opt_state;
self
}

/// Turn off all optimizations.
pub fn without_optimizations(self) -> Self {
self.with_optimizations(OptState::from_bits_truncate(0) | OptState::TYPE_COERCION)
self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
}

/// Toggle projection pushdown optimization.
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::PROJECTION_PUSHDOWN, toggle);
self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
self
}

/// Toggle cluster with columns optimization.
pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::CLUSTER_WITH_COLUMNS, toggle);
self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
self
}

/// Toggle predicate pushdown optimization.
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::PREDICATE_PUSHDOWN, toggle);
self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
self
}

/// Toggle type coercion optimization.
pub fn with_type_coercion(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::TYPE_COERCION, toggle);
self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
self
}

/// Toggle expression simplification optimization on or off.
pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::SIMPLIFY_EXPR, toggle);
self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
self
}

/// Toggle common subplan elimination optimization on or off
#[cfg(feature = "cse")]
pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::COMM_SUBPLAN_ELIM, toggle);
self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
self
}

/// Toggle common subexpression elimination optimization on or off
#[cfg(feature = "cse")]
pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::COMM_SUBEXPR_ELIM, toggle);
self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
self
}

/// Toggle slice pushdown optimization.
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::SLICE_PUSHDOWN, toggle);
self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
self
}

/// Run nodes that are capably of doing so on the streaming engine.
pub fn with_streaming(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::STREAMING, toggle);
self.opt_state.set(OptFlags::STREAMING, toggle);
self
}

pub fn with_new_streaming(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::NEW_STREAMING, toggle);
self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
self
}

/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
pub fn with_row_estimate(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::ROW_ESTIMATE, toggle);
self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
self
}

/// Run every node eagerly. This turns off multi-node optimizations.
pub fn _with_eager(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::EAGER, toggle);
self.opt_state.set(OptFlags::EAGER, toggle);
self
}

Expand Down Expand Up @@ -566,8 +566,7 @@ impl LazyFrame {
self.logical_plan,
&mut expr_arena,
&mut lp_arena,
true,
true,
&mut self.opt_state,
)?;
let plan = IRPlan::new(node, lp_arena, expr_arena);
Ok(plan)
Expand All @@ -582,18 +581,18 @@ impl LazyFrame {
) -> PolarsResult<Node> {
#[allow(unused_mut)]
let mut opt_state = self.opt_state;
let streaming = self.opt_state.contains(OptState::STREAMING);
let new_streaming = self.opt_state.contains(OptState::NEW_STREAMING);
let streaming = self.opt_state.contains(OptFlags::STREAMING);
let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
#[cfg(feature = "cse")]
if streaming && !new_streaming {
opt_state &= !OptState::COMM_SUBPLAN_ELIM;
opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
}

// The new streaming engine can't deal with the way the common
// subexpression elimination adds length-incorrect with_columns.
#[cfg(feature = "cse")]
if new_streaming {
opt_state &= !OptState::COMM_SUBEXPR_ELIM;
opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
}

let lp_top = optimize(
Expand Down Expand Up @@ -626,7 +625,7 @@ impl LazyFrame {
scratch,
enable_fmt,
true,
opt_state.contains(OptState::ROW_ESTIMATE),
opt_state.contains(OptFlags::ROW_ESTIMATE),
)?;
}
#[cfg(not(feature = "streaming"))]
Expand Down Expand Up @@ -706,11 +705,11 @@ impl LazyFrame {
{
let auto_new_streaming =
std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
if self.opt_state.contains(OptState::NEW_STREAMING) || auto_new_streaming {
if self.opt_state.contains(OptFlags::NEW_STREAMING) || auto_new_streaming {
// Try to run using the new streaming engine, falling back
// if it fails in a todo!() error if auto_new_streaming is set.
let mut new_stream_lazy = self.clone();
new_stream_lazy.opt_state |= OptState::NEW_STREAMING;
new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
let mut alp_plan = new_stream_lazy.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
Expand Down Expand Up @@ -835,7 +834,7 @@ impl LazyFrame {
cloud_options: Option<polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
) -> PolarsResult<()> {
self.opt_state |= OptState::STREAMING;
self.opt_state |= OptFlags::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload: SinkType::Cloud {
Expand Down Expand Up @@ -890,7 +889,7 @@ impl LazyFrame {
feature = "json",
))]
fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> {
self.opt_state |= OptState::STREAMING;
self.opt_state |= OptFlags::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload,
Expand Down Expand Up @@ -1315,8 +1314,8 @@ impl LazyFrame {
args: JoinArgs,
) -> LazyFrame {
// if any of the nodes reads from files we must activate this this plan as well.
if other.opt_state.contains(OptState::FILE_CACHING) {
self.opt_state |= OptState::FILE_CACHING;
if other.opt_state.contains(OptFlags::FILE_CACHING) {
self.opt_state |= OptFlags::FILE_CACHING;
}

let left_on = left_on.as_ref().to_vec();
Expand Down Expand Up @@ -1825,7 +1824,7 @@ impl LazyFrame {
#[derive(Clone)]
pub struct LazyGroupBy {
pub logical_plan: DslPlan,
opt_state: OptState,
opt_state: OptFlags,
keys: Vec<Expr>,
maintain_order: bool,
#[cfg(feature = "dynamic_group_by")]
Expand Down Expand Up @@ -2053,8 +2052,8 @@ impl JoinBuilder {
let other = self.other.expect("with not set");

// If any of the nodes reads from files we must activate this this plan as well.
if other.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
if other.opt_state.contains(OptFlags::FILE_CACHING) {
opt_state |= OptFlags::FILE_CACHING;
}

let args = JoinArgs {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl LazyFileListReader for LazyCsvReader {
)?
.build()
.into();
lf.opt_state |= OptState::FILE_CACHING;
lf.opt_state |= OptFlags::FILE_CACHING;
Ok(lf)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl LazyFileListReader for LazyIpcReader {
)?
.build()
.into();
lf.opt_state |= OptState::FILE_CACHING;
lf.opt_state |= OptFlags::FILE_CACHING;

Ok(lf)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl LazyFileListReader for LazyParquetReader {
lf = lf.with_row_index(&row_index.name, Some(row_index.offset))
}

lf.opt_state |= OptState::FILE_CACHING;
lf.opt_state |= OptFlags::FILE_CACHING;
Ok(lf)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn filter_added_column_issue_2470() -> PolarsResult<()> {
fn filter_blocked_by_map() -> PolarsResult<()> {
let df = fruits_cars();

let allowed = OptState::default() & !OptState::PREDICATE_PUSHDOWN;
let allowed = OptFlags::default() & !OptFlags::PREDICATE_PUSHDOWN;
let q = df
.lazy()
.map(Ok, allowed, None, None)
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,15 @@ fn test_simplify_expr() {

let mut expr_arena = Arena::new();
let mut lp_arena = Arena::new();
let lp_top = to_alp(plan, &mut expr_arena, &mut lp_arena, true, false).unwrap();

#[allow(const_item_mutation)]
let lp_top = to_alp(
plan,
&mut expr_arena,
&mut lp_arena,
&mut OptFlags::SIMPLIFY_EXPR,
)
.unwrap();
let plan = node_to_lp(lp_top, &expr_arena, &mut lp_arena);
assert!(
matches!(plan, DslPlan::Select{ expr, ..} if matches!(&expr[0], Expr::BinaryExpr{left, ..} if **left == Expr::Literal(LiteralValue::Float(2.0))))
Expand Down Expand Up @@ -637,7 +645,7 @@ fn test_type_coercion() {

let mut expr_arena = Arena::new();
let mut lp_arena = Arena::new();
let lp_top = to_alp(lp, &mut expr_arena, &mut lp_arena, true, true).unwrap();
let lp_top = to_alp(lp, &mut expr_arena, &mut lp_arena, &mut OptFlags::default()).unwrap();
let lp = node_to_lp(lp_top, &expr_arena, &mut lp_arena);

if let DslPlan::Select { expr, .. } = lp {
Expand Down
Loading