Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Aug 19, 2024
1 parent 3611560 commit 9a65d73
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 59 deletions.
4 changes: 3 additions & 1 deletion crates/polars-plan/src/plans/ir/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ impl IR {
| Join { schema, .. }
| HStack { schema, .. }
| ExtContext { schema, .. }
| SimpleProjection { columns: schema, .. } => schema.clone(),
| SimpleProjection {
columns: schema, ..
} => schema.clone(),
MapFunction { input, function } => {
let input_schema = IR::schema_with_cache(*input, arena, cache);
function.schema(&input_schema).unwrap().into_owned()
Expand Down
96 changes: 60 additions & 36 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,63 @@ pub fn lower_ir(
IR::SimpleProjection { input, columns } => {
let columns = columns.iter_names().map(|s| s.to_string()).collect();
let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?;
PhysNodeKind::SimpleProjection { input: phys_input, columns }
PhysNodeKind::SimpleProjection {
input: phys_input,
columns,
}
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
IR::Select {
input,
expr,
..
} if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
IR::Select { input, expr, .. }
if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) =>
{
let selectors = expr.clone();
let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?;
PhysNodeKind::Select { input: phys_input, selectors, extend_original: false }
PhysNodeKind::Select {
input: phys_input,
selectors,
extend_original: false,
}
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
IR::HStack {
input,
exprs,
..
} if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
IR::HStack { input, exprs, .. }
if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) =>
{
let selectors = exprs.clone();
let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?;
PhysNodeKind::Select { input: phys_input, selectors, extend_original: true }
PhysNodeKind::Select {
input: phys_input,
selectors,
extend_original: true,
}
},

// TODO: split reductions and streamable selections. E.g. sum(a) + sum(b) should be split
// into Select(a + b) -> Reduce(sum(a), sum(b)
IR::Select {
input,
expr,
..
} if expr
.iter()
.all(|e| can_convert_into_reduction(e.node(), expr_arena)) =>
IR::Select { input, expr, .. }
if expr
.iter()
.all(|e| can_convert_into_reduction(e.node(), expr_arena)) =>
{
let exprs = expr.clone();
let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?;
PhysNodeKind::Reduce { input: phys_input, exprs }
PhysNodeKind::Reduce {
input: phys_input,
exprs,
}
},

IR::Slice { input, offset, len } => {
if *offset >= 0 {
let offset = *offset as usize;
let length = *len as usize;
let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?;
PhysNodeKind::StreamingSlice { input: phys_input, offset, length }
PhysNodeKind::StreamingSlice {
input: phys_input,
offset,
length,
}
} else {
todo!()
}
Expand All @@ -82,7 +93,10 @@ pub fn lower_ir(
IR::Filter { input, predicate } if is_streamable(predicate.node(), expr_arena) => {
let predicate = predicate.clone();
let phys_input = lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?;
PhysNodeKind::Filter { input: phys_input, predicate }
PhysNodeKind::Filter {
input: phys_input,
predicate,
}
},

IR::DataFrameScan {
Expand All @@ -99,7 +113,10 @@ pub fn lower_ir(
let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind));
node_kind = PhysNodeKind::SimpleProjection {
input: phys_input,
columns: projection_schema.iter_names().map(|s| s.to_string()).collect(),
columns: projection_schema
.iter_names()
.map(|s| s.to_string())
.collect(),
};
schema = projection_schema.clone();
}
Expand All @@ -110,9 +127,12 @@ pub fn lower_ir(
}

let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind));
node_kind = PhysNodeKind::Filter { input: phys_input, predicate };
node_kind = PhysNodeKind::Filter {
input: phys_input,
predicate,
};
}

node_kind
},

Expand All @@ -131,10 +151,16 @@ pub fn lower_ir(

if function.is_streamable() {
let map = Arc::new(move |df| function.evaluate(df));
PhysNodeKind::Map { input: phys_input, map }
PhysNodeKind::Map {
input: phys_input,
map,
}
} else {
let map = Arc::new(move |df| function.evaluate(df));
PhysNodeKind::InMemoryMap { input: phys_input, map }
PhysNodeKind::InMemoryMap {
input: phys_input,
map,
}
}
},

