Skip to content

Commit

Permalink
perf: Use bitflags for OptState
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 22, 2024
1 parent 2eb1ac2 commit 2e322aa
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 144 deletions.
13 changes: 9 additions & 4 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! Function on multiple expressions.
//!

use polars_core::prelude::*;
pub use polars_plan::dsl::functions::*;
use polars_plan::prelude::UnionArgs;
Expand All @@ -28,8 +29,10 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
lps.push(lf.logical_plan);

for lf in &mut inputs[1..] {
// ensure we enable file caching if any lf has it enabled
opt_state.file_caching |= lf.opt_state.file_caching;
// Ensure we enable file caching if any lf has it enabled.
if lf.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
}
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp)
}
Expand Down Expand Up @@ -63,8 +66,10 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
)?;

for lf in &lfs[1..] {
// ensure we enable file caching if any lf has it enabled
opt_state.file_caching |= lf.opt_state.file_caching;
// Ensure we enable file caching if any lf has it enabled.
if lf.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
}
}

let options = HConcatOptions {
Expand Down
73 changes: 28 additions & 45 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ impl From<DslPlan> for LazyFrame {
fn from(plan: DslPlan) -> Self {
Self {
logical_plan: plan,
opt_state: OptState {
file_caching: true,
..Default::default()
},
opt_state: OptState::default() | OptState::FILE_CACHING,
cached_arena: Default::default(),
}
}
Expand Down Expand Up @@ -132,97 +129,79 @@ impl LazyFrame {

/// Turn off all optimizations.
pub fn without_optimizations(self) -> Self {
self.with_optimizations(OptState {
projection_pushdown: false,
predicate_pushdown: false,
cluster_with_columns: false,
type_coercion: true,
simplify_expr: false,
slice_pushdown: false,
// will be toggled by a scan operation such as csv scan or parquet scan
file_caching: false,
#[cfg(feature = "cse")]
comm_subplan_elim: false,
#[cfg(feature = "cse")]
comm_subexpr_elim: false,
streaming: false,
eager: false,
fast_projection: false,
row_estimate: false,
new_streaming: false,
})
self.with_optimizations(OptState::from_bits_truncate(0))
}

/// Toggle projection pushdown optimization.
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.projection_pushdown = toggle;
self.opt_state.set(OptState::PROJECTION_PUSHDOWN, toggle);
self
}

/// Toggle cluster with columns optimization.
pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
self.opt_state.cluster_with_columns = toggle;
self.opt_state.set(OptState::CLUSTER_WITH_COLUMNS, toggle);
self
}

/// Toggle predicate pushdown optimization.
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.predicate_pushdown = toggle;
self.opt_state.set(OptState::PREDICATE_PUSHDOWN, toggle);
self
}

/// Toggle type coercion optimization.
pub fn with_type_coercion(mut self, toggle: bool) -> Self {
self.opt_state.type_coercion = toggle;
self.opt_state.set(OptState::TYPE_COERCION, toggle);
self
}

/// Toggle expression simplification optimization on or off.
pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
self.opt_state.simplify_expr = toggle;
self.opt_state.set(OptState::SIMPLIFY_EXPR, toggle);
self
}

/// Toggle common subplan elimination optimization on or off
#[cfg(feature = "cse")]
pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
self.opt_state.comm_subplan_elim = toggle;
self.opt_state.set(OptState::COMM_SUBPLAN_ELIM, toggle);
self
}

/// Toggle common subexpression elimination optimization on or off
#[cfg(feature = "cse")]
pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
self.opt_state.comm_subexpr_elim = toggle;
self.opt_state.set(OptState::COMM_SUBEXPR_ELIM, toggle);
self
}

/// Toggle slice pushdown optimization.
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.slice_pushdown = toggle;
self.opt_state.set(OptState::SLICE_PUSHDOWN, toggle);
self
}

/// Run nodes that are capably of doing so on the streaming engine.
pub fn with_streaming(mut self, toggle: bool) -> Self {
self.opt_state.streaming = toggle;
self.opt_state.set(OptState::STREAMING, toggle);
self
}

pub fn with_new_streaming(mut self, toggle: bool) -> Self {
self.opt_state.new_streaming = toggle;
self.opt_state.set(OptState::NEW_STREAMING, toggle);
self
}

/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
pub fn with_row_estimate(mut self, toggle: bool) -> Self {
self.opt_state.row_estimate = toggle;
self.opt_state.set(OptState::ROW_ESTIMATE, toggle);
self
}

