Skip to content

Commit

Permalink
refactor(rust): Group arguments in conversion in a Context (#18418)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 28, 2024
1 parent fb38919 commit 20ce0f1
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 193 deletions.
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(

for lf in &mut inputs[1..] {
// Ensure we enable file caching if any lf has it enabled.
if lf.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
if lf.opt_state.contains(OptFlags::FILE_CACHING) {
opt_state |= OptFlags::FILE_CACHING;
}
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp)
Expand Down Expand Up @@ -67,8 +67,8 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(

for lf in &lfs[1..] {
// Ensure we enable file caching if any lf has it enabled.
if lf.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
if lf.opt_state.contains(OptFlags::FILE_CACHING) {
opt_state |= OptFlags::FILE_CACHING;
}
}

Expand Down
13 changes: 8 additions & 5 deletions crates/polars-lazy/src/frame/cached_arenas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ impl LazyFrame {
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<SchemaRef> {
let node = to_alp(self.logical_plan.clone(), expr_arena, lp_arena, false, true)?;
let node = to_alp(
self.logical_plan.clone(),
expr_arena,
lp_arena,
&mut OptFlags::schema_only(),
)?;

let schema = lp_arena.get(node).schema(lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
Expand Down Expand Up @@ -48,8 +53,7 @@ impl LazyFrame {
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
&mut OptFlags::schema_only(),
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();
Expand Down Expand Up @@ -83,8 +87,7 @@ impl LazyFrame {
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
&mut OptFlags::schema_only(),
)?;

let schema = arenas
Expand Down
73 changes: 36 additions & 37 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
use polars_ops::frame::JoinCoalesce;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
use polars_plan::global::FETCH_ROWS;
use smartstring::alias::String as SmartString;

Expand Down Expand Up @@ -74,15 +74,15 @@ impl IntoLazy for LazyFrame {
#[must_use]
pub struct LazyFrame {
pub logical_plan: DslPlan,
pub(crate) opt_state: OptState,
pub(crate) opt_state: OptFlags,
pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
}

impl From<DslPlan> for LazyFrame {
fn from(plan: DslPlan) -> Self {
Self {
logical_plan: plan,
opt_state: OptState::default() | OptState::FILE_CACHING,
opt_state: OptFlags::default() | OptFlags::FILE_CACHING,
cached_arena: Default::default(),
}
}
Expand All @@ -91,7 +91,7 @@ impl From<DslPlan> for LazyFrame {
impl LazyFrame {
pub(crate) fn from_inner(
logical_plan: DslPlan,
opt_state: OptState,
opt_state: OptFlags,
cached_arena: Arc<Mutex<Option<CachedArena>>>,
) -> Self {
Self {
Expand All @@ -105,11 +105,11 @@ impl LazyFrame {
DslBuilder::from(self.logical_plan)
}

fn get_opt_state(&self) -> OptState {
fn get_opt_state(&self) -> OptFlags {
self.opt_state
}

fn from_logical_plan(logical_plan: DslPlan, opt_state: OptState) -> Self {
fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
LazyFrame {
logical_plan,
opt_state,
Expand All @@ -118,91 +118,91 @@ impl LazyFrame {
}

/// Get current optimizations.
pub fn get_current_optimizations(&self) -> OptState {
pub fn get_current_optimizations(&self) -> OptFlags {
self.opt_state
}

/// Set allowed optimizations.
pub fn with_optimizations(mut self, opt_state: OptState) -> Self {
pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
self.opt_state = opt_state;
self
}

/// Turn off all optimizations.
pub fn without_optimizations(self) -> Self {
self.with_optimizations(OptState::from_bits_truncate(0) | OptState::TYPE_COERCION)
self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
}

/// Toggle projection pushdown optimization.
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::PROJECTION_PUSHDOWN, toggle);
self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
self
}

/// Toggle cluster with columns optimization.
pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::CLUSTER_WITH_COLUMNS, toggle);
self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
self
}

/// Toggle predicate pushdown optimization.
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::PREDICATE_PUSHDOWN, toggle);
self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
self
}

/// Toggle type coercion optimization.
pub fn with_type_coercion(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::TYPE_COERCION, toggle);
self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
self
}

/// Toggle expression simplification optimization on or off.
pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::SIMPLIFY_EXPR, toggle);
self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
self
}

/// Toggle common subplan elimination optimization on or off
#[cfg(feature = "cse")]
pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::COMM_SUBPLAN_ELIM, toggle);
self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
self
}

/// Toggle common subexpression elimination optimization on or off
#[cfg(feature = "cse")]
pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::COMM_SUBEXPR_ELIM, toggle);
self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
self
}

/// Toggle slice pushdown optimization.
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::SLICE_PUSHDOWN, toggle);
self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
self
}

/// Run nodes that are capably of doing so on the streaming engine.
pub fn with_streaming(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::STREAMING, toggle);
self.opt_state.set(OptFlags::STREAMING, toggle);
self
}

pub fn with_new_streaming(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::NEW_STREAMING, toggle);
self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
self
}