Expand All @@ -143,13 +169,11 @@ pub fn lower_ir(
by_column,
slice,
sort_options,
} => {
PhysNodeKind::Sort {
by_column: by_column.clone(),
slice: *slice,
sort_options: sort_options.clone(),
input: lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?,
}
} => PhysNodeKind::Sort {
by_column: by_column.clone(),
slice: *slice,
sort_options: sort_options.clone(),
input: lower_ir(*input, ir_arena, expr_arena, phys_sm, schema_cache)?,
},

IR::Union { inputs, options } => {
Expand Down Expand Up @@ -183,7 +207,7 @@ pub fn lower_ir(

_ => todo!(),
};

let output_schema = IR::schema_with_cache(node, ir_arena, schema_cache);
Ok(phys_sm.insert(PhysNode::new(output_schema, node_kind)))
}
5 changes: 4 additions & 1 deletion crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ pub struct PhysNode {

impl PhysNode {
pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {
Self { output_schema, kind }
Self {
output_schema,
kind,
}
}
}

Expand Down
30 changes: 10 additions & 20 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use polars_utils::itertools::Itertools;
use recursive::recursive;
use slotmap::{SecondaryMap, SlotMap};

use super::{PhysNode, PhysNodeKind, PhysNodeKey};
use super::{PhysNode, PhysNodeKey, PhysNodeKind};
use crate::expression::StreamExpr;
use crate::graph::{Graph, GraphNodeKey};
use crate::nodes;
Expand Down Expand Up @@ -130,10 +130,7 @@ fn to_graph_rec<'a>(
[input_key],
)
},
Reduce {
input,
exprs,
} => {
Reduce { input, exprs } => {
let input_key = to_graph_rec(*input, ctx)?;
let input_schema = &ctx.phys_sm[*input].output_schema;

Expand All @@ -142,8 +139,7 @@ fn to_graph_rec<'a>(

for e in exprs {
let (red, input_node) =
into_reduction(e.node(), ctx.expr_arena, input_schema)?
.expect("invariant");
into_reduction(e.node(), ctx.expr_arena, input_schema)?.expect("invariant");
reductions.push(red);

let input_phys =
Expand All @@ -157,17 +153,11 @@ fn to_graph_rec<'a>(
[input_key],
)
},
SimpleProjection {
input,
columns,
} => {
SimpleProjection { input, columns } => {
let input_schema = ctx.phys_sm[*input].output_schema.clone();
let input_key = to_graph_rec(*input, ctx)?;
ctx.graph.add_node(
nodes::simple_projection::SimpleProjectionNode::new(
columns.clone(),
input_schema,
),
nodes::simple_projection::SimpleProjectionNode::new(columns.clone(), input_schema),
[input_key],
)
},
Expand All @@ -181,10 +171,7 @@ fn to_graph_rec<'a>(
)
},

InMemoryMap {
input,
map,
} => {
InMemoryMap { input, map } => {
let input_schema = ctx.phys_sm[*input].output_schema.clone();
let input_key = to_graph_rec(*input, ctx)?;
ctx.graph.add_node(
Expand Down Expand Up @@ -248,7 +235,10 @@ fn to_graph_rec<'a>(
inputs,
null_extend,
} => {
let input_schemas = inputs.iter().map(|i| ctx.phys_sm[*i].output_schema.clone()).collect_vec();
let input_schemas = inputs
.iter()
.map(|i| ctx.phys_sm[*i].output_schema.clone())
.collect_vec();
let input_keys = inputs
.iter()
.map(|i| to_graph_rec(*i, ctx))
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-stream/src/skeleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ pub fn run_query(
) -> PolarsResult<DataFrame> {
let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
let root = crate::physical_plan::lower_ir(node, &mut ir_arena, expr_arena, &mut phys_sm, &mut schema_cache)?;
let root = crate::physical_plan::lower_ir(
node,
&mut ir_arena,
expr_arena,
&mut phys_sm,
&mut schema_cache,
)?;
let (mut graph, phys_to_graph) =
crate::physical_plan::physical_plan_to_graph(&phys_sm, expr_arena)?;
let mut results = crate::execute::execute_graph(&mut graph)?;
Expand Down

0 comments on commit 9a65d73

Please sign in to comment.