/// Run every node eagerly. This turns off multi-node optimizations.
pub fn _with_eager(mut self, toggle: bool) -> Self {
self.opt_state.eager = toggle;
self.opt_state.set(OptState::EAGER, toggle);
self
}

Expand Down Expand Up @@ -607,10 +586,10 @@ impl LazyFrame {
) -> PolarsResult<Node> {
#[allow(unused_mut)]
let mut opt_state = self.opt_state;
let streaming = self.opt_state.streaming;
let streaming = self.opt_state.contains(OptState::STREAMING);
#[cfg(feature = "cse")]
if streaming && self.opt_state.comm_subplan_elim {
opt_state.comm_subplan_elim = false;
if streaming && self.opt_state.contains(OptState::COMM_SUBPLAN_ELIM) {
opt_state &= !OptState::COMM_SUBPLAN_ELIM;
}
let lp_top = optimize(
self.logical_plan,
Expand Down Expand Up @@ -642,7 +621,7 @@ impl LazyFrame {
scratch,
enable_fmt,
true,
opt_state.row_estimate,
opt_state.contains(OptState::ROW_ESTIMATE),
)?;
}
#[cfg(not(feature = "streaming"))]
Expand Down Expand Up @@ -720,7 +699,7 @@ impl LazyFrame {
pub fn collect(self) -> PolarsResult<DataFrame> {
#[cfg(feature = "new_streaming")]
{
let force_new_streaming = self.opt_state.new_streaming;
let force_new_streaming = self.opt_state.contains(OptState::NEW_STREAMING);
let mut alp_plan = self.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
Expand Down Expand Up @@ -854,7 +833,7 @@ impl LazyFrame {
cloud_options: Option<polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.opt_state |= OptState::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload: SinkType::Cloud {
Expand Down Expand Up @@ -909,7 +888,7 @@ impl LazyFrame {
feature = "json",
))]
fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> {
self.opt_state.streaming = true;
self.opt_state |= OptState::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload,
Expand Down Expand Up @@ -1334,7 +1313,9 @@ impl LazyFrame {
args: JoinArgs,
) -> LazyFrame {
// if any of the nodes reads from files we must activate this this plan as well.
self.opt_state.file_caching |= other.opt_state.file_caching;
if other.opt_state.contains(OptState::FILE_CACHING) {
self.opt_state |= OptState::FILE_CACHING;
}

let left_on = left_on.as_ref().to_vec();
let right_on = right_on.as_ref().to_vec();
Expand Down Expand Up @@ -2018,8 +1999,10 @@ impl JoinBuilder {
let mut opt_state = self.lf.opt_state;
let other = self.other.expect("with not set");

// if any of the nodes reads from files we must activate this this plan as well.
opt_state.file_caching |= other.opt_state.file_caching;
// If any of the nodes reads from files we must activate this this plan as well.
if other.opt_state.contains(OptState::FILE_CACHING) {
opt_state |= OptState::FILE_CACHING;
}

let args = JoinArgs {
how: self.how,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl LazyFileListReader for LazyCsvReader {
)?
.build()
.into();
lf.opt_state.file_caching = true;
lf.opt_state |= OptState::FILE_CACHING;
Ok(lf)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl LazyFileListReader for LazyIpcReader {
)?
.build()
.into();
lf.opt_state.file_caching = true;
lf.opt_state |= OptState::FILE_CACHING;

Ok(lf)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl LazyFileListReader for LazyParquetReader {
lf = lf.with_row_index(&row_index.name, Some(row_index.offset))
}

lf.opt_state.file_caching = true;
lf.opt_state |= OptState::FILE_CACHING;
Ok(lf)
}

Expand Down
5 changes: 1 addition & 4 deletions crates/polars-lazy/src/tests/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ fn filter_added_column_issue_2470() -> PolarsResult<()> {
fn filter_blocked_by_map() -> PolarsResult<()> {
let df = fruits_cars();

let allowed = AllowedOptimizations {
predicate_pushdown: false,
..Default::default()
};
let allowed = OptState::PREDICATE_PUSHDOWN | OptState::default();
let q = df
.lazy()
.map(Ok, allowed, None, None)
Expand Down
90 changes: 39 additions & 51 deletions crates/polars-plan/src/frame/opt_state.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,48 @@
use bitflags::bitflags;

bitflags! {
#[derive(Copy, Clone, Debug)]
/// State of the allowed optimizations
pub struct OptState {
/// Only read columns that are used later in the query.
pub projection_pushdown: bool,
/// Apply predicates/filters as early as possible.
pub predicate_pushdown: bool,
/// Cluster sequential `with_columns` calls to independent calls.
pub cluster_with_columns: bool,
/// Run many type coercion optimization rules until fixed point.
pub type_coercion: bool,
/// Run many expression optimization rules until fixed point.
pub simplify_expr: bool,
/// Cache file reads.
pub file_caching: bool,
/// Pushdown slices/limits.
pub slice_pushdown: bool,
#[cfg(feature = "cse")]
/// Run common-subplan-elimination. This elides duplicate plans and caches their
/// outputs.
pub comm_subplan_elim: bool,
#[cfg(feature = "cse")]
/// Run common-subexpression-elimination. This elides duplicate expressions and caches their
/// outputs.
pub comm_subexpr_elim: bool,
/// Run nodes that are capably of doing so on the streaming engine.
pub streaming: bool,
/// Run every node eagerly. This turns off multi-node optimizations.
pub eager: bool,
/// Replace simple projections with a faster inlined projection that skips the expression engine.
pub fast_projection: bool,
/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
pub row_estimate: bool,
pub new_streaming: bool,
/// Allowed optimizations.
pub struct OptState: u32 {
/// Only read columns that are used later in the query.
const PROJECTION_PUSHDOWN = 1;
/// Apply predicates/filters as early as possible.
const PREDICATE_PUSHDOWN = 1 << 2;
/// Cluster sequential `with_columns` calls to independent calls.
const CLUSTER_WITH_COLUMNS = 1 << 3;
/// Run many type coercion optimization rules until fixed point.
const TYPE_COERCION = 1 << 4;
/// Run many expression optimization rules until fixed point.
const SIMPLIFY_EXPR = 1 << 5;
/// Cache file reads.
const FILE_CACHING = 1 << 6;
/// Pushdown slices/limits.
const SLICE_PUSHDOWN = 1 << 7;
#[cfg(feature = "cse")]
/// Run common-subplan-elimination. This elides duplicate plans and caches their
/// outputs.
const COMM_SUBPLAN_ELIM = 1 << 8;
#[cfg(feature = "cse")]
/// Run common-subexpression-elimination. This elides duplicate expressions and caches their
/// outputs.
const COMM_SUBEXPR_ELIM = 1 << 9;
/// Run nodes that are capably of doing so on the streaming engine.
const STREAMING = 1 << 10;
const NEW_STREAMING = 1 << 11;
/// Run every node eagerly. This turns off multi-node optimizations.
const EAGER = 1 << 12;
/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
const ROW_ESTIMATE = 1 << 13;
/// Replace simple projections with a faster inlined projection that skips the expression engine.
const FAST_PROJECTION = 1 << 14;
}
}

impl Default for OptState {
fn default() -> Self {
OptState {
projection_pushdown: true,
predicate_pushdown: true,
cluster_with_columns: true,
type_coercion: true,
simplify_expr: true,
slice_pushdown: true,
Self::from_bits_truncate(u32::MAX) & !Self::NEW_STREAMING & !Self::STREAMING & !Self::EAGER
// will be toggled by a scan operation such as csv scan or parquet scan
file_caching: false,
#[cfg(feature = "cse")]
comm_subplan_elim: true,
#[cfg(feature = "cse")]
comm_subexpr_elim: true,
streaming: false,
fast_projection: true,
eager: false,
row_estimate: true,
new_streaming: false,
}
& !Self::FILE_CACHING
}
}

Expand Down
12 changes: 6 additions & 6 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ impl DslBuilder {
function: DslFunction::FunctionNode(FunctionNode::OpaquePython {
function,
schema,
predicate_pd: optimizations.predicate_pushdown,
projection_pd: optimizations.projection_pushdown,
streamable: optimizations.streaming,
predicate_pd: optimizations.contains(OptState::PREDICATE_PUSHDOWN),
projection_pd: optimizations.contains(OptState::PROJECTION_PUSHDOWN),
streamable: optimizations.contains(OptState::STREAMING),
validate_output,
}),
}
Expand All @@ -453,9 +453,9 @@ impl DslBuilder {
function: DslFunction::FunctionNode(FunctionNode::Opaque {
function,
schema,
predicate_pd: optimizations.predicate_pushdown,
projection_pd: optimizations.projection_pushdown,
streamable: optimizations.streaming,
predicate_pd: optimizations.contains(OptState::PREDICATE_PUSHDOWN),
projection_pd: optimizations.contains(OptState::PROJECTION_PUSHDOWN),
streamable: optimizations.contains(OptState::STREAMING),
fmt_str: name,
}),
}
Expand Down
Loading

0 comments on commit 2e322aa

Please sign in to comment.