Skip to content

Commit

Permalink
refactor(rust): Rename AlgoicalPlan to FullAccessIR (pola-rs#15553)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 9, 2024
1 parent 33eedfb commit 86d3aa0
Show file tree
Hide file tree
Showing 63 changed files with 523 additions and 524 deletions.
12 changes: 6 additions & 6 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,27 +552,27 @@ impl LazyFrame {

pub fn optimize(
self,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Node> {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}

pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena<ALogicalPlan>, Arena<AExpr>)> {
pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena<FullAccessIR>, Arena<AExpr>)> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
let node =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
Ok((node, lp_arena, expr_arena))
}

pub fn to_alp(self) -> PolarsResult<(Node, Arena<ALogicalPlan>, Arena<AExpr>)> {
pub fn to_alp(self) -> PolarsResult<(Node, Arena<FullAccessIR>, Arena<AExpr>)> {
self.logical_plan.to_alp()
}

pub(crate) fn optimize_with_scratch(
self,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
_fmt: bool,
Expand Down Expand Up @@ -641,7 +641,7 @@ impl LazyFrame {

// sink should be replaced
let no_file_sink = if check_sink {
!matches!(lp_arena.get(lp_top), ALogicalPlan::Sink { .. })
!matches!(lp_arena.get(lp_top), FullAccessIR::Sink { .. })
} else {
true
};
Expand Down Expand Up @@ -1845,7 +1845,7 @@ impl LazyGroupBy {
#[cfg(not(feature = "dynamic_group_by"))]
let options = GroupbyOptions { slice: None };

let lp = LogicalPlan::Aggregate {
let lp = LogicalPlan::GroupBy {
input: Arc::new(self.logical_plan),
keys: Arc::new(self.keys),
aggs: vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl PartitionGroupByExec {
..Default::default()
}
.into();
let lp = LogicalPlan::Aggregate {
let lp = LogicalPlan::GroupBy {
input: Arc::new(original_df.lazy().logical_plan),
keys: Arc::new(std::mem::take(&mut self.keys)),
aggs: std::mem::take(&mut self.aggs),
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ fn partitionable_gb(

pub fn create_physical_plan(
root: Node,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Box<dyn Executor>> {
use ALogicalPlan::*;
use FullAccessIR::*;

let logical_plan = lp_arena.take(root);
match logical_plan {
Expand Down Expand Up @@ -171,7 +171,7 @@ pub fn create_physical_plan(
let input = create_physical_plan(input, lp_arena, expr_arena)?;
Ok(Box::new(executors::SliceExec { input, offset, len }))
},
Selection { input, predicate } => {
Filter { input, predicate } => {
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan(input, lp_arena, expr_arena)?;
let mut state = ExpressionConversionState::default();
Expand Down Expand Up @@ -266,7 +266,7 @@ pub fn create_physical_plan(
},
}
},
Projection {
Select {
expr,
input,
schema: _schema,
Expand Down Expand Up @@ -363,7 +363,7 @@ pub fn create_physical_plan(
let input = create_physical_plan(input, lp_arena, expr_arena)?;
Ok(Box::new(executors::UniqueExec { input, options }))
},
Aggregate {
GroupBy {
input,
keys,
aggs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ fn to_physical_piped_expr(

fn jit_insert_slice(
node: Node,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
sink_nodes: &mut Vec<(usize, Node, Rc<RefCell<u32>>)>,
operator_offset: usize,
) {
// if the join/union has a slice, we add a new slice node
// note that we take the offset + 1, because we want to
// slice AFTER the join has happened and the join will be an
// operator
use ALogicalPlan::*;
use FullAccessIR::*;
let (offset, len) = match lp_arena.get(node) {
Join { options, .. } if options.args.slice.is_some() => {
let Some((offset, len)) = options.args.slice else {
Expand Down Expand Up @@ -101,11 +101,11 @@ fn jit_insert_slice(

pub(super) fn construct(
tree: Tree,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
expr_arena: &mut Arena<AExpr>,
fmt: bool,
) -> PolarsResult<Option<Node>> {
use ALogicalPlan::*;
use FullAccessIR::*;

let mut pipelines = Vec::with_capacity(tree.len());
let mut callbacks = CallBacks::new();
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(super) fn construct(
// The file sink is always to the top of the tree
// not every branch has a final sink. For instance rhs join branches
if let Some(node) = branch.get_final_sink() {
if matches!(lp_arena.get(node), ALogicalPlan::Sink { .. }) {
if matches!(lp_arena.get(node), FullAccessIR::Sink { .. }) {
final_sink = Some(node)
}
}
Expand Down Expand Up @@ -251,22 +251,22 @@ impl SExecutionContext for ExecutionState {
}

fn get_pipeline_node(
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
mut pipelines: Vec<PipeLine>,
schema: SchemaRef,
original_lp: Option<LogicalPlan>,
) -> ALogicalPlan {
) -> FullAccessIR {
// create a dummy input as the map function will call the input
// so we just create a scan that returns an empty df
let dummy = lp_arena.add(ALogicalPlan::DataFrameScan {
let dummy = lp_arena.add(FullAccessIR::DataFrameScan {
df: Arc::new(DataFrame::empty()),
schema: Arc::new(Schema::new()),
output_schema: None,
projection: None,
selection: None,
});

ALogicalPlan::MapFunction {
FullAccessIR::MapFunction {
function: FunctionNode::Pipeline {
function: Arc::new(move |_df: DataFrame| {
let mut state = ExecutionState::new();
Expand Down
24 changes: 11 additions & 13 deletions crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn process_non_streamable_node(
stack: &mut Vec<StackFrame>,
scratch: &mut Vec<Node>,
pipeline_trees: &mut Vec<Vec<Branch>>,
lp: &ALogicalPlan,
lp: &FullAccessIR,
) {
lp.copy_inputs(scratch);
while let Some(input) = scratch.pop() {
Expand All @@ -69,11 +69,11 @@ fn process_non_streamable_node(
state.streamable = false;
}

fn insert_file_sink(mut root: Node, lp_arena: &mut Arena<ALogicalPlan>) -> Node {
fn insert_file_sink(mut root: Node, lp_arena: &mut Arena<FullAccessIR>) -> Node {
// The pipelines need a final sink, we insert that here.
// this allows us to split at joins/unions and share a sink
if !matches!(lp_arena.get(root), ALogicalPlan::Sink { .. }) {
root = lp_arena.add(ALogicalPlan::Sink {
if !matches!(lp_arena.get(root), FullAccessIR::Sink { .. }) {
root = lp_arena.add(FullAccessIR::Sink {
input: root,
payload: SinkType::Memory,
})
Expand All @@ -85,10 +85,10 @@ fn insert_slice(
root: Node,
offset: i64,
len: IdxSize,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
state: &mut Branch,
) {
let node = lp_arena.add(ALogicalPlan::Slice {
let node = lp_arena.add(FullAccessIR::Slice {
input: root,
offset,
len: len as IdxSize,
Expand All @@ -98,7 +98,7 @@ fn insert_slice(

pub(crate) fn insert_streaming_nodes(
root: Node,
lp_arena: &mut Arena<ALogicalPlan>,
lp_arena: &mut Arena<FullAccessIR>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
fmt: bool,
Expand Down Expand Up @@ -163,7 +163,7 @@ pub(crate) fn insert_streaming_nodes(
// keep the counter global so that the order will match traversal order
let mut execution_id = 0;

use ALogicalPlan::*;
use FullAccessIR::*;
while let Some(StackFrame {
node: mut root,
mut state,
Expand All @@ -177,7 +177,7 @@ pub(crate) fn insert_streaming_nodes(
state.execution_id = execution_id;
execution_id += 1;
match lp_arena.get(root) {
Selection { input, predicate }
Filter { input, predicate }
if is_streamable(predicate.node(), expr_arena, Context::Default) =>
{
state.streamable = true;
Expand Down Expand Up @@ -208,9 +208,7 @@ pub(crate) fn insert_streaming_nodes(
state.operators_sinks.push(PipelineNode::Sink(root));
stack.push(StackFrame::new(*input, state, current_idx))
},
Projection { input, expr, .. }
if all_streamable(expr, expr_arena, Context::Default) =>
{
Select { input, expr, .. } if all_streamable(expr, expr_arena, Context::Default) => {
state.streamable = true;
state.operators_sinks.push(PipelineNode::Operator(root));
stack.push(StackFrame::new(*input, state, current_idx))
Expand Down Expand Up @@ -380,7 +378,7 @@ pub(crate) fn insert_streaming_nodes(
stack.push(StackFrame::new(*input, state, current_idx))
},
#[allow(unused_variables)]
lp @ Aggregate {
lp @ GroupBy {
input,
keys,
aggs,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/streaming/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub(super) fn is_valid_tree(tree: TreeRef) -> bool {

#[cfg(debug_assertions)]
#[allow(unused)]
pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<ALogicalPlan>) {
pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<FullAccessIR>) {
// streamable: bool,
// sources: Vec<Node>,
// // joins seen in whole branch (we count a union as joins with multiple counts)
Expand Down Expand Up @@ -158,7 +158,7 @@ pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<ALogicalPlan>) {

#[cfg(debug_assertions)]
#[allow(unused)]
pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<ALogicalPlan>, expr_arena: &Arena<AExpr>) {
pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<FullAccessIR>, expr_arena: &Arena<AExpr>) {
if tree.is_empty() {
println!("EMPTY TREE");
return;
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ fn cached_before_root(q: LazyFrame) {
let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
for input in lp_arena.get(lp).get_inputs() {
assert!(matches!(lp_arena.get(input), ALogicalPlan::Cache { .. }));
assert!(matches!(lp_arena.get(input), FullAccessIR::Cache { .. }));
}
}

fn count_caches(q: LazyFrame) -> usize {
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena)
.iter(node)
.filter(|(_node, lp)| matches!(lp, ALogicalPlan::Cache { .. }))
.filter(|(_node, lp)| matches!(lp, FullAccessIR::Cache { .. }))
.count()
}

Expand Down Expand Up @@ -55,7 +55,7 @@ fn test_cse_unions() -> PolarsResult<()> {
let lp = lf.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
let mut cache_count = 0;
assert!((&lp_arena).iter(lp).all(|(_, lp)| {
use ALogicalPlan::*;
use FullAccessIR::*;
match lp {
Cache { .. } => {
cache_count += 1;
Expand Down Expand Up @@ -98,7 +98,7 @@ fn test_cse_cache_union_projection_pd() -> PolarsResult<()> {
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
let mut cache_count = 0;
assert!((&lp_arena).iter(lp).all(|(_, lp)| {
use ALogicalPlan::*;
use FullAccessIR::*;
match lp {
Cache { .. } => {
cache_count += 1;
Expand Down Expand Up @@ -154,7 +154,7 @@ fn test_cse_union2_4925() -> PolarsResult<()> {
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use ALogicalPlan::*;
use FullAccessIR::*;
match lp {
Cache { id, cache_hits, .. } => {
assert_eq!(*cache_hits, 1);
Expand Down Expand Up @@ -207,7 +207,7 @@ fn test_cse_joins_4954() -> PolarsResult<()> {
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use ALogicalPlan::*;
use FullAccessIR::*;
match lp {
Cache {
id,
Expand All @@ -218,7 +218,7 @@ fn test_cse_joins_4954() -> PolarsResult<()> {
assert_eq!(*cache_hits, 1);
assert!(matches!(
lp_arena.get(*input),
ALogicalPlan::DataFrameScan { .. }
FullAccessIR::DataFrameScan { .. }
));

Some(*id)
Expand Down Expand Up @@ -279,7 +279,7 @@ fn test_cache_with_partial_projection() -> PolarsResult<()> {
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use ALogicalPlan::*;
use FullAccessIR::*;
match lp {
Cache { id, .. } => Some(*id),
_ => None,
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,11 @@ fn test_scan_parquet_limit_9001() {
let q = LazyFrame::scan_parquet(path, args).unwrap().limit(3);
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena).iter(node).all(|(_, lp)| match lp {
ALogicalPlan::Union { options, .. } => {
FullAccessIR::Union { options, .. } => {
let sliced = options.slice.unwrap();
sliced.1 == 3
},
ALogicalPlan::Scan { file_options, .. } => file_options.n_rows == Some(3),
FullAccessIR::Scan { file_options, .. } => file_options.n_rows == Some(3),
_ => true,
});
}
Expand Down Expand Up @@ -428,9 +428,9 @@ fn test_ipc_globbing() -> PolarsResult<()> {
Ok(())
}

fn slice_at_union(lp_arena: &Arena<ALogicalPlan>, lp: Node) -> bool {
fn slice_at_union(lp_arena: &Arena<FullAccessIR>, lp: Node) -> bool {
(&lp_arena).iter(lp).all(|(_, lp)| {
if let ALogicalPlan::Union { options, .. } = lp {
if let FullAccessIR::Union { options, .. } = lp {
options.slice.is_some()
} else {
true
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod streaming;
#[cfg(all(feature = "strings", feature = "cse"))]
mod tpch;

fn get_arenas() -> (Arena<AExpr>, Arena<ALogicalPlan>) {
fn get_arenas() -> (Arena<AExpr>, Arena<FullAccessIR>) {
let expr_arena = Arena::with_capacity(16);
let lp_arena = Arena::with_capacity(8);
(expr_arena, lp_arena)
Expand Down
Loading

0 comments on commit 86d3aa0

Please sign in to comment.