/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
pub fn with_row_estimate(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::ROW_ESTIMATE, toggle);
self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
self
}

/// Run every node eagerly. This turns off multi-node optimizations.
pub fn _with_eager(mut self, toggle: bool) -> Self {
self.opt_state.set(OptState::EAGER, toggle);
self.opt_state.set(OptFlags::EAGER, toggle);
self
}

Expand Down Expand Up @@ -566,8 +566,7 @@ impl LazyFrame {
self.logical_plan,
&mut expr_arena,
&mut lp_arena,
true,
true,
&mut self.opt_state,
)?;
let plan = IRPlan::new(node, lp_arena, expr_arena);
Ok(plan)
Expand All @@ -582,18 +581,18 @@ impl LazyFrame {
) -> PolarsResult<Node> {
#[allow(unused_mut)]
let mut opt_state = self.opt_state;
let streaming = self.opt_state.contains(OptState::STREAMING);
let new_streaming = self.opt_state.contains(OptState::NEW_STREAMING);
let streaming = self.opt_state.contains(OptFlags::STREAMING);
let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
#[cfg(feature = "cse")]
if streaming && !new_streaming {
opt_state &= !OptState::COMM_SUBPLAN_ELIM;
opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
}

// The new streaming engine can't deal with the way the common
// subexpression elimination adds length-incorrect with_columns.
#[cfg(feature = "cse")]
if new_streaming {
opt_state &= !OptState::COMM_SUBEXPR_ELIM;
opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
}

let lp_top = optimize(
Expand Down Expand Up @@ -626,7 +625,7 @@ impl LazyFrame {
scratch,
enable_fmt,
true,
opt_state.contains(OptState::ROW_ESTIMATE),
opt_state.contains(OptFlags::ROW_ESTIMATE),
)?;
}
#[cfg(not(feature = "streaming"))]
Expand Down Expand Up @@ -706,11 +705,11 @@ impl LazyFrame {
{
let auto_new_streaming =
std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
if self.opt_state.contains(OptState::NEW_STREAMING) || auto_new_streaming {
if self.opt_state.contains(OptFlags::NEW_STREAMING) || auto_new_streaming {
// Try to run using the new streaming engine, falling back
// if it fails in a todo!() error if auto_new_streaming is set.
let mut new_stream_lazy = self.clone();
new_stream_lazy.opt_state |= OptState::NEW_STREAMING;
new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
let mut alp_plan = new_stream_lazy.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
Expand Down Expand Up @@ -835,7 +834,7 @@ impl LazyFrame {
cloud_options: Option<polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
) -> PolarsResult<()> {
self.opt_state |= OptState::STREAMING;
self.opt_state |= OptFlags::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload: SinkType::Cloud {
Expand Down Expand Up @@ -890,7 +889,7 @@ impl LazyFrame {
feature = "json",
))]
fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> {
self.opt_state |= OptState::STREAMING;
self.opt_state |= OptFlags::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload,
Expand Down Expand Up @@ -1315,8 +1314,8 @@ impl LazyFrame {
args: JoinArgs,
) -> LazyFrame {
// if any of the nodes reads from files we must activate this this plan as well.
if other.opt_state.contains(OptState::FILE_CACHING) {
self.opt_state |= OptState::FILE_CACHING;
if other.opt_state.contains(OptFlags::FILE_CACHING) {
self.opt_state |= OptFlags::FILE_CACHING;
}

let left_on = left_on.as_ref().to_vec();
Expand Down Expand Up @@ -1825,7 +1824,7 @@ impl LazyFrame {
#[derive(Clone)]
pub struct LazyGroupBy {
pub logical_plan: DslPlan,
opt_state: OptState,
opt_state: OptFlags,
keys: Vec<Expr>,
maintain_order: bool,
#[cfg(feature = "dynamic_group_by")]
Expand Down Expand Up @@ -2053,8 +2052,8 @@ impl JoinBuilder {
let other = self.other.expect("with not set");

// If any of the nodes reads from files we must activate this this plan as well.
if other.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
if other.opt_state.contains(OptFlags::FILE_CACHING) {
opt_state |= OptFlags::FILE_CACHING;
}

let args = JoinArgs {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl LazyFileListReader for LazyCsvReader {
)?
.build()
.into();
lf.opt_state |= OptState::FILE_CACHING;
lf.opt_state |= OptFlags::FILE_CACHING;
Ok(lf)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl LazyFileListReader for LazyIpcReader {
)?
.build()
.into();
lf.opt_state |= OptState::FILE_CACHING;
lf.opt_state |= OptFlags::FILE_CACHING;

Ok(lf)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl LazyFileListReader for LazyParquetReader {
lf = lf.with_row_index(&row_index.name, Some(row_index.offset))
}

lf.opt_state |= OptState::FILE_CACHING;
lf.opt_state |= OptFlags::FILE_CACHING;
Ok(lf)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn filter_added_column_issue_2470() -> PolarsResult<()> {
fn filter_blocked_by_map() -> PolarsResult<()> {
let df = fruits_cars();

let allowed = OptState::default() & !OptState::PREDICATE_PUSHDOWN;
let allowed = OptFlags::default() & !OptFlags::PREDICATE_PUSHDOWN;
let q = df
.lazy()
.map(Ok, allowed, None, None)
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,15 @@ fn test_simplify_expr() {

let mut expr_arena = Arena::new();
let mut lp_arena = Arena::new();
let lp_top = to_alp(plan, &mut expr_arena, &mut lp_arena, true, false).unwrap();

#[allow(const_item_mutation)]
let lp_top = to_alp(
plan,
&mut expr_arena,
&mut lp_arena,
&mut OptFlags::SIMPLIFY_EXPR,
)
.unwrap();
let plan = node_to_lp(lp_top, &expr_arena, &mut lp_arena);
assert!(
matches!(plan, DslPlan::Select{ expr, ..} if matches!(&expr[0], Expr::BinaryExpr{left, ..} if **left == Expr::Literal(LiteralValue::Float(2.0))))
Expand Down Expand Up @@ -637,7 +645,7 @@ fn test_type_coercion() {

let mut expr_arena = Arena::new();
let mut lp_arena = Arena::new();
let lp_top = to_alp(lp, &mut expr_arena, &mut lp_arena, true, true).unwrap();
let lp_top = to_alp(lp, &mut expr_arena, &mut lp_arena, &mut OptFlags::default()).unwrap();
let lp = node_to_lp(lp_top, &expr_arena, &mut lp_arena);

if let DslPlan::Select { expr, .. } = lp {
Expand Down
Loading

0 comments on commit 20ce0f1

Please sign in to comment.